Commit 01da0d24 authored by Haye Böhm's avatar Haye Böhm

Implement different variants of Parallel

parent ce800e12
Pipeline #15425 failed with stage
in 10 minutes and 18 seconds
...@@ -329,8 +329,6 @@ addActorToMap roomViz actor location inventoryForSectionShare shipStatusShare us ...@@ -329,8 +329,6 @@ addActorToMap roomViz actor location inventoryForSectionShare shipStatusShare us
>>| moveAround roomViz actor.userName inventoryForSectionShare shipStatusShare userToActorShare inventoryForAllSectionsShare) >>| moveAround roomViz actor.userName inventoryForSectionShare shipStatusShare userToActorShare inventoryForAllSectionsShare)
(viewInformation ("Section with number: " <+++ location <+++ " does not exist") [] () >>| return ()) (viewInformation ("Section with number: " <+++ location <+++ " does not exist") [] () >>| return ())
import StdDebug
:: UITag :== [Int] :: UITag :== [Int]
:: TaskUITree :: TaskUITree
......
...@@ -82,8 +82,8 @@ where ...@@ -82,8 +82,8 @@ where
[BackgroundTask (processEvents MAX_EVENTS) [BackgroundTask (processEvents MAX_EVENTS)
:if (webTasks =: []) :if (webTasks =: [])
[BackgroundTask stopOnStable] [BackgroundTask stopOnStable]
[//BackgroundTask removeOutdatedSessions [BackgroundTask removeOutdatedSessions
BackgroundTask flushWritesWhenIdle ,BackgroundTask flushWritesWhenIdle
] ]
] ]
......
...@@ -7,8 +7,6 @@ from Data.Func import $ ...@@ -7,8 +7,6 @@ from Data.Func import $
import iTasks import iTasks
import Text.Terminal.VT100 import Text.Terminal.VT100
import StdDebug
import StdArray import StdArray
runProcessInteractive :: !VT100Settings !FilePath ![String] !(Maybe FilePath) -> Task Int runProcessInteractive :: !VT100Settings !FilePath ![String] !(Maybe FilePath) -> Task Int
runProcessInteractive vt100 fp args wd = runProcessInteractive vt100 fp args wd =
......
...@@ -86,7 +86,6 @@ where ...@@ -86,7 +86,6 @@ where
| size textResponse == 0 = (Error ("queueWriteRequest: Server" +++ host +++ " disconnected without responding"), Nothing) | size textResponse == 0 = (Error ("queueWriteRequest: Server" +++ host +++ " disconnected without responding"), Nothing)
= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing) = (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)
import StdDebug, StdMisc
queueServiceRequest :: !(SDSRemoteService p r w) p !TaskId !Bool !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r queueServiceRequest :: !(SDSRemoteService p r w) p !TaskId !Bool !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r
queueServiceRequest service=:(SDSRemoteService (HTTPShareOptions {host, port, createRequest, fromResponse})) p taskId _ env queueServiceRequest service=:(SDSRemoteService (HTTPShareOptions {host, port, createRequest, fromResponse})) p taskId _ env
= case addConnection taskId host port connectionTask env of = case addConnection taskId host port connectionTask env of
...@@ -129,19 +128,19 @@ where ...@@ -129,19 +128,19 @@ where
onShareChange = onShareChange, onShareChange = onShareChange,
onDisconnect = onDisconnect} onDisconnect = onDisconnect}
onConnect connId _ _ = trace_n ("New TCP connection: " +++ toString connId +++ ". Sending: \n" +++ createMessage p) (Ok ([], []), Nothing, [createMessage p +++ "\n"], False) onConnect connId _ _ = (Ok (Nothing, []), Nothing, [createMessage p +++ "\n"], False)
onData data (previous, acc) _ onData data (previous, acc) _
| not (trace_tn ("Received " +++ data)) = undef
# newacc = acc ++ [data] # newacc = acc ++ [data]
| register && not (isnull previous) = trace_n "Close previously registered connection" (Ok (previous, newacc), Nothing, [], True) // If already a result, and we are registering, then we have received a refresh notification from the server.
| register && isJust previous = (Ok (previous, newacc), Nothing, [], True)
= case fromTextResponse (concat newacc) p register of = case fromTextResponse (concat newacc) p register of
Error e = (Error e, Nothing, [], True) Error e = (Error e, Nothing, [], True)
Ok (Nothing,response) = (Ok (previous, newacc), Nothing, maybe [] (\resp. [resp]) response, False) // No full response yet, keep the old value.
Ok (Just r, Just resp) Ok (Nothing,response) = (Ok (previous, newacc), Nothing, maybe [] (\resp. [resp +++ "\n"]) response, False)
| not (trace_tn ("Registering: " +++ resp)) = undef Ok (Just r, Just resp) = (Ok (Just r, []), Nothing, [resp +++ "\n"], False)
= (Ok ([r : previous], []), Nothing, [resp], False) // Only close the connection when we have a value and when we are not registering.
Ok (Just r, Nothing) = trace_n "Not responding, normal read" (Ok ([r : previous], []), Nothing, [], not register) Ok (Just r, Nothing) = (Ok (Just r, []), Nothing, [], not register)
onShareChange state _ = (Ok state, Nothing, [], False) onShareChange state _ = (Ok state, Nothing, [], False)
onDisconnect state _ = (Ok state, Nothing) onDisconnect state _ = (Ok state, Nothing)
...@@ -214,9 +213,9 @@ where ...@@ -214,9 +213,9 @@ where
getValueTCP connectionId connectionMap getValueTCP connectionId connectionMap
= case 'DM'.get connectionId connectionMap of = case 'DM'.get connectionId connectionMap of
Just (value :: ([r^], [String]), _) = case value of Just (value :: (Maybe r^, [String]), _) = case value of
([], _) = Ok Nothing (Nothing, _) = Ok Nothing
([r : rs],_) = Ok (Just r) (Just r,_) = Ok (Just r)
Just (dyn, _) Just (dyn, _)
# message = "Dynamic not of the correct service type, got: " # message = "Dynamic not of the correct service type, got: "
+++ toString (typeCodeOfDynamic dyn) +++ toString (typeCodeOfDynamic dyn)
......
...@@ -3,7 +3,6 @@ implementation module iTasks.Internal.Distributed.Symbols ...@@ -3,7 +3,6 @@ implementation module iTasks.Internal.Distributed.Symbols
import iTasks import iTasks
import StdFile import StdFile
import StdDebug
import StdArray import StdArray
import symbols_in_program import symbols_in_program
import dynamic_string import dynamic_string
......
...@@ -603,6 +603,7 @@ instance Identifiable SDSParallel where ...@@ -603,6 +603,7 @@ instance Identifiable SDSParallel where
nameSDS (SDSParallel sds1 sds2 {SDSParallelOptions|name}) acc = ["|",name:nameSDS sds1 [",":nameSDS sds2 ["|":acc]]] nameSDS (SDSParallel sds1 sds2 {SDSParallelOptions|name}) acc = ["|",name:nameSDS sds1 [",":nameSDS sds2 ["|":acc]]]
instance Readable SDSParallel where instance Readable SDSParallel where
// TODO: Figure out how to NOT repeat the same code 4 times.
readSDS sds=:(SDSParallel sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify reqSDSId iworld readSDS sds=:(SDSParallel sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify reqSDSId iworld
# iworld = mbRegister p sds mbNotify c reqSDSId iworld # iworld = mbRegister p sds mbNotify c reqSDSId iworld
# (p1,p2) = param p # (p1,p2) = param p
...@@ -618,6 +619,51 @@ instance Readable SDSParallel where ...@@ -618,6 +619,51 @@ instance Readable SDSParallel where
(ReadResult r1 ssds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel ssds1 sds2 opts)), iworld) (ReadResult r1 ssds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel ssds1 sds2 opts)), iworld)
(AsyncRead sds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel sds1 sds2 opts)), iworld) (AsyncRead sds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel sds1 sds2 opts)), iworld)
readSDS sds=:(SDSParallelWriteLeft sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify reqSDSId iworld
# iworld = mbRegister p sds mbNotify c reqSDSId iworld
# (p1,p2) = param p
# (res1, iworld) = readSDS sds1 p1 c mbNotify reqSDSId iworld
| res1 =:(Error _)
= (liftError res1, iworld)
# (res2, iworld) = readSDS sds2 p2 c mbNotify reqSDSId iworld
| res2 =:(Error _)
= (liftError res2, iworld)
= case (fromOk res1, fromOk res2) of
(ReadResult r1 ssds1, ReadResult r2 ssds2) = (Ok (ReadResult (read (r1, r2)) (SDSParallel ssds1 ssds2 opts)), iworld)
(AsyncRead sds1, ReadResult r2 ssds2) = (Ok (AsyncRead (SDSParallel sds1 ssds2 opts)), iworld)
(ReadResult r1 ssds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel ssds1 sds2 opts)), iworld)
(AsyncRead sds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel sds1 sds2 opts)), iworld)
readSDS sds=:(SDSParallelWriteRight sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify reqSDSId iworld
# iworld = mbRegister p sds mbNotify c reqSDSId iworld
# (p1,p2) = param p
# (res1, iworld) = readSDS sds1 p1 c mbNotify reqSDSId iworld
| res1 =:(Error _)
= (liftError res1, iworld)
# (res2, iworld) = readSDS sds2 p2 c mbNotify reqSDSId iworld
| res2 =:(Error _)
= (liftError res2, iworld)
= case (fromOk res1, fromOk res2) of
(ReadResult r1 ssds1, ReadResult r2 ssds2) = (Ok (ReadResult (read (r1, r2)) (SDSParallel ssds1 ssds2 opts)), iworld)
(AsyncRead sds1, ReadResult r2 ssds2) = (Ok (AsyncRead (SDSParallel sds1 ssds2 opts)), iworld)
(ReadResult r1 ssds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel ssds1 sds2 opts)), iworld)
(AsyncRead sds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel sds1 sds2 opts)), iworld)
readSDS sds=:(SDSParallelWriteNone sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify reqSDSId iworld
# iworld = mbRegister p sds mbNotify c reqSDSId iworld
# (p1,p2) = param p
# (res1, iworld) = readSDS sds1 p1 c mbNotify reqSDSId iworld
| res1 =:(Error _)
= (liftError res1, iworld)
# (res2, iworld) = readSDS sds2 p2 c mbNotify reqSDSId iworld
| res2 =:(Error _)
= (liftError res2, iworld)
= case (fromOk res1, fromOk res2) of
(ReadResult r1 ssds1, ReadResult r2 ssds2) = (Ok (ReadResult (read (r1, r2)) (SDSParallel ssds1 ssds2 opts)), iworld)
(AsyncRead sds1, ReadResult r2 ssds2) = (Ok (AsyncRead (SDSParallel sds1 ssds2 opts)), iworld)
(ReadResult r1 ssds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel ssds1 sds2 opts)), iworld)
(AsyncRead sds1, AsyncRead sds2) = (Ok (AsyncRead (SDSParallel sds1 sds2 opts)), iworld)
instance Writeable SDSParallel where instance Writeable SDSParallel where
writeSDS sds=:(SDSParallel sds1 sds2 opts=:{SDSParallelOptions|param,writel,writer,name}) p c w iworld writeSDS sds=:(SDSParallel sds1 sds2 opts=:{SDSParallelOptions|param,writel,writer,name}) p c w iworld
# (p1,p2) = param p # (p1,p2) = param p
...@@ -655,6 +701,49 @@ instance Writeable SDSParallel where ...@@ -655,6 +701,49 @@ instance Writeable SDSParallel where
(Ok (AsyncWrite sds1), Ok (WriteResult n2 ssds2)) = (Ok (AsyncWrite (SDSParallel sds1 ssds2 opts)), queueNotifyEvents (sdsIdentity sds2) n2 iworld) (Ok (AsyncWrite sds1), Ok (WriteResult n2 ssds2)) = (Ok (AsyncWrite (SDSParallel sds1 ssds2 opts)), queueNotifyEvents (sdsIdentity sds2) n2 iworld)
(Ok (AsyncWrite sds1), Ok (AsyncWrite sds2)) = (Ok (AsyncWrite (SDSParallel sds1 sds2 opts)), iworld) (Ok (AsyncWrite sds1), Ok (AsyncWrite sds2)) = (Ok (AsyncWrite (SDSParallel sds1 sds2 opts)), iworld)
writeSDS sds=:(SDSParallelWriteLeft sds1 sds2 opts=:{SDSParallelOptions|param,writel,name}) p c w iworld
# p1 = fst (param p)
//Read/write sds1
# (npreds1,iworld) = case writel of
(SDSWrite f) = case readSDS sds1 p1 c Nothing (sdsIdentity sds1) iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite ssds), iworld)
(Ok (ReadResult r1 ssds),iworld) = case f p r1 w of
Error e = (Error e, iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w1) = writeSDS ssds p1 c w1 iworld
(SDSWriteConst f) = case f p w of
Error e = (Error e,iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet sds1),iworld)
Ok (Just w1) = writeSDS sds1 p1 c w1 iworld
= case npreds1 of
Error e = (Error e, iworld)
Ok (WriteResult n1 ssds1) = (Ok (WriteResult n1 (SDSParallelWriteLeft ssds1 sds2 opts)), iworld)
Ok (AsyncWrite sds1) = (Ok (AsyncWrite (SDSParallelWriteLeft sds1 sds2 opts)), iworld)
writeSDS sds=:(SDSParallelWriteLeft sds1 sds2 opts=:{SDSParallelOptions|param,writer,name}) p c w iworld
# p2 = snd (param p)
//Read/write sds1
# (npreds2,iworld) = case writer of
(SDSWrite f) = case readSDS sds2 p2 c Nothing (sdsIdentity sds2) iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite ssds), iworld)
(Ok (ReadResult r2 ssds),iworld) = case f p r2 w of
Error e = (Error e, iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w2) = writeSDS ssds p2 c w2 iworld
(SDSWriteConst f) = case f p w of
Error e = (Error e,iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet sds2),iworld)
Ok (Just w2) = writeSDS sds2 p2 c w2 iworld
= case npreds2 of
Error e = (Error e, iworld)
Ok (WriteResult n2 ssds2) = (Ok (WriteResult n2 (SDSParallelWriteLeft sds1 ssds2 opts)), iworld)
Ok (AsyncWrite sds2) = (Ok (AsyncWrite (SDSParallelWriteLeft sds1 sds2 opts)), iworld)
writeSDS sds=:(SDSParallelWriteNone sds1 sds2 opts) p c w iworld
= (Ok (WriteResult 'Set'.newSet sds), iworld)
instance Modifiable SDSParallel where instance Modifiable SDSParallel where
modifySDS f sds p context iworld modifySDS f sds p context iworld
= case readSDS sds p context Nothing (sdsIdentity sds) iworld of = case readSDS sds p context Nothing (sdsIdentity sds) iworld of
......
...@@ -75,10 +75,8 @@ where ...@@ -75,10 +75,8 @@ where
# opts = {ListenerInstanceOpts|taskId=TaskId 0 0, port=port, connectionTask=ct, removeOnClose = True} # opts = {ListenerInstanceOpts|taskId=TaskId 0 0, port=port, connectionTask=ct, removeOnClose = True}
= (ListenerInstance opts (fromJust mbListener),world) = (ListenerInstance opts (fromJust mbListener),world)
import StdDebug
loop :: !(*IWorld -> (!Maybe Timeout,!*IWorld)) !*IWorld -> *IWorld loop :: !(*IWorld -> (!Maybe Timeout,!*IWorld)) !*IWorld -> *IWorld
loop determineTimeout iworld=:{ioTasks,sdsNotifyRequests} loop determineTimeout iworld=:{ioTasks,sdsNotifyRequests}
//| not (trace_tn ("Number of registrations:" +++ toString (length (flatten (map 'DM'.keys ('DM'.elems sdsNotifyRequests)))))) = undef
// Also put all done tasks at the end of the todo list, as the previous event handling may have yielded new tasks. // Also put all done tasks at the end of the todo list, as the previous event handling may have yielded new tasks.
# (mbTimeout,iworld=:{IWorld|ioTasks={todo},world}) = determineTimeout {iworld & ioTasks = {done=[], todo = ioTasks.todo ++ (reverse ioTasks.done)}} # (mbTimeout,iworld=:{IWorld|ioTasks={todo},world}) = determineTimeout {iworld & ioTasks = {done=[], todo = ioTasks.todo ++ (reverse ioTasks.done)}}
//Check which mainloop tasks have data available //Check which mainloop tasks have data available
...@@ -373,7 +371,6 @@ processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandle ...@@ -373,7 +371,6 @@ processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandle
# {done, todo} = iworld.ioTasks # {done, todo} = iworld.ioTasks
= {iworld & ioStates = ioStates, ioTasks = {done = [mkIOTaskInstance ioChannels : done], todo = todo}} = {iworld & ioStates = ioStates, ioTasks = {done = [mkIOTaskInstance ioChannels : done], todo = todo}}
IODData data IODData data
| not (trace_tn ("Got data for task " +++ toString taskId +++ ", close: " +++ toString close)) = undef
# (mbTaskState, mbw, out, close, iworld) = onDataHandler data taskState r iworld # (mbTaskState, mbw, out, close, iworld) = onDataHandler data taskState r iworld
# iworld = if (instanceNo > 0) (queueRefresh [(taskId, "New data for "<+++ instanceNo)] iworld) iworld # iworld = if (instanceNo > 0) (queueRefresh [(taskId, "New data for "<+++ instanceNo)] iworld) iworld
# (mbSdsErr, iworld) = writeShareIfNeeded sds mbw iworld # (mbSdsErr, iworld) = writeShareIfNeeded sds mbw iworld
......
...@@ -656,8 +656,6 @@ tonicWrapApp` mn fn nid cases t=:(Task eval) ...@@ -656,8 +656,6 @@ tonicWrapApp` mn fn nid cases t=:(Task eval)
= ([childApp:acc], currActive, iworld) = ([childApp:acc], currActive, iworld)
_ = (acc, currActive, iworld) _ = (acc, currActive, iworld)
import StdDebug
getNode :: !ExprId !TExpr -> Maybe TExpr getNode :: !ExprId !TExpr -> Maybe TExpr
getNode eid expr=:(TVar eid` _ _) getNode eid expr=:(TVar eid` _ _)
| eid == eid` = Just expr | eid == eid` = Just expr
......
...@@ -73,9 +73,9 @@ mapSingle :: !(sds p [r] [w]) -> (SDSLens p r w) | gText{|*|} p & TC p & TC r & ...@@ -73,9 +73,9 @@ mapSingle :: !(sds p [r] [w]) -> (SDSLens p r w) | gText{|*|} p & TC p & TC r &
// The read type is a tuple of both types. // The read type is a tuple of both types.
// The write type can either be a tuple of both write types, only one of them or it is written to none of them (result is a read-only shared). // The write type can either be a tuple of both write types, only one of them or it is written to none of them (result is a read-only shared).
(>*<) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) (wx,wy) | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2 (>*<) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) (wx,wy) | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2
(>*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSLens p (rx,ry) wx | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2 (>*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) wx | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & Readable sds2
(|*<) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSLens p (rx,ry) wy | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2 (|*<) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) wy | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & Readable sds1 & RWShared sds2
(|*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSLens p (rx,ry) () | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2 (|*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) () | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & Readable sds1 & Readable sds2
/** /**
* Puts a symmetric lens between two symmetric shared data sources. * Puts a symmetric lens between two symmetric shared data sources.
......
...@@ -100,14 +100,35 @@ where ...@@ -100,14 +100,35 @@ where
write1 _ w = Ok (Just (fst w)) write1 _ w = Ok (Just (fst w))
write2 _ w = Ok (Just (snd w)) write2 _ w = Ok (Just (snd w))
(>*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSLens p (rx,ry) wx | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2 (>*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) wx | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & Readable sds2
(>*|) l r = mapWrite (\wx _ -> Just (wx, ())) (Just \p (wx,_) . Ok wx) (l >*< toReadOnly r) (>*|) l r = SDSParallelWriteLeft l r opts
where
(|*<) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSLens p (rx,ry) wy | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2 opts = {SDSParallelOptions| name = ">*|"
(|*<) l r = mapWrite (\wy _ -> Just ((), wy)) (Just \p (_, wy). Ok wy) (toReadOnly l >*< r) , param = \p. (p,p)
, read = id
(|*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSLens p (rx,ry) () | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & RWShared sds1 & RWShared sds2 , writel = SDSWriteConst (\_ w. Ok (Just w))
(|*|) l r = toReadOnly (l >*< r) , writer = SDSWriteConst (\_ _. Ok Nothing)
}
(|*<) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) wy | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & Readable sds1 & RWShared sds2
(|*<) l r = SDSParallelWriteRight l r opts
where
opts = {SDSParallelOptions| name = "|*<"
, param = \p. (p,p)
, read = id
, writel = SDSWriteConst (\_ _. Ok Nothing)
, writer = SDSWriteConst (\_ w. Ok (Just w))
}
(|*|) infixl 6 :: !(sds1 p rx wx) !(sds2 p ry wy) -> SDSParallel p (rx,ry) () | gText{|*|} p & TC p & TC rx & TC ry & TC wx & TC wy & Readable sds1 & Readable sds2
(|*|) l r = SDSParallelWriteNone l r opts
where
opts = {SDSParallelOptions| name = "|*|"
, param = \p. (p, p)
, read = id
, writel = SDSWriteConst (\_ _. Ok Nothing)
, writer = SDSWriteConst (\_ _. Ok Nothing)
}
symmetricLens :: !(a b -> b) !(b a -> a) !(sds1 p a a) !(sds2 p b b) -> (!SDSLens p a a, !SDSLens p b b) | gText{|*|} p & TC p & TC a & TC b & RWShared sds1 & RWShared sds2 symmetricLens :: !(a b -> b) !(b a -> a) !(sds1 p a a) !(sds2 p b b) -> (!SDSLens p a a, !SDSLens p b b) | gText{|*|} p & TC p & TC a & TC b & RWShared sds1 & RWShared sds2
symmetricLens putr putl sharedA sharedB = (newSharedA,newSharedB) symmetricLens putr putl sharedA sharedB = (newSharedA,newSharedB)
...@@ -160,7 +181,6 @@ where ...@@ -160,7 +181,6 @@ where
reducer _ [(_,attr)] = Ok attr reducer _ [(_,attr)] = Ok attr
import StdDebug, StdMisc
taskListItemValue :: !(SharedTaskList a) -> SDSLens (Either Int TaskId) (TaskValue a) () | TC a taskListItemValue :: !(SharedTaskList a) -> SDSLens (Either Int TaskId) (TaskValue a) () | TC a
taskListItemValue tasklist = mapReadError read (toReadOnly (sdsTranslate "taskListItemValue" listFilter tasklist)) taskListItemValue tasklist = mapReadError read (toReadOnly (sdsTranslate "taskListItemValue" listFilter tasklist))
where where
...@@ -168,7 +188,7 @@ where ...@@ -168,7 +188,7 @@ where
listFilter (Right taskId) = {onlyIndex=Nothing,onlyTaskId=Just [taskId],onlySelf=False,includeValue=True,includeAttributes=False,includeProgress=False} listFilter (Right taskId) = {onlyIndex=Nothing,onlyTaskId=Just [taskId],onlySelf=False,includeValue=True,includeAttributes=False,includeProgress=False}
read (_,items) = case [value \\ {TaskListItem|value} <- items] of read (_,items) = case [value \\ {TaskListItem|value} <- items] of
vs=:[v:_] = trace_n ("taskListItemValues: " +++ toString (length vs)) (Ok v) vs=:[v:_] = (Ok v)
_ = Error (exception "taskListItemValue: item not found") _ = Error (exception "taskListItemValue: item not found")
taskListItemProgress :: !(SharedTaskList a) -> SDSLens (Either Int TaskId) InstanceProgress () | TC a taskListItemProgress :: !(SharedTaskList a) -> SDSLens (Either Int TaskId) InstanceProgress () | TC a
......
...@@ -220,6 +220,10 @@ required type w. The reducer has the job to turn this ws into w. ...@@ -220,6 +220,10 @@ required type w. The reducer has the job to turn this ws into w.
//Read from and write to two independent SDS's //Read from and write to two independent SDS's
:: SDSParallel p r w = E. p1 r1 w1 p2 r2 w2 sds1 sds2: SDSParallel (sds1 p1 r1 w1) (sds2 p2 r2 w2) (SDSParallelOptions p1 r1 w1 p2 r2 w2 p r w) & RWShared sds1 & RWShared sds2 & gText{|*|} p1 & TC p1 & gText{|*|} p2 & TC p2 & TC r1 & TC r2 & TC w1 & TC w2 :: SDSParallel p r w = E. p1 r1 w1 p2 r2 w2 sds1 sds2: SDSParallel (sds1 p1 r1 w1) (sds2 p2 r2 w2) (SDSParallelOptions p1 r1 w1 p2 r2 w2 p r w) & RWShared sds1 & RWShared sds2 & gText{|*|} p1 & TC p1 & gText{|*|} p2 & TC p2 & TC r1 & TC r2 & TC w1 & TC w2
| E. p1 r1 p2 r2 w2 sds1 sds2: SDSParallelWriteLeft (sds1 p1 r1 w) (sds2 p2 r2 w2) (SDSParallelOptions p1 r1 w p2 r2 w2 p r w) & RWShared sds1 & Readable sds2 & gText{|*|} p1 & TC p1 & gText{|*|} p2 & TC p2 & TC r1 & TC r2 & TC w2 & TC w
| E. p1 r1 w1 p2 r2 sds1 sds2: SDSParallelWriteRight (sds1 p1 r1 w1) (sds2 p2 r2 w) (SDSParallelOptions p1 r1 w1 p2 r2 w p r w) & Readable sds1 & RWShared sds2 & gText{|*|} p1 & TC p1 & gText{|*|} p2 & TC p2 & TC r1 & TC r2 & TC w1 & TC w
| E. p1 r1 w1 p2 r2 w2 sds1 sds2: SDSParallelWriteNone (sds1 p1 r1 w1) (sds2 p2 r2 w2) (SDSParallelOptions p1 r1 w1 p2 r2 w2 p r w) & Readable sds1 & Readable sds2 & gText{|*|} p1 & TC p1 & gText{|*|} p2 & TC p2 & TC r1 & TC r2 & TC w1 & TC w2
:: SDSParallelOptions p1 r1 w1 p2 r2 w2 p r w = :: SDSParallelOptions p1 r1 w1 p2 r2 w2 p r w =
{ name :: String { name :: String
, param :: p -> (p1,p2) , param :: p -> (p1,p2)
......
...@@ -145,15 +145,14 @@ where ...@@ -145,15 +145,14 @@ where
res (Value [(_,Value (Left a) sa),(_,Value (Right b) sb)] _) = Value (a,b) (sa && sb) res (Value [(_,Value (Left a) sa),(_,Value (Right b) sb)] _) = Value (a,b) (sa && sb)
res _ = NoValue res _ = NoValue
import StdDebug
feedForward :: (Task a) ((SDSLens () (Maybe a) ()) -> Task b) -> Task b | iTask a & iTask b feedForward :: (Task a) ((SDSLens () (Maybe a) ()) -> Task b) -> Task b | iTask a & iTask b
feedForward taska taskbf = parallel feedForward taska taskbf = parallel
[(Embedded, \s -> taska @ Left) [(Embedded, \s -> taska @ Left)
,(Embedded, \s -> taskbf (mapRead prj (sdsFocus (Left 0) (taskListItemValue s))) @ Right) ,(Embedded, \s -> taskbf (mapRead prj (sdsFocus (Left 0) (taskListItemValue s))) @ Right)
] [] @? res ] [] @? res
where where
prj (Value (Left a) _) = trace_n "prj value" (Just a) prj (Value (Left a) _) = Just a
prj _ = trace_n "prj nothing" Nothing prj _ = Nothing
res (Value [_,(_,Value (Right b) s)] _) = Value b s res (Value [_,(_,Value (Right b) s)] _) = Value b s
res _ = NoValue res _ = NoValue
......
...@@ -13,8 +13,6 @@ import iTasks.Internal.TaskEval ...@@ -13,8 +13,6 @@ import iTasks.Internal.TaskEval
import iTasks.Internal.Util import iTasks.Internal.Util
from iTasks.Internal.SDS import write, read, readRegister from iTasks.Internal.SDS import write, read, readRegister
import StdBool, StdDebug
from Data.Func import mapSt from Data.Func import mapSt
import StdTuple, StdArray, StdList, StdString import StdTuple, StdArray, StdList, StdString
...@@ -28,7 +26,7 @@ withShared initial stask = Task eval ...@@ -28,7 +26,7 @@ withShared initial stask = Task eval
where where
eval event evalOpts (TCInit taskId ts) iworld eval event evalOpts (TCInit taskId ts) iworld
# (taskIda,iworld) = getNextTaskId iworld # (taskIda,iworld) = getNextTaskId iworld
# (e,iworld) = trace_n "Write initial value" (write (initial) (sdsFocus taskId localShare) EmptyContext iworld) # (e,iworld) = write (initial) (sdsFocus taskId localShare) EmptyContext iworld
| isError e | isError e
= (ExceptionResult (fromError e),iworld) = (ExceptionResult (fromError e),iworld)
| otherwise | otherwise
......
...@@ -120,15 +120,11 @@ where ...@@ -120,15 +120,11 @@ where
Just a = (ValueResult (Value a True) {lastEvent=ts,removedTasks=[],refreshSensitive=False} (rep event) s, iworld) Just a = (ValueResult (Value a True) {lastEvent=ts,removedTasks=[],refreshSensitive=False} (rep event) s, iworld)
Nothing = (ExceptionResult (exception "Corrupt task result"), iworld) Nothing = (ExceptionResult (exception "Corrupt task result"), iworld)
import StdMisc, StdDebug
derive gText Event, Set
watch :: !(sds () r w) -> Task r | iTask r & TC w & Readable, Registrable sds watch :: !(sds () r w) -> Task r | iTask r & TC w & Readable, Registrable sds
watch shared = Task (eval shared) watch shared = Task (eval shared)
where where
eval :: (sds () r w) Event TaskEvalOpts TaskTree *IWorld -> (TaskResult r, !*IWorld) | iTask r & TC w & Readable, Registrable sds eval :: (sds () r w) Event TaskEvalOpts TaskTree *IWorld -> (TaskResult r, !*IWorld) | iTask r & TC w & Readable, Registrable sds
eval shared event evalOpts (TCInit taskId ts) iworld=:{sdsEvalStates} eval shared event evalOpts (TCInit taskId ts) iworld=:{sdsEvalStates}
| not (trace_tn ("watch eval init bacause " +++ toSingleLineText event)) = undef
= case 'SDS'.readRegister taskId shared iworld of = case 'SDS'.readRegister taskId shared iworld of
(Error e, iworld) = (ExceptionResult e, iworld) (Error e, iworld) = (ExceptionResult e, iworld)
(Ok (ReadingDone val), iworld) = (ValueResult (Value val False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} (rep event) (TCInit taskId ts), iworld) (Ok (ReadingDone val), iworld) = (ValueResult (Value val False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} (rep event) (TCInit taskId ts), iworld)
...@@ -138,7 +134,6 @@ where ...@@ -138,7 +134,6 @@ where
= (ValueResult NoValue ei (rep event) (TCAwait Read taskId ts (TCInit taskId ts)), {iworld & sdsEvalStates = sdsEvalStates}) = (ValueResult NoValue ei (rep event) (TCAwait Read taskId ts (TCInit taskId ts)), {iworld & sdsEvalStates = sdsEvalStates})
eval shared event evalOpts (TCAwait Read taskId ts subtree) iworld=:{sdsEvalStates} eval shared event evalOpts (TCAwait Read taskId ts subtree) iworld=:{sdsEvalStates}
| not (trace_tn ("watch eval await bacause " +++ toSingleLineText event)) = undef
= case 'DM'.get taskId sdsEvalStates of = case 'DM'.get taskId sdsEvalStates of
Nothing = (ExceptionResult (exception ("No SDS state found for task " +++ toString taskId)), iworld) Nothing = (ExceptionResult (exception ("No SDS state found for task " +++ toString taskId)), iworld)
Just val = case val iworld of Just val = case val iworld of
...@@ -150,7 +145,6 @@ where ...@@ -150,7 +145,6 @@ where
= (ValueResult NoValue {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} NoChange (TCAwait Read taskId ts (TCInit taskId ts)), {iworld & sdsEvalStates = sdsEvalStates}) = (ValueResult NoValue {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} NoChange (TCAwait Read taskId ts (TCInit taskId ts)), {iworld & sdsEvalStates = sdsEvalStates})
eval shared event repAs ttree=:(TCDestroy _) iworld