Engine.icl 12.6 KB
Newer Older
1
implementation module iTasks.Engine
2

3 4
import Data.Func
import Data.Functor
5
import Data.List
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
import Data.Queue
import Internet.HTTP
import StdEnv
import System.CommandLine
import System.Directory
import System.File
import System.FilePath
import System.GetOpt
import System.OS
import Text
import iTasks.Internal.Distributed.Symbols
import iTasks.Internal.EngineTasks
import iTasks.Internal.IWorld
import iTasks.Internal.SDS
import iTasks.Internal.SDSService
import iTasks.Internal.TaskServer
import iTasks.Internal.TaskStore
import iTasks.Internal.Util
import iTasks.SDS.Sources.System
25
import iTasks.WF.Combinators.Common
26 27
import iTasks.WF.Definition
import iTasks.WF.Tasks.SDS
28
import iTasks.WF.Tasks.System
Mart Lubbers's avatar
Mart Lubbers committed
29
import iTasks.WF.Derives
30

31
import qualified Data.Map as DM
32
import Data.Map.GenJSON
33

34 35
from TCPIP import :: Timeout
from StdFunc import :: St, seqList
36

Mart Lubbers's avatar
Mart Lubbers committed
37 38
MAX_EVENTS 		        :== 5

39
derive class iTask EngineOptions
40

41
doTasks :: a !*World -> *World | Startable a
Bas Lijnse's avatar
Bas Lijnse committed
42
doTasks startable world = doTasksWithOptions (defaultEngineCLIOptions startable) world
43

Bas Lijnse's avatar
Bas Lijnse committed
44 45
doTasksWithOptions :: ([String] EngineOptions -> MaybeError [String] (a,EngineOptions)) !*World -> *World | Startable a
doTasksWithOptions initFun world
46 47 48
	# (cli,world)                = getCommandLine world
	# (options,world)            = defaultEngineOptions world
	# mbOptions                  = initFun cli options
49
	| mbOptions =:(Error _)      = show (fromError mbOptions) (setReturnCode 1 world)
Bas Lijnse's avatar
Bas Lijnse committed
50
	# (startable,options)        = fromOk mbOptions
51
	# mbIWorld                   = createIWorld options world
52 53 54 55
	| mbIWorld =: Left _
		# (Left (err, world)) = mbIWorld
		= show [err] (setReturnCode 1 world)
	# (Right iworld)             = mbIWorld
56
	# (symbolsResult, iworld)    = initSymbolsShare options.distributed options.appName iworld
57
	| symbolsResult =: (Error _) = show ["Error reading symbols while required: " +++ fromError symbolsResult] (setReturnCode 1 (destroyIWorld iworld))
Bas Lijnse's avatar
Bas Lijnse committed
58 59
	# iworld = if (hasDup (requestPaths startable))
		(iShow ["Warning: duplicate paths in the web tasks: " +++ join ", " ["'" +++ p +++ "'"\\p<-requestPaths startable]] iworld)
60
		iworld
Bas Lijnse's avatar
Bas Lijnse committed
61
	# iworld                     = serve (startupTasks startable options) (tcpTasks startable options.serverPort options.keepaliveTime) (timeout options.timeout) iworld
62
	= destroyIWorld iworld
63
where
Bas Lijnse's avatar
Bas Lijnse committed
64 65 66 67
	requestPaths startable = [path\\{path}<-webTasks startable]
	webTasks startable = [t \\ WebTask t <- toStartable startable]
	startupTasks startable {distributed, sdsPort}
		=  if (webTasks startable) =:[]
68 69 70 71 72
		   //if there are no webtasks: stop when stable
		   [systemTask (startTask stopOnStable)]
		   //if there are: show instructions andcleanup old sessions
		   [startTask viewWebServerInstructions
		   ,systemTask (startTask removeOutdatedSessions)]
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
73
		//If distributed, start sds service task
74
		++ (if distributed [systemTask (startTask (sdsServiceTask sdsPort))] [])
75
		++ [systemTask (startTask flushWritesWhenIdle)
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
76
		//Start all startup tasks
77
		   :[t \\ StartupTask t <- toStartable startable]]
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
78

79
	startTask t = {StartupTask|attributes=defaultValue,task=TaskWrapper t}
80
	systemTask t = {StartupTask|t&attributes='DM'.put "system" (JSONBool True) t.StartupTask.attributes}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
81

Haye Böhm's avatar
Haye Böhm committed
82
	initSymbolsShare False _ iworld = (Ok (), iworld)
Haye Böhm's avatar
Haye Böhm committed
83
	initSymbolsShare True appName iworld = case storeSymbols (IF_WINDOWS (appName +++ ".exe") appName) iworld of
Haye Böhm's avatar
Haye Böhm committed
84
		(Error (e, s), iworld) = (Error s, iworld)
85
		(Ok noSymbols, iworld) = (Ok (),  {iworld & world = show ["Read number of symbols: " +++ toString noSymbols] iworld.world})
Haye Böhm's avatar
Haye Böhm committed
86

87
	//Only run a webserver if there are tasks that are started through the web
Bas Lijnse's avatar
Bas Lijnse committed
88 89
	tcpTasks startable serverPort keepaliveTime
		| (webTasks startable)=: [] = []
90
		| otherwise
Bas Lijnse's avatar
Bas Lijnse committed
91
			= [(serverPort,httpServer serverPort keepaliveTime (engineWebService (webTasks startable)) taskOutput)]
92

93
	// The iTasks engine consist of a set of HTTP Web services
94
	engineWebService :: [WebTask] -> [WebService (Map InstanceNo TaskOutput) (Map InstanceNo TaskOutput)]
95 96 97 98 99 100 101 102 103 104 105 106 107
	engineWebService webtasks =
		[taskUIService webtasks
		,documentService
		,staticResourceService [path \\ {WebTask|path} <- webtasks]
		]

	show :: ![String] !*World -> *World
	show lines world
		# (console,world)	= stdio world
		# console			= seqSt (\s c -> fwrites (s +++ OS_NEWLINE) c) lines console
		# (_,world)			= fclose console world
		= world

Bas Lijnse's avatar
Bas Lijnse committed
108 109
defaultEngineCLIOptions :: a [String] EngineOptions -> MaybeError [String] (a, EngineOptions)
defaultEngineCLIOptions tasks [argv0:argv] defaults
110 111 112 113 114
	# (settings, positionals, errs) = getOpt Permute opts argv
	| not (errs =: []) = Error errs
	| not (positionals =: []) = Error ["Positional arguments not allowed"]
	= case foldl (o) id settings (Just defaults) of
		Nothing = (Error [usageInfo ("Usage " +++ argv0 +++ "[OPTIONS]") opts])
Bas Lijnse's avatar
Bas Lijnse committed
115
		Just settings = Ok (tasks,settings)
116 117 118
where
	opts :: [OptDescr ((Maybe EngineOptions) -> Maybe EngineOptions)]
	opts =
119
		[ Option ['?'] ["help"] (NoArg (\_->Nothing))
120 121
			"Display this message"
		, Option ['p'] ["port"] (ReqArg (\p->fmap \o->{o & serverPort=toInt p}) "PORT")
122
			("Specify the HTTP port (default: " +++ toString defaults.serverPort +++ ")")
123 124
		, Option [] ["timeout"] (OptArg (\mp->fmap \o->{o & timeout=fmap toInt mp}) "MILLISECONDS")
			"Specify the timeout in ms (default: 500)\nIf not given, use an indefinite timeout."
125
		, Option [] ["allowed-hosts"] (ReqArg (\p->fmap \o->{o & allowedHosts = if (p == "") [] (split "," p)}) "IPADRESSES")
126 127
			("Specify a comma separated white list of hosts that are allowed to connected to this application\ndefault: "
			 +++ join "," defaults.allowedHosts)
128 129
		, Option [] ["keepalive"] (ReqArg (\p->fmap \o->{o & keepaliveTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the keepalive time in seconds (default: 300)"
130 131
		, Option [] ["maxevents"] (ReqArg (\p->fmap \o->{o & maxEvents=toInt p}) "NUM")
			"Specify the maximum number of events to process per loop (default: 5)"
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
		, Option [] ["sessiontime"] (ReqArg (\p->fmap \o->{o & sessionTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the expiry time for a session in seconds (default: 60)"
		, Option [] ["autolayout"] (NoArg (fmap \o->{o & autoLayout=True}))
			"Enable autolayouting (default)"
		, Option [] ["no-autolayout"] (NoArg (fmap \o->{o & autoLayout=False}))
			"Disable autolayouting"
		, Option [] ["persist-tasks"] (NoArg (fmap \o->{o & persistTasks=True}))
			"Enable the persistence of tasks"
		, Option [] ["no-persist-tasks"] (NoArg (fmap \o->{o & persistTasks=False}))
			"Disable the persistence of tasks (default)"
		, Option [] ["webdir"] (ReqArg (\p->fmap \o->{o & webDirPath=p}) "PATH")
			("Specify the folder containing static web content\ndefault: " +++ defaults.webDirPath)
		, Option [] ["storedir"] (ReqArg (\p->fmap \o->{o & storeDirPath=p}) "PATH")
			("Specify the folder containing the data stores\ndefault: " +++ defaults.storeDirPath)
		, Option [] ["tempdir"] (ReqArg (\p->fmap \o->{o & tempDirPath=p}) "PATH")
			("Specify the folder containing the temporary files\ndefault: " +++ defaults.tempDirPath)
148 149
		, Option [] ["bytecodepath"] (ReqArg (\p->fmap \o->{o & byteCodePath=p}) "PATH")
			("Specify the app's bytecode file\ndefault: " +++ defaults.byteCodePath)
150 151
		, Option [] ["distributed"] (NoArg (fmap \o->{o & distributed=True}))
			"Enable distributed mode (populate the symbols share)"
152 153
		, Option ['s'] ["sdsPort"] (ReqArg (\p->fmap \o->{o & sdsPort=toInt p}) "SDSPORT")
			("Specify the SDS port (default: " +++ toString defaults.sdsPort +++ ")")
154
		]
155

156 157
onStartup :: (Task a) -> StartableTask | iTask a
onStartup task = StartupTask {StartupTask|attributes = defaultValue, task = TaskWrapper task}
158

159 160 161 162 163 164 165 166
onRequest :: String (Task a) -> StartableTask | iTask a
onRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper (const task)}

onStartupWithAttributes :: (Task a) TaskAttributes -> StartableTask | iTask a
onStartupWithAttributes task attributes = StartupTask {StartupTask|attributes = attributes, task = TaskWrapper task}

onRequestFromRequest :: String (HTTPRequest -> Task a) -> StartableTask | iTask a
onRequestFromRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper task}
167 168

class Startable a
Bas Lijnse's avatar
Bas Lijnse committed
169
where
170 171 172
	toStartable :: !a -> [StartableTask]

instance Startable (Task a) | iTask a //Default as web task
173
where
174
	toStartable task = [onRequest "/" task]
175

176
instance Startable (HTTPRequest -> Task a) | iTask a //As web task
177
where
178
	toStartable task = [onRequestFromRequest "/" task]
179

180
instance Startable StartableTask
181
where
182
	toStartable task = [task]
183

184
instance Startable [StartableTask]
185
where
186
	toStartable list = list
187

188 189 190 191
instance Startable (a,b) | Startable a & Startable b
where
	toStartable (x,y) = toStartable x ++ toStartable y

192 193 194 195 196 197 198 199 200 201 202 203
viewWebServerInstructions :: Task String
viewWebServerInstructions
	=   get applicationOptions
	>>- \{EngineOptions|appName,serverPort} ->
			traceValue (join OS_NEWLINE
				["*** " +++ appName +++ " HTTP server ***"
				,""
				,"Running at http://localhost" +++
					if (serverPort == 80)
						"/"
						(":" +++ toString serverPort +++ "/")
				])
204

205 206
defaultEngineOptions :: !*World -> (!EngineOptions,!*World)
defaultEngineOptions world
207
	# (appPath,world)    = determineAppPath world
208 209
	# (appVersion,world) = determineAppVersion appPath world
	# appDir             = takeDirectory appPath
210
	# appName            = (if (takeExtension appPath == "exe") dropExtension id o dropDirectory) appPath
211
	# options =
212 213 214 215 216
		{ appName        = appName
		, appPath        = appPath
		, appVersion     = appVersion
		, serverPort     = IF_POSIX_OR_WINDOWS 8080 80
		, serverUrl      = "http://localhost/"
217
		, allowedHosts   = ["127.0.0.1"]
218 219 220 221 222 223 224
		, keepaliveTime  = {tv_sec=300,tv_nsec=0} // 5 minutes
		, sessionTime    = {tv_sec=60,tv_nsec=0}  // 1 minute, (the client pings every 10 seconds by default)
		, persistTasks   = False
		, autoLayout     = True
		, distributed    = False
		, maxEvents      = 5
		, sdsPort        = 9090
Mart Lubbers's avatar
Mart Lubbers committed
225
		, timeout        = Nothing//Just 500
226 227 228
		, webDirPath     = appDir </> appName +++ "-www"
		, storeDirPath   = appDir </> appName +++ "-data" </> "stores"
		, tempDirPath    = appDir </> appName +++ "-data" </> "tmp"
229
		, byteCodePath   = appDir </> appName +++ ".bc"
230 231 232
		}
	= (options,world)

ecrombag's avatar
ecrombag committed
233
// Determines the server executables path
234
determineAppPath :: !*World -> (!FilePath, !*World)
ecrombag's avatar
ecrombag committed
235
determineAppPath world
236
	# ([arg:_],world) = getCommandLine world
237
	| dropDirectory arg <> "ConsoleClient.exe"	= toCanonicalPath arg world
238 239 240
	//Using dynamic linker:
	# (res, world)				= getCurrentDirectory world
	| isError res				= abort "Cannot get current directory."
241
	# currentDirectory			= fromOk res
242 243
	# (res, world)				= readDirectory currentDirectory world
	| isError res				= abort "Cannot read current directory."
244
	# batchfiles				= [f \\ f <- fromOk res | takeExtension f == "bat" ]
245 246 247 248 249
	| isEmpty batchfiles		= abort "No dynamic linker batch file found."
	# (infos, world)			= seqList (map getFileInfo batchfiles) world
	| any isError infos	 		= abort "Cannot get file information."
	= (currentDirectory </> (fst o hd o sortBy cmpFileTime) (zip2 batchfiles infos), world)
	where
250
		cmpFileTime (_,Ok {FileInfo | lastModifiedTime = x})
251
					(_,Ok {FileInfo | lastModifiedTime = y}) = timeGm x > timeGm y
252

253
//By default, we use the modification time of the application executable as version id
254
determineAppVersion :: !FilePath!*World -> (!String,!*World)
255 256
determineAppVersion appPath world
	# (res,world)       = getFileInfo appPath world
257
	| res =: (Error _)  = ("unknown",world)
258 259 260
	# tm				= (fromOk res).lastModifiedTime
	# version           = strfTime "%Y%m%d-%H%M%S" tm
	= (version,world)
261 262 263 264 265 266 267 268 269 270 271 272

timeout :: !(Maybe Timeout) !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout mt iworld = case read taskEvents EmptyContext iworld of
	//No events
	(Ok (ReadingDone (Queue [] [])),iworld=:{sdsNotifyRequests,world})
		# (ts, world) = nsTime world
		= ( minListBy lesser [mt:flatten (map (getTimeoutFromClock ts) ('DM'.elems sdsNotifyRequests))]
		  , {iworld & world = world})
	(Ok (ReadingDone (Queue _ _)), iworld)               = (Just 0,iworld)   //There are still events, don't wait
	(Error _,iworld)            = (Just 500,iworld) //Keep retrying, but not too fast
where
	lesser (Just x) (Just y) = x < y
273 274
	lesser (Just _) Nothing  = True
	lesser _        _        = False
275 276 277 278 279 280

	getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
	getTimeoutFromClock now requests = map getTimeoutFromClock` ('DM'.toList requests)
	where
		getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
		getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
281
			| dependsOnClock snr && ts.interval <> zero
282 283 284 285 286
				# fire = iworldTimespecNextFire now reqTimespec ts
				= Just (max 0 (toMs fire - toMs now))
			= mt
		getTimeoutFromClock` _ = mt

287
	dependsOnClock :: !SDSNotifyRequest -> Bool
288
	dependsOnClock snr = indexOf "$IWorld:timespec$" snr.reqSDSId >= 0
289

290
	toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000