Commit 8cf4905d authored by Bas Lijnse's avatar Bas Lijnse

Unified event queue to be able to decide wait times in the main loop based on...

Unified event queue to be able to decide wait times in the main loop based on pending events (needs more testing)
parent 08dc2177
......@@ -67,7 +67,7 @@ where
// = (ValueResult NoValue (taskInfo ts) norep context, printlnI ("refresh, no session id") iworld)
// Refresh: server restart. anything else?
taskFunc (RefreshEvent _) taskRepOpts context=:(TCBasic taskId ts jsonRes _) iworld
taskFunc (RefreshEvent _ _) taskRepOpts context=:(TCBasic taskId ts jsonRes _) iworld
# (rep, _, iworld) = genRep tasklet taskId taskRepOpts Nothing iworld
//No! because state and value will be out of sync!
......
......@@ -51,7 +51,7 @@ where
| stable
# status = fromJust (fromJSON encv)
# (rep,iworld) = makeRep taskId evalOpts status iworld
# iworld = queueRefresh [taskInstance] ["Checked OS process for instance "<+++ taskInstance] iworld
# iworld = queueRefresh [(taskInstance,"Checked OS process for instance "<+++ taskInstance)] iworld
= (ValueResult (Value status True) {TaskEvalInfo|lastEvent=lastEvent,removedTasks=[],refreshSensitive=True} rep state, iworld)
| otherwise
//Check status
......@@ -64,7 +64,7 @@ where
Just c = (CompletedProcess c,True, TCBasic taskId lastEvent (toJSON (CompletedProcess c)) False)
Nothing = (RunningProcess cmd,False, state)
# (rep,iworld) = makeRep taskId evalOpts status {IWorld|iworld & world = world}
# iworld = queueRefresh [taskInstance] ["Checked OS process for instance "<+++ taskInstance] iworld
# iworld = queueRefresh [(taskInstance,"Checked OS process for instance "<+++ taskInstance)] iworld
= (ValueResult (Value status stable) {TaskEvalInfo|lastEvent=lastEvent,removedTasks=[],refreshSensitive=True} rep state, iworld)
eval event repAs (TCDestroy _) iworld
......
......@@ -56,6 +56,7 @@ allTaskInstances :: ROShared () [TaskInstance]
detachedTaskInstances :: ROShared () [TaskInstance] //Exclude sessions
taskInstanceByNo :: RWShared InstanceNo TaskInstance TaskAttributes
taskInstanceAttributesByNo :: RWShared InstanceNo TaskAttributes TaskAttributes
taskInstancesByAttribute :: ROShared (!String,!String) [TaskInstance] //Parameter is (key,value)
// Application
applicationName :: ReadOnlyShared String // Application name
......
......@@ -55,14 +55,14 @@ currentSessions ::ReadOnlyShared [TaskListItem Void]
currentSessions
= mapRead (map toTaskListItem) (toReadOnly (sdsFocus filter filteredInstanceIndex))
where
filter = {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Just True
filter = {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Just True,matchAttribute=Nothing
,includeConstants=True,includeProgress=True,includeAttributes=True}
currentProcesses ::ReadOnlyShared [TaskListItem Void]
currentProcesses
= mapRead (map toTaskListItem) (toReadOnly (sdsFocus filter filteredInstanceIndex))
where
filter = {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Just False
filter = {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Just False,matchAttribute=Nothing
,includeConstants=True,includeProgress=True,includeAttributes=True}
toTaskListItem :: !InstanceData -> TaskListItem a
......@@ -89,7 +89,7 @@ allTaskInstances :: ROShared () [TaskInstance]
allTaskInstances
= toReadOnly
(sdsProject (SDSLensRead readInstances) SDSNoWrite
(sdsFocus {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Nothing,includeConstants=True,includeProgress=True,includeAttributes=True} filteredInstanceIndex))
(sdsFocus {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Nothing,matchAttribute=Nothing,includeConstants=True,includeProgress=True,includeAttributes=True} filteredInstanceIndex))
where
readInstances is = Ok (map taskInstanceFromInstanceData is)
......@@ -97,7 +97,7 @@ detachedTaskInstances :: ROShared () [TaskInstance]
detachedTaskInstances
= toReadOnly
(sdsProject (SDSLensRead readInstances) SDSNoWrite
(sdsFocus {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Just False,includeConstants=True,includeProgress=True,includeAttributes=True} filteredInstanceIndex))
(sdsFocus {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Just False,matchAttribute=Nothing,includeConstants=True,includeProgress=True,includeAttributes=True} filteredInstanceIndex))
where
readInstances is = Ok (map taskInstanceFromInstanceData is)
......@@ -106,7 +106,7 @@ taskInstanceByNo
= sdsProject (SDSLensRead readItem) (SDSLensWrite writeItem)
(sdsTranslate "taskInstanceByNo" filter filteredInstanceIndex)
where
filter no = {InstanceFilter|onlyInstanceNo=Just [no],notInstanceNo=Nothing,onlySession=Nothing,includeConstants=True,includeProgress=True,includeAttributes=True}
filter no = {InstanceFilter|onlyInstanceNo=Just [no],notInstanceNo=Nothing,onlySession=Nothing,matchAttribute=Nothing,includeConstants=True,includeProgress=True,includeAttributes=True}
readItem [i] = Ok (taskInstanceFromInstanceData i)
readItem _ = Error (exception "Task instance not found")
......@@ -119,7 +119,7 @@ taskInstanceAttributesByNo
= sdsProject (SDSLensRead readItem) (SDSLensWrite writeItem)
(sdsTranslate "taskInstanceAttributesByNo" filter filteredInstanceIndex)
where
filter no = {InstanceFilter|onlyInstanceNo=Just [no],notInstanceNo=Nothing,onlySession=Nothing,includeConstants=False,includeProgress=False,includeAttributes=True}
filter no = {InstanceFilter|onlyInstanceNo=Just [no],notInstanceNo=Nothing,onlySession=Nothing,matchAttribute=Nothing,includeConstants=False,includeProgress=False,includeAttributes=True}
readItem [(_,_,_,Just a)] = Ok a
readItem _ = Error (exception "Task instance not found")
......@@ -127,6 +127,14 @@ where
writeItem [(n,c,p,_)] a = Ok (Just [(n,c,p,Just a)])
writeItem _ _ = Error (exception "Task instance not found")
taskInstancesByAttribute :: ROShared (!String,!String) [TaskInstance]
taskInstancesByAttribute
= toReadOnly
(sdsProject (SDSLensRead readInstances) SDSNoWrite
(sdsTranslate "taskInstancesByAttribute" (\p -> {InstanceFilter|onlyInstanceNo=Nothing,notInstanceNo=Nothing,onlySession=Nothing,matchAttribute=Just p,includeConstants=True,includeProgress=True,includeAttributes=True}) filteredInstanceIndex))
where
readInstances is = Ok (map taskInstanceFromInstanceData is)
currentTopTask :: ReadOnlyShared TaskId
currentTopTask = mapRead (\currentInstance -> TaskId currentInstance 0) currentInstanceShare
......
......@@ -252,7 +252,7 @@ where
eval event evalOpts (TCDestroy (TCParallel taskId ts taskTrees)) iworld=:{current}
//Mark all tasks as deleted and use the standar evaluation function to clean up
# taskListFilter = {TaskListFilter|onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=False,includeAttributes=False,includeProgress=False}
# (mbError,iworld) = modify (map (\pts -> {ParallelTaskState|pts & change=Just RemoveParallelTask})) (sdsFocus (taskId,taskListFilter) taskInstanceParallelTaskList) iworld
# (mbError,iworld) = modify (\ptss -> ((),map (\pts -> {ParallelTaskState|pts & change=Just RemoveParallelTask}) ptss)) (sdsFocus (taskId,taskListFilter) taskInstanceParallelTaskList) iworld
| mbError =:(Error _) = (ExceptionResult (fromError mbError),iworld)
# (mbResults,iworld) = evalParallelTasks taskId ('DM'.fromList taskTrees) event evalOpts conts [] [] iworld
= (DestroyedResult,iworld)
......@@ -326,7 +326,7 @@ evalParallelTasks listId taskTrees event evalOpts conts completed [] iworld
Nothing //We have evaluated all branches and nothing is added
//Remove all entries that are marked as removed from the list, they have been cleaned up by now
# taskListFilter = {TaskListFilter|onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=False,includeAttributes=False,includeProgress=False}
# (mbError,iworld) = modify (\l -> [x \\ x <- l | x.ParallelTaskState.change =!= Just RemoveParallelTask]) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
# (mbError,iworld) = modify (\l -> ((),[x \\ x <- l | x.ParallelTaskState.change =!= Just RemoveParallelTask])) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
| mbList =:(Error _) = (Error (fromError mbList),iworld)
= (Ok completed,iworld)
Just (_,(type,task),_) //Add extension
......@@ -335,13 +335,13 @@ evalParallelTasks listId taskTrees event evalOpts conts completed [] iworld
Ok (state,mbTask)
//Update the task list (TODO, be specific about what we are writing here)
# taskListFilter = {TaskListFilter|onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=True,includeAttributes=True,includeProgress=True}
# (mbError,iworld) = modify (\states -> states ++ [{ParallelTaskState|state & index = length states}]) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
# (mbError,iworld) = modify (\states -> ((),states ++ [{ParallelTaskState|state & index = length states}])) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
| mbError =:(Error _) = (liftError mbError,iworld)
# taskId = state.ParallelTaskState.taskId
//Store the task function
# (mbError,iworld) = write (snd (fromJust mbTask)) (sdsFocus taskId taskInstanceEmbeddedTask) iworld
| mbError =:(Error _) = (liftError mbError,iworld)
= evalParallelTasks listId taskTrees (RefreshEvent Nothing) evalOpts conts completed [state] iworld //Continue
= evalParallelTasks listId taskTrees (RefreshEvent Nothing "Refresh in new parallel branch") evalOpts conts completed [state] iworld //Continue
err = (liftError err, iworld)
todo = evalParallelTasks listId taskTrees event evalOpts conts completed todo iworld //Evaluate the remaining items
......@@ -354,7 +354,7 @@ evalParallelTasks listId taskTrees event evalOpts conts completed [{ParallelTask
# tree = fromMaybe (TCInit taskId taskTime) ('DM'.get taskId taskTrees)
//Evaluate or destroy branch
| change === Just RemoveParallelTask
# (result,iworld) = evala (RefreshEvent Nothing) {mkEvalOpts & noUI = True} (TCDestroy tree) iworld
# (result,iworld) = evala (RefreshEvent Nothing "Destroying parallel branch") {mkEvalOpts & noUI = True} (TCDestroy tree) iworld
//TODO: remove the task evaluation function
= evalParallelTasks listId taskTrees event evalOpts conts [result:completed] todo iworld
| otherwise
......@@ -393,9 +393,9 @@ evalParallelTasks listId taskTrees event evalOpts conts completed [{ParallelTask
# valueChanged = newValue =!= value
//Write updated value, and optionally the new lastFocus time to the tasklist
# (mbError,iworld) = if valueChanged
(modify (\pts -> {ParallelTaskState|pts & value = encode val, lastFocus = maybe pts.ParallelTaskState.lastFocus Just mbNewFocus})
(modify (\pts -> ((),{ParallelTaskState|pts & value = encode val, lastFocus = maybe pts.ParallelTaskState.lastFocus Just mbNewFocus}))
(sdsFocus (listId,taskId,True) taskInstanceParallelTaskListItem) iworld)
(modify (\pts -> {ParallelTaskState|pts & lastFocus = maybe pts.ParallelTaskState.lastFocus Just mbNewFocus})
(modify (\pts -> ((),{ParallelTaskState|pts & lastFocus = maybe pts.ParallelTaskState.lastFocus Just mbNewFocus}))
(sdsFocus (listId,taskId,False) taskInstanceParallelTaskListItem) iworld)
| mbError =:(Error _) = (Error (fromError mbError),iworld)
//Add the current result before checking for removals
......@@ -423,7 +423,7 @@ where
# (Task evala) = fromOk mbTask
//TODO: remove the task evaluation function
# evalOpts = {mkEvalOpts & noUI = True}
# (r,iworld) = evala (RefreshEvent Nothing) evalOpts (TCDestroy tree) iworld
# (r,iworld) = evala (RefreshEvent Nothing "Destroying removed parallel branch") evalOpts (TCDestroy tree) iworld
# (rs,iworld) = destroyRemoved removed rs iworld
= ([r:rs],iworld)
| otherwise
......@@ -498,7 +498,7 @@ where
= (Ok taskId, iworld)
//Update the task list
# taskListFilter = {onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=True,includeAttributes=True,includeProgress=True}
# (mbError,iworld) = modify (\states -> states ++ [{ParallelTaskState|state & index = nextIndex states}]) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
# (mbError,iworld) = modify (\states -> ((),states ++ [{ParallelTaskState|state & index = nextIndex states}])) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
| mbError =:(Error _) = (liftError mbError,iworld)
//If the task is an embedded one, we also need to store the task function
| mbTask =:(Just _)
......@@ -528,7 +528,7 @@ where
= (ValueResult (Value () True) {lastEvent=ts,removedTasks=[],refreshSensitive=False} (finalizeRep evalOpts NoRep) (TCStable taskId ts (DeferredJSONNode JSONNull)), iworld)
//Mark the task as removed, and update the indices of the tasks afterwards
# taskListFilter = {onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=True,includeAttributes=True,includeProgress=True}
# (mbError,iworld) = modify (markAsRemoved removeId) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
# (mbError,iworld) = modify (\xs -> ((),markAsRemoved removeId xs)) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
| mbError =:(Error _) = (ExceptionResult (fromError mbError),iworld)
//If it is a detached task, remove the detached instance, if it is embedded, pass notify the currently evaluating parallel
| taskNo == 0 //(if the taskNo equals zero the instance is embedded)
......@@ -574,7 +574,7 @@ where
| otherwise
# task = parTask (sdsTranslate "setTaskAndList" (\listFilter -> (listId,taskId,listFilter)) parallelTaskList)
# taskListFilter = {onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=True,includeAttributes=True,includeProgress=True}
# (mbError,iworld) = modify (scheduleReplacement replaceId task) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
# (mbError,iworld) = modify (\ts -> ((),scheduleReplacement replaceId task ts)) (sdsFocus (listId,taskListFilter) taskInstanceParallelTaskList) iworld
| mbError =:(Error _) = (ExceptionResult (fromError mbError),iworld)
= (ValueResult (Value () True) {lastEvent=ts,removedTasks=[],refreshSensitive=False} (finalizeRep evalOpts NoRep) (TCStable taskId ts (DeferredJSONNode JSONNull)), iworld)
eval _ evalOpts state=:(TCStable taskId ts _) iworld
......@@ -597,7 +597,7 @@ where
# listId = fromOk mbListId
| listId == TaskId 0 0
= (Ok (), iworld)
# (mbError,iworld) = modify (\pts -> {ParallelTaskState|pts & lastFocus = Just taskTime}) (sdsFocus (listId,focusId,False) taskInstanceParallelTaskListItem) iworld
# (mbError,iworld) = modify (\pts -> ((),{ParallelTaskState|pts & lastFocus = Just taskTime})) (sdsFocus (listId,focusId,False) taskInstanceParallelTaskListItem) iworld
| mbError =:(Error _) = (liftError mbError, iworld)
= (Ok (), iworld)
......@@ -611,7 +611,7 @@ where
//Just steal the instance, TODO, make stealing optional
# progress = {InstanceProgress|progress & attachedTo = [taskId:attachmentChain]}
# (_,iworld) = write progress (sdsFocus instanceNo taskInstanceProgress) iworld
# iworld = queueUrgentRefresh [instanceNo] ["attach of " <+++ instanceNo <+++ " requires refresh"] iworld
# iworld = queueRefresh [(instanceNo,"attach of " <+++ instanceNo <+++ " requires refresh")] iworld
= eval event evalOpts (TCBasic taskId ts JSONNull False) iworld
Error e
= (ExceptionResult e,iworld)
......@@ -690,7 +690,7 @@ where
# (resa,iworld)
= evala event (extendCallTrace taskId evalOpts) (TCDestroy treea) iworld
//Remove share from reduct
# (e,iworld) = modify (\shares -> 'DM'.del taskId shares) (sdsFocus instanceNo taskInstanceShares) iworld
# (e,iworld) = modify (\shares -> ((),'DM'.del taskId shares)) (sdsFocus instanceNo taskInstanceShares) iworld
| isError e
= (ExceptionResult (fromError e), iworld)
= (resa,iworld)
......
......@@ -9,8 +9,9 @@ import iTasks.API.Core.Client.Tasklet
import iTasks._Framework.UIDiff
import qualified iTasks._Framework.SDS as SDS
from Data.Map import qualified newMap, toList, get
from Data.Map import qualified newMap, toList, fromList, get
from Data.List import find
from Data.Queue as DQ import qualified newQueue, dequeue
import System.Time, Math.Random
import Text.JSON
......@@ -57,41 +58,41 @@ controllerFunc _ st=:{TaskState | sessionId, instanceNo, task, taskId = Nothing}
# (mbTaskId, iworld) = createClientTaskInstance task sessionId instanceNo iworld
= case mbTaskId of
Ok taskId
# (mbResult,iworld) = evalTaskInstance instanceNo (RefreshEvent Nothing) iworld
# (mbResult,iworld) = evalTaskInstance instanceNo (RefreshEvent Nothing "Client init") iworld
= case mbResult of
Ok (_,_,updates)
= (Just updates, {TaskState | st & taskId = Just taskId}, iworld)
Ok (_,_)
= (Nothing, {TaskState | st & taskId = Just taskId}, iworld)
_ = (Nothing, {TaskState | st & taskId = Just taskId}, iworld)
_ = (Nothing, st, iworld)
// Refresh
controllerFunc _ st=:{TaskState | sessionId, instanceNo, task, taskId = Just t} Nothing Nothing Nothing iworld
# (mbResult,iworld) = evalTaskInstance instanceNo (RefreshEvent Nothing) iworld
# (mbResult,iworld) = evalTaskInstance instanceNo (RefreshEvent Nothing "Client refresh") iworld
= case mbResult of
Ok (_,value,updates)
= (Just updates, {TaskState | st & value = Just value}, iworld)
Ok (_,value)
= (Nothing, {TaskState | st & value = Just value}, iworld)
Error msg = abort msg
// Focus
controllerFunc _ st=:{TaskState | sessionId, instanceNo, task, taskId = Just t} (Just eventNo) Nothing Nothing iworld
# iworld = trace_n "c_focus" iworld
# (mbResult,iworld) = evalTaskInstance instanceNo (FocusEvent eventNo t) iworld
= case mbResult of
Ok (_,value,updates)
= (Just updates, {TaskState | st & value = Just value}, iworld)
Ok (_,value)
= (Nothing, {TaskState | st & value = Just value}, iworld)
Error msg = abort msg
// Edit
controllerFunc taskId st=:{TaskState | sessionId, instanceNo} (Just eventNo) (Just name) (Just jsonval) iworld
# (mbResult,iworld) = evalTaskInstance instanceNo (EditEvent eventNo taskId name (fromString jsonval)) iworld
= case mbResult of
Ok (_,value,updates)
= (Just updates, {TaskState | st & value = Just value}, iworld)
Ok (_,value)
= (Nothing, {TaskState | st & value = Just value}, iworld)
Error msg = abort msg
// Action
controllerFunc taskId st=:{TaskState | sessionId, instanceNo} (Just eventNo) (Just name) Nothing iworld
# (mbResult,iworld) = evalTaskInstance instanceNo (ActionEvent eventNo taskId name) iworld
= case mbResult of
Ok (_,value,updates)
= (Just updates, {TaskState | st & value = Just value}, iworld)
Ok (_,value)
= (Nothing, {TaskState | st & value = Just value}, iworld)
Error msg = abort msg
newWorld :: *World
......@@ -99,17 +100,23 @@ newWorld = undef
getUIUpdates :: !*IWorld -> (!Maybe [(InstanceNo, [String])], *IWorld)
getUIUpdates iworld
= case 'SDS'.read taskOutput iworld of
(Ok uiUpdates,iworld)
= case 'Data.Map'.toList uiUpdates of
[] = (Nothing, iworld)
msgs
# (_,iworld) = 'SDS'.write 'Data.Map'.newMap taskOutput iworld
= (Just (map getUpdates msgs), iworld)
= case 'SDS'.read taskInstanceUIs iworld of
(Ok uiStates,iworld)
= case 'Data.Map'.toList uiStates of
[] = (Nothing,iworld)
states
# (_,iworld) = 'SDS'.write ('Data.Map'.fromList (map clearOutput states)) taskInstanceUIs iworld
= (Just (map getUpdates states), iworld)
(_,iworld)
= (Nothing, iworld)
where
getUpdates (instanceNo,upds) = (instanceNo, [toString (encodeUIUpdates upds)])
getUpdates (instanceNo,UIEnabled _ _ upds) = (instanceNo, [toString (encodeUIUpdates (toList upds))])
toList q = case 'DQ'.dequeue q of //TODO SHOULD BE IN Data.Queue
(Nothing,q) = []
(Just x,q) = [x:toList q]
clearOutput (instanceNo,UIEnabled version refUI _) = (instanceNo, UIEnabled version refUI 'DQ'.newQueue)
clearOutput state = state
createClientIWorld :: !String !InstanceNo -> *IWorld
createClientIWorld serverURL currentInstance
......@@ -145,7 +152,6 @@ createClientIWorld serverURL currentInstance
,cachedShares = 'Data.Map'.newMap
,exposedShares = 'Data.Map'.newMap
,jsCompilerState = locundef "jsCompilerState"
,refreshQueue = []
,shutdown = False
,random = genRandInt seed
,ioTasks = {done=[],todo=[]}
......
......@@ -3,15 +3,18 @@ implementation module iTasks._Framework.Engine
import StdMisc, StdArray, StdList, StdOrdList, StdTuple, StdChar, StdFile, StdBool, StdEnum
from StdFunc import o, seqList, ::St, const
from Data.Map import :: Map
from Data.Queue import :: Queue(..)
import qualified Data.Map as DM
import Data.Error, Data.Func, Data.Tuple, Math.Random, Internet.HTTP, Text, Text.Encodings.MIME, Text.Encodings.UrlEncoding
import System.Time, System.CommandLine, System.Environment, System.OSError, System.File, System.FilePath, System.Directory
import iTasks._Framework.Util, iTasks._Framework.HtmlUtil
import iTasks._Framework.IWorld, iTasks._Framework.WebService, iTasks._Framework.SDSService
import iTasks.API.Common.SDSCombinators
import qualified iTasks._Framework.SDS as SDS
CLEAN_HOME_VAR :== "CLEAN_HOME"
SESSION_TIMEOUT :== fromString "0000-00-00 00:10:00"
MAX_EVENTS :== 5
//The following modules are excluded by the SAPL -> Javascript compiler
//because they contain functions implemented in ABC code that cannot
......@@ -71,7 +74,7 @@ startEngine publishable world
// mark all instance as outdated initially
# iworld = queueAllPersistent iworld
//Start task server
# iworld = serve port (httpServer port keepalive (engine publishable) taskOutput) [BackgroundTask removeOutdatedSessions,BackgroundTask refreshTaskInstances] timeout iworld
# iworld = serve port (httpServer port keepalive (engine publishable) taskInstanceUIs) [BackgroundTask removeOutdatedSessions,BackgroundTask updateClocks, BackgroundTask (processEvents MAX_EVENTS)] timeout iworld
= finalizeIWorld iworld
where
infoline :: !String -> [String]
......@@ -120,7 +123,10 @@ where
= stringOpt key [v:r]
timeout :: !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout iworld = (Just 25, iworld) //Run 40 times a second, using blocking behaviour
timeout iworld = case 'SDS'.read taskEvents iworld of //Check if there are events in the queue
(Ok (Queue [] []),iworld) = (Just 100,iworld) //Empty queue, don't waste CPU, but refresh
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
//Read the content of the master instance index on disk to the "ti" field in the iworld
clearConnections :: !*IWorld -> *IWorld
......@@ -128,22 +134,15 @@ where
where
//When the server starts we make sure all have a blank connectedTo field
filter = {InstanceFilter|defaultValue & includeProgress = True}
clear index = [(n,c,Just {InstanceProgress|p & connectedTo = Nothing},a) \\(n,c,Just p,a) <-index]
clear index = ((),[(n,c,Just {InstanceProgress|p & connectedTo = Nothing},a) \\(n,c,Just p,a) <-index])
queueAllPersistent :: !*IWorld -> *IWorld
queueAllPersistent iworld
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & onlySession=Just False} filteredInstanceIndex) iworld
= case mbIndex of
Ok index = queueRefresh [instanceNo \\ (instanceNo,_,_,_)<- index] [] iworld
Ok index = queueRefresh [(instanceNo,"Persistent first refresh") \\ (instanceNo,_,_,_)<- index] iworld
_ = iworld
refreshTaskInstances :: !*IWorld -> *IWorld
refreshTaskInstances iworld
# iworld = updateClocks iworld
= case dequeueRefresh iworld of
(Just instanceNo,mbReason,iworld) = refreshTaskInstance instanceNo mbReason iworld
(_,_,iworld) = iworld
removeOutdatedSessions :: !*IWorld -> *IWorld
removeOutdatedSessions iworld
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & onlySession=Just True,includeProgress=True} filteredInstanceIndex) iworld
......@@ -159,14 +158,14 @@ where
//HACK FOR RUNNING BACKGROUND TASKS ON A CLIENT
background :: !*IWorld -> *IWorld
background iworld = removeOutdatedSessions (refreshTaskInstances iworld)
background iworld = (processEvents MAX_EVENTS o removeOutdatedSessions) iworld
// The iTasks engine consist of a set of HTTP request handlers
engine :: publish -> [(!String -> Bool
,!Bool
,!(HTTPRequest (Map InstanceNo [UIUpdate]) *IWorld -> (!HTTPResponse,!Maybe ConnectionType, !Maybe (Map InstanceNo [UIUpdate]), !*IWorld))
,!(HTTPRequest (Map InstanceNo [UIUpdate]) (Maybe {#Char}) ConnectionType *IWorld -> (![{#Char}], !Bool, !ConnectionType, !Maybe (Map InstanceNo [UIUpdate]), !*IWorld))
,!(HTTPRequest (Map InstanceNo [UIUpdate]) ConnectionType *IWorld -> (!Maybe (Map InstanceNo [UIUpdate]), !*IWorld))
,!(HTTPRequest (Map InstanceNo TIUIState) *IWorld -> (!HTTPResponse,!Maybe ConnectionType, !Maybe (Map InstanceNo TIUIState), !*IWorld))
,!(HTTPRequest (Map InstanceNo TIUIState) (Maybe {#Char}) ConnectionType *IWorld -> (![{#Char}], !Bool, !ConnectionType, !Maybe (Map InstanceNo TIUIState), !*IWorld))
,!(HTTPRequest (Map InstanceNo TIUIState) ConnectionType *IWorld -> (!Maybe (Map InstanceNo TIUIState), !*IWorld))
)] | Publishable publish
engine publishable
= taskHandlers (publishAll publishable) ++ defaultHandlers
......@@ -251,7 +250,6 @@ initIWorld mbSDKPath mbWebdirPaths mbStorePath mbSaplPath world
,cachedShares = 'DM'.newMap
,exposedShares = 'DM'.newMap
,jsCompilerState = (lst, ftmap, flavour, Nothing, 'DM'.newMap)
,refreshQueue = []
,shutdown = False
,ioTasks = {done = [], todo = []}
,ioStates = 'DM'.newMap
......@@ -316,9 +314,9 @@ where
simpleHTTPResponse ::
(!(String -> Bool),HTTPRequest *IWorld -> (!HTTPResponse,*IWorld))
->
(!(String -> Bool),!Bool,!(HTTPRequest (Map InstanceNo [UIUpdate]) *IWorld -> (HTTPResponse, Maybe loc, Maybe (Map InstanceNo [UIUpdate]) ,*IWorld))
,!(HTTPRequest (Map InstanceNo [UIUpdate]) (Maybe {#Char}) loc *IWorld -> (![{#Char}], !Bool, loc, Maybe (Map InstanceNo [UIUpdate]) ,!*IWorld))
,!(HTTPRequest (Map InstanceNo [UIUpdate]) loc *IWorld -> (!Maybe (Map InstanceNo [UIUpdate]),!*IWorld)))
(!(String -> Bool),!Bool,!(HTTPRequest (Map InstanceNo TIUIState) *IWorld -> (HTTPResponse, Maybe loc, Maybe (Map InstanceNo TIUIState) ,*IWorld))
,!(HTTPRequest (Map InstanceNo TIUIState) (Maybe {#Char}) loc *IWorld -> (![{#Char}], !Bool, loc, Maybe (Map InstanceNo TIUIState) ,!*IWorld))
,!(HTTPRequest (Map InstanceNo TIUIState) loc *IWorld -> (!Maybe (Map InstanceNo TIUIState),!*IWorld)))
simpleHTTPResponse (pred,responseFun) = (pred,True,initFun,dataFun,lostFun)
where
initFun req _ env
......
......@@ -12,7 +12,7 @@ from iTasks.API.Core.Types import :: Date, :: Time, :: DateTime, :: Con
from iTasks._Framework.UIDefinition import :: UIDef, :: UIControl, :: UIEditletOpts
from iTasks._Framework.UIDiff import :: UIUpdate, :: UIEditletDiffs, :: ReferenceVersion, :: MessageType
from iTasks._Framework.TaskState import :: ParallelTaskState, :: TIMeta, :: DeferredJSON
from iTasks._Framework.Task import :: TaskValue, :: ConnectionTask, :: BackgroundTask
from iTasks._Framework.Task import :: TaskValue, :: ConnectionTask, :: BackgroundTask, :: Event
from iTasks._Framework.SDS import :: SDSNotifyRequest, :: BasicShareId
from iTasks._Framework.SDS import :: RWShared, :: ReadWriteShared, :: Shared, :: JSONShared
......@@ -40,11 +40,8 @@ from TCPIP import :: TCP_Listener, :: TCP_Listener_, :: TCP_RChannel_, :: TCP_SC
,!Maybe ParserState // Some information collected by the parser for the code generator
,!Map InstanceNo (Set String)) // Per client information of the names of the already generated functions
, refreshQueue :: ![(!InstanceNo,!Maybe String)] // Instances that need refreshing (optionally with an explanation)
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, world :: !*World // The outside world
......@@ -136,12 +133,4 @@ iworldUTCTime :: Shared Time
//Update the clock shares
updateClocks :: !*IWorld -> *IWorld
getResponseExpiry :: !InstanceNo !*IWorld -> (!Maybe Int, !*IWorld)
/*
addUIUpdates :: !InstanceNo ![UIUpdate] !*IWorld -> *IWorld
popUIUpdates :: ![InstanceNo] !*IWorld -> (![(!InstanceNo,![UIUpdate])],!*IWorld)
clearUIUpdates :: !InstanceNo !*IWorld -> *IWorld
*/
instance FileSystem IWorld
......@@ -57,16 +57,6 @@ updateClocks iworld=:{IWorld|clocks,world}
# iworld = if (utcTime == clocks.utcTime) iworld (snd (write utcTime iworldUTCTime iworld))
= iworld
//Determine the expiration of request, thereby determining the poll interval of
//polling clients
REGULAR_EXPIRY :== 10000
FAST_EXPIRY :== 100
IMMEDIATE_EXPIRY :== 0
getResponseExpiry :: !InstanceNo !*IWorld -> (!Maybe Int, !*IWorld)
getResponseExpiry instanceNo iworld=:{refreshQueue=[]} = (Just REGULAR_EXPIRY,iworld)
getResponseExpiry instanceNo iworld=:{refreshQueue} = (Just FAST_EXPIRY,iworld)
//Wrapper instance for file access
instance FileSystem IWorld
where
......
......@@ -125,8 +125,9 @@ read :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r,
readRegister :: !TaskId !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld)
//Write an SDS (and queue evaluation of those task instances which contained tasks that registered for notification)
write :: !w !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
//Read followed by write
modify :: !(r -> w) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
//Read followed by write. The 'a' typed value is a result that is returned
modify :: !(r -> (!a,!w)) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException a, !*IWorld)
//Force notify (queue evaluation of task instances that registered for notification)
notify :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
......
......@@ -128,7 +128,7 @@ write w sds iworld
= case write` () w sds iworld of
(Ok notify, iworld)
# instanceNos = [no \\ (TaskId no _) <- 'Set'.toList notify]
# iworld = queueRefresh instanceNos [] iworld
# iworld = queueRefresh [(i,"Notification for write of " +++ sdsIdentity sds) \\ i <- instanceNos] iworld
# iworld = clearInstanceSDSRegistrations instanceNos iworld
= (Ok (), iworld)
(Error e,iworld) = (Error e,iworld)
......@@ -296,9 +296,11 @@ where
//In case of a type mismatch, just ignore (should not happen)
_ = (match,nomatch)
modify :: !(r -> w) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
modify :: !(r -> (!a,!w)) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException a, !*IWorld)
modify f sds iworld = case read sds iworld of
(Ok r,iworld) = write (f r) sds iworld
(Ok r,iworld) = let (a,w) = f r in case write w sds iworld of
(Ok (),iworld) = (Ok a,iworld)
(Error e,iworld) = (Error e, iworld)
(Error e,iworld) = (Error e,iworld)
notify :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
......
......@@ -5,6 +5,7 @@ from iTasks._Framework.IWorld import :: IWorld
from iTasks._Framework.Engine import :: ConnectionType
from iTasks._Framework.SDS import :: RWShared
from iTasks._Framework.Task import :: Task, :: InstanceNo
from iTasks._Framework.TaskState import :: TIUIState
from iTasks._Framework.UIDiff import :: UIUpdate
import iTasks._Framework.Generic
......@@ -13,9 +14,9 @@ import Data.Maybe, Data.Void, Data.Error, Text.JSON
sdsService :: (!(String -> Bool)
,!Bool
,!(HTTPRequest (Map InstanceNo