Commit 64dec19e authored by Bas Lijnse's avatar Bas Lijnse
Browse files

Moved engine background tasks to Internal.EngineTasks

parent 127a2d41
......@@ -22,6 +22,8 @@ import Internet.HTTP, System.Time, System.CommandLine, Data.Func
import iTasks.Internal.IWorld, iTasks.Internal.TaskEval, iTasks.Internal.TaskStore
import iTasks.Internal.Util
import iTasks.Internal.TaskServer
import iTasks.Internal.EngineTasks
from iTasks.Extensions.DateTime import toDate, toTime, instance == Date, instance == Time
from Data.Set import :: Set, newSet
......@@ -130,11 +132,11 @@ startEngineWithOptions initFun publishable world
# iworld = createIWorld (fromJust mbOptions) world
# (res,iworld) = initJSCompilerState iworld
| res =:(Error _) = show ["Fatal error: " +++ fromError res] (destroyIWorld iworld)
# iworld = serve [] (tcpTasks options.serverPort options.keepaliveTime) systemTasks timeout iworld
# iworld = serve [] (tcpTasks options.serverPort options.keepaliveTime) engineTasks timeout iworld
= destroyIWorld iworld
where
tcpTasks serverPort keepaliveTime = [(serverPort,httpServer serverPort keepaliveTime (engine publishable) allUIChanges)]
systemTasks =
tcpTasks serverPort keepaliveTime = [(serverPort,httpServer serverPort keepaliveTime (engineWebService publishable) allUIChanges)]
engineTasks =
[BackgroundTask updateClocks
,BackgroundTask (processEvents MAX_EVENTS)
,BackgroundTask removeOutdatedSessions
......@@ -174,108 +176,9 @@ timeout iworld = case 'SDS'.read taskEvents iworld of //Check if there are event
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
updateClocks :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
updateClocks iworld=:{IWorld|clocks,world}
//Determine current date and time
# (timestamp,world) = time world
# (local,world) = currentLocalDateTimeWorld world
# localDate = toDate local
localTime = toTime local
# (utc,world) = currentUTCDateTimeWorld world
# utcDate = toDate utc
utcTime = toTime utc
# iworld = {iworld & world = world}
//Write SDS's if necessary
# (mbe,iworld) = if (localDate == clocks.localDate) (Ok (),iworld) (write localDate iworldLocalDate iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (localTime == clocks.localTime) (Ok (),iworld) (write localTime iworldLocalTime iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (utcDate == clocks.utcDate) (Ok (),iworld) (write utcDate iworldUTCDate iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (utcTime == clocks.utcTime) (Ok (),iworld) (write utcTime iworldUTCTime iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (timestamp == clocks.timestamp) (Ok (),iworld) (write timestamp iworldTimestamp iworld)
| mbe =:(Error _) = (mbe,iworld)
= (Ok (),iworld)
//When we run the built-in HTTP server we need to do active garbage collection of instances that were created for sessions
removeOutdatedSessions :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
removeOutdatedSessions iworld=:{IWorld|options}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & onlySession=Just True} filteredInstanceIndex) iworld
= case mbIndex of
Ok index = checkAll removeIfOutdated index iworld
Error e = (Error e, iworld)
where
checkAll f [] iworld = (Ok (),iworld)
checkAll f [x:xs] iworld = case f x iworld of
(Ok (),iworld) = checkAll f xs iworld
(Error e,iworld) = (Error e,iworld)
removeIfOutdated (instanceNo,_,_,_) iworld=:{options={appVersion},clocks={timestamp}}
# (remove,iworld) = case read (sdsFocus instanceNo taskInstanceIO) iworld of
//If there is I/O information, we check that age first
(Ok (Just (client,Timestamp tInstance)),iworld) //No IO for too long, clean up
= (Ok ((tNow - tInstance) > options.EngineOptions.sessionTime),iworld)
//If there is no I/O information, get meta-data and check builtId and creation date
(Ok Nothing,iworld)
= case read (sdsFocus instanceNo taskInstanceConstants) iworld of
(Ok {InstanceConstants|build,issuedAt},iworld)
| build <> appVersion = (Ok True,iworld)
# (Timestamp tInstance) = issuedAt
| (tNow - tInstance) > options.EngineOptions.sessionTime = (Ok True,iworld)
= (Ok False,iworld)
(Error e,iworld)
= (Error e,iworld)
(Error e,iworld)
= (Error e,iworld)
= case remove of
(Ok True)
# (e,iworld) = deleteTaskInstance instanceNo iworld
| e=:(Error _) = (e,iworld)
# (e,iworld) = 'SDS'.write Nothing (sdsFocus instanceNo taskInstanceIO) iworld
| e=:(Error _) = (e,iworld)
= (Ok (),iworld)
(Ok False)
= (Ok (), iworld)
(Error e)
= (Error e,iworld)
where
(Timestamp tNow) = timestamp
//When the event queue is empty, write deferred SDS's
flushWritesWhenIdle:: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
flushWritesWhenIdle iworld = case read taskEvents iworld of
(Error e,iworld) = (Error e,iworld)
(Ok (Queue [] []),iworld) = flushDeferredSDSWrites iworld
(Ok _,iworld) = (Ok (),iworld)
//When we don't run the built-in HTTP server we don't want to loop forever so we stop the loop
//once all tasks are stable
stopOnStable :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
stopOnStable iworld=:{IWorld|shutdown}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & includeProgress=True} filteredInstanceIndex) iworld
= case mbIndex of
Ok index
# shutdown = case shutdown of
Nothing = if (allStable index) (Just (if (exceptionOccurred index) 1 0)) Nothing
_ = shutdown
= (Ok (), {IWorld|iworld & shutdown = shutdown})
Error e = (Error e, iworld)
where
allStable instances = all (\v -> v =: Stable || v =: Exception) (values instances)
exceptionOccurred instances = any (\v -> v =: Exception) (values instances)
values instances = [value \\ (_,_,Just {InstanceProgress|value},_) <- instances]
//HACK FOR RUNNING BACKGROUND TASKS ON A CLIENT
background :: !*IWorld -> *IWorld
background iworld
# iworld = snd (processEvents MAX_EVENTS iworld)
# iworld = snd (removeOutdatedSessions iworld)
= iworld
// The iTasks engine consist of a set of HTTP WebService
engine :: publish -> [WebService (Map InstanceNo (Queue UIChange)) (Map InstanceNo (Queue UIChange))] | Publishable publish
engine publishable = [taskUIService published, documentService, sdsService, staticResourceService [url \\ {PublishedTask|url} <- published]]
engineWebService :: publish -> [WebService (Map InstanceNo (Queue UIChange)) (Map InstanceNo (Queue UIChange))] | Publishable publish
engineWebService publishable = [taskUIService published, documentService, sdsService, staticResourceService [url \\ {PublishedTask|url} <- published]]
where
published = publishAll publishable
......
definition module iTasks.Internal.EngineTasks
/**
* This module defines the separate system tasks that the iTasks engine performs
*/
from iTasks.Internal.IWorld import :: IWorld
from iTasks.WF.Definition import :: TaskException
from Data.Error import :: MaybeError
from Data.Maybe import :: Maybe
from TCPIP import :: Timeout
timeout :: !*IWorld -> (!Maybe Timeout,!*IWorld)
updateClocks :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
removeOutdatedSessions :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
flushWritesWhenIdle:: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
stopOnStable :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
implementation module iTasks.Internal.EngineTasks
import StdBool, StdOverloaded
import iTasks.Engine
import iTasks.Internal.IWorld
import iTasks.WF.Definition
import iTasks.Internal.Util
import iTasks.Internal.SDS
import iTasks.Internal.TaskStore
import iTasks.SDS.Definition
import iTasks.SDS.Combinators.Common
from iTasks.Extensions.DateTime import toDate, toTime, instance == Date, instance == Time
from System.Time import time
from TCPIP import :: Timeout
import Data.Queue
timeout :: !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout iworld = case read taskEvents iworld of //Check if there are events in the queue
(Ok (Queue [] []),iworld) = (Just 10,iworld) //Empty queue, don't waste CPU, but refresh
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
updateClocks :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
updateClocks iworld=:{IWorld|clocks,world}
//Determine current date and time
# (timestamp,world) = time world
# (local,world) = currentLocalDateTimeWorld world
# localDate = toDate local
localTime = toTime local
# (utc,world) = currentUTCDateTimeWorld world
# utcDate = toDate utc
utcTime = toTime utc
# iworld = {iworld & world = world}
//Write SDS's if necessary
# (mbe,iworld) = if (localDate == clocks.localDate) (Ok (),iworld) (write localDate iworldLocalDate iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (localTime == clocks.localTime) (Ok (),iworld) (write localTime iworldLocalTime iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (utcDate == clocks.utcDate) (Ok (),iworld) (write utcDate iworldUTCDate iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (utcTime == clocks.utcTime) (Ok (),iworld) (write utcTime iworldUTCTime iworld)
| mbe =:(Error _) = (mbe,iworld)
# (mbe,iworld) = if (timestamp == clocks.timestamp) (Ok (),iworld) (write timestamp iworldTimestamp iworld)
| mbe =:(Error _) = (mbe,iworld)
= (Ok (),iworld)
//When we run the built-in HTTP server we need to do active garbage collection of instances that were created for sessions
removeOutdatedSessions :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
removeOutdatedSessions iworld=:{IWorld|options}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & onlySession=Just True} filteredInstanceIndex) iworld
= case mbIndex of
Ok index = checkAll removeIfOutdated index iworld
Error e = (Error e, iworld)
where
checkAll f [] iworld = (Ok (),iworld)
checkAll f [x:xs] iworld = case f x iworld of
(Ok (),iworld) = checkAll f xs iworld
(Error e,iworld) = (Error e,iworld)
removeIfOutdated (instanceNo,_,_,_) iworld=:{options={appVersion},clocks={timestamp}}
# (remove,iworld) = case read (sdsFocus instanceNo taskInstanceIO) iworld of
//If there is I/O information, we check that age first
(Ok (Just (client,Timestamp tInstance)),iworld) //No IO for too long, clean up
= (Ok ((tNow - tInstance) > options.EngineOptions.sessionTime),iworld)
//If there is no I/O information, get meta-data and check builtId and creation date
(Ok Nothing,iworld)
= case read (sdsFocus instanceNo taskInstanceConstants) iworld of
(Ok {InstanceConstants|build,issuedAt},iworld)
| build <> appVersion = (Ok True,iworld)
# (Timestamp tInstance) = issuedAt
| (tNow - tInstance) > options.EngineOptions.sessionTime = (Ok True,iworld)
= (Ok False,iworld)
(Error e,iworld)
= (Error e,iworld)
(Error e,iworld)
= (Error e,iworld)
= case remove of
(Ok True)
# (e,iworld) = deleteTaskInstance instanceNo iworld
| e=:(Error _) = (e,iworld)
# (e,iworld) = write Nothing (sdsFocus instanceNo taskInstanceIO) iworld
| e=:(Error _) = (e,iworld)
= (Ok (),iworld)
(Ok False)
= (Ok (), iworld)
(Error e)
= (Error e,iworld)
where
(Timestamp tNow) = timestamp
//When the event queue is empty, write deferred SDS's
flushWritesWhenIdle:: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
flushWritesWhenIdle iworld = case read taskEvents iworld of
(Error e,iworld) = (Error e,iworld)
(Ok (Queue [] []),iworld) = flushDeferredSDSWrites iworld
(Ok _,iworld) = (Ok (),iworld)
//When we don't run the built-in HTTP server we don't want to loop forever so we stop the loop
//once all tasks are stable
stopOnStable :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
stopOnStable iworld=:{IWorld|shutdown}
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & includeProgress=True} filteredInstanceIndex) iworld
= case mbIndex of
Ok index
# shutdown = case shutdown of
Nothing = if (allStable index) (Just (if (exceptionOccurred index) 1 0)) Nothing
_ = shutdown
= (Ok (), {IWorld|iworld & shutdown = shutdown})
Error e = (Error e, iworld)
where
allStable instances = all (\v -> v =: Stable || v =: Exception) (values instances)
exceptionOccurred instances = any (\v -> v =: Exception) (values instances)
values instances = [value \\ (_,_,Just {InstanceProgress|value},_) <- instances]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment