diff --git a/Libraries/iTasks/Internal/AsyncSDS.icl b/Libraries/iTasks/Internal/AsyncSDS.icl index 9919fa1dd0d166dffb372d54ed0da70c610cba8e..7c3557d9892a27c7798f29bdd27f7f76b17e4748 100644 --- a/Libraries/iTasks/Internal/AsyncSDS.icl +++ b/Libraries/iTasks/Internal/AsyncSDS.icl @@ -31,6 +31,8 @@ onData data (Left acc) _ = (Ok (Left (acc ++ [data])), Nothing, [], False) onShareChange acc _ = (Ok acc, Nothing, [], False) +onDestroy s = (Ok s, []) + queueSDSRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r queueSDSRequest req host port taskId symbols iworld = case addConnection taskId host port connectionTask iworld of @@ -43,7 +45,8 @@ where handlers _ = {ConnectionHandlers| onConnect = onConnect req, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onDisconnect (Left acc) _ # textResponse = concat acc @@ -61,7 +64,8 @@ where handlers _ = {ConnectionHandlers| onConnect = onConnect req, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy=onDestroy} onDisconnect (Left acc) _ # textResponse = concat acc @@ -79,7 +83,8 @@ where handlers req = {ConnectionHandlers| onConnect = onConnect req, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onDisconnect (Left acc) _ # textResponse = concat acc @@ -98,7 +103,8 @@ where handlers req = {ConnectionHandlers| onConnect = onConnect, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onConnect _ _ _ # req = createRequest p @@ -127,7 +133,8 @@ where handlers = {ConnectionHandlers| onConnect = onConnect, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onConnect connId _ _ = (Ok (Nothing, []), Nothing, [createMessage p +++ "\n"], False) @@ -160,6 +167,7 @@ where , onData = onData , onShareChange = onShareChange , onDisconnect = onDisconnect + , onDestroy = onDestroy } onConnect connId _ _ # req = toWriteRequest p w @@ -191,7 +199,8 @@ where handlers = {ConnectionHandlers| onConnect = onConnect, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onConnect connId _ _ = (Ok (Left ""), Nothing, [toWriteMessage p w +++ "\n"], False) diff --git a/Libraries/iTasks/Internal/SDSService.icl b/Libraries/iTasks/Internal/SDSService.icl index da757a750e672192f65fbdf62fe3aaa7f52cb244..395fa7f6e17d354f0f5a5e1940db388920fccd03 100644 --- a/Libraries/iTasks/Internal/SDSService.icl +++ b/Libraries/iTasks/Internal/SDSService.icl @@ -66,6 +66,7 @@ where , onShareChange = onShareChange , onTick = onTick , onDisconnect = onDisconnect + , onDestroy = \s iw->(Ok s, [], iw) } reevaluateShares :: !{#Symbol} !TaskId ![(ConnectionId, (Bool, String, String))] *IWorld -> (MaybeErrorString [(ConnectionId, (Bool, String, String))], *IWorld) diff --git a/Libraries/iTasks/Internal/Task.dcl b/Libraries/iTasks/Internal/Task.dcl index ba5ec635e725d2ce443e7c703bdba2eace425a04..60cd71f9e2099c2dda059c93274ef018efe05a3a 100644 --- a/Libraries/iTasks/Internal/Task.dcl +++ b/Libraries/iTasks/Internal/Task.dcl @@ -34,6 +34,7 @@ derive gEq Task , onShareChange :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, ![String], !Bool, !*IWorld)) , onTick :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, ![String], !Bool, !*IWorld)) , onDisconnect :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, !*IWorld)) + , onDestroy :: !( l *IWorld -> *(!MaybeErrorString l, ![String], !*IWorld)) } //Background computation tasks diff --git a/Libraries/iTasks/Internal/Task.icl b/Libraries/iTasks/Internal/Task.icl index acbd5596ab15e71ea405132c40e13e7c6e0cfd1e..35ad9f4b81431686ea47f7dab09ffbdf0fa5f6c0 100644 --- a/Libraries/iTasks/Internal/Task.icl +++ b/Libraries/iTasks/Internal/Task.icl @@ -45,8 +45,8 @@ where error = "Creating default task functions is impossible" wrapConnectionTask :: (ConnectionHandlers l r w) (sds () r w) -> ConnectionTask | TC l & TC r & TC w & RWShared sds -wrapConnectionTask ch=:{ConnectionHandlers|onConnect,onData,onShareChange,onDisconnect} sds - = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`} (toDynamic sds) +wrapConnectionTask ch=:{ConnectionHandlers|onConnect,onData,onShareChange,onDisconnect,onDestroy} sds + = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`,onDestroy=onDestroy`} (toDynamic sds) where onConnect` connId host (r :: r^) env # (mbl, mbw, out, close) = onConnect connId host r @@ -71,10 +71,14 @@ where # (mbl, mbw) = onDisconnect l r = (toDyn <$> mbl, toDyn <$> mbw, env) onDisconnect` l r env = abort ("onDisconnect does not match with type l=" +++ toString (typeCodeOfDynamic l) +++ ", r=" +++ toString (typeCodeOfDynamic r)) + onDestroy` (l :: l^) env + # (mbl, out) = onDestroy l + = (toDyn <$> mbl, out, env) + onDestroy` l env = abort ("onDestroy does not match with type l=" +++ toString (typeCodeOfDynamic l)) wrapIWorldConnectionTask :: (ConnectionHandlersIWorld l r w) (sds () r w) -> ConnectionTask | TC l & TC r & TC w & RWShared sds -wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect,onData,onShareChange,onTick,onDisconnect} sds - = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`} (toDynamic sds) +wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect,onData,onShareChange,onTick,onDisconnect,onDestroy} sds + = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`,onDestroy=onDestroy`} (toDynamic sds) where onConnect` connId host (r :: r^) env # (mbl, mbw, out, close, env) = onConnect connId host r env @@ -102,6 +106,10 @@ where # (mbl, mbw, env) = onDisconnect l r env = (toDyn <$> mbl, toDyn <$> mbw, env) onDisconnect` l r env = abort ("onDisconnect does not match with type l=" +++ toString (typeCodeOfDynamic l) +++ ", r=" +++ toString (typeCodeOfDynamic r)) + onDestroy` (l :: l^) env + # (mbl, out, env) = onDestroy l env + = (toDyn <$> mbl, out, env) + onDestroy` l env = abort ("onDestroy does not match with type l=" +++ toString (typeCodeOfDynamic l)) mkInstantTask :: (TaskId *IWorld -> (!MaybeError (Dynamic,String) a,!*IWorld)) -> Task a | iTask a mkInstantTask iworldfun = Task (evalOnce iworldfun) diff --git a/Libraries/iTasks/Internal/TaskServer.icl b/Libraries/iTasks/Internal/TaskServer.icl index 90631223279fd6f7c838007882b8dab4a0ffd0f0..46a7ca470541cf2bad5b577b7e744fe466420582 100644 --- a/Libraries/iTasks/Internal/TaskServer.icl +++ b/Libraries/iTasks/Internal/TaskServer.icl @@ -153,7 +153,7 @@ where = (n + 1,[x:xs]) //TODO: Use share notification to trigger task re-evaluation based on io events -process :: !Int [(!Int,!SelectResult)] !*IWorld -> !*IWorld +process :: !Int [(!Int,!SelectResult)] !*IWorld -> *IWorld process i chList iworld=:{ioTasks={done,todo=[]}} = iworld process i chList iworld=:{ioTasks={done,todo=[ListenerInstance lopts listener:todo]},ioStates,world} # taskId=:(TaskId instanceNo _) = lopts.ListenerInstanceOpts.taskId @@ -228,13 +228,16 @@ process i chList iworld=:{ioTasks={done, todo=[ConnectionInstance opts duplexCha # iworld = processIOTask i chList opts.ConnectionInstanceOpts.taskId opts.ConnectionInstanceOpts.connectionId opts.ConnectionInstanceOpts.removeOnClose sds tcpConnectionIOOps - (\_ -> handlers.ConnectionHandlersIWorld.onDisconnect) handlers.ConnectionHandlersIWorld.onData - handlers.ConnectionHandlersIWorld.onShareChange handlers.ConnectionHandlersIWorld.onTick (ConnectionInstance opts) duplexChannel iworld + (\_ -> handlers.ConnectionHandlersIWorld.onDisconnect) + handlers.ConnectionHandlersIWorld.onData + handlers.ConnectionHandlersIWorld.onShareChange + handlers.ConnectionHandlersIWorld.onTick + handlers.ConnectionHandlersIWorld.onDestroy + (ConnectionInstance opts) duplexChannel iworld = process (i+1) chList iworld where (ConnectionTask handlers sds) = opts.ConnectionInstanceOpts.connectionTask - process i chList iworld=:{ioTasks={done,todo=[BackgroundInstance opts bt=:(BackgroundTask eval):todo]}} # (mbe,iworld=:{ioTasks={done,todo}}) = eval {iworld & ioTasks = {done=done,todo=todo}} | mbe =: (Error _) = abort (snd (fromError mbe)) //TODO Handle the error without an abort @@ -292,15 +295,16 @@ processIOTask :: !Int !(readData Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld)) !(Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld)) !(Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld)) + !(Dynamic *IWorld -> (!MaybeErrorString Dynamic, ![String], !*IWorld)) !(.ioChannels -> *IOTaskInstance) !.ioChannels !*IWorld -> *IWorld processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandler onDataHandler - onShareChangeHandler onTickHandler mkIOTaskInstance ioChannels iworld=:{ioStates} + onShareChangeHandler onTickHandler onDestroyHandler mkIOTaskInstance ioChannels iworld=:{ioStates} + # (TaskId instanceNo _) = taskId = case 'DM'.get taskId ioStates of Just (IOActive taskStates) - # (TaskId instanceNo _) = taskId // get task state # mbTaskState = 'DM'.get connectionId taskStates | isNothing mbTaskState @@ -383,6 +387,18 @@ processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandle # {done, todo} = iworld.ioTasks = {iworld & ioStates = ioStates, ioTasks = {done = [mkIOTaskInstance ioChannels : done], todo = todo}} Just (IODestroyed taskStates) + // get task state one last time + # mbTaskState = 'DM'.get connectionId taskStates + | isNothing mbTaskState + # iworld = if (instanceNo > 0) (queueRefresh [(taskId, "Exception for " <+++ instanceNo)] iworld) iworld + # ioStates = 'DM'.put taskId (IOException "Missing IO task state for task ") ioStates + = ioOps.closeIO (ioChannels, {iworld & ioStates = ioStates}) + # (Just taskState, _) = mbTaskState + //Ondestroy handler + # (mbTaskState, out, iworld) = onDestroyHandler taskState iworld + | mbTaskState =: (Error _) = taskStateException mbTaskState instanceNo ioStates ioOps.closeIO (ioChannels, iworld) + // write data + # (ioChannels, iworld) = seq [ioOps.writeData o \\ o <- out] (ioChannels, iworld) # iworld = ioOps.closeIO (ioChannels, iworld) //Remove the state for this connection //If this is the last connection for this task, we can clean up. diff --git a/Libraries/iTasks/Internal/Tonic/Server.icl b/Libraries/iTasks/Internal/Tonic/Server.icl index 303e26919e8db164af20c22e65cc9f755f32baa9..e04e7cd70b41da3077f8aed5c9ab22a60d060960 100644 --- a/Libraries/iTasks/Internal/Tonic/Server.icl +++ b/Libraries/iTasks/Internal/Tonic/Server.icl @@ -313,6 +313,7 @@ acceptTonicTraces tonicShare , onData = onData , onShareChange = onShareChange , onDisconnect = onDisconnect + , onDestroy = onDestroy } where onConnect :: ConnectionId String TMessageStore @@ -353,3 +354,4 @@ acceptTonicTraces tonicShare onDisconnect st lines = (Ok st, Just lines) + onDestroy st = (Ok st, []) diff --git a/Libraries/iTasks/Internal/WebService.icl b/Libraries/iTasks/Internal/WebService.icl index 0459d36c60a9586419974d9e872294633172f619..b9378c41c52269964f34a9ec50ce71e9ce99c5d6 100644 --- a/Libraries/iTasks/Internal/WebService.icl +++ b/Libraries/iTasks/Internal/WebService.icl @@ -131,7 +131,7 @@ wsockTextMsg payload = [wsockMsgFrame WS_OP_TEXT True payload] httpServer :: !Int !Timespec ![WebService r w] (sds () r w) -> ConnectionTask | TC r & TC w & RWShared sds httpServer port keepAliveTime requestProcessHandlers sds - = wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect, onData=onData, onShareChange=onShareChange, onTick=onTick, onDisconnect=onDisconnect} sds + = wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect, onData=onData, onShareChange=onShareChange, onTick=onTick, onDisconnect=onDisconnect, onDestroy=onDestroy} sds where onConnect connId host r iworld=:{IWorld|world,clock} = (Ok (NTIdle host clock),Nothing,[],False,{IWorld|iworld & world = world}) @@ -235,6 +235,8 @@ where = (Ok connState, mbW, env) onDisconnect connState r env = (Ok connState, Nothing, env) + onDestroy s iw = (Ok s, [], iw) + selectHandler req [] = Nothing selectHandler req [h:hs] | h.urlMatchPred req.HTTPRequest.req_path = Just h diff --git a/Libraries/iTasks/WF/Tasks/IO.dcl b/Libraries/iTasks/WF/Tasks/IO.dcl index e9c551d071bf33cfe78b50b4dc6dae0feabee32a..1a45616e94b1797f3a51664ab9f4438f89ac36c7 100644 --- a/Libraries/iTasks/WF/Tasks/IO.dcl +++ b/Libraries/iTasks/WF/Tasks/IO.dcl @@ -13,9 +13,10 @@ from Data.Error import :: MaybeError, :: MaybeErrorString :: ConnectionHandlers l r w = { onConnect :: !(ConnectionId String r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) - , onData :: !( String l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) + , onData :: !( String l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) , onShareChange :: !( l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) - , onDisconnect :: !( l r -> (!MaybeErrorString l, Maybe w )) + , onDisconnect :: !( l r -> (!MaybeErrorString l, Maybe w )) + , onDestroy :: !( l -> (!MaybeErrorString l, ![String] )) } /**