Engine.icl 12.5 KB
Newer Older
1
implementation module iTasks.Engine
2

3 4
import Data.Func
import Data.Functor
5
import Data.List
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
import Data.Queue
import Internet.HTTP
import StdEnv
import System.CommandLine
import System.Directory
import System.File
import System.FilePath
import System.GetOpt
import System.OS
import Text
import iTasks.Internal.Distributed.Symbols
import iTasks.Internal.EngineTasks
import iTasks.Internal.IWorld
import iTasks.Internal.SDS
import iTasks.Internal.SDSService
import iTasks.Internal.TaskServer
import iTasks.Internal.TaskStore
import iTasks.Internal.Util
import iTasks.SDS.Sources.System
25
import iTasks.WF.Combinators.Common
26 27
import iTasks.WF.Definition
import iTasks.WF.Tasks.SDS
28
import iTasks.WF.Tasks.System
Mart Lubbers's avatar
Mart Lubbers committed
29
import iTasks.WF.Derives
30

31
import qualified Data.Map as DM
32

33 34
from TCPIP import :: Timeout
from StdFunc import :: St, seqList
35

Mart Lubbers's avatar
Mart Lubbers committed
36 37
MAX_EVENTS 		        :== 5

38
derive class iTask EngineOptions
39

40 41
doTasks :: a !*World -> *World | Startable a
doTasks startable world = doTasksWithOptions defaultEngineCLIOptions startable world
42

43
doTasksWithOptions :: ([String] EngineOptions -> MaybeError [String] EngineOptions) a !*World -> *World | Startable a
44
doTasksWithOptions initFun startable world
45 46 47
	# (cli,world)                = getCommandLine world
	# (options,world)            = defaultEngineOptions world
	# mbOptions                  = initFun cli options
48
	| mbOptions =:(Error _)      = show (fromError mbOptions) (setReturnCode 1 world)
49
	# options                    = fromOk mbOptions
50
	# mbIWorld                   = createIWorld options world
51 52 53 54
	| mbIWorld =: Left _
		# (Left (err, world)) = mbIWorld
		= show [err] (setReturnCode 1 world)
	# (Right iworld)             = mbIWorld
55
	# (symbolsResult, iworld)    = initSymbolsShare options.distributed options.appName iworld
56
	| symbolsResult =: (Error _) = show ["Error reading symbols while required: " +++ fromError symbolsResult] (setReturnCode 1 (destroyIWorld iworld))
57
	# iworld = if (hasDup requestPaths)
58
		(iShow ["Warning: duplicate paths in the web tasks: " +++ join ", " ["'" +++ p +++ "'"\\p<-requestPaths]] iworld)
59
		iworld
Mart Lubbers's avatar
Mart Lubbers committed
60
	# iworld                     = serve (startupTasks options) (tcpTasks options.serverPort options.keepaliveTime) (timeout options.timeout) iworld
61
	= destroyIWorld iworld
62
where
63
	requestPaths = [path\\{path}<-webTasks]
64
	webTasks = [t \\ WebTask t <- toStartable startable]
65
	startupTasks {distributed, sdsPort}
66
		=  if webTasks=:[]
67 68 69 70 71
		   //if there are no webtasks: stop when stable
		   [systemTask (startTask stopOnStable)]
		   //if there are: show instructions andcleanup old sessions
		   [startTask viewWebServerInstructions
		   ,systemTask (startTask removeOutdatedSessions)]
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
72
		//If distributed, start sds service task
73
		++ (if distributed [systemTask (startTask (sdsServiceTask sdsPort))] [])
74
		++ [systemTask (startTask flushWritesWhenIdle)
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
75
		//Start all startup tasks
76
		   :[t \\ StartupTask t <- toStartable startable]]
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
77

78
	startTask t = {StartupTask|attributes=defaultValue,task=TaskWrapper t}
79
	systemTask t = {StartupTask|t&attributes='DM'.put "system" (JSONBool True) t.StartupTask.attributes}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
80

Haye Böhm's avatar
Haye Böhm committed
81
	initSymbolsShare False _ iworld = (Ok (), iworld)
Haye Böhm's avatar
Haye Böhm committed
82
	initSymbolsShare True appName iworld = case storeSymbols (IF_WINDOWS (appName +++ ".exe") appName) iworld of
Haye Böhm's avatar
Haye Böhm committed
83
		(Error (e, s), iworld) = (Error s, iworld)
84
		(Ok noSymbols, iworld) = (Ok (),  {iworld & world = show ["Read number of symbols: " +++ toString noSymbols] iworld.world})
Haye Böhm's avatar
Haye Böhm committed
85

86 87 88 89 90
	//Only run a webserver if there are tasks that are started through the web
	tcpTasks serverPort keepaliveTime
		| webTasks =: [] = []
		| otherwise
			= [(serverPort,httpServer serverPort keepaliveTime (engineWebService webTasks) taskOutput)]
91

92
	// The iTasks engine consist of a set of HTTP Web services
93
	engineWebService :: [WebTask] -> [WebService (Map InstanceNo TaskOutput) (Map InstanceNo TaskOutput)]
94 95 96 97 98 99 100 101 102 103 104 105 106
	engineWebService webtasks =
		[taskUIService webtasks
		,documentService
		,staticResourceService [path \\ {WebTask|path} <- webtasks]
		]

	show :: ![String] !*World -> *World
	show lines world
		# (console,world)	= stdio world
		# console			= seqSt (\s c -> fwrites (s +++ OS_NEWLINE) c) lines console
		# (_,world)			= fclose console world
		= world

107
defaultEngineCLIOptions :: [String] EngineOptions -> MaybeError [String] EngineOptions
108 109 110 111 112 113 114 115 116 117
defaultEngineCLIOptions [argv0:argv] defaults
	# (settings, positionals, errs) = getOpt Permute opts argv
	| not (errs =: []) = Error errs
	| not (positionals =: []) = Error ["Positional arguments not allowed"]
	= case foldl (o) id settings (Just defaults) of
		Nothing = (Error [usageInfo ("Usage " +++ argv0 +++ "[OPTIONS]") opts])
		Just settings = Ok settings
where
	opts :: [OptDescr ((Maybe EngineOptions) -> Maybe EngineOptions)]
	opts =
118
		[ Option ['?'] ["help"] (NoArg (\_->Nothing))
119 120
			"Display this message"
		, Option ['p'] ["port"] (ReqArg (\p->fmap \o->{o & serverPort=toInt p}) "PORT")
121
			("Specify the HTTP port (default: " +++ toString defaults.serverPort +++ ")")
122 123
		, Option [] ["timeout"] (OptArg (\mp->fmap \o->{o & timeout=fmap toInt mp}) "MILLISECONDS")
			"Specify the timeout in ms (default: 500)\nIf not given, use an indefinite timeout."
124
		, Option [] ["allowed-hosts"] (ReqArg (\p->fmap \o->{o & allowedHosts = if (p == "") [] (split "," p)}) "IPADRESSES")
125 126
			("Specify a comma separated white list of hosts that are allowed to connected to this application\ndefault: "
			 +++ join "," defaults.allowedHosts)
127 128
		, Option [] ["keepalive"] (ReqArg (\p->fmap \o->{o & keepaliveTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the keepalive time in seconds (default: 300)"
129 130
		, Option [] ["maxevents"] (ReqArg (\p->fmap \o->{o & maxEvents=toInt p}) "NUM")
			"Specify the maximum number of events to process per loop (default: 5)"
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
		, Option [] ["sessiontime"] (ReqArg (\p->fmap \o->{o & sessionTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the expiry time for a session in seconds (default: 60)"
		, Option [] ["autolayout"] (NoArg (fmap \o->{o & autoLayout=True}))
			"Enable autolayouting (default)"
		, Option [] ["no-autolayout"] (NoArg (fmap \o->{o & autoLayout=False}))
			"Disable autolayouting"
		, Option [] ["persist-tasks"] (NoArg (fmap \o->{o & persistTasks=True}))
			"Enable the persistence of tasks"
		, Option [] ["no-persist-tasks"] (NoArg (fmap \o->{o & persistTasks=False}))
			"Disable the persistence of tasks (default)"
		, Option [] ["webdir"] (ReqArg (\p->fmap \o->{o & webDirPath=p}) "PATH")
			("Specify the folder containing static web content\ndefault: " +++ defaults.webDirPath)
		, Option [] ["storedir"] (ReqArg (\p->fmap \o->{o & storeDirPath=p}) "PATH")
			("Specify the folder containing the data stores\ndefault: " +++ defaults.storeDirPath)
		, Option [] ["tempdir"] (ReqArg (\p->fmap \o->{o & tempDirPath=p}) "PATH")
			("Specify the folder containing the temporary files\ndefault: " +++ defaults.tempDirPath)
147 148
		, Option [] ["bytecodepath"] (ReqArg (\p->fmap \o->{o & byteCodePath=p}) "PATH")
			("Specify the app's bytecode file\ndefault: " +++ defaults.byteCodePath)
149 150
		, Option [] ["distributed"] (NoArg (fmap \o->{o & distributed=True}))
			"Enable distributed mode (populate the symbols share)"
151 152
		, Option ['s'] ["sdsPort"] (ReqArg (\p->fmap \o->{o & sdsPort=toInt p}) "SDSPORT")
			("Specify the SDS port (default: " +++ toString defaults.sdsPort +++ ")")
153
		]
154

155 156
onStartup :: (Task a) -> StartableTask | iTask a
onStartup task = StartupTask {StartupTask|attributes = defaultValue, task = TaskWrapper task}
157

158 159 160 161 162 163 164 165
onRequest :: String (Task a) -> StartableTask | iTask a
onRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper (const task)}

onStartupWithAttributes :: (Task a) TaskAttributes -> StartableTask | iTask a
onStartupWithAttributes task attributes = StartupTask {StartupTask|attributes = attributes, task = TaskWrapper task}

onRequestFromRequest :: String (HTTPRequest -> Task a) -> StartableTask | iTask a
onRequestFromRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper task}
166 167

class Startable a
Bas Lijnse's avatar
Bas Lijnse committed
168
where
169 170 171
	toStartable :: !a -> [StartableTask]

instance Startable (Task a) | iTask a //Default as web task
172
where
173
	toStartable task = [onRequest "/" task]
174

175
instance Startable (HTTPRequest -> Task a) | iTask a //As web task
176
where
177
	toStartable task = [onRequestFromRequest "/" task]
178

179
instance Startable StartableTask
180
where
181
	toStartable task = [task]
182

183
instance Startable [StartableTask]
184
where
185
	toStartable list = list
186

187 188 189 190
instance Startable (a,b) | Startable a & Startable b
where
	toStartable (x,y) = toStartable x ++ toStartable y

191 192 193 194 195 196 197 198 199 200 201 202
viewWebServerInstructions :: Task String
viewWebServerInstructions
	=   get applicationOptions
	>>- \{EngineOptions|appName,serverPort} ->
			traceValue (join OS_NEWLINE
				["*** " +++ appName +++ " HTTP server ***"
				,""
				,"Running at http://localhost" +++
					if (serverPort == 80)
						"/"
						(":" +++ toString serverPort +++ "/")
				])
203

204 205
defaultEngineOptions :: !*World -> (!EngineOptions,!*World)
defaultEngineOptions world
206
	# (appPath,world)    = determineAppPath world
207 208
	# (appVersion,world) = determineAppVersion appPath world
	# appDir             = takeDirectory appPath
209
	# appName            = (if (takeExtension appPath == "exe") dropExtension id o dropDirectory) appPath
210
	# options =
211 212 213 214 215
		{ appName        = appName
		, appPath        = appPath
		, appVersion     = appVersion
		, serverPort     = IF_POSIX_OR_WINDOWS 8080 80
		, serverUrl      = "http://localhost/"
216
		, allowedHosts   = ["127.0.0.1"]
217 218 219 220 221 222 223
		, keepaliveTime  = {tv_sec=300,tv_nsec=0} // 5 minutes
		, sessionTime    = {tv_sec=60,tv_nsec=0}  // 1 minute, (the client pings every 10 seconds by default)
		, persistTasks   = False
		, autoLayout     = True
		, distributed    = False
		, maxEvents      = 5
		, sdsPort        = 9090
Mart Lubbers's avatar
Mart Lubbers committed
224
		, timeout        = Nothing//Just 500
225 226 227
		, webDirPath     = appDir </> appName +++ "-www"
		, storeDirPath   = appDir </> appName +++ "-data" </> "stores"
		, tempDirPath    = appDir </> appName +++ "-data" </> "tmp"
228
		, byteCodePath   = appDir </> appName +++ ".bc"
229 230 231
		}
	= (options,world)

ecrombag's avatar
ecrombag committed
232
// Determines the server executables path
233
determineAppPath :: !*World -> (!FilePath, !*World)
ecrombag's avatar
ecrombag committed
234
determineAppPath world
235
	# ([arg:_],world) = getCommandLine world
236
	| dropDirectory arg <> "ConsoleClient.exe"	= toCanonicalPath arg world
237 238 239
	//Using dynamic linker:
	# (res, world)				= getCurrentDirectory world
	| isError res				= abort "Cannot get current directory."
240
	# currentDirectory			= fromOk res
241 242
	# (res, world)				= readDirectory currentDirectory world
	| isError res				= abort "Cannot read current directory."
243
	# batchfiles				= [f \\ f <- fromOk res | takeExtension f == "bat" ]
244 245 246 247 248
	| isEmpty batchfiles		= abort "No dynamic linker batch file found."
	# (infos, world)			= seqList (map getFileInfo batchfiles) world
	| any isError infos	 		= abort "Cannot get file information."
	= (currentDirectory </> (fst o hd o sortBy cmpFileTime) (zip2 batchfiles infos), world)
	where
249
		cmpFileTime (_,Ok {FileInfo | lastModifiedTime = x})
250
					(_,Ok {FileInfo | lastModifiedTime = y}) = timeGm x > timeGm y
251

252
//By default, we use the modification time of the application executable as version id
253
determineAppVersion :: !FilePath!*World -> (!String,!*World)
254 255
determineAppVersion appPath world
	# (res,world)       = getFileInfo appPath world
256
	| res =: (Error _)  = ("unknown",world)
257 258 259
	# tm				= (fromOk res).lastModifiedTime
	# version           = strfTime "%Y%m%d-%H%M%S" tm
	= (version,world)
260 261 262 263 264 265 266 267 268 269 270 271

timeout :: !(Maybe Timeout) !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout mt iworld = case read taskEvents EmptyContext iworld of
	//No events
	(Ok (ReadingDone (Queue [] [])),iworld=:{sdsNotifyRequests,world})
		# (ts, world) = nsTime world
		= ( minListBy lesser [mt:flatten (map (getTimeoutFromClock ts) ('DM'.elems sdsNotifyRequests))]
		  , {iworld & world = world})
	(Ok (ReadingDone (Queue _ _)), iworld)               = (Just 0,iworld)   //There are still events, don't wait
	(Error _,iworld)            = (Just 500,iworld) //Keep retrying, but not too fast
where
	lesser (Just x) (Just y) = x < y
272 273
	lesser (Just _) Nothing  = True
	lesser _        _        = False
274 275 276 277 278 279

	getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
	getTimeoutFromClock now requests = map getTimeoutFromClock` ('DM'.toList requests)
	where
		getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
		getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
280
			| dependsOnClock snr && ts.interval <> zero
281 282 283 284 285
				# fire = iworldTimespecNextFire now reqTimespec ts
				= Just (max 0 (toMs fire - toMs now))
			= mt
		getTimeoutFromClock` _ = mt

286
	dependsOnClock :: !SDSNotifyRequest -> Bool
287
	dependsOnClock snr = indexOf "$IWorld:timespec$" snr.reqSDSId >= 0
288

289
	toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000