Commit fdae53e8 authored by Bas Lijnse's avatar Bas Lijnse

Fixed failed decoding of task functions in parallel combinator during cleanup.

This caused some parallel branches to not be fully destroyed.
parent 5d1d99c7
Pipeline #12631 failed with stage
in 1 minute and 26 seconds
......@@ -10,7 +10,7 @@ import iTasks.Internal.Util
from iTasks.WF.Combinators.Core import :: SharedTaskList
from iTasks.WF.Combinators.Core import :: ParallelTaskType(..), :: ParallelTask(..)
from Data.Map as DM import qualified newMap, fromList, toList, get, put, del
from Data.Map as DM import qualified newMap, fromList, toList, get, put, del, mapSize
from Data.Queue import :: Queue (..)
from Data.Queue as DQ import qualified newQueue, enqueue, dequeue, empty
from iTasks.Internal.SDS as SDS import qualified read, write, modify
......@@ -64,7 +64,8 @@ processEvents max iworld=:{IWorld| memoryShares}
= case dequeueEvent iworld of
(Nothing,iworld) = (Ok (),iworld)
(Just (instanceNo,event),iworld)
= case trace_n ("memory shares size " +++ toString (size $ copy_to_string memoryShares)) $ evalTaskInstance instanceNo event iworld of
//# iworld = trace_n ("memory shares size " +++ toString (size $ copy_to_string memoryShares) +++ ", num "+++ toString ('DM'.mapSize memoryShares)) iworld
= case evalTaskInstance instanceNo event iworld of
(Ok taskValue,iworld)
= processEvents (max - 1) iworld
(Error msg,iworld=:{IWorld|world})
......@@ -124,7 +125,7 @@ where
Ok _
//Store updated reduct
# (nextTaskNo,iworld) = getNextTaskNo iworld
# (_,iworld) = 'SDS'.modify (\r -> let x = ((),{TIReduct|r & tree = tree, nextTaskNo = nextTaskNo, nextTaskTime = nextTaskTime + 1}) in trace_n (diffToConsole $ gDiff{|*|} r.tree tree) x)
# (_,iworld) = 'SDS'.modify (\r -> let x = ((),{TIReduct|r & tree = tree, nextTaskNo = nextTaskNo, nextTaskTime = nextTaskTime + 1}) in x)
(sdsFocus instanceNo taskInstanceReduct) iworld
//FIXME: Don't write the full reduct (all parallel shares are triggered then!)
//Store update value
......
......@@ -426,11 +426,8 @@ taskInstanceEmbeddedTask :: RWShared TaskId (Task a) (Task a) | iTask a
taskInstanceEmbeddedTask = sdsLens "taskInstanceEmbeddedTask" param (SDSRead read) (SDSWrite write) (SDSNotifyConst notify) taskInstanceReduct
where
param (TaskId instanceNo _) = instanceNo
//TODO: Investigate why unwrapTask (from clientoverride) no longer works here
read taskId {TIReduct|tasks} = case ('DM'.get taskId tasks) of
(Just dyn) = case dyn of
(task :: Task a^) = Ok task
_ = Error (exception ("Could not unwrap embedded task " <+++ taskId))
(Just dyn) = Ok (unwrapTask dyn)
_ = Error (exception ("Could not find embedded task " <+++ taskId))
write taskId r=:{TIReduct|tasks} w = Ok (Just {TIReduct|r & tasks = 'DM'.put taskId (dynamic w :: Task a^) tasks})
notify taskId _ = const ((==) taskId)
......
......@@ -120,23 +120,25 @@ where
Just rewrite = Right (rewrite,Just ntreea, info.TaskEvalInfo.lastEvent,info.TaskEvalInfo.removedTasks)
ExceptionResult e = case searchContException e conts of
Nothing = (Left (ExceptionResult e))
Just rewrite = (Right (rewrite,Nothing,ts,[])) //TODO: Figure out how to garbage collect after exceptions
Just rewrite = (Right (rewrite,Nothing,ts,[]))
= case mbCont of
Left res = (res,iworld)
Right ((sel,Task evalb,d_json_a),mbTreeA, lastEvent,removedTasks)
//Cleanup state of left-hand side
# iworld = case mbTreeA of
Nothing = iworld
Just treea = snd (evala ResetEvent (extendCallTrace taskId evalOpts) (TCDestroy treea) iworld) //TODO: Check for exceptions during cleanup
//TODO: Check for exceptions during cleanup
Just treea = snd (evala ResetEvent (extendCallTrace taskId evalOpts) (TCDestroy treea) iworld)
# (taskIdb,iworld) = getNextTaskId iworld
# (resb,iworld) = evalb ResetEvent (extendCallTrace taskId evalOpts) (TCInit taskIdb lastEvent) iworld
= case resb of
ValueResult val info change nstateb
ValueResult val info change=:(ReplaceUI _) nstateb
# info = {TaskEvalInfo|info & lastEvent = max ts info.TaskEvalInfo.lastEvent, removedTasks = removedTasks ++ info.TaskEvalInfo.removedTasks}
= (ValueResult val info (doAfterStepLayout ResetEvent change) (TCStep taskId info.TaskEvalInfo.lastEvent (Right (d_json_a,sel,nstateb))),iworld)
ValueResult val info change nstateb
= (ExceptionResult (exception ("Reset event of task in step failed to produce replacement UI: ("+++ toString (toJSON change)+++")")), iworld)
ExceptionResult e = (ExceptionResult e, iworld)
//Eval right-hand side
eval event evalOpts (TCStep taskId ts (Right (enca,sel,treeb))) iworld=:{current={taskTime}}
= case restoreTaskB sel enca of
......@@ -160,7 +162,7 @@ where
(ExceptionResult e,iworld) = (ExceptionResult e,iworld)
(ValueResult _ _ _ _,iworld) = (ExceptionResult (exception "Destroy failed in step"),iworld)
eval event evalOpts (TCDestroy (TCStep taskId ts (Right(enca,sel,treeb)))) iworld
eval event evalOpts (TCDestroy (TCStep taskId ts (Right (enca,sel,treeb)))) iworld
= case restoreTaskB sel enca of
Just (Task evalb) = evalb event (extendCallTrace taskId evalOpts) (TCDestroy treeb) iworld
Nothing = (ExceptionResult (exception "Corrupt task value in step"), iworld)
......@@ -300,16 +302,16 @@ where
//Stopped because of an unhandled exception
(Error e, iworld)
//Clean up before returning the exception
= case destroyParallelTasks taskId ('DM'.fromList taskTrees) iworld of
(res=:(DestroyedResult), iworld) = (fixOverloading res (ExceptionResult e), iworld)
(res=:(ExceptionResult _), iworld) = (fixOverloading res (ExceptionResult e), iworld)
# (res,iworld) = destroyParallelTasks taskId ('DM'.fromList taskTrees) iworld
= (exceptionResult res e,iworld)
where
//We need to know how many branches there are before evaluation to be
//able to determine the correct UI update instructions
prevNumBranches = length taskTrees
fixOverloading :: (TaskResult a) (TaskResult a) -> TaskResult a
fixOverloading _ x = x
exceptionResult :: (TaskResult [(!Int,!TaskValue a)]) TaskException -> (TaskResult [(!Int,!TaskValue a)])
exceptionResult DestroyedResult e = ExceptionResult e
exceptionResult (ExceptionResult _) e = ExceptionResult e
//Cleanup
eval event evalOpts ttree=:(TCDestroy (TCParallel taskId ts taskTrees _)) iworld=:{current}
......@@ -412,13 +414,8 @@ evalParallelTasks listId taskTrees event evalOpts conts completed [] iworld
Nothing //We have evaluated all branches and nothing is added
//Remove all entries that are marked as removed from the list, they have been cleaned up by now
# taskListFilter = {TaskListFilter|onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=False,includeAttributes=False,includeProgress=False}
# (mbError,iworld) = modify (\l -> ([taskId \\ x=:{ParallelTaskState|taskId} <- l | isRemoved x],[x \\ x <- l | not (isRemoved x)]))
# (mbError,iworld) = modify (\l -> ((),[x \\ x <- l | not (isRemoved x)]))
(sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
| mbError =:(Error _) = (Error (fromError mbError),iworld)
//Remove all task functions of the removed entries from the instance reduct
//TODO Move to `destroyRemoved`
# (TaskId instanceNo _) = listId
# (mbError,iworld) = modify (\(r=:{TIReduct|tasks}) -> ((),{TIReduct|r & tasks = foldr 'DM'.del tasks (fromOk mbError)} )) (sdsFocus instanceNo taskInstanceReduct) iworld
| mbError =:(Error _) = (Error (fromError mbError),iworld)
= (Ok completed,iworld)
......@@ -445,7 +442,7 @@ where
evalParallelTasks listId taskTrees event evalOpts conts completed [t=:{ParallelTaskState|taskId=TaskId _ taskNo}:todo] iworld
= case evalParallelTask listId taskTrees event evalOpts t iworld of
(Error e, iworld) = (Error e,iworld)
(Ok (ExceptionResult e), iworld) = (Error e,iworld)
(Ok (ExceptionResult e), iworld) = (Error e,iworld) //Stop on exceptions
(Ok result=:(ValueResult val evalInfo=:{TaskEvalInfo|lastEvent,removedTasks} rep tree), iworld)
//Add the current result before checking for removals
# completed = [result:completed]
......@@ -468,10 +465,11 @@ evalEmbeddedParallelTask listId taskTrees event evalOpts {ParallelTaskState|task
# (tree,newBranch) = maybe (TCInit taskId taskTime,True) (\tree -> (tree,False)) ('DM'.get taskId taskTrees)
//Evaluate or destroy branch
| change === Just RemoveParallelTask
//# (result,taskTrees,iworld) = destroyEmbeddedParallelTask listId taskId taskTrees iworld
//= (Ok result,iworld)
//TODO: Fully destroy, also remove the task evaluation function and deal with exception
= appFst Ok (evala ResetEvent {mkEvalOpts & noUI = True} (TCDestroy tree) iworld)
# (result,taskTrees,iworld) = destroyEmbeddedParallelTask listId taskId taskTrees iworld
= case result of
(Ok res) = (Ok res,iworld)
(Error [e]) = (Error e, iworld)
(Error _) = (Error (exception "evalEmbeddedParallelTask: multiple exceptions during destruction"), iworld)
| otherwise
# evalOpts = {evalOpts & tonicOpts = {evalOpts.tonicOpts & captureParallel = evalOpts.tonicOpts.inParallel == Just listId
, inParallel = Just listId}}
......@@ -487,7 +485,7 @@ evalEmbeddedParallelTask listId taskTrees event evalOpts {ParallelTaskState|task
ExceptionResult e
//TODO Check exception
//If the exception can not be handled, don't continue evaluating just stop
= (Error e,iworld)
= (Ok (ExceptionResult e),iworld)
ValueResult val evalInfo=:{TaskEvalInfo|lastEvent,removedTasks} rep tree
//Check for a focus event targeted at this branc
# mbNewFocus= case event of
......@@ -546,7 +544,7 @@ destroyParallelTasks listId=:(TaskId instanceNo _) taskTrees iworld
(Ok (),iworld) = (exceptions,iworld)
(Error e,iworld) = ([e:exceptions],iworld)
| exceptions =: []
= (result, iworld)
= (destroyResult result, iworld)
| otherwise
= (ExceptionResult (exception "Multiple exceptions in destroyParallelTasks"),iworld)
where
......@@ -558,6 +556,10 @@ where
(Error e,_,iworld) = (DestroyedResult, e ++ exceptions,iworld)
(Ok res,_,iworld) = (res,exceptions,iworld)
destroyResult :: (TaskResult a) -> (TaskResult [(!Int,!TaskValue a)])
destroyResult DestroyedResult = DestroyedResult
destroyResult (ExceptionResult e) = ExceptionResult e
destroyEmbeddedParallelTask :: TaskId TaskId (Map TaskId TaskTree) *IWorld -> *(MaybeError [TaskException] (TaskResult a),Map TaskId TaskTree,*IWorld) | iTask a
destroyEmbeddedParallelTask listId=:(TaskId instanceNo _) taskId taskTrees iworld=:{current={taskTime}}
// In this fuction we continue as much as possible, even when we encounter exceptions
......@@ -585,8 +587,6 @@ destroyDetachedParallelTask listId=:(TaskId instanceNo _) taskId taskTrees iworl
// That way attach combinators can be programmed to notify the user or simply stop the task
= (Ok DestroyedResult,taskTrees,iworld)
//END
destroyRemoved listId removed [] iworld = ([],iworld)
destroyRemoved listId removed [r=:(ValueResult _ _ _ tree):rs] iworld
= case taskIdFromResult r of
......@@ -594,9 +594,8 @@ destroyRemoved listId removed [r=:(ValueResult _ _ _ tree):rs] iworld
| isMember taskId removed
# (mbRes,_,iworld) = destroyEmbeddedParallelTask listId taskId ('DM'.fromList [(taskId,tree)]) iworld
| mbRes =:(Error _) = ([ExceptionResult (hd (fromError mbRes)):rs],iworld)
# r = fromOk mbRes
# (rs,iworld) = destroyRemoved listId removed rs iworld
= ([r:rs],iworld)
= ([fromOk mbRes:rs],iworld)
| otherwise
# (rs,iworld) = destroyRemoved listId removed rs iworld
= ([r:rs],iworld)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment