Engine.icl 12.1 KB
Newer Older
1
implementation module iTasks.Engine
2

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
import Data.Func
import Data.Functor
import Data.Queue
import Internet.HTTP
import StdEnv
import System.CommandLine
import System.Directory
import System.File
import System.FilePath
import System.GetOpt
import System.OS
import Text
import iTasks.Internal.Distributed.Symbols
import iTasks.Internal.EngineTasks
import iTasks.Internal.IWorld
import iTasks.Internal.SDS
import iTasks.Internal.SDSService
import iTasks.Internal.TaskServer
import iTasks.Internal.TaskStore
import iTasks.Internal.Util
import iTasks.SDS.Sources.System
24
import iTasks.WF.Combinators.Common
25 26
import iTasks.WF.Definition
import iTasks.WF.Tasks.SDS
27
import iTasks.WF.Tasks.System
28

29
import qualified Data.Map as DM
30

31 32
from TCPIP import :: Timeout
from StdFunc import :: St, seqList
33

Mart Lubbers's avatar
Mart Lubbers committed
34 35
MAX_EVENTS 		        :== 5

36
derive class iTask EngineOptions
37

38 39
doTasks :: a !*World -> *World | Startable a
doTasks startable world = doTasksWithOptions defaultEngineCLIOptions startable world
40

41
doTasksWithOptions :: ([String] EngineOptions -> MaybeError [String] EngineOptions) a !*World -> *World | Startable a
42
doTasksWithOptions initFun startable world
43 44 45
	# (cli,world)                = getCommandLine world
	# (options,world)            = defaultEngineOptions world
	# mbOptions                  = initFun cli options
46
	| mbOptions =:(Error _)      = show (fromError mbOptions) (setReturnCode 1 world)
47
	# options                    = fromOk mbOptions
48
	# mbIWorld                   = createIWorld options world
49 50 51 52
	| mbIWorld =: Left _
		# (Left (err, world)) = mbIWorld
		= show [err] (setReturnCode 1 world)
	# (Right iworld)             = mbIWorld
53
	# (symbolsResult, iworld)    = initSymbolsShare options.distributed options.appName iworld
54
	| symbolsResult =: (Error _) = show ["Error reading symbols while required: " +++ fromError symbolsResult] (setReturnCode 1 (destroyIWorld iworld))
Mart Lubbers's avatar
Mart Lubbers committed
55
	# iworld                     = serve (startupTasks options) (tcpTasks options.serverPort options.keepaliveTime) (timeout options.timeout) iworld
56
	= destroyIWorld iworld
57
where
58
    webTasks = [t \\ WebTask t <- toStartable startable]
59
	startupTasks {distributed, sdsPort}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
60
		//If distributed, start sds service task
61 62
		=  (if distributed [systemTask (startTask (sdsServiceTask sdsPort))] [])
		++ [systemTask (startTask flushWritesWhenIdle)
63
		//If there no webtasks, stop when stable, otherwise cleanup old sessions
64
		   ,systemTask (startTask if (webTasks =: []) stopOnStable removeOutdatedSessions)
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
65
		//Start all startup tasks
66
		   :[t \\ StartupTask t <- toStartable startable]]
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
67

68
	startTask t = {StartupTask|attributes=defaultValue,task=TaskWrapper t}
69
	systemTask t = {StartupTask|t&attributes='DM'.put "system" "yes" t.StartupTask.attributes}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
70

Haye Böhm's avatar
Haye Böhm committed
71
	initSymbolsShare False _ iworld = (Ok (), iworld)
Haye Böhm's avatar
Haye Böhm committed
72
	initSymbolsShare True appName iworld = case storeSymbols (IF_WINDOWS (appName +++ ".exe") appName) iworld of
Haye Böhm's avatar
Haye Böhm committed
73
		(Error (e, s), iworld) = (Error s, iworld)
74
		(Ok noSymbols, iworld) = (Ok (),  {iworld & world = show ["Read number of symbols: " +++ toString noSymbols] iworld.world})
Haye Böhm's avatar
Haye Böhm committed
75

76 77 78 79 80
	//Only run a webserver if there are tasks that are started through the web
	tcpTasks serverPort keepaliveTime
		| webTasks =: [] = []
		| otherwise
			= [(serverPort,httpServer serverPort keepaliveTime (engineWebService webTasks) taskOutput)]
81

82
	// The iTasks engine consist of a set of HTTP Web services
83
	engineWebService :: [WebTask] -> [WebService (Map InstanceNo TaskOutput) (Map InstanceNo TaskOutput)]
84 85 86 87 88 89 90 91 92 93 94 95 96
	engineWebService webtasks =
		[taskUIService webtasks
		,documentService
		,staticResourceService [path \\ {WebTask|path} <- webtasks]
		]

	show :: ![String] !*World -> *World
	show lines world
		# (console,world)	= stdio world
		# console			= seqSt (\s c -> fwrites (s +++ OS_NEWLINE) c) lines console
		# (_,world)			= fclose console world
		= world

97
defaultEngineCLIOptions :: [String] EngineOptions -> MaybeError [String] EngineOptions
98 99 100 101 102 103 104 105 106 107
defaultEngineCLIOptions [argv0:argv] defaults
	# (settings, positionals, errs) = getOpt Permute opts argv
	| not (errs =: []) = Error errs
	| not (positionals =: []) = Error ["Positional arguments not allowed"]
	= case foldl (o) id settings (Just defaults) of
		Nothing = (Error [usageInfo ("Usage " +++ argv0 +++ "[OPTIONS]") opts])
		Just settings = Ok settings
where
	opts :: [OptDescr ((Maybe EngineOptions) -> Maybe EngineOptions)]
	opts =
108
		[ Option ['?'] ["help"] (NoArg (\_->Nothing))
109 110
			"Display this message"
		, Option ['p'] ["port"] (ReqArg (\p->fmap \o->{o & serverPort=toInt p}) "PORT")
111
			("Specify the HTTP port (default: " +++ toString defaults.serverPort +++ ")")
112 113
		, Option [] ["timeout"] (OptArg (\mp->fmap \o->{o & timeout=fmap toInt mp}) "MILLISECONDS")
			"Specify the timeout in ms (default: 500)\nIf not given, use an indefinite timeout."
114 115 116
		, Option [] ["allowed-hosts"] (ReqArg (\p->fmap \o->{o & allowedHosts = split "," p}) "IPADRESSES")
			("Specify a comma separated white list of hosts that are allowed to connected to this application\ndefault: "
			 +++ join "," defaults.allowedHosts)
117 118
		, Option [] ["keepalive"] (ReqArg (\p->fmap \o->{o & keepaliveTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the keepalive time in seconds (default: 300)"
119 120
		, Option [] ["maxevents"] (ReqArg (\p->fmap \o->{o & maxEvents=toInt p}) "NUM")
			"Specify the maximum number of events to process per loop (default: 5)"
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
		, Option [] ["sessiontime"] (ReqArg (\p->fmap \o->{o & sessionTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the expiry time for a session in seconds (default: 60)"
		, Option [] ["autolayout"] (NoArg (fmap \o->{o & autoLayout=True}))
			"Enable autolayouting (default)"
		, Option [] ["no-autolayout"] (NoArg (fmap \o->{o & autoLayout=False}))
			"Disable autolayouting"
		, Option [] ["persist-tasks"] (NoArg (fmap \o->{o & persistTasks=True}))
			"Enable the persistence of tasks"
		, Option [] ["no-persist-tasks"] (NoArg (fmap \o->{o & persistTasks=False}))
			"Disable the persistence of tasks (default)"
		, Option [] ["webdir"] (ReqArg (\p->fmap \o->{o & webDirPath=p}) "PATH")
			("Specify the folder containing static web content\ndefault: " +++ defaults.webDirPath)
		, Option [] ["storedir"] (ReqArg (\p->fmap \o->{o & storeDirPath=p}) "PATH")
			("Specify the folder containing the data stores\ndefault: " +++ defaults.storeDirPath)
		, Option [] ["tempdir"] (ReqArg (\p->fmap \o->{o & tempDirPath=p}) "PATH")
			("Specify the folder containing the temporary files\ndefault: " +++ defaults.tempDirPath)
137 138
		, Option [] ["bytecodepath"] (ReqArg (\p->fmap \o->{o & byteCodePath=p}) "PATH")
			("Specify the app's bytecode file\ndefault: " +++ defaults.byteCodePath)
139 140
		, Option [] ["distributed"] (NoArg (fmap \o->{o & distributed=True}))
			"Enable distributed mode (populate the symbols share)"
141 142
		, Option ['s'] ["sdsPort"] (ReqArg (\p->fmap \o->{o & sdsPort=toInt p}) "SDSPORT")
			("Specify the SDS port (default: " +++ toString defaults.sdsPort +++ ")")
143
		]
144

145 146
onStartup :: (Task a) -> StartableTask | iTask a
onStartup task = StartupTask {StartupTask|attributes = defaultValue, task = TaskWrapper task}
147

148 149 150 151 152 153 154 155
onRequest :: String (Task a) -> StartableTask | iTask a
onRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper (const task)}

onStartupWithAttributes :: (Task a) TaskAttributes -> StartableTask | iTask a
onStartupWithAttributes task attributes = StartupTask {StartupTask|attributes = attributes, task = TaskWrapper task}

onRequestFromRequest :: String (HTTPRequest -> Task a) -> StartableTask | iTask a
onRequestFromRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper task}
156 157

class Startable a
Bas Lijnse's avatar
Bas Lijnse committed
158
where
159 160 161
	toStartable :: !a -> [StartableTask]

instance Startable (Task a) | iTask a //Default as web task
162
where
163
	toStartable task =
164 165
		[onStartup viewWebServerInstructions
		,onRequest "/" task
166
		]
167

168
instance Startable (HTTPRequest -> Task a) | iTask a //As web task
169
where
170
	toStartable task =
171 172
		[onStartup viewWebServerInstructions
		,onRequestFromRequest "/" task
173
		]
174

175
instance Startable StartableTask
176
where
177
	toStartable task = [task]
178

179
instance Startable [StartableTask]
180
where
181
	toStartable list = list
182

183 184 185 186
instance Startable (a,b) | Startable a & Startable b
where
	toStartable (x,y) = toStartable x ++ toStartable y

187 188 189 190 191 192 193 194 195 196 197 198
viewWebServerInstructions :: Task String
viewWebServerInstructions
	=   get applicationOptions
	>>- \{EngineOptions|appName,serverPort} ->
			traceValue (join OS_NEWLINE
				["*** " +++ appName +++ " HTTP server ***"
				,""
				,"Running at http://localhost" +++
					if (serverPort == 80)
						"/"
						(":" +++ toString serverPort +++ "/")
				])
199

200 201
defaultEngineOptions :: !*World -> (!EngineOptions,!*World)
defaultEngineOptions world
202
	# (appPath,world)    = determineAppPath world
203 204
	# (appVersion,world) = determineAppVersion appPath world
	# appDir             = takeDirectory appPath
205
	# appName            = (if (takeExtension appPath == "exe") dropExtension id o dropDirectory) appPath
206
	# options =
207 208 209 210 211
		{ appName        = appName
		, appPath        = appPath
		, appVersion     = appVersion
		, serverPort     = IF_POSIX_OR_WINDOWS 8080 80
		, serverUrl      = "http://localhost/"
212
		, allowedHosts   = ["127.0.0.1"]
213 214 215 216 217 218 219
		, keepaliveTime  = {tv_sec=300,tv_nsec=0} // 5 minutes
		, sessionTime    = {tv_sec=60,tv_nsec=0}  // 1 minute, (the client pings every 10 seconds by default)
		, persistTasks   = False
		, autoLayout     = True
		, distributed    = False
		, maxEvents      = 5
		, sdsPort        = 9090
Mart Lubbers's avatar
Mart Lubbers committed
220
		, timeout        = Nothing//Just 500
221 222 223
		, webDirPath     = appDir </> appName +++ "-www"
		, storeDirPath   = appDir </> appName +++ "-data" </> "stores"
		, tempDirPath    = appDir </> appName +++ "-data" </> "tmp"
224
		, byteCodePath   = appDir </> appName +++ ".bc"
225 226 227
		}
	= (options,world)

ecrombag's avatar
ecrombag committed
228
// Determines the server executables path
229
determineAppPath :: !*World -> (!FilePath, !*World)
ecrombag's avatar
ecrombag committed
230
determineAppPath world
231
	# ([arg:_],world) = getCommandLine world
232
	| dropDirectory arg <> "ConsoleClient.exe"	= toCanonicalPath arg world
233 234 235
	//Using dynamic linker:
	# (res, world)				= getCurrentDirectory world
	| isError res				= abort "Cannot get current directory."
236
	# currentDirectory			= fromOk res
237 238
	# (res, world)				= readDirectory currentDirectory world
	| isError res				= abort "Cannot read current directory."
239
	# batchfiles				= [f \\ f <- fromOk res | takeExtension f == "bat" ]
240 241 242 243 244
	| isEmpty batchfiles		= abort "No dynamic linker batch file found."
	# (infos, world)			= seqList (map getFileInfo batchfiles) world
	| any isError infos	 		= abort "Cannot get file information."
	= (currentDirectory </> (fst o hd o sortBy cmpFileTime) (zip2 batchfiles infos), world)
	where
245
		cmpFileTime (_,Ok {FileInfo | lastModifiedTime = x})
246
					(_,Ok {FileInfo | lastModifiedTime = y}) = timeGm x > timeGm y
247

248
//By default, we use the modification time of the application executable as version id
249
determineAppVersion :: !FilePath!*World -> (!String,!*World)
250 251
determineAppVersion appPath world
	# (res,world)       = getFileInfo appPath world
252
	| res =: (Error _)  = ("unknown",world)
253 254 255
	# tm				= (fromOk res).lastModifiedTime
	# version           = strfTime "%Y%m%d-%H%M%S" tm
	= (version,world)
256 257 258 259 260 261 262 263 264 265 266 267

timeout :: !(Maybe Timeout) !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout mt iworld = case read taskEvents EmptyContext iworld of
	//No events
	(Ok (ReadingDone (Queue [] [])),iworld=:{sdsNotifyRequests,world})
		# (ts, world) = nsTime world
		= ( minListBy lesser [mt:flatten (map (getTimeoutFromClock ts) ('DM'.elems sdsNotifyRequests))]
		  , {iworld & world = world})
	(Ok (ReadingDone (Queue _ _)), iworld)               = (Just 0,iworld)   //There are still events, don't wait
	(Error _,iworld)            = (Just 500,iworld) //Keep retrying, but not too fast
where
	lesser (Just x) (Just y) = x < y
268 269
	lesser (Just _) Nothing  = True
	lesser _        _        = False
270 271 272 273 274 275 276 277 278 279 280 281 282

	getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
	getTimeoutFromClock now requests = map getTimeoutFromClock` ('DM'.toList requests)
	where
		getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
		getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
			| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
				# fire = iworldTimespecNextFire now reqTimespec ts
				= Just (max 0 (toMs fire - toMs now))
			= mt
		getTimeoutFromClock` _ = mt

	toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000