Commit c32040f8 authored by Bas Lijnse's avatar Bas Lijnse

Combined types for parallel and global task meta data

parent b464b512
Pipeline #33032 failed with stage
in 1 minute and 52 seconds
definition module iTasks.Internal.IWorld
from System.FilePath import :: FilePath
from Data.Map import :: Map
from Data.Maybe import :: Maybe
from Data.Error import :: MaybeError(..), :: MaybeErrorString(..)
from Data.Set import :: Set
from Data.Queue import :: Queue
from Data.Either import :: Either
from StdFile import class FileSystem, class FileEnv
from System.Time import :: Timestamp, :: Timespec
from Text.GenJSON import :: JSONNode
from iTasks.Engine import :: EngineOptions
from iTasks.UI.Definition import :: UI, :: UIType
from iTasks.Internal.TaskState import :: ParallelTaskState, :: TIMeta, :: DeferredJSON
from iTasks.Internal.Task import :: ConnectionTask
from iTasks.Internal.TaskEval import :: TaskTime
from System.FilePath import :: FilePath
from Data.Map import :: Map
from Data.Maybe import :: Maybe
from Data.Error import :: MaybeError(..), :: MaybeErrorString(..)
from Data.Set import :: Set
from Data.Queue import :: Queue
from Data.Either import :: Either
from StdFile import class FileSystem, class FileEnv
from System.Time import :: Timestamp, :: Timespec
from Text.GenJSON import :: JSONNode
from iTasks.Engine import :: EngineOptions
from iTasks.UI.Definition import :: UI, :: UIType
from iTasks.Internal.TaskState import :: TaskMeta
from iTasks.Internal.Task import :: ConnectionTask
from iTasks.Internal.TaskEval import :: TaskTime
from iTasks.Util.DeferredJSON import :: DeferredJSON
from iTasks.WF.Definition import :: TaskValue, :: Event, :: TaskId, :: InstanceNo, :: TaskNo, :: TaskException
from iTasks.WF.Combinators.Core import :: ParallelTaskType, :: TaskListItem
......
......@@ -17,7 +17,7 @@ import iTasks.Internal.SDS
from iTasks.UI.Layout import :: LUI, :: LUIMoves, :: LUIMoveID, :: LUIEffectStage, :: LUINo
from iTasks.Util.DeferredJSON import :: DeferredJSON(..)
from iTasks.Internal.TaskState import :: TIMeta(..) , :: TIType(..), :: TaskChange(..)
from iTasks.Internal.TaskState import :: TaskMeta(..) , :: TIType(..), :: TaskChange(..)
import iTasks.Internal.TaskEval
from iTasks.SDS.Combinators.Common import toDynamic
......
......@@ -21,7 +21,7 @@ import qualified iTasks.Internal.SDS as SDS
from iTasks.SDS.Combinators.Common import sdsFocus, >*|, mapReadWrite, mapReadWriteError
from StdFunc import const, o
derive gEq TIMeta, TIType, TaskChange
derive gEq TaskMeta, TIType, TaskChange
mkEvalOpts :: TaskEvalOpts
mkEvalOpts =
......
......@@ -31,8 +31,10 @@ from System.FilePath import :: FilePath
//FIXME: Extensions should not be imported in core
from iTasks.Extensions.Document import :: Document, :: DocumentId
derive JSONEncode TIMeta, TIType, TIReduct
derive JSONDecode TIMeta, TIType, TIReduct
derive JSONEncode TaskMeta, TIType, TIReduct
derive JSONDecode TaskMeta, TIType, TIReduct
derive gDefault TaskMeta
//Persistent context of active tasks
//Split up version of task instance information
......@@ -49,7 +51,7 @@ derive JSONDecode TIMeta, TIType, TIReduct
}
*/
:: TIMeta =
:: TaskMeta =
//Static information
{ taskId :: !TaskId //Unique global identification
, instanceType :: !TIType //There are 3 types of tasks: startup tasks, sessions, and persistent tasks
......@@ -70,19 +72,6 @@ derive JSONDecode TIMeta, TIType, TIReduct
, initialized :: !Bool //TODO: Get rid of in this record
}
:: ParallelTaskState =
{ taskId :: !TaskId //Identification
, detached :: !Bool
, taskAttributes :: !TaskAttributes //Attributes that reflect the latest attributes from the task UI
, managementAttributes :: !TaskAttributes //Attributes that are explicitly written to the list through the tasklist
, unsyncedAttributes :: !Set String //When the `managementAttributes` are written they need to be synced to the UI on the next evaluation
, createdAt :: !TaskTime //Time the entry was added to the set (used by layouts to highlight new items)
, lastEvent :: !TaskTime //Last modified time
, change :: !Maybe TaskChange //Changes like removing or replacing a parallel task are only done when the
//parallel is evaluated. This field is used to schedule such changes.
, initialized :: !Bool
}
:: TaskChange
= RemoveTask //Mark for removal from the set on the next evaluation
| ReplaceTask !Dynamic //Replace the task on the next evaluation
......@@ -104,7 +93,6 @@ derive JSONDecode TIMeta, TIType, TIReduct
= TIValue !(TaskValue DeferredJSON)
| TIException !Dynamic !String
derive gDefault TIMeta
:: InstanceFilter =
{ //'Vertical' filters
......@@ -143,7 +131,7 @@ newInstanceKey :: !*IWorld -> (!InstanceKey,!*IWorld)
nextInstanceNo :: SimpleSDSLens Int
//This index contains all meta-data about the task instances on this engine
taskInstanceIndex :: SimpleSDSLens [TIMeta]
taskInstanceIndex :: SimpleSDSLens [TaskMeta]
//Task instance state is accessible as shared data sources
filteredInstanceIndex :: SDSLens InstanceFilter [InstanceData] [InstanceData]
......@@ -161,7 +149,7 @@ taskInstanceReduct :: SDSLens InstanceNo (Maybe TIReduct) (Maybe TIRe
taskInstanceValue :: SDSLens InstanceNo (Maybe TIValue) (Maybe TIValue)
taskInstanceShares :: SDSLens InstanceNo (Maybe (Map TaskId DeferredJSON)) (Maybe (Map TaskId DeferredJSON))
taskInstanceParallelTaskLists :: SDSLens InstanceNo (Maybe (Map TaskId [ParallelTaskState])) (Maybe (Map TaskId [ParallelTaskState]))
taskInstanceParallelTaskLists :: SDSLens InstanceNo (Maybe (Map TaskId [TaskMeta])) (Maybe (Map TaskId [TaskMeta]))
taskInstanceParallelValues :: SDSLens InstanceNo (Maybe (Map TaskId (Map TaskId (TaskValue DeferredJSON)))) (Maybe (Map TaskId (Map TaskId (TaskValue DeferredJSON))))
topLevelTaskList :: SDSLens TaskListFilter (!TaskId,![TaskListItem a]) [(TaskId,TaskAttributes)]
......@@ -175,11 +163,11 @@ allInstanceIO :: SimpleSDSLens (Map InstanceNo (!String,Timespec))
localShare :: SDSLens TaskId a a | iTask a
//Core parallel task list state structure
taskInstanceParallelTaskList :: SDSLens (TaskId,TaskListFilter) [ParallelTaskState] [ParallelTaskState]
taskInstanceParallelTaskList :: SDSLens (TaskId,TaskListFilter) [TaskMeta] [TaskMeta]
taskInstanceParallelTaskListValues :: SDSLens (TaskId,TaskListFilter) (Map TaskId (TaskValue DeferredJSON)) (Map TaskId (TaskValue DeferredJSON))
//Private interface used during evaluation of parallel combinator
taskInstanceParallelTaskListItem :: SDSLens (TaskId,TaskId) ParallelTaskState ParallelTaskState
taskInstanceParallelTaskListItem :: SDSLens (TaskId,TaskId) TaskMeta TaskMeta
taskInstanceParallelTaskListValue :: SDSLens (TaskId,TaskId) (TaskValue DeferredJSON) (TaskValue DeferredJSON)
taskInstanceEmbeddedTask :: SDSLens TaskId (Task a) (Task a) | iTask a
......
......@@ -47,18 +47,18 @@ from Data.Queue import :: Queue(..)
from Control.Applicative import class Alternative(<|>)
import Data.GenEq
derive JSONEncode TIMeta, TIType, TIValue, TIReduct, ParallelTaskState, TaskChange, TaskResult, TaskEvalInfo
derive JSONDecode TIMeta, TIType, TIValue, TIReduct, ParallelTaskState, TaskChange, TaskResult, TaskEvalInfo
derive JSONEncode TaskMeta, TIType, TIValue, TIReduct, TaskChange, TaskResult, TaskEvalInfo
derive JSONDecode TaskMeta, TIType, TIValue, TIReduct, TaskChange, TaskResult, TaskEvalInfo
derive gDefault InstanceProgress, TIType, TaskId, ValueStatus, InstanceFilter
gDefault{|TIMeta|}
gDefault{|TaskMeta|}
= {taskId= TaskId 0 0,instanceType=gDefault{|*|},build="",createdAt=gDefault{|*|},valuestatus=gDefault{|*|},attachedTo=[],instanceKey=Nothing
,firstEvent=Nothing,lastEvent=Nothing,taskAttributes='DM'.newMap,managementAttributes='DM'.newMap,unsyncedAttributes = 'DS'.newSet
,change = Nothing, initialized = False}
derive gEq TaskChange
derive gText TaskChange, ParallelTaskState, Set
derive gText TaskChange, Set
derive class iTask InstanceFilter
......@@ -78,7 +78,7 @@ rawInstanceParallels = mbStoreShare NS_TASK_INSTANCES True InDynamicFile
rawInstanceParallelValues = mbStoreShare NS_TASK_INSTANCES True InDynamicFile
//Master instance index
taskInstanceIndex :: SimpleSDSLens [TIMeta]
taskInstanceIndex :: SimpleSDSLens [TaskMeta]
taskInstanceIndex = sdsFocus "instances" rawTaskIndex
//Next instance no counter
......@@ -110,7 +110,7 @@ taskInstanceShares :: SDSLens InstanceNo (Maybe (Map TaskId DeferredJSON)) (Mayb
taskInstanceShares = sdsTranslate "taskInstanceShares" (\t -> t +++> "-shares") rawInstanceShares
//Task instance parallel lists
taskInstanceParallelTaskLists :: SDSLens InstanceNo (Maybe (Map TaskId [ParallelTaskState])) (Maybe (Map TaskId [ParallelTaskState]))
taskInstanceParallelTaskLists :: SDSLens InstanceNo (Maybe (Map TaskId [TaskMeta])) (Maybe (Map TaskId [TaskMeta]))
taskInstanceParallelTaskLists = sdsTranslate "taskInstanceParallelLists" (\t -> t +++> "-tasklists") rawInstanceParallels
taskInstanceParallelValues :: SDSLens InstanceNo (Maybe (Map TaskId (Map TaskId (TaskValue DeferredJSON)))) (Maybe (Map TaskId (Map TaskId (TaskValue DeferredJSON))))
......@@ -232,7 +232,7 @@ where
//Pairwise update (under the assumption that both lists are sorted by ascending instance number)
write` p is [] = [i \\ i <- is | not (filterPredicate p i)] //Remove all items that match the filter but are not in write list
write` p [] ws = [updateColumns p i w \\ w <- ws & i <- repeat defaultValue] //Add new items
write` p [i=:{TIMeta|taskId}:is] [w=:(wNo,_,_,_):ws]
write` p [i=:{TaskMeta|taskId}:is] [w=:(wNo,_,_,_):ws]
| taskId == wNo = [updateColumns p i w:write` p is ws] //Update the appropriate columns
| filterPredicate p i = write` p is [w:ws] //If w is not the next element, it may be because it is outside the filter, if it isn't it is apparently deleted
= [i:write` p is [w:ws]] //I was outside the filter, just leave it unchanged
......@@ -255,11 +255,11 @@ where
newRows rs wfilter ws = [updateColumns wfilter defaultValue w \\ w=:(taskId,_,_,_) <- ws | not (isMember taskId existingInstances)]
where
existingInstances = [taskId \\ {TIMeta|taskId} <- rs]
existingInstances = [taskId \\ {TaskMeta|taskId} <- rs]
selectRows tfilter is = filter (filterPredicate tfilter) is
selectColumns {InstanceFilter|includeConstants,includeProgress,includeAttributes}
{TIMeta|taskId,instanceType,build,createdAt,valuestatus,attachedTo,instanceKey,firstEvent,lastEvent,taskAttributes,managementAttributes}
{TaskMeta|taskId,instanceType,build,createdAt,valuestatus,attachedTo,instanceKey,firstEvent,lastEvent,taskAttributes,managementAttributes}
# listId = case instanceType of
(TIPersistent _ (Just listId)) = listId
_ = TaskId 0 0
......@@ -275,11 +275,11 @@ where
updateColumns {InstanceFilter|includeConstants,includeProgress,includeAttributes} i (iNo,mbC,mbP,mbA)
# i = if includeConstants (maybe i (\{InstanceConstants|type,build,issuedAt}
-> {TIMeta|i & instanceType = instanceType i type mbP ,build=build,createdAt=issuedAt}) mbC) i
# i = if includeProgress (maybe i (\{InstanceProgress|value,attachedTo,instanceKey,firstEvent,lastEvent}-> {TIMeta|i & valuestatus=value,attachedTo=attachedTo,instanceKey=instanceKey,firstEvent=firstEvent,lastEvent=lastEvent}) mbP) i
-> {TaskMeta|i & instanceType = instanceType i type mbP ,build=build,createdAt=issuedAt}) mbC) i
# i = if includeProgress (maybe i (\{InstanceProgress|value,attachedTo,instanceKey,firstEvent,lastEvent}-> {TaskMeta|i & valuestatus=value,attachedTo=attachedTo,instanceKey=instanceKey,firstEvent=firstEvent,lastEvent=lastEvent}) mbP) i
# i = if includeAttributes (maybe i (\(managementAttributes,taskAttributes) ->
{TIMeta|i & managementAttributes = managementAttributes, taskAttributes = taskAttributes}) mbA) i
= {TIMeta|i & taskId = iNo}
{TaskMeta|i & managementAttributes = managementAttributes, taskAttributes = taskAttributes}) mbA) i
= {TaskMeta|i & taskId = iNo}
where
instanceType _ (StartupInstance) _ = TIStartup
instanceType _ (SessionInstance) (Just {InstanceProgress|instanceKey=Just key}) = TISession key
......@@ -288,13 +288,13 @@ where
instanceType {instanceType} _ _ = instanceType
filterPredicate {InstanceFilter|onlyInstanceNo,notInstanceNo,includeSessions,includeDetached,includeStartup,matchAttribute} i
= (maybe True (\m -> isMember i.TIMeta.taskId m) onlyInstanceNo)
&& (maybe True (\m -> not (isMember i.TIMeta.taskId m)) notInstanceNo)
= (maybe True (\m -> isMember i.TaskMeta.taskId m) onlyInstanceNo)
&& (maybe True (\m -> not (isMember i.TaskMeta.taskId m)) notInstanceNo)
&& ((includeSessions && i.instanceType =: (TISession _)) ||
(includeDetached && i.instanceType =: (TIPersistent _ _)) ||
(includeStartup && i.instanceType =: (TIStartup))
)
&& (maybe True (\(mk,mv) -> (maybe False ((==) mv) ('DM'.get mk i.TIMeta.taskAttributes) || maybe False ((==) mv) ('DM'.get mk i.TIMeta.managementAttributes) ) ) matchAttribute)
&& (maybe True (\(mk,mv) -> (maybe False ((==) mv) ('DM'.get mk i.TaskMeta.taskAttributes) || maybe False ((==) mv) ('DM'.get mk i.TaskMeta.managementAttributes) ) ) matchAttribute)
notifyFun _ ws qfilter = any (filterPredicate qfilter) ws
......@@ -388,7 +388,7 @@ where
reducer taskId shares = read taskId shares
taskInstanceParallelTaskList :: SDSLens (TaskId,TaskListFilter) [ParallelTaskState] [ParallelTaskState]
taskInstanceParallelTaskList :: SDSLens (TaskId,TaskListFilter) [TaskMeta] [TaskMeta]
taskInstanceParallelTaskList = sdsLens "taskInstanceParallelTaskList" param (SDSRead read) (SDSWrite write) (SDSNotifyConst notify) (Just \p ws -> read p ws) (removeMaybe (Just 'DM'.newMap) taskInstanceParallelTaskLists)
where
param (TaskId instanceNo _,listFilter) = instanceNo
......@@ -407,7 +407,7 @@ where
|| (listFilter.TaskListFilter.includeAttributes && regListFilter.TaskListFilter.includeAttributes)
|| (listFilter.TaskListFilter.includeProgress && regListFilter.TaskListFilter.includeProgress)) = False
//Check if the written records match the registered filter
| maybe False (\taskIds -> all (\t -> not (isMember t taskIds)) [taskId \\(_,{ParallelTaskState|taskId}) <- states]) regListFilter.onlyTaskId
| maybe False (\taskIds -> all (\t -> not (isMember t taskIds)) [taskId \\(_,{TaskMeta|taskId}) <- states]) regListFilter.onlyTaskId
= False
| maybe False (\indices -> all (\i -> not (isMember i indices)) (map fst states)) regListFilter.onlyIndex
= False
......@@ -418,21 +418,21 @@ where
//enumerate = zip2 [0..]
enumerate l = [(i,x) \\ x <- l & i <- [0..]]
inFilter {TaskListFilter|onlyTaskId,onlyIndex} (index, {ParallelTaskState|taskId})
inFilter {TaskListFilter|onlyTaskId,onlyIndex} (index, {TaskMeta|taskId})
= maybe True (\taskIds -> isMember taskId taskIds) onlyTaskId
&& maybe True (\indices -> isMember index indices) onlyIndex
//ASSUMPTION: BOTH LISTS ARE SORTED BY TASK ID
merge:: TaskListFilter [(Int,ParallelTaskState)] [ParallelTaskState] -> [ParallelTaskState]
merge:: TaskListFilter [(Int,TaskMeta)] [TaskMeta] -> [TaskMeta]
merge listFilter os ns = merge` os ns
where
listLength = length os
merge` [(i,o):os] [n:ns]
| o.ParallelTaskState.taskId == n.ParallelTaskState.taskId //Potential update
| o.TaskMeta.taskId == n.TaskMeta.taskId //Potential update
| inFilter listFilter (i,o) = [n:merge` os ns] //Only update the item if it matches the filter
| otherwise = [o:merge` os ns]
| o.ParallelTaskState.taskId < n.ParallelTaskState.taskId //The taskId of the old item is not in the written set
| o.TaskMeta.taskId < n.TaskMeta.taskId //The taskId of the old item is not in the written set
| inFilter listFilter (i,o) = merge` os [n:ns] //The old item was in the filter, so it was removed
| otherwise = [o:merge` os [n:ns]] //The old item was not in the filter, so it is ok that is not in the written list
| otherwise
......@@ -456,24 +456,20 @@ where
notify (listId,listFilter) states ts (regListId,regListFilter)
= listId == regListId //TODO: If we keep this SDS, we need to be more precise in notifying based on the filter
taskInstanceParallelTaskListItem :: SDSLens (TaskId,TaskId) ParallelTaskState ParallelTaskState
taskInstanceParallelTaskListItem :: SDSLens (TaskId,TaskId) TaskMeta TaskMeta
taskInstanceParallelTaskListItem = sdsLens "taskInstanceParallelTaskListItem" param (SDSRead read) (SDSWrite write) (SDSNotifyConst notify) (Just reducer) taskInstanceParallelTaskList
where
//In this SDS the include value and include attributes flags are used to indicate what is written for notification
//During a read the whole ParallelTaskState record is used
param (listId,taskId)
= (listId,{TaskListFilter|onlyIndex=Nothing,onlyTaskId=Just [taskId],onlySelf=False,includeValue=False,includeAttributes=True,includeProgress=False})
read p=:(listId,taskId) [] = Error (exception ("Could not find parallel task " <+++ taskId <+++ " in list " <+++ listId))
read p=:(_,taskId) [x:xs] = if (x.ParallelTaskState.taskId == taskId) (Ok x) (read p xs)
write (_,taskId) list pts = Ok (Just [if (x.ParallelTaskState.taskId == taskId) pts x \\ x <- list])
read p=:(_,taskId) [x:xs] = if (x.TaskMeta.taskId == taskId) (Ok x) (read p xs)
write (_,taskId) list pts = Ok (Just [if (x.TaskMeta.taskId == taskId) pts x \\ x <- list])
notify (listId,taskId) _ = const ((==) taskId o snd)
reducer p ws = read p ws
taskInstanceParallelTaskListValue :: SDSLens (TaskId,TaskId) (TaskValue DeferredJSON) (TaskValue DeferredJSON)
taskInstanceParallelTaskListValue = sdsLens "taskInstanceParallelTaskListValue" param (SDSRead read) (SDSWrite write) (SDSNotifyConst notify) (Just reducer) taskInstanceParallelTaskListValues
where
//In this SDS the include value and include attributes flags are used to indicate what is written for notification
//During a read the whole ParallelTaskState record is used
param (listId,taskId)
= (listId,{TaskListFilter|onlyIndex=Nothing,onlyTaskId=Just [taskId],onlySelf=False,includeValue=True,includeAttributes=False,includeProgress=False})
read p=:(listId,taskId) values = case 'DM'.get taskId values of
......@@ -512,23 +508,26 @@ where
read (listId,selfId,listFilter) (states,values) = Ok (listId,items)
where
items = [{TaskListItem|taskId = taskId, listId = listId
, detached = detached, self = taskId == selfId
, detached = isDetached listId taskId, self = taskId == selfId
, value = maybe NoValue decode ('DM'.get taskId values), progress = Nothing, attributes = 'DM'.union managementAttributes taskAttributes
} \\ {ParallelTaskState|taskId,detached,taskAttributes,managementAttributes,change} <- states | change =!= Just RemoveTask]
} \\ {TaskMeta|taskId,taskAttributes,managementAttributes,change} <- states | change =!= Just RemoveTask]
decode NoValue = NoValue
decode (Value json stable) = maybe NoValue (\v -> Value v stable) (fromDeferredJSON json)
//When the task is part of another instance than the listid we can conclude that the task is detached
isDetached (TaskId listInstance _) (TaskId taskInstance _) = taskInstance <> listInstance
write (listId,selfId,{TaskListFilter|includeAttributes=False}) _ _ = Ok Nothing
write (listId,selfId,listFilter) (states,values) [] = Ok (Just (states,values))
write (listId,selfId,listFilter) (states,values) [(t,a):updates]
# states = [if (taskId == t) {ParallelTaskState|pts & managementAttributes = a, unsyncedAttributes = 'DS'.fromList $ 'DM'.keys a} pts \\ pts=:{ParallelTaskState|taskId} <- states]
# states = [if (taskId == t) {TaskMeta|meta & managementAttributes = a, unsyncedAttributes = 'DS'.fromList $ 'DM'.keys a} meta \\ meta=:{TaskMeta|taskId} <- states]
= (write (listId,selfId,listFilter) (states,values) updates)
notify (listId,_,_) states ts (regListId,_,_) = regListId == listId //Only check list id, the listFilter is checked one level up
lensReducer (listId, selfId, listFilter) (ws,_)
= (Ok ([(taskId, managementAttributes) \\ {ParallelTaskState|taskId,detached,managementAttributes,change} <- ws | change =!= Just RemoveTask]))
= (Ok ([(taskId, managementAttributes) \\ {TaskMeta|taskId,managementAttributes,change} <- ws | change =!= Just RemoveTask]))
param2 _ (listId,items) = {InstanceFilter|onlyInstanceNo=Just [taskId \\ {TaskListItem|taskId,detached} <- items | detached],notInstanceNo=Nothing
,includeSessions=True,includeDetached=True,includeStartup=True,matchAttribute=Nothing, includeConstants = False, includeAttributes = True,includeProgress = True}
......
......@@ -18,7 +18,6 @@ from Data.Foldable import maximum
import Text.GenJSON
from StdFunc import o, const, id, flip
from iTasks.Internal.TaskState import :: TIMeta(..), :: TIType(..) , :: TaskChange
from iTasks.Internal.TaskEval import :: TaskTime
from iTasks.WF.Combinators.Core import :: AttachmentStatus
......
......@@ -292,23 +292,28 @@ initParallelTask ::
!(ParallelTask a)
!*IWorld
->
(!MaybeError TaskException (ParallelTaskState, Maybe (TaskId,Task a)), !*IWorld)
(!MaybeError TaskException (TaskMeta, Maybe (TaskId,Task a)), !*IWorld)
| iTask a
initParallelTask evalOpts listId parType parTask iworld=:{current={taskTime}}
initParallelTask evalOpts listId parType parTask iworld=:{clock,current={taskTime}}
# (mbTaskStuff,iworld) = case parType of
Embedded = mkEmbedded iworld
(Detached evalDirect attr) = mkDetached evalDirect attr iworld
= case mbTaskStuff of
Ok (taskId,mbTask)
# state =
{ ParallelTaskState
{ TaskMeta
| taskId = taskId
, detached = isNothing mbTask
, instanceType = TIPersistent "FIXME" (Just listId)//FIXME: Redundant information
, build = "FIXME"
, createdAt = clock
, valuestatus = Unstable
, attachedTo = []
, instanceKey = Nothing
, firstEvent = Just clock
, lastEvent = Just clock
, taskAttributes = 'DM'.newMap
, managementAttributes = 'DM'.newMap
, unsyncedAttributes = 'DS'.newSet
, createdAt = taskTime
, lastEvent = taskTime
, change = Nothing
, initialized = False
}
......@@ -333,7 +338,7 @@ where
evalParallelTasks :: !Event !TaskEvalOpts
[TaskCont [(TaskTime,TaskValue a)] (ParallelTaskType,ParallelTask a)]
[(TaskId, TaskResult a)] [ParallelTaskState] (Map TaskId (TaskValue DeferredJSON)) !*IWorld
[(TaskId, TaskResult a)] [TaskMeta] (Map TaskId (TaskValue DeferredJSON)) !*IWorld
->
(MaybeError TaskException [TaskResult a],!*IWorld) | iTask a
evalParallelTasks event evalOpts=:{TaskEvalOpts|taskId=listId} conts completed [] values iworld
......@@ -364,7 +369,7 @@ evalParallelTasks event evalOpts=:{TaskEvalOpts|taskId=listId} conts completed [
# taskListFilter = {TaskListFilter|onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=False,includeAttributes=True,includeProgress=True}
# (mbError,iworld) = modify (\states -> states ++ [state]) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) EmptyContext iworld
| mbError =:(Error _) = (liftError mbError,iworld)
# taskId = state.ParallelTaskState.taskId
# taskId = state.TaskMeta.taskId
//Store the task function
# (mbError,iworld) = (write (snd (fromJust mbTask)) (sdsFocus taskId taskInstanceEmbeddedTask) EmptyContext iworld)
| mbError =:(Error _) = (liftError mbError,iworld)
......@@ -373,10 +378,10 @@ evalParallelTasks event evalOpts=:{TaskEvalOpts|taskId=listId} conts completed [
//There is more work to do:
todo = evalParallelTasks event evalOpts conts completed todo values iworld
where
isRemoved {ParallelTaskState|change=Just RemoveTask} = True
isRemoved {TaskMeta|change=Just RemoveTask} = True
isRemoved _ = False
addManagementAttributeChanges {ParallelTaskState|managementAttributes,unsyncedAttributes} (ValueResult val evalInfor rep tree)
addManagementAttributeChanges {TaskMeta|managementAttributes,unsyncedAttributes} (ValueResult val evalInfor rep tree)
//Add the explicit attributes
# rep = case rep of
ReplaceUI (UI type attr items)
......@@ -390,10 +395,10 @@ where
= (ValueResult val evalInfor rep tree)
addManagementAttributeChanges pts c = c
clearAttributeSync pts = {ParallelTaskState| pts & unsyncedAttributes = 'DS'.newSet}
clearAttributeSync meta = {TaskMeta| meta & unsyncedAttributes = 'DS'.newSet}
//Evaluate an embedded parallel task
evalParallelTasks event evalOpts=:{TaskEvalOpts|taskId=listId} conts completed [t=:{ParallelTaskState|taskId=taskId=:(TaskId _ taskNo)}:todo] values iworld
evalParallelTasks event evalOpts=:{TaskEvalOpts|taskId=listId} conts completed [t=:{TaskMeta|taskId=taskId=:(TaskId _ taskNo)}:todo] values iworld
# lastValue = fromMaybe NoValue $ 'DM'.get taskId values
= case evalParallelTask listId event evalOpts t lastValue iworld of
(Error e, iworld) = (Error e,iworld)
......@@ -408,14 +413,16 @@ evalParallelTasks event evalOpts=:{TaskEvalOpts|taskId=listId} conts completed [
(Ok result=:DestroyedResult, iworld)
= evalParallelTasks event evalOpts conts [(taskId, result):completed] todo values iworld
where
evalParallelTask :: TaskId !Event !TaskEvalOpts ParallelTaskState (TaskValue DeferredJSON) !*IWorld
evalParallelTask :: TaskId !Event !TaskEvalOpts TaskMeta (TaskValue DeferredJSON) !*IWorld
-> *(MaybeError TaskException (TaskResult a), !*IWorld) | iTask a
evalParallelTask listId event evalOpts taskState=:{ParallelTaskState|detached} value iworld
evalParallelTask listId=:(TaskId listInstance _) event evalOpts taskState=:{TaskMeta|taskId=TaskId taskInstance _} value iworld
| detached = evalDetachedParallelTask listId event evalOpts taskState iworld
= evalEmbeddedParallelTask listId event evalOpts taskState value iworld
where
detached = taskInstance <> listInstance
evalEmbeddedParallelTask listId event evalOpts
{ParallelTaskState|taskId,createdAt,change,initialized} value iworld=:{current={taskTime}}
{TaskMeta|taskId,createdAt,change,initialized} value iworld=:{current={taskTime}}
//Lookup task evaluation function and task evaluation state
# (mbTask,iworld) = read (sdsFocus taskId taskInstanceEmbeddedTask) EmptyContext iworld
| mbTask =:(Error _) = (Error (fromError mbTask),iworld)
......@@ -446,8 +453,8 @@ where
| mbError =:(Error _) = (Error (fromError mbError), iworld)
//Write meta data
# (mbError,iworld) = modify
(\pts -> {ParallelTaskState|pts &
taskAttributes = taskAttributeUpdate pts.ParallelTaskState.taskAttributes, initialized = True})
(\meta -> {TaskMeta|meta &
taskAttributes = taskAttributeUpdate meta.TaskMeta.taskAttributes, initialized = True})
(sdsFocus (listId,taskId) taskInstanceParallelTaskListItem)
EmptyContext iworld
| mbError =:(Error _) = (Error (fromError mbError),iworld)
......@@ -467,8 +474,8 @@ where
(TaskId instanceNo taskNo) = taskId
//Retrieve result of detached parallel task
evalDetachedParallelTask :: !TaskId !Event !TaskEvalOpts !ParallelTaskState !*IWorld -> *(MaybeError TaskException (TaskResult a), *IWorld) | iTask a
evalDetachedParallelTask listId event evalOpts {ParallelTaskState|taskId=taskId=:(TaskId instanceNo _)} iworld
evalDetachedParallelTask :: !TaskId !Event !TaskEvalOpts !TaskMeta !*IWorld -> *(MaybeError TaskException (TaskResult a), *IWorld) | iTask a
evalDetachedParallelTask listId event evalOpts {TaskMeta|taskId=taskId=:(TaskId instanceNo _)} iworld
= case readRegister listId (sdsFocus instanceNo (removeMaybe Nothing taskInstanceValue)) iworld of
(Error e,iworld) = (Error e,iworld)
(Ok (ReadingDone (TIException dyn msg)),iworld) = (Ok (ExceptionResult (dyn,msg)),iworld)
......@@ -503,10 +510,12 @@ where
minimalTaskListFilter = {TaskListFilter|onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False
,includeValue=False,includeAttributes=False,includeProgress=False}
destroyParallelTask listId (_,exceptions,iworld) {ParallelTaskState|taskId,detached}
destroyParallelTask listId=:(TaskId listInstance _) (_,exceptions,iworld) {TaskMeta|taskId=taskId=:(TaskId taskInstance _)}
= case (if detached destroyDetachedParallelTask destroyEmbeddedParallelTask) listId taskId iworld of
(Error e, iworld) = (DestroyedResult, e ++ exceptions,iworld)
(Ok res, iworld) = (res, exceptions,iworld)
where
detached = taskInstance <> listInstance
destroyResult :: (TaskResult a) -> (TaskResult [(Int,TaskValue a)])
destroyResult DestroyedResult = DestroyedResult
......@@ -622,7 +631,7 @@ where
# (mbStateMbTask, iworld) = initParallelTask mkEvalOpts listId parType parTask iworld
= case mbStateMbTask of
Ok (state,mbTask)
# taskId = state.ParallelTaskState.taskId
# taskId = state.TaskMeta.taskId
| listId == TaskId 0 0 //For the top-level list, we don't need to do anything else
//TODO: Make sure we don't lose the attributes!
= (Ok taskId, iworld)
......@@ -671,8 +680,8 @@ where
//When a task is marked as removed, the index of the tasks after that are decreased
markAsRemoved removeId [] = []
markAsRemoved removeId [s=:{ParallelTaskState|taskId}:ss]
| taskId == removeId = [{ParallelTaskState|s & change = Just RemoveTask}:ss]
markAsRemoved removeId [s=:{TaskMeta|taskId}:ss]
| taskId == removeId = [{TaskMeta|s & change = Just RemoveTask}:ss]
| otherwise = [s:markAsRemoved removeId ss]
replaceTask :: !TaskId !(ParallelTask a) !(SharedTaskList a) -> Task () | iTask a
......@@ -705,8 +714,8 @@ where
= (ValueResult (Value () True) (mkTaskEvalInfo lastEval) (mkUIIfReset event (ui UIEmpty)) (treturn ()), iworld)
scheduleReplacement replaceId task [] = []
scheduleReplacement replaceId task [s=:{ParallelTaskState|taskId}:ss]
| taskId == replaceId = [{ParallelTaskState|s & change = Just (ReplaceTask (dynamic task :: Task a^))}:ss]
scheduleReplacement replaceId task [s=:{TaskMeta|taskId}:ss]
| taskId == replaceId = [{TaskMeta|s & change = Just (ReplaceTask (dynamic task :: Task a^))}:ss]
| otherwise = [s:scheduleReplacement replaceId task ss]
attach :: !InstanceNo !Bool -> Task AttachmentStatus
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment