Commit 5578633c authored by Bas Lijnse's avatar Bas Lijnse

Merge branch 'master' into...

Merge branch 'master' into 224-older-applications-are-not-working-with-the-new-api-incidone-shipadventure
parents ed882256 815c8f7e
Pipeline #12435 passed with stage
in 3 minutes and 24 seconds
......@@ -37,6 +37,8 @@ from System.OS import IF_POSIX_OR_WINDOWS
import System.GetOpt
import Data.Functor
MAX_EVENTS :== 5
defaultEngineOptions :: !*World -> (!EngineOptions,!*World)
defaultEngineOptions world
# (appPath,world) = determineAppPath world
......@@ -118,10 +120,15 @@ startEngineWithOptions initFun publishable world
# iworld = createIWorld (fromJust mbOptions) world
# (res,iworld) = initJSCompilerState iworld
| res =:(Error _) = show ["Fatal error: " +++ fromError res] (destroyIWorld iworld)
# iworld = serve [TaskWrapper removeOutdatedSessions] (tcpTasks options.serverPort options.keepaliveTime) (timeout options.timeout) iworld
# iworld = serve [] (tcpTasks options.serverPort options.keepaliveTime) engineTasks (timeout options.timeout) iworld
= destroyIWorld iworld
where
tcpTasks serverPort keepaliveTime = [(serverPort,httpServer serverPort keepaliveTime (engineWebService publishable) taskOutput)]
engineTasks =
[BackgroundTask updateClock
,BackgroundTask (processEvents MAX_EVENTS)
,BackgroundTask removeOutdatedSessions
,BackgroundTask flushWritesWhenIdle]
runTasks :: a !*World -> *World | Runnable a
runTasks tasks world = runTasksWithOptions (\c o -> (Just o,[])) tasks world
......@@ -137,8 +144,13 @@ runTasksWithOptions initFun runnable world
# iworld = createIWorld options world
# (res,iworld) = initJSCompilerState iworld
| res =:(Error _) = show ["Fatal error: " +++ fromError res] (destroyIWorld iworld)
# iworld = serve (toRunnable runnable) [] (timeout options.timeout) iworld
# iworld = serve (toRunnable runnable) [] systemTasks (timeout options.timeout) iworld
= destroyIWorld iworld
where
systemTasks =
[BackgroundTask updateClock
,BackgroundTask (processEvents MAX_EVENTS)
,BackgroundTask stopOnStable]
show :: ![String] !*World -> *World
show lines world
......
......@@ -143,7 +143,7 @@ createClientIWorld serverURL currentInstance
,attachmentChain = []
,nextTaskNo = 6666
}
,sdsNotifyRequests = []
,sdsNotifyRequests = 'Data.Map'.newMap
,memoryShares = 'Data.Map'.newMap
,readCache = 'Data.Map'.newMap
,writeCache = 'Data.Map'.newMap
......
......@@ -7,10 +7,13 @@ from iTasks.WF.Definition import :: TaskException
from Data.Error import :: MaybeError
from Data.Maybe import :: Maybe
from TCPIP import :: Timeout
from iTasks.WF.Definition import :: Task
timeout :: !(Maybe Timeout) !*IWorld -> (!Maybe Timeout,!*IWorld)
removeOutdatedSessions :: Task ()
updateClock :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
stopOnStable :: Task ()
removeOutdatedSessions :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
flushWritesWhenIdle:: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
stopOnStable :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
implementation module iTasks.Internal.EngineTasks
import StdBool, StdOverloaded, StdList, StdOrdList
import qualified Data.Map as DM
import qualified Data.Set as DS
import Data.List
import Data.Functor, Data.Func
import iTasks.Engine
import iTasks.Internal.IWorld
import iTasks.WF.Definition
......@@ -9,7 +13,6 @@ import iTasks.Internal.SDS
import iTasks.Internal.TaskStore
import iTasks.SDS.Definition
import iTasks.SDS.Combinators.Common
import iTasks
from iTasks.Extensions.DateTime import toDate, toTime, instance == Date, instance == Time
from System.Time import time
......@@ -24,8 +27,7 @@ timeout mt iworld = case read taskEvents iworld of
//No events
(Ok (Queue [] []),iworld=:{sdsNotifyRequests,world})
# (ts, world) = nsTime world
# to = minListBy lesser [mt:map (getTimoutFromClock ts) sdsNotifyRequests]
= ( minListBy lesser [mt:map (getTimoutFromClock ts) sdsNotifyRequests]
= ( minListBy lesser [mt:flatten $ map (getTimeoutFromClock ts) $ 'DM'.elems sdsNotifyRequests]
, {iworld & world = world})
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
......@@ -34,38 +36,53 @@ where
lesser (Just _) Nothing = True
lesser Nothing Nothing = False
getTimoutFromClock :: Timespec SDSNotifyRequest -> Maybe Int
getTimoutFromClock now snr=:{cmpParam=(ts :: ClockParameter Timespec)}
| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
# fire = iworldTimespecNextFire now snr.reqTimespec ts
= Just (max 0 (toMs fire - toMs now))
= mt
getTimoutFromClock _ _ = mt
getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
getTimeoutFromClock now requests = getTimeoutFromClock` <$> 'DM'.toList requests
where
getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
# fire = iworldTimespecNextFire now reqTimespec ts
= Just (max 0 (toMs fire - toMs now))
= mt
getTimeoutFromClock` _ = mt
toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000
updateClock :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
updateClock iworld=:{IWorld|clock,world}
//Determine current date and time
# (timespec,world) = nsTime world
# iworld = {iworld & world = world}
//Write SDS if necessary
# (mbe,iworld) = write timespec (sdsFocus {start=zero,interval=zero} iworldTimespec) iworld
| mbe =:(Error _) = (mbe,iworld)
= (Ok (),iworld)
//When we run the built-in HTTP server we need to do active garbage collection of instances that were created for sessions
removeOutdatedSessions :: Task ()
removeOutdatedSessions = whileUnchanged (sdsFocus {start=Timestamp 0,interval=Timestamp 1} iworldTimestamp)
\_->get (sdsFocus {InstanceFilter|defaultValue & onlySession=Just True} filteredInstanceIndex)
>>- mkInstantTask o const o checkAll removeIfOutdated
removeOutdatedSessions :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
removeOutdatedSessions iworld=:{IWorld|options}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & onlySession=Just True} filteredInstanceIndex) iworld
= case mbIndex of
Ok index = checkAll removeIfOutdated index iworld
Error e = (Error e, iworld)
where
checkAll f [] iworld = (Ok (),iworld)
checkAll f [x:xs] iworld = case f x iworld of
(Ok (),iworld) = checkAll f xs iworld
(Error e,iworld) = (Error e,iworld)
removeIfOutdated (instanceNo,_,_,_) iworld=:{options={appVersion,sessionTime},clock=tNow}
removeIfOutdated (instanceNo,_,_,_) iworld=:{options={appVersion},clock=tNow}
# (remove,iworld) = case read (sdsFocus instanceNo taskInstanceIO) iworld of
//If there is I/O information, we check that age first
(Ok (Just (client,tInstance)),iworld) //No IO for too long, clean up
= (Ok ((tNow - tInstance) > sessionTime),iworld)
= (Ok ((tNow - tInstance) > options.EngineOptions.sessionTime),iworld)
//If there is no I/O information, get meta-data and check builtId and creation date
(Ok Nothing,iworld)
= case read (sdsFocus instanceNo taskInstanceConstants) iworld of
(Ok {InstanceConstants|build,issuedAt=tInstance},iworld)
| build <> appVersion = (Ok True,iworld)
| (tNow - tInstance) > sessionTime = (Ok True,iworld)
| (tNow - tInstance) > options.EngineOptions.sessionTime = (Ok True,iworld)
= (Ok False,iworld)
(Error e,iworld)
= (Error e,iworld)
......@@ -92,15 +109,19 @@ flushWritesWhenIdle iworld = case read taskEvents iworld of
//When we don't run the built-in HTTP server we don't want to loop forever so we stop the loop
//once all tasks are stable
stopOnStable :: Task ()
stopOnStable = get (sdsFocus {InstanceFilter|defaultValue & includeProgress=True} filteredInstanceIndex)
>>- \index->mkInstantTask \tid iworld=:{shutdown}->case shutdown of
Just _ = (Ok (), iworld)
_ = (Ok (), {iworld & shutdown=
if (allStable index)
(Just (if (exceptionOccurred index) 1 0))
Nothing})
stopOnStable :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
stopOnStable iworld=:{IWorld|shutdown}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & includeProgress=True} filteredInstanceIndex) iworld
= case mbIndex of
Ok index
# shutdown = case shutdown of
Nothing = if (allStable index) (Just (if (exceptionOccurred index) 1 0)) Nothing
_ = shutdown
= (Ok (), {IWorld|iworld & shutdown = shutdown})
Error e = (Error e, iworld)
where
allStable instances = all (\v -> v =: Stable || v =: (Exception _)) (values instances)
exceptionOccurred instances = any (\v -> v =: (Exception _)) (values instances)
values instances = [value \\ (_,_,Just {InstanceProgress|value},_) <- instances]
......@@ -18,7 +18,7 @@ from iTasks.Internal.TaskEval import :: TaskTime
from iTasks.WF.Definition import :: TaskValue, :: Event, :: TaskId, :: InstanceNo, :: TaskNo
from iTasks.WF.Combinators.Core import :: ParallelTaskType, :: TaskListItem
from iTasks.SDS.Definition import :: SDS, :: RWShared, :: ReadWriteShared, :: Shared, :: ReadOnlyShared
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared, :: DeferredWrite
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared, :: DeferredWrite, :: SDSIdentity
from iTasks.Extensions.DateTime import :: Time, :: Date, :: DateTime
from Sapl.Linker.LazyLinker import :: LoaderState
......@@ -29,23 +29,23 @@ from TCPIP import :: TCP_Listener, :: TCP_Listener_, :: TCP_RChannel_, :: TCP_SC
CLEAN_HOME_VAR :== "CLEAN_HOME"
:: *IWorld = { options :: !EngineOptions // Engine configuration
, clock :: !Timespec // Server side clock
, current :: !TaskEvalState // Shared state during task evaluation
:: *IWorld = { options :: !EngineOptions // Engine configuration
, clock :: !Timespec // Server side clock
, current :: !TaskEvalState // Shared state during task evaluation
, random :: [Int] // Infinite random stream
, random :: [Int] // Infinite random stream
, sdsNotifyRequests :: ![SDSNotifyRequest] // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, sdsNotifyRequests :: !Map SDSIdentity (Map SDSNotifyRequest Timespec) // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, world :: !*World // The outside world
, world :: !*World // The outside world
//Experimental database connection cache
, resources :: *[*Resource]
......@@ -77,10 +77,11 @@ CLEAN_HOME_VAR :== "CLEAN_HOME"
:: *IOTaskInstance
= ListenerInstance !ListenerInstanceOpts !*TCP_Listener
| ConnectionInstance !ConnectionInstanceOpts !*TCP_DuplexChannel
| BackgroundInstance !BackgroundInstanceOpts !BackgroundTask
:: ListenerInstanceOpts =
{ taskId :: !TaskId //Reference to the task that created the listener
, nextConnectionId :: !ConnectionId
, nextConnectionId :: !ConnectionId
, port :: !Int
, connectionTask :: !ConnectionTask
, removeOnClose :: !Bool //If this flag is set, states of connections accepted by this listener are removed when the connection is closed
......@@ -96,6 +97,12 @@ CLEAN_HOME_VAR :== "CLEAN_HOME"
:: ConnectionId :== Int
:: BackgroundInstanceOpts =
{ bgInstId :: !BackgroundTaskId
}
:: BackgroundTaskId :== Int
:: IOStates :== Map TaskId IOState
:: IOState
= IOActive !(Map ConnectionId (!Dynamic,!Bool)) // Bool: stability
......@@ -138,8 +145,6 @@ destroyIWorld :: !*IWorld -> *World
}
iworldTimespec :: SDS (ClockParameter Timespec) Timespec Timespec
iworldTimestamp :: SDS (ClockParameter Timestamp) Timestamp Timestamp
/*
* Calculate the next fire for the given timespec
*
......@@ -149,7 +154,7 @@ iworldTimestamp :: SDS (ClockParameter Timestamp) Timestamp Timestamp
* @result time to fire next
*/
iworldTimespecNextFire :: Timespec Timespec (ClockParameter Timespec) -> Timespec
iworldTimestamp :: SDS (ClockParameter Timestamp) Timestamp Timestamp
iworldLocalDateTime :: ReadOnlyShared DateTime
iworldLocalDateTime` :: !*IWorld -> (!DateTime, !*IWorld)
......
......@@ -71,7 +71,7 @@ createIWorld options world
,attachmentChain = []
,nextTaskNo = 0
}
,sdsNotifyRequests = []
,sdsNotifyRequests = 'DM'.newMap
,memoryShares = 'DM'.newMap
,readCache = 'DM'.newMap
,writeCache = 'DM'.newMap
......
......@@ -13,16 +13,16 @@ import iTasks.SDS.Definition
//Notification requests are stored in the IWorld
:: SDSNotifyRequest =
{ reqTaskId :: TaskId //Id of the task that read the SDS. This Id also connects a chain of notify requests that were registered together
, reqSDSId :: SDSIdentity //Id of the actual SDS used to create this request (may be a derived one)
, reqTimespec :: Timespec
{ reqTaskId :: !TaskId //Id of the task that read the SDS. This Id also connects a chain of notify requests that were registered together
, reqSDSId :: !SDSIdentity //Id of the actual SDS used to create this request (may be a derived one)
, cmpSDSId :: SDSIdentity //Id of the SDS we are saving for comparison
, cmpParam :: Dynamic //Parameter we are saving for comparison
, cmpParamText :: String //String version of comparison parameter for tracing
, cmpParam :: !Dynamic //Parameter we are saving for comparison
, cmpParamText :: !String //String version of comparison parameter for tracing
}
:: SDSIdentity :== String
instance < SDSNotifyRequest
:: DeferredWrite = E. p r w: DeferredWrite !p !w !(SDS p r w) & iTask p & TC r & TC w
//Internal creation functions:
......
implementation module iTasks.Internal.SDS
from StdFunc import const
import StdString, StdTuple, StdMisc, StdList, StdBool
import StdString, StdTuple, StdMisc, StdList, StdBool, StdFunc
from Data.Map import :: Map
import qualified Data.Map as DM
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text, Text.GenJSON
......@@ -77,8 +77,13 @@ mbRegister :: !p !(RWShared p r w) !(Maybe TaskId) !SDSIdentity !*IWorld -> *IWo
mbRegister p sds Nothing reqSDSId iworld = iworld
mbRegister p sds (Just taskId) reqSDSId iworld=:{IWorld|sdsNotifyRequests,world}
# (ts, world) = nsTime world
# req = {SDSNotifyRequest|reqTimespec=ts,reqTaskId=taskId,reqSDSId=reqSDSId,cmpSDSId=sdsIdentity sds,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= {iworld & world=world, sdsNotifyRequests = [req:sdsNotifyRequests]}
# req = {SDSNotifyRequest|reqTaskId=taskId,reqSDSId=reqSDSId,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= { iworld
& world = world
, sdsNotifyRequests = 'DM'.alter (Just o maybe ('DM'.singleton req ts) ('DM'.put req ts))
(sdsIdentity sds)
sdsNotifyRequests
}
read` :: !p !(Maybe TaskId) !SDSIdentity !(RWShared p r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | iTask p & TC r
read` p mbNotify reqSDSId sds=:(SDSSource {SDSSource|read}) env
......@@ -340,13 +345,14 @@ checkRegistrations sdsId pred iworld
= (match,nomatch,iworld)
where
//Find all notify requests for the given share id
lookupRegistrations sdsId iworld=:{sdsNotifyRequests}
= ([reg \\ reg=:{SDSNotifyRequest|cmpSDSId} <- sdsNotifyRequests | cmpSDSId == sdsId],iworld)
lookupRegistrations :: String !*IWorld -> (![(!SDSNotifyRequest, !Timespec)], !*IWorld)
lookupRegistrations sdsId iworld=:{sdsNotifyRequests} =
('DM'.toList $ 'DM'.findWithDefault 'DM'.newMap sdsId sdsNotifyRequests, iworld)
//Match the notify requests against the predicate to determine two sets:
//The registrations that matched the predicate, and those that did not match the predicate
matchRegistrations pred [] = ('Set'.newSet,'Set'.newSet)
matchRegistrations pred [{SDSNotifyRequest|reqTimespec,reqTaskId,cmpParam}:regs]
matchRegistrations pred [({SDSNotifyRequest|reqTaskId,cmpParam}, reqTimespec):regs]
# (match,nomatch) = matchRegistrations pred regs
= case cmpParam of
(p :: p^) = if (pred reqTimespec p)
......@@ -371,13 +377,25 @@ queueNotifyEvents sdsId notify iworld
clearTaskSDSRegistrations :: !(Set TaskId) !*IWorld -> *IWorld
clearTaskSDSRegistrations taskIds iworld=:{IWorld|sdsNotifyRequests}
= {iworld & sdsNotifyRequests = [r \\ r=:{SDSNotifyRequest|reqTaskId} <- sdsNotifyRequests | not ('Set'.member reqTaskId taskIds)]}
= {iworld & sdsNotifyRequests = 'DM'.foldlWithKey clearRegistrationRequests 'DM'.newMap sdsNotifyRequests}
where
clearRegistrationRequests :: (Map SDSIdentity (Map SDSNotifyRequest Timespec))
SDSIdentity
(Map SDSNotifyRequest Timespec)
-> Map SDSIdentity (Map SDSNotifyRequest Timespec)
clearRegistrationRequests notifyRequests sdsIdentity requests
| 'DM'.null filteredRequests = notifyRequests
| otherwise = 'DM'.put sdsIdentity filteredRequests notifyRequests
where
filteredRequests = 'DM'.filterWithKey (\req _ -> not $ 'Set'.member req.reqTaskId taskIds) requests
listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList (foldr addReg 'DM'.newMap sdsNotifyRequests),iworld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList ('DM'.foldrWithKey addRegs 'DM'.newMap sdsNotifyRequests),iworld)
where
addReg {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _),cmpSDSId} list
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
addRegs cmpSDSId reqs list = 'DM'.foldlWithKey addReg list reqs
where
addReg list {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _)} _
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatSDSRegistrationsList list
......@@ -433,5 +451,8 @@ newURL iworld=:{IWorld|options={serverUrl},random}
// TODO: different URL for clients
getURLbyId :: !String !*IWorld -> (!String, !*IWorld)
getURLbyId sdsId iworld=:{IWorld|options={serverUrl},random}
= ("sds:" +++ serverUrl +++ "/" +++ sdsId, iworld)
= ("sds:" +++ serverUrl +++ "/" +++ sdsId, iworld)
// some efficient order to be able to put notify requests in sets
instance < SDSNotifyRequest where
< x y = (x.reqTaskId, x.reqSDSId, x.cmpParamText) < (y.reqTaskId, y.reqSDSId, y.cmpParamText)
......@@ -10,13 +10,21 @@ from Data.Error import :: MaybeError
from iTasks.WF.Definition import :: TaskId
from iTasks.Internal.Task import :: ConnectionTask, :: TaskException
from iTasks.Internal.IWorld import :: IWorld
from iTasks.Internal.IWorld import :: IWorld, :: BackgroundTaskId
from iTasks.Internal.Task import :: ConnectionTask, :: BackgroundTask, :: TaskException
from iTasks.Engine import :: TaskWrapper
//Core task server loop
serve :: ![TaskWrapper] ![(!Int,!ConnectionTask)] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve :: ![TaskWrapper] ![(!Int,!ConnectionTask)] ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
//Dynamically add a listener
addListener :: !TaskId !Int !Bool !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
//Dynamically add a connection
addConnection :: !TaskId !String !Int !ConnectionTask !*IWorld -> (!MaybeError TaskException Dynamic,!*IWorld)
//Dynamically add a background task
addBackgroundTask :: !BackgroundTask !*IWorld -> (!MaybeError TaskException BackgroundTaskId,!*IWorld)
//Dynamically remove a background task
removeBackgroundTask :: !BackgroundTaskId !*IWorld -> (!MaybeError TaskException (),!*IWorld)
......@@ -18,28 +18,27 @@ from iTasks.Internal.TaskStore import queueRefresh
import iTasks.WF.Tasks.IO
import iTasks.SDS.Combinators.Common
MAX_EVENTS :== 5
//Helper type that holds the mainloop instances during a select call
//in these mainloop instances the unique listeners and read channels
//have been temporarily removed.
:: *IOTaskInstanceDuringSelect
= ListenerInstanceDS !ListenerInstanceOpts
| ConnectionInstanceDS !ConnectionInstanceOpts !*TCP_SChannel
| BackgroundInstanceDS !BackgroundInstanceOpts !BackgroundTask
serve :: ![TaskWrapper] ![(!Int,!ConnectionTask)] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve its cts determineTimeout iworld
= loop determineTimeout (init its cts iworld)
serve :: ![TaskWrapper] ![(!Int,!ConnectionTask)] ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve its cts bts determineTimeout iworld
= loop determineTimeout (init its cts bts iworld)
init :: ![TaskWrapper] ![(!Int,!ConnectionTask)] !*IWorld -> *IWorld
init its cts iworld
init :: ![TaskWrapper] ![(!Int,!ConnectionTask)] ![BackgroundTask] !*IWorld -> *IWorld
init its cts bts iworld
// Check if the initial tasks have been added already
# iworld = createInitialInstances its iworld
// All persistent task instances should receive a reset event to continue their work
# iworld = createInitialInstances its iworld
// All persistent task instances should receive a reset event to continue their work
# iworld=:{IWorld|ioTasks,world} = queueAll iworld
# (listeners,world) = connectAll cts world
# ioStates = 'DM'.fromList [(TaskId 0 0, IOActive 'DM'.newMap)]
= {iworld & ioTasks = {done=[],todo=listeners}, ioStates=ioStates, world=world}
= {iworld & ioTasks = {done=[],todo=listeners ++ map (BackgroundInstance {bgInstId=0}) bts}, ioStates = ioStates, world = world}
where
createInitialInstances :: [TaskWrapper] !*IWorld -> *IWorld
createInitialInstances its iworld
......@@ -80,33 +79,15 @@ loop determineTimeout iworld
# (mbTimeout,iworld=:{IWorld|ioTasks={todo},world}) = determineTimeout iworld
//Check which mainloop tasks have data available
# (todo,chList,world) = select mbTimeout todo world
//Write the clock
# (timespec, world) = nsTime world
# (mbe, iworld) = write timespec (sdsFocus {start=zero,interval=zero} iworldTimespec) {iworld & world=world, ioTasks = {done=[],todo=todo}}
| mbe =:(Error _) = iworld
//Process the select result
# iworld =:{shutdown,ioTasks={done}} = process 0 chList iworld
//Move everything from the done list back to the todo list and process events
# (mbe, iworld) = processEvents MAX_EVENTS {iworld & ioTasks={todo = reverse done,done=[]}}
| mbe =:(Error _) = abort "Error in event processing"
# iworld =:{shutdown,ioTasks={done}} = process 0 chList {iworld & ioTasks = {done=[],todo=todo}, world = world}
//Move everything from the done list back to the todo list
# iworld = {iworld & ioTasks={todo = reverse done,done=[]}}
//Everything needs to be re-evaluated
= case shutdown of
(Just exitCode) = halt exitCode iworld
_ = loop determineTimeout iworld
processEvents :: !Int *IWorld -> *(!MaybeError TaskException (), !*IWorld)
processEvents max iworld
| max <= 0 = (Ok (), iworld)
| otherwise
= case dequeueEvent iworld of
(Nothing,iworld) = (Ok (),iworld)
(Just (instanceNo,event),iworld)
= case evalTaskInstance instanceNo event iworld of
(Ok taskValue,iworld)
= processEvents (max - 1) iworld
(Error msg,iworld=:{IWorld|world})
= (Ok (),{IWorld|iworld & world = world})
select :: (Maybe Timeout) *[IOTaskInstance] *World -> (!*[IOTaskInstance],![(Int,SelectResult)],!*World)
select mbTimeout mlInstances world
# (empty,listeners,rChannels,mlInstances) = toSelectSet mlInstances
......@@ -127,6 +108,7 @@ toSelectSet [i:is]
= case i of
ListenerInstance opts l = (False,[l:ls],rs,[ListenerInstanceDS opts:is])
ConnectionInstance opts {rChannel,sChannel} = (False,ls,[rChannel:rs],[ConnectionInstanceDS opts sChannel:is])
BackgroundInstance opts bt = (e,ls,rs,[BackgroundInstanceDS opts bt:is])
/* Restore the list of main loop instances.
In the same pass also update the indices in the select result to match the
......@@ -161,6 +143,10 @@ where
| otherwise
# (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs [(c,what):ch] is
= ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],ch)
//Background tasks
fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls rs ch [BackgroundInstanceDS opts bt:is]
# (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners numSeenReceivers ls rs ch is
= ([BackgroundInstance opts bt:is],ch)
ulength [] = (0,[])
ulength [x:xs]
......@@ -248,6 +234,11 @@ process i chList iworld=:{ioTasks={done, todo=[ConnectionInstance opts duplexCha
where
(ConnectionTask handlers sds) = opts.ConnectionInstanceOpts.connectionTask
process i chList iworld=:{ioTasks={done,todo=[BackgroundInstance opts bt=:(BackgroundTask eval):todo]}}
# (mbe,iworld=:{ioTasks={done,todo}}) = eval {iworld & ioTasks = {done=done,todo=todo}}
| mbe =: (Error _) = abort (snd (fromError mbe)) //TODO Handle the error without an abort
= process (i+1) chList {iworld & ioTasks={done=[BackgroundInstance opts bt:done],todo=todo}}
process i chList iworld=:{ioTasks={done,todo=[t:todo]}}
= process (i+1) chList {iworld & ioTasks={done=[t:done],todo=todo}}
......@@ -506,6 +497,28 @@ addIOTask taskId sds init ioOps onInitHandler mkIOTaskInstance iworld
# {done, todo} = iworld.ioTasks
= (Ok l, {iworld & ioStates = ioStates, ioTasks = {done = [mkIOTaskInstance initInfo ioChannels : done], todo = todo}})
//Dynamically add a background task
addBackgroundTask :: !BackgroundTask !*IWorld -> (!MaybeError TaskException BackgroundTaskId,!*IWorld)
addBackgroundTask bt iworld=:{ioTasks={done,todo}}
# (todo, i) = appSnd (\is->1 + maxList is) (unzip (map transform todo))
# todo = todo ++ [BackgroundInstance {BackgroundInstanceOpts|bgInstId=i} bt]
= (Ok i, {iworld & ioTasks={done=done, todo=todo}})
where
transform a=:(BackgroundInstance {bgInstId} _) = (a, bgInstId)
transform a = (a, 1)
//Dynamically remove a background task
removeBackgroundTask :: !BackgroundTaskId !*IWorld -> (!MaybeError TaskException (),!*IWorld)
removeBackgroundTask btid iworld=:{ioTasks={done,todo}}
//We filter the tasks and use the boolean state to hold whether a task was dropped
# (r, todo) = foldr (\e (b, l)->let (b`, e`)=drop e in (b` || b, if b` l [e`:l])) (False, []) todo
# iworld = {iworld & ioTasks={done=done, todo=todo}}
| not r = (Error (exception "No backgroundtask with that id"), iworld)
= (Ok (), iworld)
where
drop a=:(BackgroundInstance {bgInstId} _) = (bgInstId == btid, a)
drop a = (False, a)
checkSelect :: !Int ![(!Int,!SelectResult)] -> (!Maybe SelectResult,![(!Int,!SelectResult)])
checkSelect i chList =:[(who,what):ws] | (i == who) = (Just what,ws)
checkSelect i chList = (Nothing,chList)
......@@ -521,3 +534,5 @@ halt exitCode iworld=:{ioTasks={todo=[ConnectionInstance _ {rChannel,sChannel}:t
# world = closeRChannel rChannel world
# world = closeChannel sChannel world
= halt exitCode {iworld & ioTasks = {todo=todo,done=done}}
halt exitCode iworld=:{ioTasks={todo=[BackgroundInstance _ _ :todo],done},world}
= halt exitCode {iworld & ioTasks= {todo=todo,done=done}}
......@@ -937,6 +937,7 @@ extractDownstreamChange (lui,moves)
(Just (InsertChild ui), Just lui) = (ReplaceUI ui,(lui,moves))
(Just RemoveChild, Just lui) = (ReplaceUI (UI UIEmpty 'DM'.newMap []),(lui,moves))