Verified Commit 7b14aca0 authored by Camil Staps's avatar Camil Staps 🚀

Use a dedicated QueuedEvent record to avoid tuples in the queueEventShare

parent 065fe603
Pipeline #47634 failed with stages
in 9 minutes and 8 seconds
......@@ -42,7 +42,7 @@ processEvents max iworld
= case dequeueEvent iworld of
(Error e, iworld) = (Error e, iworld)
(Ok ?None, iworld) = (Ok (), iworld)
(Ok (?Just (instanceNo,event)), iworld)
(Ok (?Just {instanceNo,event}), iworld)
= case evalTaskInstance instanceNo event iworld of
(Ok taskValue,iworld)
= processEvents (max - 1) iworld
......
......@@ -15,8 +15,13 @@ from Data.Queue import :: Queue
from Data.Set import :: Set
from Text.GenJSON import generic JSONEncode, generic JSONDecode, :: JSONNode
//When events are placed in this queue, the engine will re-evaluate the corresponding task instances.
:: TaskInput :== Queue (InstanceNo,Event)
//* When events are placed in this queue, the engine will re-evaluate the corresponding task instances.
:: TaskInput :== Queue QueuedEvent
:: QueuedEvent =
{ instanceNo :: !InstanceNo
, event :: !Event
}
taskEvents :: SimpleSDSLens TaskInput
......@@ -35,15 +40,14 @@ taskEvents :: SimpleSDSLens TaskInput
taskOutput :: SimpleSDSLens (Map InstanceNo TaskOutput)
taskInstanceOutput :: SDSLens InstanceNo TaskOutput TaskOutput
/**
* Writing in this share queues an event for a task instance
* events are applied in FIFO order when the task instance is evaluated
*
* By splitting up event queuing and instance evaluation, events can come in asynchronously without
* the need to directly processing them.
*/
queueEventShare :: SDSLens () () (InstanceNo, Event)
* Writing in this share queues an event for a task instance.
* Events are applied in FIFO order when the task instance is evaluated.
*
* By splitting up event queuing and instance evaluation, events can come in
* asynchronously without the need to directly processing them.
*/
queueEventShare :: SDSLens () () QueuedEvent
//* Queue an event for a task instance by writing in {{queueEventShare}}
queueEvent :: !InstanceNo !Event !*IWorld -> *IWorld
......@@ -54,10 +58,8 @@ queueRefresh :: !TaskId !*IWorld -> *IWorld
//* Convenience function for queueing multiple refresh multiple refresh events at once.
queueRefreshes :: !(Set TaskId) !*IWorld -> *IWorld
/**
* Dequeue a task event
*/
dequeueEvent :: !*IWorld -> (!MaybeError TaskException (?(InstanceNo,Event)),!*IWorld)
//* Dequeue a task event.
dequeueEvent :: !*IWorld -> (!MaybeError TaskException (?QueuedEvent),!*IWorld)
/**
* Remove all events for a given instance
......
......@@ -28,8 +28,8 @@ import iTasks.SDS.Combinators.Core, iTasks.SDS.Combinators.Common
import iTasks.SDS.Sources.Store
import iTasks.WF.Derives
derive JSONEncode TaskOutputMessage, Queue, Event
derive JSONDecode TaskOutputMessage, Queue, Event
derive JSONEncode TaskOutputMessage, QueuedEvent, Queue, Event
derive JSONDecode TaskOutputMessage, QueuedEvent, Queue, Event
rawInstanceEvents = storeShare NS_TASK_INSTANCES False InMemory (?Just 'DQ'.newQueue)
rawInstanceOutput = storeShare NS_TASK_INSTANCES False InMemory (?Just 'DM'.newMap)
......@@ -50,26 +50,26 @@ where
reducer p ws = Ok (fromMaybe 'DQ'.newQueue ('DM'.get p ws))
queueEvent :: !InstanceNo !Event !*IWorld -> *IWorld
queueEvent ino event iworld = snd (write (ino, event) queueEventShare EmptyContext iworld)
queueEvent ino event iworld = snd (write {instanceNo=ino, event=event} queueEventShare EmptyContext iworld)
queueEventShare :: SDSLens () () (InstanceNo, Event)
queueEventShare = mapReadWrite (const (), writer) ?None taskEvents
queueEventShare :: SDSLens () () QueuedEvent
queueEventShare =: mapReadWrite (const (), writer) ?None taskEvents
where
writer :: (InstanceNo, Event) TaskInput -> ?TaskInput
writer (instanceNo, event) q = ?Just (fromMaybe ('DQ'.enqueue (instanceNo,event) q) (queueWithMergedRefreshEvent q))
writer :: !QueuedEvent !TaskInput -> ?TaskInput
writer qe=:{instanceNo,event} q = ?Just (fromMaybe ('DQ'.enqueue qe q) (queueWithMergedRefreshEvent q))
where
// merge multiple refresh events for same instance
queueWithMergedRefreshEvent :: !(Queue (!InstanceNo, !Event)) -> ?(Queue (!InstanceNo, !Event))
queueWithMergedRefreshEvent :: !(Queue QueuedEvent) -> ?(Queue QueuedEvent)
queueWithMergedRefreshEvent ('DQ'.Queue front back) = case event of
RefreshEvent refreshTasks =
((\front` -> ('DQ'.Queue front` back)) <$> queueWithMergedRefreshEventList front) <|>
((\back` -> ('DQ'.Queue front back`)) <$> queueWithMergedRefreshEventList back)
where
queueWithMergedRefreshEventList :: [(InstanceNo, Event)] -> ?[(InstanceNo, Event)]
queueWithMergedRefreshEventList :: [QueuedEvent] -> ?[QueuedEvent]
queueWithMergedRefreshEventList [] = ?None
queueWithMergedRefreshEventList [hd=:(instanceNo`, event`) : tl] = case event` of
RefreshEvent refreshTasks` | instanceNo` == instanceNo =
?Just [(instanceNo, RefreshEvent ('DS'.union refreshTasks refreshTasks`)) : tl]
queueWithMergedRefreshEventList [hd=:{instanceNo=ino`, event=ev`}:tl] = case ev` of
RefreshEvent refreshTasks` | ino` == instanceNo =
?Just [{instanceNo=instanceNo, event=RefreshEvent ('DS'.union refreshTasks refreshTasks`)}:tl]
_ =
(\tl` -> [hd : tl`]) <$> queueWithMergedRefreshEventList tl
_ = ?None
......@@ -84,9 +84,8 @@ queueRefreshes tasks iworld
# iworld = 'Foldable'.foldl (\w t -> queueEvent (toInstanceNo t) (RefreshEvent ('DS'.singleton t)) w) iworld tasks
= iworld
dequeueEvent :: !*IWorld -> (!MaybeError TaskException (?(InstanceNo,Event)),!*IWorld)
dequeueEvent iworld
= case 'SDS'.read taskEvents 'SDS'.EmptyContext iworld of
dequeueEvent :: !*IWorld -> (!MaybeError TaskException (?QueuedEvent),!*IWorld)
dequeueEvent iworld = case 'SDS'.read taskEvents 'SDS'.EmptyContext iworld of
(Error e, iworld) = (Error e, iworld)
(Ok ('SDS'.ReadingDone queue), iworld)
# (val, queue) = 'DQ'.dequeue queue
......@@ -99,7 +98,9 @@ clearEvents instanceNo iworld
# (_,iworld) = 'SDS'.modify clear taskEvents 'SDS'.EmptyContext iworld
= iworld
where
clear (Queue fs bs) = Queue [f \\ f=:(i,_) <- fs | i <> instanceNo] [b \\ b=:(i,_) <- bs | i <> instanceNo]
clear (Queue fs bs) = Queue
[f \\ f=:{QueuedEvent | instanceNo=i} <- fs | i <> instanceNo]
[b \\ b=:{QueuedEvent | instanceNo=i} <- bs | i <> instanceNo]
queueOutput :: !InstanceNo ![TaskOutputMessage] !*IWorld -> *IWorld
queueOutput instanceNo messages iworld
......
......@@ -566,7 +566,7 @@ halt :: !Int !*IWorld -> *IWorld
halt exitCode iworld
# (merr, iworld) = read allTaskInstances EmptyContext iworld
| isError merr = iShowErr [snd (fromError merr)] (closeChannels iworld)
# iworld = foldr destroy iworld [i.instanceNo\\i<-directResult (fromOk merr)]
# iworld = foldr destroy iworld [i.TaskInstance.instanceNo \\ i <- directResult (fromOk merr)]
= closeChannels iworld
where
destroy :: !InstanceNo !*IWorld -> *IWorld
......
......@@ -36,7 +36,7 @@ derive gEditor TestEvent, StartEvent, EndEvent, TestLocation, FailReason, Counte
derive gText TestEvent, StartEvent, EndEvent, TestLocation, FailReason, CounterExample, FailedAssertion, Relation
derive gEq TestEvent, StartEvent, EndEvent, TestLocation, FailReason, CounterExample, FailedAssertion, Relation
derive class iTask Queue, Event
derive class iTask Queue, Event, QueuedEvent
:: TestStatus =
{ tcpQueue :: !String
......@@ -153,7 +153,7 @@ where
handleResponses share = watch share >>*
[ OnValue $ ifValue (\s -> s.waitRequested=:[_:_]) \{waitRequested} ->
get (taskEvents |*| allTaskInstances) >>- \(Queue ea eb,timeta) ->
let active_instance_nos = [i \\ (i,_) <- ea ++ eb] in
let active_instance_nos = [instanceNo \\ {QueuedEvent|instanceNo} <- ea ++ eb] in
allTasks
[checkInstanceNo no timeta
@! if (isMember no active_instance_nos) ?None (?Just no)
......@@ -173,7 +173,7 @@ where
]
where
checkInstanceNo :: !InstanceNo ![TaskInstance] -> Task InstanceNo
checkInstanceNo no instances = case [i \\ {instanceNo=i} <- instances | i == no] of
checkInstanceNo no instances = case [i \\ {TaskInstance|instanceNo=i} <- instances | i == no] of
[m] -> return m
[] -> throw ("No active task with InstanceNo '"+++toString no+++"' found")
_ -> throw ("More than one active task with InstanceNo '"+++toString no+++"' found")
......
......@@ -736,7 +736,10 @@ where
# (_,iworld) = write meta (sdsFocus (instanceNo,False,True) taskInstance) EmptyContext iworld
//Clear all input and output of that instance
# (_,iworld) = write 'DQ'.newQueue (sdsFocus instanceNo taskInstanceOutput) EmptyContext iworld
# (_,iworld) = modify (\('DQ'.Queue a b) -> 'DQ'.Queue [(i,e) \\(i,e)<- a| i <> instanceNo][(i,e) \\(i,e)<- b| i <> instanceNo]) taskEvents EmptyContext iworld
# (_,iworld) = modify (\('DQ'.Queue a b) -> 'DQ'.Queue
[qe \\ qe=:{QueuedEvent | instanceNo=i} <- a | i <> instanceNo]
[qe \\ qe=:{QueuedEvent | instanceNo=i} <- b | i <> instanceNo])
taskEvents EmptyContext iworld
= eval (ASAttached (status =: (Right True))) build (?Just newKey) event evalOpts iworld
eval _ _ _ DestroyEvent evalOpts=:{TaskEvalOpts|taskId} iworld
......@@ -820,9 +823,9 @@ where
proxy rTaskId=:(TaskId ino tno) eventShare lastVal resultShare event opts iworld
//Determine whether to propagate the event
# propfun = case event of
(EditEvent tid a b) = writeCompletely (ino, EditEvent (patchInstance ino tid) a b) eventShare lastVal mkEmptyUI
(ActionEvent tid a) = writeCompletely (ino, ActionEvent (patchInstance ino tid) a) eventShare lastVal mkEmptyUI
ResetEvent = writeCompletely (ino, ResetEvent) eventShare lastVal mkEmptyUI
(EditEvent tid a b) = writeCompletely {instanceNo=ino, event=EditEvent (patchInstance ino tid) a b} eventShare lastVal mkEmptyUI
(ActionEvent tid a) = writeCompletely {instanceNo=ino, event=ActionEvent (patchInstance ino tid) a} eventShare lastVal mkEmptyUI
ResetEvent = writeCompletely {instanceNo=ino, event=ResetEvent} eventShare lastVal mkEmptyUI
//Refresh is always just for us, read is not used and destroy is handled by the cleanup hook
_ = id
= propfun
......@@ -843,7 +846,7 @@ where
, iworld)
//Remote produced an exception, destroy it and propagate the exception
(AsyncException e)
= writeCompletely (ino, DestroyEvent) eventShare lastVal mkEmptyUI
= writeCompletely {instanceNo=ino, event=DestroyEvent} eventShare lastVal mkEmptyUI
(\event opts iworld->(ExceptionResult e, iworld)) event opts iworld
) event opts iworld
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment