Commit fc723b4f authored by Haye Böhm's avatar Haye Böhm

logging for bug

parent 3a076cf7
......@@ -79,8 +79,7 @@ where
= [(serverPort,httpServer serverPort keepaliveTime (engineWebService webTasks) taskOutput)]
engineTasks =
[BackgroundTask updateClock
,BackgroundTask (processEvents MAX_EVENTS)
[BackgroundTask (processEvents MAX_EVENTS)
:if (webTasks =: [])
[BackgroundTask stopOnStable]
[BackgroundTask removeOutdatedSessions
......
......@@ -35,7 +35,7 @@ instance toUserConstraint UserId
:: UserId :== String
:: Role :== String
:: UserTitle :== String // A descriptive name of a user (not used for identification)
instance toUserConstraint (a,b) | toUserConstraint a & toString b
//* User authentication
......@@ -52,7 +52,7 @@ instance toString Username, Password
instance == Username, Password
instance < Username, Password
derive JSONEncode User, UserConstraint, Username, Password
derive JSONEncode User, UserConstraint, Username, Password
derive JSONDecode User, UserConstraint, Username, Password
derive gDefault User, UserConstraint, Username, Password
derive gEq User, UserConstraint, Username, Password
......@@ -75,7 +75,7 @@ taskInstancesForUser :: SDSLens User [TaskInstance] ()
taskInstancesForCurrentUser :: SDSSequence () [TaskInstance] ()
/*
* Copies authentication attributes of current task
* Copies authentication attributes of current task
* and then attaches it
*/
workOn :: !t -> Task AttachmentStatus | toInstanceNo t
......
......@@ -36,7 +36,7 @@ where
lesser (Just x) (Just y) = x < y
lesser (Just _) Nothing = True
lesser Nothing Nothing = False
getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
getTimeoutFromClock now requests = getTimeoutFromClock` <$> 'DM'.toList requests
where
......@@ -66,7 +66,7 @@ removeOutdatedSessions :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
removeOutdatedSessions iworld=:{IWorld|options}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & onlySession=Just True} filteredInstanceIndex) EmptyContext iworld
= case mbIndex of
Ok (ReadResult index _) = checkAll removeIfOutdated index iworld
Ok (ReadResult index _) = checkAll removeIfOutdated index iworld
Error e = (Error e, iworld)
where
checkAll f [] iworld = (Ok (),iworld)
......@@ -88,7 +88,7 @@ where
= (Ok False,iworld)
(Error e,iworld)
= (Error e,iworld)
(Error e,iworld)
(Error e,iworld)
= (Error e,iworld)
= case remove of
(Ok True)
......@@ -96,7 +96,7 @@ where
| e=:(Error _) = (e,iworld)
= case write Nothing (sdsFocus instanceNo taskInstanceIO) EmptyContext iworld of
(Error e, iworld) = (Error e, iworld)
(Ok Done, iworld) = (Ok (), iworld)
(Ok WritingDone, iworld) = (Ok (), iworld)
(Ok False)
= (Ok (), iworld)
(Error e)
......@@ -114,7 +114,7 @@ flushWritesWhenIdle iworld = case read taskEvents EmptyContext iworld of
stopOnStable :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
stopOnStable iworld=:{IWorld|shutdown}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & includeProgress=True} filteredInstanceIndex) EmptyContext iworld
= case mbIndex of
= case mbIndex of
Ok (ReadResult index _)
# shutdown = case shutdown of
Nothing = if (allStable index) (Just (if (exceptionOccurred index) 1 0)) Nothing
......@@ -122,7 +122,7 @@ stopOnStable iworld=:{IWorld|shutdown}
= (Ok (), {IWorld|iworld & shutdown = shutdown})
Error e = (Error e, iworld)
where
allStable instances = all (\v -> v =: Stable || v =: (Exception _)) (values instances)
allStable instances = all (\v -> v =: Stable || v =: (Exception _)) (values instances)
exceptionOccurred instances = any (\v -> v =: (Exception _)) (values instances)
values instances = [value \\ (_,_,Just {InstanceProgress|value},_) <- instances]
......
......@@ -18,7 +18,7 @@ from iTasks.Internal.TaskEval import :: TaskTime
from iTasks.WF.Definition import :: TaskValue, :: Event, :: TaskId, :: InstanceNo, :: TaskNo, :: TaskException
from iTasks.WF.Combinators.Core import :: ParallelTaskType, :: TaskListItem
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: DeferredWrite, :: SDSIdentity
from iTasks.SDS.Definition import :: SDSSource, :: SDSLens, :: SDSParallel
from iTasks.SDS.Definition import :: SDSSource, :: SDSLens, :: SDSParallel, class RWShared, class Registrable, class Modifiable, class Identifiable, class Readable, class Writeable
from iTasks.Extensions.DateTime import :: Time, :: Date, :: DateTime
from Sapl.Linker.LazyLinker import :: LoaderState
......
......@@ -100,14 +100,17 @@ read :: !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException
//Read an SDS and register a taskId to be notified when it is written
readRegister :: !TaskId !(sds () r w) !*IWorld -> (!MaybeError TaskException (ReadResult () r w), !*IWorld) | TC r & TC w & Readable, Registrable sds
:: AsyncWrite p r w = Done
:: AsyncWrite p r w = WritingDone
| E. sds: Writing (sds p r w) & Writeable sds & TC p & TC r & TC w
//Write an SDS (and queue evaluation of those task instances which contained tasks that registered for notification)
write :: !w !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException (AsyncWrite () r w), !*IWorld) | TC r & TC w & Writeable sds
:: AsyncModify r w = ModifyingDone w & TC w
| E. sds: Modifying (sds () r w) (r -> MaybeError TaskException w) & Modifiable sds & TC r & TC w
//Read followed by write. The 'a' typed value is a result that is returned
modify :: !(r -> w) !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException (ModifyResult () r w), !*IWorld) | TC r & TC w & Modifiable sds
modify :: !(r -> w) !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException (AsyncModify r w), !*IWorld) | TC r & TC w & Modifiable sds
//Clear all registrations for the given tasks.
//This is normally called by the queueRefresh functions, because once a task is queued
......@@ -118,7 +121,8 @@ queueNotifyEvents :: !String !(Set SDSNotifyRequest) !*IWorld -> *IWorld
//List all current registrations (for debugging purposes)
listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatSDSRegistrationsList :: [SDSNotifyRequest] -> String
//Flush all deffered/cached writes of
flushDeferredSDSWrites :: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
......
......@@ -79,6 +79,7 @@ mbRegister p sds (Just taskId) context reqSDSId iworld=:{IWorld|sdsNotifyRequest
# (ts, world) = nsTime world
# req = buildRequest context taskId reqSDSId p
# sdsId = sdsIdentity sds
| not (trace_tn ("Registering " +++ toString taskId +++ " for changes in " +++ sdsId)) = undef
= { iworld
& world = world
, sdsNotifyRequests = 'DM'.alter (Just o maybe ('DM'.singleton req ts) ('DM'.put req ts))
......@@ -103,10 +104,14 @@ where
, cmpParam=dynamic p
, cmpParamText=toSingleLineText p
, remoteOptions = mbRemoteOptions}
import StdDebug
write :: !w !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException !(AsyncWrite () r w), !*IWorld) | TC r & TC w & Writeable sds
write w sds c iworld = case writeSDS sds () c w iworld of
(Ok (WriteResult notify _), iworld) = (Ok Done, queueNotifyEvents (sdsIdentity sds) notify iworld)
write w sds c iworld
| not (trace_tn ("Writing to " +++ sdsIdentity sds)) = undef
= case writeSDS sds () c w iworld of
(Ok (WriteResult notify _), iworld)
| not (trace_tn ("Notifying " +++ toSingleLineText ('Set'.toList notify) +++ " of changes")) = undef
= (Ok WritingDone, queueNotifyEvents (sdsIdentity sds) notify iworld)
(Ok (AsyncWrite sds), iworld) = (Ok (Writing sds), iworld)
(Error e,iworld) = (Error e,iworld)
......@@ -139,11 +144,17 @@ where
//In case of a type mismatch, just ignore (should not happen)
_ = abort "Not matching!"
modify :: !(r -> w) !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException (ModifyResult () r w), !*IWorld) | TC r & TC w & Modifiable sds
modify :: !(r -> w) !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException (AsyncModify r w), !*IWorld) | TC r & TC w & Modifiable sds
modify f sds context iworld
= modifySDS sf sds () context iworld
where
sf = \r. Ok (f r)
| not (trace_tn ("Modify ")) = undef
# (registrations, iworld) = listAllSDSRegistrations iworld
| not (trace_tn ("Notify requests in IWorld: \n" +++ formatRegistrations registrations )) = undef
= case modifySDS (\r. Ok (f r)) sds () context iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncModify sds f), iworld) = (Ok (Modifying sds f), iworld)
(Ok (ModifyResult notify r w _), iworld)
# iworld = queueNotifyEvents (sdsIdentity sds) notify iworld
= (Ok (ModifyingDone w), iworld)
queueNotifyEvents :: !String !(Set SDSNotifyRequest) !*IWorld -> !*IWorld
queueNotifyEvents sdsId notify iworld
......@@ -181,12 +192,17 @@ where
addReg list {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _)} _
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatSDSRegistrationsList list
= 'Text'.join "\n" ( flatten [ [ "Task instance " +++ toString i +++ ":"
:["\t"+++toString taskId +++ "->"+++sdsId\\(taskId,sdsId) <- regs]] \\ (i,regs) <- list
]
)
formatSDSRegistrationsList :: [SDSNotifyRequest] -> String
formatSDSRegistrationsList list = 'Text'.join "\n" lines
where
lines = [ "Task id " +++ toString reqTaskId +++ ": " +++ reqSDSId \\ {reqTaskId, reqSDSId} <- list]
formatRegistrations :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatRegistrations list = 'Text'.join "\n" lines
where
lines = [toString instanceNo +++ " -> " +++
('Text'.join "\n\t" [toString tId +++ ":" +++ sdsId \\ (tId, sdsId) <- requests])
\\ (instanceNo, requests) <- list]
flushDeferredSDSWrites :: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
flushDeferredSDSWrites iworld=:{writeCache}
......@@ -244,22 +260,23 @@ where
instance Modifiable SDSSource where
modifySDS f sds=:(SDSSource {SDSSourceOptions|name}) p context iworld
| not (trace_tn ("Modify SDSSource: " +++ sdsIdentity sds)) = undef
= case readSDS sds p context Nothing (sdsIdentity sds) iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (ReadResult r ssds), iworld) = case f r of
Error e = (Error e, iworld)
Ok w = case writeSDS ssds p context w iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (WriteResult n ssds), iworld) = (Ok (ModifyResult r w ssds), queueNotifyEvents (sdsIdentity sds) n iworld)
(Ok (WriteResult n ssds), iworld) = (Ok (ModifyResult n r w ssds), iworld)
modifySDS f (SDSValue False v sds) p c iworld = case modifySDS f sds p c iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (ModifyResult r w ssds), iworld) = (Ok (ModifyResult r w (SDSValue False v ssds)), iworld)
(Ok (ModifyResult notify r w ssds), iworld) = (Ok (ModifyResult notify r w (SDSValue False v ssds)), iworld)
(Ok (AsyncModify ssds f), iworld) = (Ok (AsyncModify (SDSValue True v ssds) f), iworld)
modifySDS f (SDSValue True r sds) p c iworld = case f r of
Error e = (Error (exception ""), iworld)
Ok w = (Ok (ModifyResult r w (SDSValue True r sds)), iworld)
Error e = (Error e, iworld)
Ok w = (Ok (ModifyResult 'Set'.newSet r w (SDSValue True r sds)), iworld)
instance Registrable SDSSource
where
......@@ -329,6 +346,7 @@ where
instance Modifiable SDSLens where
modifySDS f sds=:(SDSLens sds1 opts=:{SDSLensOptions|param, read, write, reducer, notify, name}) p context iworld
| not (trace_tn ("Modify SDSLens: " +++ sdsIdentity sds)) = undef
= case reducer of
Nothing = case readSDS sds p context Nothing (sdsIdentity sds) iworld of
(Error e, iworld) = (Error e, iworld)
......@@ -338,21 +356,30 @@ instance Modifiable SDSLens where
Ok w = case writeSDS ssds p context w iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncWrite sds), iworld) = (Ok (AsyncModify sds f), iworld)
(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult r w ssds), queueNotifyEvents (sdsIdentity sds) notify iworld)
(Ok (WriteResult notify ssds), iworld)
| not (trace_tn ("Notify no reducer: " +++ formatSDSRegistrationsList ('Set'.toList notify)))
= (Ok (ModifyResult notify r w ssds), iworld)
Just reducer = case modifySDS sf sds1 (param p) context iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncModify sds _), iworld) = (Ok (AsyncModify (SDSLens sds opts) f), iworld)
(Ok (ModifyResult rs ws ssds), iworld) = case reducer p ws of
(Ok (ModifyResult toNotify rs ws ssds), iworld) = case reducer p ws of
Error e = (Error e, iworld)
Ok w
# notf = case notify of
SDSNotify f = f p rs w
SDSNotifyConst f = f p w
# (m, nm, iworld) = checkRegistrations (sdsIdentity sds) notf iworld
= case doRead read p rs of
Error e = (Error e, iworld)
Ok r = (Ok (ModifyResult r w (SDSLens ssds opts)), queueNotifyEvents (sdsIdentity sds) m iworld)
Ok r
# (match, nomatch, iworld) = checkRegistrations (sdsIdentity sds) notf iworld
# notify = 'Set'.difference toNotify ('Set'.difference nomatch match)
| not (trace_tn ("Match: " +++ formatSDSRegistrationsList ('Set'.toList match))) = undef
| not (trace_tn ("No match: " +++ formatSDSRegistrationsList ('Set'.toList nomatch))) = undef
| not (trace_tn ("Not notifying: " +++ formatSDSRegistrationsList ('Set'.toList ('Set'.difference nomatch match)))) = undef
| not (trace_tn ("toNotify: " +++ formatSDSRegistrationsList ('Set'.toList toNotify))) = undef
| not (trace_tn ("Notify: " +++ formatSDSRegistrationsList ('Set'.toList notify))) = undef
= (Ok (ModifyResult notify r w (SDSLens ssds opts)), iworld)
where
sf rs
# readV = doRead read p rs
......@@ -421,6 +448,7 @@ instance Writeable SDSCache where
instance Modifiable SDSCache where
modifySDS f sds=:(SDSCache _ opts) p context iworld
| not (trace_tn ("Modify SDSCache: " +++ sdsIdentity sds)) = undef
= case readSDS sds p context Nothing (sdsIdentity sds) iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncRead sds), iworld) = (Ok (AsyncModify sds f), iworld)
......@@ -428,7 +456,7 @@ instance Modifiable SDSCache where
(Error e) = (Error e, iworld)
(Ok w) = case writeSDS ssds p context w iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult r w sds), queueNotifyEvents (sdsIdentity sds) notify iworld)
(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult notify r w sds), iworld)
instance Registrable SDSCache where
readRegisterSDS sds p c taskId iworld = readSDS sds p c (Just taskId) (sdsIdentity sds) iworld
......@@ -460,32 +488,33 @@ instance Writeable SDSSequence where
# (npreds1,iworld) = case writel of
(SDSWrite f) = case f p r1 w of
Error e = (Error e, iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w1) = writeSDS ssds (paraml p) c w1 iworld
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w1) = writeSDS ssds (paraml p) c w1 iworld
(SDSWriteConst f) = case f p w of
Error e = (Error e, iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w1) = writeSDS ssds (paraml p) c w1 iworld
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w1) = writeSDS ssds (paraml p) c w1 iworld
| npreds1 =:(Error _) = (liftError npreds1, iworld)
//Read/write sds2 if necessary
# (npreds2,iworld) = case writer of
(SDSWrite f) = case readSDS sds2 (paramr p r1) c Nothing (sdsIdentity sds2) iworld of //Also read sds2
(Error e, iworld) = (Error e, iworld)
(Ok (ReadResult r2 ssds),iworld) = case f p r2 w of
Error e = (Error e, iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w2) = writeSDS sds2 (paramr p r1) c w2 iworld
Error e = (Error e, iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet ssds), iworld)
Ok (Just w2) = writeSDS sds2 (paramr p r1) c w2 iworld
(SDSWriteConst f) = case f p w of
Error e = (Error e, iworld)
Ok Nothing = (Ok (WriteResult 'Set'.newSet sds2), iworld)
Ok (Just w2) = writeSDS sds2 (paramr p r1) c w2 iworld
Ok Nothing = (Ok (WriteResult 'Set'.newSet sds2), iworld)
Ok (Just w2) = writeSDS sds2 (paramr p r1) c w2 iworld
| npreds2 =:(Error _) = (liftError npreds2, iworld)
= case (npreds1, npreds2) of
(Ok (WriteResult notify1 ssds1), Ok (WriteResult notify2 ssds2)) = (Ok (WriteResult ('Set'.union notify1 notify2) (SDSSequence ssds1 ssds2 opts)), iworld)
(Ok (WriteResult notify1 ssds1), Ok (AsyncWrite sds2)) = (Ok (AsyncWrite (SDSSequence ssds sds2 opts)), queueNotifyEvents (sdsIdentity sds1) notify1 iworld)
instance Modifiable SDSSequence where
modifySDS f sds=:(SDSSequence _ _ opts=:{SDSSequenceOptions|name}) p context iworld
modifySDS f sds p context iworld
| not (trace_tn ("Modify SDSSequence: " +++ sdsIdentity sds)) = undef
= case readSDS sds p context Nothing (sdsIdentity sds) iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncRead sds), iworld) = (Error (exception "SDSSequence cannot be modified asynchronously in the left SDS."), iworld)
......@@ -494,7 +523,7 @@ instance Modifiable SDSSequence where
Ok w = case writeSDS sds p context w iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncWrite _), iworld) = (Error (exception "SDSSequence cannot be modified asynchronously"), iworld)
(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult r w sds), queueNotifyEvents (sdsIdentity sds) notify iworld)
(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult notify r w sds), iworld)
instance Registrable SDSSequence where
readRegisterSDS sds p c taskId iworld = readSDS sds p c (Just taskId) (sdsIdentity sds) iworld
......@@ -566,15 +595,18 @@ instance Writeable SDSSelect where
instance Modifiable SDSSelect where
modifySDS f sds=:(SDSSelect sds1 sds2 opts=:{select}) p context iworld
| not (trace_tn ("Modify SDSSelect: " +++ sdsIdentity sds)) = undef
= case select p of
(Left p1) = case modifySDS f sds1 p1 context iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncModify sds f), iworld) = (Ok (AsyncModify (SDSSelect sds sds2 opts) f), iworld)
(Ok (ModifyResult r w ssds), iworld) = (Ok (ModifyResult r w (SDSSelect ssds sds2 opts)), iworld)
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncModify sds f), iworld) = (Ok (AsyncModify (SDSSelect sds sds2 opts) f), iworld)
// TODO: Use applicable notify function.
(Ok (ModifyResult notify r w ssds), iworld) = (Ok (ModifyResult 'Set'.newSet r w (SDSSelect ssds sds2 opts)), iworld)
(Right p2) = case modifySDS f sds2 p2 context iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncModify sds f), iworld) = (Ok (AsyncModify (SDSSelect sds1 sds opts) f), iworld)
(Ok (ModifyResult r w ssds), iworld) = (Ok (ModifyResult r w (SDSSelect sds1 ssds opts)), iworld)
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncModify sds f), iworld) = (Ok (AsyncModify (SDSSelect sds1 sds opts) f), iworld)
// TODO: Use applicable notify function.
(Ok (ModifyResult notify r w ssds), iworld) = (Ok (ModifyResult 'Set'.newSet r w (SDSSelect sds1 ssds opts)), iworld)
instance Registrable SDSSelect where
readRegisterSDS sds p c taskId iworld = readSDS sds p c (Just taskId) (sdsIdentity sds) iworld
......@@ -637,7 +669,8 @@ instance Writeable SDSParallel where
(Ok (AsyncWrite sds1), Ok (AsyncWrite sds2)) = (Ok (AsyncWrite (SDSParallel sds1 sds2 opts)), iworld)
instance Modifiable SDSParallel where
modifySDS f sds=:(SDSParallel sds1 sds2 opts=:{SDSParallelOptions|name}) p context iworld
modifySDS f sds p context iworld
| not (trace_tn ("Modify SDSParallel: " +++ sdsIdentity sds)) = undef
= case readSDS sds p context Nothing (sdsIdentity sds) iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncRead sds), iworld) = (Ok (AsyncModify sds f), iworld)
......@@ -646,7 +679,7 @@ instance Modifiable SDSParallel where
Ok w = case writeSDS ssds p context w iworld of
(Error e, iworld) = (Error e, iworld)
(Ok (AsyncWrite sds), iworld) = (Ok (AsyncModify sds f), iworld)
(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult r w ssds), queueNotifyEvents (sdsIdentity sds) notify iworld)
(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult notify r w ssds), iworld)
instance Registrable SDSParallel where
readRegisterSDS sds p c taskId iworld = readSDS sds p c (Just taskId) (sdsIdentity sds) iworld
......@@ -717,7 +750,7 @@ instance Modifiable SDSRemoteSource where
# errorString = "SDSRemoteSourceQueued modify get value<br>Remote to " +++ optionsS opts +++ ": " +++ error
= (Error (exception errorString), iworld)
Ok Nothing = (Ok (AsyncModify sds f), iworld)
Ok (Just (r, w)) = (Ok (ModifyResult r w (SDSRemoteSource subsds opts)), iworld)
Ok (Just (r, w)) = (Ok (ModifyResult 'Set'.newSet r w (SDSRemoteSource subsds opts)), iworld)
instance Registrable SDSRemoteSource where
readRegisterSDS sds p context taskId iworld
......@@ -754,7 +787,7 @@ instance Registrable SDSRemoteService where
readRegisterSDS _ _ _ _ iworld = (Error (exception "registering remote services not possible"), iworld)
instance == SDSNotifyRequest where
(==) r1 r2 = r1.reqTaskId == r2.reqTaskId && r1.remoteOptions == r2.remoteOptions
(==) r1 r2 = (r1.reqTaskId,r1.reqSDSId, r1.cmpParamText) == (r2.reqTaskId, r2.reqSDSId, r2.cmpParamText) && r1.remoteOptions == r2.remoteOptions
instance == RemoteNotifyOptions where
(==) left right = (left.hostToNotify, left.portToNotify, left.remoteSdsId) == (right.hostToNotify, right.portToNotify, right.remoteSdsId)
......@@ -130,7 +130,7 @@ where
(Ok (AsyncWrite sds), iworld) = (Ok (Right (serializeToBase64 (SDSWriteRequest sds p val))), iworld)
(SDSModifyRequest sds p f) = case modifySDS f sds p (TaskContext taskId) iworld of
(Error (_, e), iworld) = (Error e, iworld)
(Ok (ModifyResult r w _), iworld) = (Ok (Left (serializeToBase64 (Ok (r,w)))), iworld)
(Ok (ModifyResult notify r w _), iworld) = (Ok (Left (serializeToBase64 (Ok (r,w)))), iworld)
(Ok (AsyncModify sds f), iworld) = (Ok (Right (serializeToBase64 (SDSModifyRequest sds p f))), iworld)
(SDSRefreshRequest refreshTaskId sdsId)
// If we receive a request to refresh the sds service task, we find all remote
......
......@@ -67,9 +67,11 @@ processEvents max iworld
(Error msg,iworld=:{IWorld|world})
= (Ok (),{IWorld|iworld & world = world})
import StdDebug,StdMisc
//Evaluate a single task instance
evalTaskInstance :: !InstanceNo !Event !*IWorld -> (!MaybeErrorString (TaskValue DeferredJSON),!*IWorld)
evalTaskInstance instanceNo event iworld
| not (trace_tn ("Evaluate " +++ toSingleLineText instanceNo)) = undef
# iworld = mbResetUIState instanceNo event iworld
# (res,iworld) = evalTaskInstance` instanceNo event iworld
= (res,iworld)
......@@ -84,7 +86,7 @@ where
# (oldProgress,iworld) = read (sdsFocus instanceNo taskInstanceProgress) EmptyContext iworld
| isError oldProgress = exitWithException instanceNo ((\(Error (e,msg)) -> msg) oldProgress) iworld
# oldProgress=:{InstanceProgress|value,attachedTo} = directResult (fromOk oldProgress)
//Check exeption
//Check exception
| value =: (Exception _)
# (Exception description) = value
= exitWithException instanceNo description iworld
......@@ -114,7 +116,7 @@ where
// Write the updated progress
# (mbErr,iworld) = if (updateProgress clock newResult oldProgress === oldProgress)
(Ok (),iworld) //Only update progress when something changed
(case (modify (updateProgress clock newResult) (sdsFocus instanceNo taskInstanceProgress) EmptyContext iworld) of
(case trace_n "Modify progress" (modify (updateProgress clock newResult) (sdsFocus instanceNo taskInstanceProgress) EmptyContext iworld) of
(Error e, iworld) = (Error e, iworld)
(Ok _, iworld) = (Ok (), iworld) )
= case mbErr of
......@@ -122,14 +124,14 @@ where
Ok _
//Store updated reduct
# (nextTaskNo,iworld) = getNextTaskNo iworld
# (_,iworld) = modify (\r -> {TIReduct|r & tree = tree, nextTaskNo = nextTaskNo, nextTaskTime = nextTaskTime + 1})
(sdsFocus instanceNo taskInstanceReduct) EmptyContext iworld
# (_,iworld) = trace_n "Modify reduct" (modify (\r -> {TIReduct|r & tree = tree, nextTaskNo = nextTaskNo, nextTaskTime = nextTaskTime + 1})
(sdsFocus instanceNo taskInstanceReduct) EmptyContext iworld)
//FIXME: Don't write the full reduct (all parallel shares are triggered then!)
//Store update value
# newValue = case newResult of
(ValueResult val _ _ _) = TIValue val
(ExceptionResult (e,str)) = TIException e str
# (mbErr,iworld) = if deleted (Ok Done,iworld) (write newValue (sdsFocus instanceNo taskInstanceValue) EmptyContext iworld)
# (mbErr,iworld) = if deleted (Ok WritingDone,iworld) (trace_n "write new value" (write newValue (sdsFocus instanceNo taskInstanceValue) EmptyContext iworld))
= case mbErr of
Error (e,description) = exitWithException instanceNo description iworld
Ok _
......@@ -178,8 +180,8 @@ where
updateInstanceLastIO ::![InstanceNo] !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
updateInstanceLastIO [] iworld = (Ok (),iworld)
updateInstanceLastIO [instanceNo:instanceNos] iworld=:{IWorld|clock}
= case modify (\io -> fmap (appSnd (const clock)) io) (sdsFocus instanceNo taskInstanceIO) EmptyContext iworld of
(Ok (ModifyResult _ _ _),iworld) = updateInstanceLastIO instanceNos iworld
= case trace_n "modify updateInstanceLastIO" (modify (\io -> fmap (appSnd (const clock)) io) (sdsFocus instanceNo taskInstanceIO) EmptyContext iworld) of
(Ok (ModifyingDone _),iworld) = updateInstanceLastIO instanceNos iworld
(Error e,iworld) = (Error e,iworld)
updateInstanceConnect :: !String ![InstanceNo] !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
......@@ -192,8 +194,8 @@ updateInstanceConnect client [instanceNo:instanceNos] iworld=:{IWorld|clock}
updateInstanceDisconnect :: ![InstanceNo] !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
updateInstanceDisconnect [] iworld = (Ok (),iworld)
updateInstanceDisconnect [instanceNo:instanceNos] iworld=:{IWorld|clock}
= case modify (\io -> fmap (appSnd (const clock)) io) (sdsFocus instanceNo taskInstanceIO) EmptyContext iworld of
(Ok (ModifyResult _ _ _),iworld) = updateInstanceDisconnect instanceNos iworld
= case trace_n "modify updateInstanceDisconnect" (modify (\io -> fmap (appSnd (const clock)) io) (sdsFocus instanceNo taskInstanceIO) EmptyContext iworld) of
(Ok (ModifyingDone _),iworld) = updateInstanceDisconnect instanceNos iworld
(Error e,iworld) = (Error e,iworld)
currentInstanceShare :: SDSSource () InstanceNo ()
......
......@@ -75,8 +75,10 @@ where
# opts = {ListenerInstanceOpts|taskId=TaskId 0 0, port=port, connectionTask=ct, removeOnClose = True}
= (ListenerInstance opts (fromJust mbListener),world)
import StdDebug
loop :: !(*IWorld -> (!Maybe Timeout,!*IWorld)) !*IWorld -> *IWorld
loop determineTimeout iworld=:{ioTasks}
loop determineTimeout iworld=:{ioTasks,sdsNotifyRequests}
//| not (trace_tn ("Number of registrations:" +++ toString (length (flatten (map 'DM'.keys ('DM'.elems sdsNotifyRequests)))))) = undef
// Also put all done tasks at the end of the todo list, as the previous event handling may have yielded new tasks.
# (mbTimeout,iworld=:{IWorld|ioTasks={todo},world}) = determineTimeout {iworld & ioTasks = {done=[], todo = ioTasks.todo ++ (reverse ioTasks.done)}}
//Check which mainloop tasks have data available
......@@ -438,7 +440,7 @@ writeShareIfNeeded :: !(sds () r w) !(Maybe w) !*IWorld -> (!MaybeError TaskExce
writeShareIfNeeded sds Nothing iworld = (Ok (), iworld)
writeShareIfNeeded sds (Just w) iworld = case 'SDS'.write w sds EmptyContext iworld of
(Error e, iworld) = (Error e, iworld)
(Ok Done, iworld) = (Ok (), iworld)
(Ok WritingDone, iworld) = (Ok (), iworld)
addListener :: !TaskId !Int !Bool !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addListener taskId port removeOnClose connectionTask iworld=:{ioTasks={todo,done}, ioStates, world}
......
......@@ -221,7 +221,7 @@ deleteTaskInstance instanceNo iworld=:{IWorld|options={EngineOptions|persistTask
| mbe =: (Error _) = (Error (exception (fromError mbe)),iworld)
= (Ok (),iworld)
where
toME (Ok ('SDS'.ModifyResult _ _ _)) = Ok ()
toME (Ok ('SDS'.ModifyingDone _)) = Ok ()
toMe (Error e) = (Error e)
......@@ -518,10 +518,13 @@ where
mergeReason x y = concat [x , "; " , y]
_ = Nothing
import StdDebug,StdMisc
queueRefresh :: ![(!TaskId, !String)] !*IWorld -> *IWorld
queueRefresh [] iworld = iworld
queueRefresh tasks iworld
//Clear the instance's share change registrations, we are going to evaluate anyway
# iworld = 'SDS'.clearTaskSDSRegistrations ('DS'.fromList (map fst tasks)) iworld
| not (trace_tn ("Queue refresh event for " +++ concat (map (toSingleLineText o fst) tasks))) = undef
# iworld = foldl (\w (t,r) -> queueEvent (toInstanceNo t) (RefreshEvent ('DS'.singleton t) r) w) iworld tasks
= iworld
......@@ -534,7 +537,7 @@ dequeueEvent iworld
# (val, queue) = 'DQ'.dequeue queue
= case 'SDS'.write queue taskEvents 'SDS'.EmptyContext iworld of
(Error e, iworld) = (Nothing, iworld)
(Ok Done, iworld) = (val, iworld)
(Ok WritingDone