Commit ce5ea146 authored by Bas Lijnse's avatar Bas Lijnse

Added the possibility to run an iTasks event loop with a list of tasks that...

Added the possibility to run an iTasks event loop with a list of tasks that are immediately created and evaluated
parent e6e59db7
......@@ -20,7 +20,7 @@ MAX_EVENTS :== 5
:: PublishedTask =
{ url :: String
, task :: TaskWrapper
, task :: WebTaskWrapper
}
:: ServerOptions =
......@@ -33,7 +33,8 @@ MAX_EVENTS :== 5
, saplDirPath :: Maybe FilePath
}
:: TaskWrapper = E.a: TaskWrapper (HTTPRequest -> Task a) & iTask a
:: WebTaskWrapper = E.a: WebTaskWrapper (HTTPRequest -> Task a) & iTask a
:: TaskWrapper = E.a: TaskWrapper (Task a) & iTask a
/**
* Starts the task engine with a list of published task definitions.
......@@ -75,6 +76,18 @@ instance Publishable [PublishedTask]
determineAppName :: !*World -> (!String,!*World)
/**
* Start a stripped task engine (without an HTTP server) with a list of tasks to be created
*/
class Runnable a
where
toRunnable :: !a -> [TaskWrapper]
instance Runnable (Task a) | iTask a
instance Runnable [TaskWrapper]
runTasks :: a !*World -> *World | Runnable a
//HACK FOR RUNNING BACKGROUND TASKS ON A CLIENT
background :: !*IWorld -> *IWorld
......@@ -36,14 +36,12 @@ show lines world
# (_,world) = fclose console world
= world
startEngine :: a !*World -> *World | Publishable a
startEngine publishable world
getServerOptions :: !*World -> (!Maybe ServerOptions,!*World)
getServerOptions world
# (opts,world) = getCommandLine world
# (appName,world) = determineAppName world
# (appPath,world) = determineAppPath world
// Show server name
# world = show (infoline appName) world
//Check commandline options
//Check commandline options
# port = fromMaybe DEFAULT_PORT (intOpt "-port" opts)
# keepalive = fromMaybe DEFAULT_KEEPALIVE_TIME (intOpt "-keepalive" opts)
# help = boolOpt "-help" opts
......@@ -51,8 +49,8 @@ startEngine publishable world
# storeOpt = stringOpt "-store" opts
# saplOpt = stringOpt "-sapl" opts
//If -help option is given show help and stop
| help = show instructions world
# options =
| help = (Nothing, show instructions world)
# options =
{ appName = appName
, appPath = appPath
, serverPort = port
......@@ -61,7 +59,7 @@ startEngine publishable world
, storeDirPath = storeOpt
, saplDirPath = saplOpt
}
= startEngineWithOptions publishable options world
= (Just options,world)
where
instructions :: [String]
instructions =
......@@ -75,11 +73,6 @@ where
,""
]
//running :: !Int -> [String]
//running port = ["Running at http://localhost" +++ (if (port == 80) "/" (":" +++ toString port +++ "/"))]
infoline :: !String -> [String]
infoline app = ["*** " +++ app +++ " HTTP server ***",""]
boolOpt :: !String ![String] -> Bool
boolOpt key opts = isMember key opts
......@@ -99,35 +92,51 @@ where
| n == key = Just v
= stringOpt key [v:r]
startEngine :: a !*World -> *World | Publishable a
startEngine publishable world
= case getServerOptions world of
(Nothing,world) = world
(Just options,world) = startEngineWithOptions publishable options world
startEngineWithOptions :: a ServerOptions !*World -> *World | Publishable a
startEngineWithOptions publishable options=:{appName,appPath,serverPort,keepalive,webDirPath,storeDirPath,saplDirPath} world
# world = show (running serverPort) world
# world = show (running appName serverPort) world
# iworld = createIWorld appName appPath webDirPath storeDirPath saplDirPath world
# (res,iworld) = initJSCompilerState iworld
| res =:(Error _) = show ["Fatal error: " +++ fromError res] (destroyIWorld iworld)
// All persistent task instances should receive a reset event to continue their work
# iworld = queueAllPersistent iworld
//Start task server
# iworld = serve serverPort (httpServer serverPort keepalive (engine publishable) allUIChanges)
[BackgroundTask removeOutdatedSessions
,BackgroundTask updateClocks, BackgroundTask (processEvents MAX_EVENTS)] timeout iworld
# iworld = serve [] [(serverPort,httpServer serverPort keepalive (engine publishable) allUIChanges)] backgroundTasks timeout iworld
= destroyIWorld iworld
where
running :: !Int -> [String]
running port = ["Running at http://localhost" +++ (if (port == 80) "/" (":" +++ toString port +++ "/"))]
timeout :: !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout iworld = case 'SDS'.read taskEvents iworld of //Check if there are events in the queue
(Ok (Queue [] []),iworld) = (Just 10,iworld) //Empty queue, don't waste CPU, but refresh
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
queueAllPersistent :: !*IWorld -> *IWorld
queueAllPersistent iworld
# (mbIndex,iworld) = read (sdsFocus {InstanceFilter|defaultValue & onlySession=Just False} filteredInstanceIndex) iworld
= case mbIndex of
Ok index = queueRefresh [(instanceNo,"Persistent first refresh") \\ (instanceNo,_,_,_)<- index] iworld
_ = iworld
running :: !String !Int -> [String]
running app port = ["*** " +++ app +++ " HTTP server ***"
,""
,"Running at http://localhost" +++ (if (port == 80) "/" (":" +++ toString port +++ "/"))]
runTasks :: a !*World -> *World | Runnable a
runTasks tasks world
= case getServerOptions world of
(Nothing,world) = world
(Just options,world) = runTasksWithOptions tasks options world
runTasksWithOptions :: a ServerOptions !*World -> *World | Runnable a
runTasksWithOptions runnable options=:{appName,appPath,serverPort,keepalive,webDirPath,storeDirPath,saplDirPath} world
# iworld = createIWorld appName appPath webDirPath storeDirPath saplDirPath world
# (res,iworld) = initJSCompilerState iworld
| res =:(Error _) = show ["Fatal error: " +++ fromError res] (destroyIWorld iworld)
# iworld = serve (toRunnable runnable) [] backgroundTasks timeout iworld
= destroyIWorld iworld
backgroundTasks =
[BackgroundTask updateClocks
,BackgroundTask (processEvents MAX_EVENTS)
,BackgroundTask removeOutdatedSessions]
timeout :: !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout iworld = case 'SDS'.read taskEvents iworld of //Check if there are events in the queue
(Ok (Queue [] []),iworld) = (Just 10,iworld) //Empty queue, don't waste CPU, but refresh
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
updateClocks :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
updateClocks iworld=:{IWorld|clocks,world}
......@@ -175,6 +184,7 @@ where
(Ok Nothing,iworld) = iworld
(Error e,iworld) = iworld
//HACK FOR RUNNING BACKGROUND TASKS ON A CLIENT
background :: !*IWorld -> *IWorld
background iworld
......@@ -195,13 +205,13 @@ where
published = publishAll publishable
publish :: String (HTTPRequest -> Task a) -> PublishedTask | iTask a
publish url task = {url = url, task = TaskWrapper (withFinalSessionLayout task)}
publish url task = {url = url, task = WebTaskWrapper (withFinalSessionLayout task)}
withFinalSessionLayout :: (HTTPRequest -> Task a) -> (HTTPRequest -> Task a) | iTask a
withFinalSessionLayout taskf = \req -> tune (ApplyLayout defaultSessionLayout) (taskf req)
publishWithoutLayout :: String (HTTPRequest -> Task a) -> PublishedTask | iTask a
publishWithoutLayout url task = {url = url, task = TaskWrapper task}
publishWithoutLayout url task = {url = url, task = WebTaskWrapper task}
instance Publishable (Task a) | iTask a
where
......@@ -215,6 +225,19 @@ instance Publishable [PublishedTask]
where
publishAll list = list
class Runnable a
where
toRunnable :: !a -> [TaskWrapper]
instance Runnable (Task a) | iTask a
where
toRunnable task = [TaskWrapper task]
instance Runnable [TaskWrapper]
where
toRunnable list = list
// Determines the server executables name
determineAppName :: !*World -> (!String,!*World)
determineAppName world
......
......@@ -11,9 +11,10 @@ from Data.Error import :: MaybeError
from iTasks.API.Core.Types import :: TaskId
from iTasks._Framework.IWorld import :: IWorld, :: BackgroundTaskId
from iTasks._Framework.Task import :: ExternalProcessTask, :: ConnectionTask, :: BackgroundTask, :: TaskException
from iTasks._Framework.Engine import :: TaskWrapper
//Core task server loop
serve :: !Int !ConnectionTask ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve :: ![TaskWrapper] ![(!Int,!ConnectionTask)] ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
//Dynamically add a listener
addListener :: !TaskId !Int !Bool !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
......
implementation module iTasks._Framework.TaskServer
import StdFile, StdBool, StdInt, StdClass, StdList, StdMisc, StdArray, StdTuple, StdOrdList
import Data.Maybe, Data.Functor, Data.Error, System.Time, Text, Data.Tuple
import Data.Maybe, Data.Functor, Data.Func, Data.Error, System.Time, Text, Data.Tuple
from StdFunc import seq
from Data.Map import :: Map (..)
import qualified System.Process as Process
......@@ -11,10 +11,12 @@ import qualified Data.Map as DM
import qualified iTasks._Framework.SDS as SDS
import TCPChannelClass, TCPChannels, TCPEvent, TCPStringChannels, TCPDef, tcp
import iTasks._Framework.Engine, iTasks._Framework.IWorld, iTasks._Framework.TaskEval, iTasks._Framework.TaskStore
import iTasks._Framework.IWorld
import iTasks._Framework.Task
import iTasks._Framework.TaskEval
from iTasks._Framework.TaskStore import queueRefresh
import iTasks.API.Common.SDSCombinators
//Helper type that holds the mainloop instances during a select call
//in these mainloop instances the unique listeners and read channels
......@@ -25,17 +27,53 @@ from iTasks._Framework.TaskStore import queueRefresh
| ExternalProcessInstanceDS !ExternalProcessInstanceOpts !ProcessHandle !ProcessIO
| BackgroundInstanceDS !BackgroundInstanceOpts !BackgroundTask
serve :: !Int !ConnectionTask ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve port ct bt determineTimeout iworld
= loop determineTimeout (init port ct bt iworld)
init :: !Int !ConnectionTask ![BackgroundTask] !*IWorld -> *IWorld
init port ct bt iworld=:{IWorld|ioTasks,world}
# (success, mbListener, world) = openTCP_Listener port world
| not success = abort ("Error: port "+++ toString port +++ " already in use.\n")
# opts = {ListenerInstanceOpts|taskId=TaskId 0 0, nextConnectionId=0, port=port, connectionTask=ct, removeOnClose = True}
serve :: ![TaskWrapper] ![(!Int,!ConnectionTask)] ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve its cts bts determineTimeout iworld
= loop determineTimeout (init its cts bts iworld)
init :: ![TaskWrapper] ![(!Int,!ConnectionTask)] ![BackgroundTask] !*IWorld -> *IWorld
init its cts bts iworld
// Check if it the initial tasks have been added already
# iworld = createInitialInstances its iworld
// All persistent task instances should receive a reset event to continue their work
# iworld=:{IWorld|ioTasks,world} = queueAll iworld
# (listeners,world) = connectAll cts world
# ioStates = 'DM'.fromList [(TaskId 0 0, IOActive 'DM'.newMap)]
= {iworld & ioTasks = {done=[],todo=[ListenerInstance opts (fromJust mbListener):map (BackgroundInstance {bgInstId=0})bt]}, ioStates = ioStates, world = world}
= {iworld & ioTasks = {done=[],todo=listeners ++ map (BackgroundInstance {bgInstId=0}) bts}, ioStates = ioStates, world = world}
where
createInitialInstances :: [TaskWrapper] !*IWorld -> *IWorld
createInitialInstances its iworld
# (mbNextNo,iworld) = read nextInstanceNo iworld
| (mbNextNo =: (Ok 1)) = createAll its iworld //This way we check if it is the initial run of the program
= iworld
createAll :: [TaskWrapper] !*IWorld -> *IWorld
createAll [] iworld = iworld
createAll [TaskWrapper task:ts] iworld
= case createTaskInstance task iworld of
(Ok _,iworld) = createAll ts iworld
(Error (_,e),iworld) = abort e
queueAll :: !*IWorld -> *IWorld
queueAll iworld
# (mbIndex,iworld) = read (sdsFocus defaultValue filteredInstanceIndex) iworld
= case mbIndex of
Ok index = queueRefresh [(instanceNo,"Persistent first refresh") \\ (instanceNo,_,_,_)<- index] iworld
_ = iworld
connectAll :: ![(!Int,!ConnectionTask)] !*World -> *(![*IOTaskInstance],!*World)
connectAll [] world = ([],world)
connectAll [(port,ct):cts] world
# (l,world) = connect port ct world
# (ls,world) = connectAll cts world
= ([l:ls],world)
connect :: !Int !ConnectionTask !*World -> *(!*IOTaskInstance,!*World)
connect port ct world
# (success, mbListener, world) = openTCP_Listener port world
| not success = abort ("Error: port "+++ toString port +++ " already in use.\n")
# opts = {ListenerInstanceOpts|taskId=TaskId 0 0, nextConnectionId=0, port=port, connectionTask=ct, removeOnClose = True}
= (ListenerInstance opts (fromJust mbListener),world)
loop :: !(*IWorld -> (!Maybe Timeout,!*IWorld)) !*IWorld -> *IWorld
loop determineTimeout iworld
......@@ -52,23 +90,26 @@ loop determineTimeout iworld
select :: (Maybe Timeout) *[IOTaskInstance] *World -> (!*[IOTaskInstance],![(Int,SelectResult)],!*World)
select mbTimeout mlInstances world
# (listeners,rChannels,mlInstances)
= toSelectSet mlInstances
# (chList,(TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)),_,world)
= selectChannel_MT mbTimeout (TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)) TCP_Void world
# (mlInstances, chList)
= fromSelectSet listeners rChannels mlInstances chList
= (mlInstances, chList, world)
toSelectSet :: !*[IOTaskInstance] -> *(!*[*TCP_Listener],!*[*TCP_RChannel],!*[*IOTaskInstanceDuringSelect])
toSelectSet [] = ([],[],[])
# (empty,listeners,rChannels,mlInstances) = toSelectSet mlInstances
| empty //selectChannel_MT aborts if it is called with an empty list, so we must make sure that never happens
# (mlInstances, chList) = fromSelectSet listeners rChannels mlInstances []
= (mlInstances, chList, world)
| otherwise
# (chList,(TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)),_,world)
= selectChannel_MT mbTimeout (TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)) TCP_Void world
# (mlInstances, chList)
= fromSelectSet listeners rChannels mlInstances chList
= (mlInstances, chList, world)
toSelectSet :: !*[IOTaskInstance] -> *(!Bool,!*[*TCP_Listener],!*[*TCP_RChannel],!*[*IOTaskInstanceDuringSelect])
toSelectSet [] = (True,[],[],[])
toSelectSet [i:is]
# (ls,rs,is) = toSelectSet is
# (e,ls,rs,is) = toSelectSet is
= case i of
ListenerInstance opts l = ([l:ls],rs,[ListenerInstanceDS opts:is])
ConnectionInstance opts {rChannel,sChannel} = (ls,[rChannel:rs],[ConnectionInstanceDS opts sChannel:is])
ExternalProcessInstance opts pHandle pIO = (ls, rs, [ExternalProcessInstanceDS opts pHandle pIO : is])
BackgroundInstance opts bt = (ls,rs,[BackgroundInstanceDS opts bt:is])
ListenerInstance opts l = (False,[l:ls],rs,[ListenerInstanceDS opts:is])
ConnectionInstance opts {rChannel,sChannel} = (False,ls,[rChannel:rs],[ConnectionInstanceDS opts sChannel:is])
ExternalProcessInstance opts pHandle pIO = (e, ls, rs, [ExternalProcessInstanceDS opts pHandle pIO : is])
BackgroundInstance opts bt = (e,ls,rs,[BackgroundInstanceDS opts bt:is])
/* Restore the list of main loop instances.
In the same pass also update the indices in the select result to match the
......
......@@ -36,6 +36,10 @@ newDocumentId :: !*IWorld -> (!DocumentId, !*IWorld)
//=== Task instance index: ===
//A global index of all task instances is maintained
//This counter is used to ensure unique instance numbers
nextInstanceNo :: RWShared () Int Int
//This index contains all meta-data about the task instances on this engine
taskInstanceIndex :: RWShared () [TIMeta] [TIMeta]
......
......@@ -327,7 +327,7 @@ where
disconnectFun _ _ (state,instances) iworld = (Nothing, snd (updateInstanceDisconnect instances iworld))
disconnectFun _ _ _ iworld = (Nothing, iworld)
createTaskInstance` req [{PublishedTask|url,task=TaskWrapper task}:taskUrls] iworld
createTaskInstance` req [{PublishedTask|url,task=WebTaskWrapper task}:taskUrls] iworld
| req.HTTPRequest.req_path == uiUrl url = createTaskInstance (task req) iworld
| otherwise = createTaskInstance` req taskUrls iworld
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment