Engine.icl 12.2 KB
Newer Older
1
implementation module iTasks.Engine
2

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
import Data.Func
import Data.Functor
import Data.Queue
import Internet.HTTP
import StdEnv
import System.CommandLine
import System.Directory
import System.File
import System.FilePath
import System.GetOpt
import System.OS
import Text
import iTasks.Internal.Distributed.Symbols
import iTasks.Internal.EngineTasks
import iTasks.Internal.IWorld
import iTasks.Internal.SDS
import iTasks.Internal.SDSService
import iTasks.Internal.TaskServer
import iTasks.Internal.TaskStore
import iTasks.Internal.Util
import iTasks.SDS.Sources.System
24
import iTasks.WF.Combinators.Common
25 26
import iTasks.WF.Definition
import iTasks.WF.Tasks.SDS
27
import iTasks.WF.Tasks.System
Mart Lubbers's avatar
Mart Lubbers committed
28
import iTasks.WF.Derives
29

30
import qualified Data.Map as DM
31

32 33
from TCPIP import :: Timeout
from StdFunc import :: St, seqList
34

Mart Lubbers's avatar
Mart Lubbers committed
35 36
MAX_EVENTS 		        :== 5

37
derive class iTask EngineOptions
38

39 40
doTasks :: a !*World -> *World | Startable a
doTasks startable world = doTasksWithOptions defaultEngineCLIOptions startable world
41

42
doTasksWithOptions :: ([String] EngineOptions -> MaybeError [String] EngineOptions) a !*World -> *World | Startable a
43
doTasksWithOptions initFun startable world
44 45 46
	# (cli,world)                = getCommandLine world
	# (options,world)            = defaultEngineOptions world
	# mbOptions                  = initFun cli options
47
	| mbOptions =:(Error _)      = show (fromError mbOptions) (setReturnCode 1 world)
48
	# options                    = fromOk mbOptions
49
	# mbIWorld                   = createIWorld options world
50 51 52 53
	| mbIWorld =: Left _
		# (Left (err, world)) = mbIWorld
		= show [err] (setReturnCode 1 world)
	# (Right iworld)             = mbIWorld
54
	# (symbolsResult, iworld)    = initSymbolsShare options.distributed options.appName iworld
55
	| symbolsResult =: (Error _) = show ["Error reading symbols while required: " +++ fromError symbolsResult] (setReturnCode 1 (destroyIWorld iworld))
Mart Lubbers's avatar
Mart Lubbers committed
56
	# iworld                     = serve (startupTasks options) (tcpTasks options.serverPort options.keepaliveTime) (timeout options.timeout) iworld
57
	= destroyIWorld iworld
58
where
59
    webTasks = [t \\ WebTask t <- toStartable startable]
60
	startupTasks {distributed, sdsPort}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
61
		//If distributed, start sds service task
62 63
		=  (if distributed [systemTask (startTask (sdsServiceTask sdsPort))] [])
		++ [systemTask (startTask flushWritesWhenIdle)
64
		//If there no webtasks, stop when stable, otherwise cleanup old sessions
65
		   ,systemTask (startTask if (webTasks =: []) stopOnStable removeOutdatedSessions)
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
66
		//Start all startup tasks
67
		   :[t \\ StartupTask t <- toStartable startable]]
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
68

69
	startTask t = {StartupTask|attributes=defaultValue,task=TaskWrapper t}
70
	systemTask t = {StartupTask|t&attributes='DM'.put "system" (JSONBool True) t.StartupTask.attributes}
Mart Lubbers's avatar
cleanup  
Mart Lubbers committed
71

Haye Böhm's avatar
Haye Böhm committed
72
	initSymbolsShare False _ iworld = (Ok (), iworld)
Haye Böhm's avatar
Haye Böhm committed
73
	initSymbolsShare True appName iworld = case storeSymbols (IF_WINDOWS (appName +++ ".exe") appName) iworld of
Haye Böhm's avatar
Haye Böhm committed
74
		(Error (e, s), iworld) = (Error s, iworld)
75
		(Ok noSymbols, iworld) = (Ok (),  {iworld & world = show ["Read number of symbols: " +++ toString noSymbols] iworld.world})
Haye Böhm's avatar
Haye Böhm committed
76

77 78 79 80 81
	//Only run a webserver if there are tasks that are started through the web
	tcpTasks serverPort keepaliveTime
		| webTasks =: [] = []
		| otherwise
			= [(serverPort,httpServer serverPort keepaliveTime (engineWebService webTasks) taskOutput)]
82

83
	// The iTasks engine consist of a set of HTTP Web services
84
	engineWebService :: [WebTask] -> [WebService (Map InstanceNo TaskOutput) (Map InstanceNo TaskOutput)]
85 86 87 88 89 90 91 92 93 94 95 96 97
	engineWebService webtasks =
		[taskUIService webtasks
		,documentService
		,staticResourceService [path \\ {WebTask|path} <- webtasks]
		]

	show :: ![String] !*World -> *World
	show lines world
		# (console,world)	= stdio world
		# console			= seqSt (\s c -> fwrites (s +++ OS_NEWLINE) c) lines console
		# (_,world)			= fclose console world
		= world

98
defaultEngineCLIOptions :: [String] EngineOptions -> MaybeError [String] EngineOptions
99 100 101 102 103 104 105 106 107 108
defaultEngineCLIOptions [argv0:argv] defaults
	# (settings, positionals, errs) = getOpt Permute opts argv
	| not (errs =: []) = Error errs
	| not (positionals =: []) = Error ["Positional arguments not allowed"]
	= case foldl (o) id settings (Just defaults) of
		Nothing = (Error [usageInfo ("Usage " +++ argv0 +++ "[OPTIONS]") opts])
		Just settings = Ok settings
where
	opts :: [OptDescr ((Maybe EngineOptions) -> Maybe EngineOptions)]
	opts =
109
		[ Option ['?'] ["help"] (NoArg (\_->Nothing))
110 111
			"Display this message"
		, Option ['p'] ["port"] (ReqArg (\p->fmap \o->{o & serverPort=toInt p}) "PORT")
112
			("Specify the HTTP port (default: " +++ toString defaults.serverPort +++ ")")
113 114
		, Option [] ["timeout"] (OptArg (\mp->fmap \o->{o & timeout=fmap toInt mp}) "MILLISECONDS")
			"Specify the timeout in ms (default: 500)\nIf not given, use an indefinite timeout."
115
		, Option [] ["allowed-hosts"] (ReqArg (\p->fmap \o->{o & allowedHosts = if (p == "") [] (split "," p)}) "IPADRESSES")
116 117
			("Specify a comma separated white list of hosts that are allowed to connected to this application\ndefault: "
			 +++ join "," defaults.allowedHosts)
118 119
		, Option [] ["keepalive"] (ReqArg (\p->fmap \o->{o & keepaliveTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the keepalive time in seconds (default: 300)"
120 121
		, Option [] ["maxevents"] (ReqArg (\p->fmap \o->{o & maxEvents=toInt p}) "NUM")
			"Specify the maximum number of events to process per loop (default: 5)"
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
		, Option [] ["sessiontime"] (ReqArg (\p->fmap \o->{o & sessionTime={tv_sec=toInt p,tv_nsec=0}}) "SECONDS")
			"Specify the expiry time for a session in seconds (default: 60)"
		, Option [] ["autolayout"] (NoArg (fmap \o->{o & autoLayout=True}))
			"Enable autolayouting (default)"
		, Option [] ["no-autolayout"] (NoArg (fmap \o->{o & autoLayout=False}))
			"Disable autolayouting"
		, Option [] ["persist-tasks"] (NoArg (fmap \o->{o & persistTasks=True}))
			"Enable the persistence of tasks"
		, Option [] ["no-persist-tasks"] (NoArg (fmap \o->{o & persistTasks=False}))
			"Disable the persistence of tasks (default)"
		, Option [] ["webdir"] (ReqArg (\p->fmap \o->{o & webDirPath=p}) "PATH")
			("Specify the folder containing static web content\ndefault: " +++ defaults.webDirPath)
		, Option [] ["storedir"] (ReqArg (\p->fmap \o->{o & storeDirPath=p}) "PATH")
			("Specify the folder containing the data stores\ndefault: " +++ defaults.storeDirPath)
		, Option [] ["tempdir"] (ReqArg (\p->fmap \o->{o & tempDirPath=p}) "PATH")
			("Specify the folder containing the temporary files\ndefault: " +++ defaults.tempDirPath)
138 139
		, Option [] ["bytecodepath"] (ReqArg (\p->fmap \o->{o & byteCodePath=p}) "PATH")
			("Specify the app's bytecode file\ndefault: " +++ defaults.byteCodePath)
140 141
		, Option [] ["distributed"] (NoArg (fmap \o->{o & distributed=True}))
			"Enable distributed mode (populate the symbols share)"
142 143
		, Option ['s'] ["sdsPort"] (ReqArg (\p->fmap \o->{o & sdsPort=toInt p}) "SDSPORT")
			("Specify the SDS port (default: " +++ toString defaults.sdsPort +++ ")")
144
		]
145

146 147
onStartup :: (Task a) -> StartableTask | iTask a
onStartup task = StartupTask {StartupTask|attributes = defaultValue, task = TaskWrapper task}
148

149 150 151 152 153 154 155 156
onRequest :: String (Task a) -> StartableTask | iTask a
onRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper (const task)}

onStartupWithAttributes :: (Task a) TaskAttributes -> StartableTask | iTask a
onStartupWithAttributes task attributes = StartupTask {StartupTask|attributes = attributes, task = TaskWrapper task}

onRequestFromRequest :: String (HTTPRequest -> Task a) -> StartableTask | iTask a
onRequestFromRequest path task = WebTask {WebTask|path = path, task = WebTaskWrapper task}
157 158

class Startable a
Bas Lijnse's avatar
Bas Lijnse committed
159
where
160 161 162
	toStartable :: !a -> [StartableTask]

instance Startable (Task a) | iTask a //Default as web task
163
where
164
	toStartable task =
165 166
		[onStartup viewWebServerInstructions
		,onRequest "/" task
167
		]
168

169
instance Startable (HTTPRequest -> Task a) | iTask a //As web task
170
where
171
	toStartable task =
172 173
		[onStartup viewWebServerInstructions
		,onRequestFromRequest "/" task
174
		]
175

176
instance Startable StartableTask
177
where
178
	toStartable task = [task]
179

180
instance Startable [StartableTask]
181
where
182
	toStartable list = list
183

184 185 186 187
instance Startable (a,b) | Startable a & Startable b
where
	toStartable (x,y) = toStartable x ++ toStartable y

188 189 190 191 192 193 194 195 196 197 198 199
viewWebServerInstructions :: Task String
viewWebServerInstructions
	=   get applicationOptions
	>>- \{EngineOptions|appName,serverPort} ->
			traceValue (join OS_NEWLINE
				["*** " +++ appName +++ " HTTP server ***"
				,""
				,"Running at http://localhost" +++
					if (serverPort == 80)
						"/"
						(":" +++ toString serverPort +++ "/")
				])
200

201 202
defaultEngineOptions :: !*World -> (!EngineOptions,!*World)
defaultEngineOptions world
203
	# (appPath,world)    = determineAppPath world
204 205
	# (appVersion,world) = determineAppVersion appPath world
	# appDir             = takeDirectory appPath
206
	# appName            = (if (takeExtension appPath == "exe") dropExtension id o dropDirectory) appPath
207
	# options =
208 209 210 211 212
		{ appName        = appName
		, appPath        = appPath
		, appVersion     = appVersion
		, serverPort     = IF_POSIX_OR_WINDOWS 8080 80
		, serverUrl      = "http://localhost/"
213
		, allowedHosts   = ["127.0.0.1"]
214 215 216 217 218 219 220
		, keepaliveTime  = {tv_sec=300,tv_nsec=0} // 5 minutes
		, sessionTime    = {tv_sec=60,tv_nsec=0}  // 1 minute, (the client pings every 10 seconds by default)
		, persistTasks   = False
		, autoLayout     = True
		, distributed    = False
		, maxEvents      = 5
		, sdsPort        = 9090
Mart Lubbers's avatar
Mart Lubbers committed
221
		, timeout        = Nothing//Just 500
222 223 224
		, webDirPath     = appDir </> appName +++ "-www"
		, storeDirPath   = appDir </> appName +++ "-data" </> "stores"
		, tempDirPath    = appDir </> appName +++ "-data" </> "tmp"
225
		, byteCodePath   = appDir </> appName +++ ".bc"
226 227 228
		}
	= (options,world)

ecrombag's avatar
ecrombag committed
229
// Determines the server executables path
230
determineAppPath :: !*World -> (!FilePath, !*World)
ecrombag's avatar
ecrombag committed
231
determineAppPath world
232
	# ([arg:_],world) = getCommandLine world
233
	| dropDirectory arg <> "ConsoleClient.exe"	= toCanonicalPath arg world
234 235 236
	//Using dynamic linker:
	# (res, world)				= getCurrentDirectory world
	| isError res				= abort "Cannot get current directory."
237
	# currentDirectory			= fromOk res
238 239
	# (res, world)				= readDirectory currentDirectory world
	| isError res				= abort "Cannot read current directory."
240
	# batchfiles				= [f \\ f <- fromOk res | takeExtension f == "bat" ]
241 242 243 244 245
	| isEmpty batchfiles		= abort "No dynamic linker batch file found."
	# (infos, world)			= seqList (map getFileInfo batchfiles) world
	| any isError infos	 		= abort "Cannot get file information."
	= (currentDirectory </> (fst o hd o sortBy cmpFileTime) (zip2 batchfiles infos), world)
	where
246
		cmpFileTime (_,Ok {FileInfo | lastModifiedTime = x})
247
					(_,Ok {FileInfo | lastModifiedTime = y}) = timeGm x > timeGm y
248

249
//By default, we use the modification time of the application executable as version id
250
determineAppVersion :: !FilePath!*World -> (!String,!*World)
251 252
determineAppVersion appPath world
	# (res,world)       = getFileInfo appPath world
253
	| res =: (Error _)  = ("unknown",world)
254 255 256
	# tm				= (fromOk res).lastModifiedTime
	# version           = strfTime "%Y%m%d-%H%M%S" tm
	= (version,world)
257 258 259 260 261 262 263 264 265 266 267 268

timeout :: !(Maybe Timeout) !*IWorld -> (!Maybe Timeout,!*IWorld)
timeout mt iworld = case read taskEvents EmptyContext iworld of
	//No events
	(Ok (ReadingDone (Queue [] [])),iworld=:{sdsNotifyRequests,world})
		# (ts, world) = nsTime world
		= ( minListBy lesser [mt:flatten (map (getTimeoutFromClock ts) ('DM'.elems sdsNotifyRequests))]
		  , {iworld & world = world})
	(Ok (ReadingDone (Queue _ _)), iworld)               = (Just 0,iworld)   //There are still events, don't wait
	(Error _,iworld)            = (Just 500,iworld) //Keep retrying, but not too fast
where
	lesser (Just x) (Just y) = x < y
269 270
	lesser (Just _) Nothing  = True
	lesser _        _        = False
271 272 273 274 275 276

	getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
	getTimeoutFromClock now requests = map getTimeoutFromClock` ('DM'.toList requests)
	where
		getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
		getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
277
			| dependsOnClock snr && ts.interval <> zero
278 279 280 281 282
				# fire = iworldTimespecNextFire now reqTimespec ts
				= Just (max 0 (toMs fire - toMs now))
			= mt
		getTimeoutFromClock` _ = mt

283
	dependsOnClock :: !SDSNotifyRequest -> Bool
284
	dependsOnClock snr = indexOf "$IWorld:timespec$" snr.reqSDSId >= 0
285

286
	toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000