Engine.icl 11.7 KB
Newer Older
1
implementation module iTasks.Engine
2

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
import Data.Func
import Data.Functor
import Data.Queue
import Internet.HTTP
import StdEnv
import System.CommandLine
import System.Directory
import System.File
import System.FilePath
import System.GetOpt
import System.OS
import Text
import iTasks.Internal.Distributed.Symbols
import iTasks.Internal.EngineTasks
import iTasks.Internal.IWorld
import iTasks.Internal.SDS
import iTasks.Internal.SDSService
import iTasks.Internal.TaskServer
import iTasks.Internal.TaskStore
import iTasks.Internal.Util
import iTasks.SDS.Sources.System
24
import iTasks.WF.Combinators.Common
25 26
import iTasks.WF.Definition
import iTasks.WF.Tasks.SDS
27
import iTasks.WF.Tasks.System
28

29
import qualified Data.Map as DM
30

31 32
from TCPIP import :: Timeout
from StdFunc import :: St, seqList
33

Mart Lubbers's avatar
Mart Lubbers committed
34 35
MAX_EVENTS 		        :== 5

36
derive class iTask EngineOptions
37

38 39
doTasks :: a !*World -> *World | Startable a
doTasks startable world = doTasksWithOptions defaultEngineCLIOptions startable world
40

41
doTasksWithOptions :: ([String] EngineOptions -> MaybeError [String] EngineOptions) a !*World -> *World | Startable a
42
doTasksWithOptions initFun startable world
43 44 45 46 47 48 49
	# (cli,world)                = getCommandLine world
	# (options,world)            = defaultEngineOptions world
	# mbOptions                  = initFun cli options
	| mbOptions =:(Error _)      = show (fromError mbOptions) world
	# options                    = fromOk mbOptions
	# iworld                     = createIWorld options world
	# (symbolsResult, iworld)    = initSymbolsShare options.distributed options.appName iworld
Haye Böhm's avatar
Haye Böhm committed
50
	| symbolsResult =: (Error _) = show ["Error reading symbols while required: " +++ fromError symbolsResult] (destroyIWorld iworld)
Mart Lubbers's avatar
Mart Lubbers committed
51
	# iworld                     = serve (startupTasks options) (tcpTasks options.serverPort options.keepaliveTime) (timeout options.timeout) iworld
52
	= destroyIWorld iworld
53
where
54
    webTasks = [t \\ WebTask t <- toStartable startable]
55
	startupTasks {distributed, sdsPort}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
56
		//If distributed, start sds service task
57
		=  (if distributed [startTask (sdsServiceTask sdsPort)] [])
58 59 60
		++ [startTask flushWritesWhenIdle
		//If there no webtasks, stop when stable, otherwise cleanup old sessions
		   ,startTask if (webTasks =: []) stopOnStable removeOutdatedSessions
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
61
		//Start all startup tasks
62
		   :[t \\ StartupTask t <- toStartable startable]]
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
63

64
	startTask t = {StartupTask|attributes=defaultValue,task=TaskWrapper t}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
65

Haye Böhm's avatar
Haye Böhm committed
66
	initSymbolsShare False _ iworld = (Ok (), iworld)
Haye Böhm's avatar
Haye Böhm committed
67
	initSymbolsShare True appName iworld = case storeSymbols (IF_WINDOWS (appName +++ ".exe") appName) iworld of
Haye Böhm's avatar
Haye Böhm committed
68
		(Error (e, s), iworld) = (Error s, iworld)
69
		(Ok noSymbols, iworld) = (Ok (),  {iworld & world = show ["Read number of symbols: " +++ toString noSymbols] iworld.world})
Haye Böhm's avatar
Haye Böhm committed
70

71 72 73 74 75
	//Only run a webserver if there are tasks that are started through the web
	tcpTasks serverPort keepaliveTime
		| webTasks =: [] = []
		| otherwise
			= [(serverPort,httpServer serverPort keepaliveTime (engineWebService webTasks) taskOutput)]
76

77
	// The iTasks engine consist of a set of HTTP Web services
78
	engineWebService :: [WebTask] -> [WebService (Map InstanceNo TaskOutput) (Map InstanceNo TaskOutput)]
79 80 81 82 83 84 85 86 87 88 89 90 91
	engineWebService webtasks =
		[taskUIService webtasks
		,documentService
		,staticResourceService [path \\ {WebTask|path} <- webtasks]
		]

	show :: ![String] !*World -> *World
	show lines world
		# (console,world)	= stdio world
		# console			= seqSt (\s c -> fwrites (s +++ OS_NEWLINE) c) lines console
		# (_,world)			= fclose console world
		= world

92
defaultEngineCLIOptions :: [String] EngineOptions -> MaybeError [String] EngineOptions
93 94 95 96 97 98 99 100 101 102
defaultEngineCLIOptions [argv0:argv] defaults
	# (settings, positionals, errs) = getOpt Permute opts argv
	| not (errs =: []) = Error errs
	| not (positionals =: []) = Error ["Positional arguments not allowed"]
	= case foldl (o) id settings (Just defaults) of
		Nothing = (Error [usageInfo ("Usage " +++ argv0 +++ "[OPTIONS]") opts])
		Just settings = Ok settings
where
	opts :: [OptDescr ((Maybe EngineOptions) -> Maybe EngineOptions)]
	opts =
103
		[ Option ['?'] ["help"] (NoArg (\_->Nothing))
104 105
			"Display this message"
		, Option ['p'] ["port"] (ReqArg (\p->fmap \o->{o & serverPort=toInt p}) "PORT")
106
			("Specify the HTTP port (default: " +++ toString defaults.serverPort +++ ")")
107 108
		, Option [] ["timeout"] (OptArg (\mp->fmap \o->{o & timeout=fmap toInt mp}) "MILLISECONDS")
			"Specify the timeout in ms (default: 500)\nIf not given, use an indefinite timeout."
109 110 111
		, Option [] ["allowed-hosts"] (ReqArg (\p->fmap \o->{o & allowedHosts = split "," p}) "IPADRESSES")
			("Specify a comma separated white list of hosts that are allowed to connected to this application\ndefault: "
			 +++ join "," defaults.allowedHosts)
112 113
		, Option [] ["keepalive"] (ReqArg (\p->fmap \o->{o & keepaliveTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the keepalive time in seconds (default: 300)"
114 115
		, Option [] ["maxevents"] (ReqArg (\p->fmap \o->{o & maxEvents=toInt p}) "NUM")
			"Specify the maximum number of events to process per loop (default: 5)"
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
		, Option [] ["sessiontime"] (ReqArg (\p->fmap \o->{o & sessionTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the expiry time for a session in seconds (default: 60)"
		, Option [] ["autolayout"] (NoArg (fmap \o->{o & autoLayout=True}))
			"Enable autolayouting (default)"
		, Option [] ["no-autolayout"] (NoArg (fmap \o->{o & autoLayout=False}))
			"Disable autolayouting"
		, Option [] ["persist-tasks"] (NoArg (fmap \o->{o & persistTasks=True}))
			"Enable the persistence of tasks"
		, Option [] ["no-persist-tasks"] (NoArg (fmap \o->{o & persistTasks=False}))
			"Disable the persistence of tasks (default)"
		, Option [] ["webdir"] (ReqArg (\p->fmap \o->{o & webDirPath=p}) "PATH")
			("Specify the folder containing static web content\ndefault: " +++ defaults.webDirPath)
		, Option [] ["storedir"] (ReqArg (\p->fmap \o->{o & storeDirPath=p}) "PATH")
			("Specify the folder containing the data stores\ndefault: " +++ defaults.storeDirPath)
		, Option [] ["tempdir"] (ReqArg (\p->fmap \o->{o & tempDirPath=p}) "PATH")
			("Specify the folder containing the temporary files\ndefault: " +++ defaults.tempDirPath)
132 133
		, Option [] ["bytecodepath"] (ReqArg (\p->fmap \o->{o & byteCodePath=p}) "PATH")
			("Specify the app's bytecode file\ndefault: " +++ defaults.byteCodePath)
134 135
		, Option [] ["distributed"] (NoArg (fmap \o->{o & distributed=True}))
			"Enable distributed mode (populate the symbols share)"
136 137
		, Option ['s'] ["sdsPort"] (ReqArg (\p->fmap \o->{o & sdsPort=toInt p}) "SDSPORT")
			("Specify the SDS port (default: " +++ toString defaults.sdsPort +++ ")")
138
		]
139

140 141
onStartup :: (Task a) -> StartableTask | iTask a
onStartup task = StartupTask {StartupTask|attributes = defaultValue, task = TaskWrapper task}
142

143 144 145 146 147 148 149 150
onRequest :: String (Task a) -> StartableTask | iTask a
onRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper (const task)}

onStartupWithAttributes :: (Task a) TaskAttributes -> StartableTask | iTask a
onStartupWithAttributes task attributes = StartupTask {StartupTask|attributes = attributes, task = TaskWrapper task}

onRequestFromRequest :: String (HTTPRequest -> Task a) -> StartableTask | iTask a
onRequestFromRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper task}
151 152

class Startable a
Bas Lijnse's avatar
Bas Lijnse committed
153
where
154 155 156
	toStartable :: !a -> [StartableTask]

instance Startable (Task a) | iTask a //Default as web task
157
where
158
	toStartable task =
159 160
		[onStartup viewWebServerInstructions
		,onRequest "/" task
161
		]
162

163
instance Startable (HTTPRequest -> Task a) | iTask a //As web task
164
where
165
	toStartable task =
166 167
		[onStartup viewWebServerInstructions
		,onRequestFromRequest "/" task
168
		]
169

170
instance Startable StartableTask
171
where
172
	toStartable task = [task]
173

174
instance Startable [StartableTask]
175
where
176
	toStartable list = list
177

178 179 180 181
instance Startable (a,b) | Startable a & Startable b
where
	toStartable (x,y) = toStartable x ++ toStartable y

182 183 184 185 186 187 188 189 190 191 192 193
viewWebServerInstructions :: Task String
viewWebServerInstructions
	=   get applicationOptions
	>>- \{EngineOptions|appName,serverPort} ->
			traceValue (join OS_NEWLINE
				["*** " +++ appName +++ " HTTP server ***"
				,""
				,"Running at http://localhost" +++
					if (serverPort == 80)
						"/"
						(":" +++ toString serverPort +++ "/")
				])
194

195 196
defaultEngineOptions :: !*World -> (!EngineOptions,!*World)
defaultEngineOptions world
197
	# (appPath,world)    = determineAppPath world
198 199 200
	# (appVersion,world) = determineAppVersion appPath world
	# appDir             = takeDirectory appPath
	# appName            = (dropExtension o dropDirectory) appPath
201
	# options =
202 203 204 205 206
		{ appName        = appName
		, appPath        = appPath
		, appVersion     = appVersion
		, serverPort     = IF_POSIX_OR_WINDOWS 8080 80
		, serverUrl      = "http://localhost/"
207
		, allowedHosts   = ["127.0.0.1"]
208 209 210 211 212 213 214
		, keepaliveTime  = {tv_sec=300,tv_nsec=0} // 5 minutes
		, sessionTime    = {tv_sec=60,tv_nsec=0}  // 1 minute, (the client pings every 10 seconds by default)
		, persistTasks   = False
		, autoLayout     = True
		, distributed    = False
		, maxEvents      = 5
		, sdsPort        = 9090
Mart Lubbers's avatar
Mart Lubbers committed
215
		, timeout        = Nothing//Just 500
216 217 218
		, webDirPath     = appDir </> appName +++ "-www"
		, storeDirPath   = appDir </> appName +++ "-data" </> "stores"
		, tempDirPath    = appDir </> appName +++ "-data" </> "tmp"
219
		, byteCodePath   = appDir </> appName +++ ".bc"
220 221 222
		}
	= (options,world)

ecrombag's avatar
ecrombag committed
223
// Determines the server executables path
224
determineAppPath :: !*World -> (!FilePath, !*World)
ecrombag's avatar
ecrombag committed
225
determineAppPath world
226
	# ([arg:_],world) = getCommandLine world
227
	| dropDirectory arg <> "ConsoleClient.exe"	= toCanonicalPath arg world
228 229 230
	//Using dynamic linker:
	# (res, world)				= getCurrentDirectory world
	| isError res				= abort "Cannot get current directory."
231
	# currentDirectory			= fromOk res
232 233
	# (res, world)				= readDirectory currentDirectory world
	| isError res				= abort "Cannot read current directory."
234
	# batchfiles				= [f \\ f <- fromOk res | takeExtension f == "bat" ]
235 236 237 238 239
	| isEmpty batchfiles		= abort "No dynamic linker batch file found."
	# (infos, world)			= seqList (map getFileInfo batchfiles) world
	| any isError infos	 		= abort "Cannot get file information."
	= (currentDirectory </> (fst o hd o sortBy cmpFileTime) (zip2 batchfiles infos), world)
	where
240
		cmpFileTime (_,Ok {FileInfo | lastModifiedTime = x})
241
					(_,Ok {FileInfo | lastModifiedTime = y}) = timeGm x > timeGm y
242

243
//By default, we use the modification time of the application executable as version id
244
determineAppVersion :: !FilePath!*World -> (!String,!*World)
245 246
determineAppVersion appPath world
	# (res,world)       = getFileInfo appPath world
247
	| res =: (Error _)  = ("unknown",world)
248 249 250
	# tm				= (fromOk res).lastModifiedTime
	# version           = strfTime "%Y%m%d-%H%M%S" tm
	= (version,world)
251 252 253 254 255 256 257 258 259 260 261 262

timeout :: !(Maybe Timeout) !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout mt iworld = case read taskEvents EmptyContext iworld of
	//No events
	(Ok (ReadingDone (Queue [] [])),iworld=:{sdsNotifyRequests,world})
		# (ts, world) = nsTime world
		= ( minListBy lesser [mt:flatten (map (getTimeoutFromClock ts) ('DM'.elems sdsNotifyRequests))]
		  , {iworld & world = world})
	(Ok (ReadingDone (Queue _ _)), iworld)               = (Just 0,iworld)   //There are still events, don't wait
	(Error _,iworld)            = (Just 500,iworld) //Keep retrying, but not too fast
where
	lesser (Just x) (Just y) = x < y
263 264
	lesser (Just _) Nothing  = True
	lesser _        _        = False
265 266 267 268 269 270 271 272 273 274 275 276 277

	getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
	getTimeoutFromClock now requests = map getTimeoutFromClock` ('DM'.toList requests)
	where
		getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
		getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
			| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
				# fire = iworldTimespecNextFire now reqTimespec ts
				= Just (max 0 (toMs fire - toMs now))
			= mt
		getTimeoutFromClock` _ = mt

	toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000