From fe8fa967b8137b38a41cd82499714e6c4b846746 Mon Sep 17 00:00:00 2001 From: Mart Lubbers Date: Fri, 29 Mar 2019 14:00:01 +0100 Subject: [PATCH] Add ondestroy handler for tcp connections Normally you can gracefull clean up in iTasks by hooking into the onDisconnect handler, however, if the IOtask is destroyed by for example a step, it was impossible to gracefully close the connection. To fix this, this commit: - Patches the tcp connections used internally (async, {web,sds}service, tonic) - Adapts the wrapping tasks from Internal.Task - Adapts processIOTask to get an onDestroy handler - When an IOTask is destroyed, read the task state once more, call the onDestroy handler and possibly write some data once more. This can be used to gracefully close any connections to services that cannot reasonably detect broken TCP connections --- Libraries/iTasks/Internal/AsyncSDS.icl | 21 +++++++++++----- Libraries/iTasks/Internal/SDSService.icl | 1 + Libraries/iTasks/Internal/Task.dcl | 1 + Libraries/iTasks/Internal/Task.icl | 16 +++++++++---- Libraries/iTasks/Internal/TaskServer.icl | 28 +++++++++++++++++----- Libraries/iTasks/Internal/Tonic/Server.icl | 2 ++ Libraries/iTasks/Internal/WebService.icl | 4 +++- Libraries/iTasks/WF/Tasks/IO.dcl | 5 ++-- 8 files changed, 59 insertions(+), 19 deletions(-) diff --git a/Libraries/iTasks/Internal/AsyncSDS.icl b/Libraries/iTasks/Internal/AsyncSDS.icl index 9919fa1dd..7c3557d98 100644 --- a/Libraries/iTasks/Internal/AsyncSDS.icl +++ b/Libraries/iTasks/Internal/AsyncSDS.icl @@ -31,6 +31,8 @@ onData data (Left acc) _ = (Ok (Left (acc ++ [data])), Nothing, [], False) onShareChange acc _ = (Ok acc, Nothing, [], False) +onDestroy s = (Ok s, []) + queueSDSRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r queueSDSRequest req host port taskId symbols iworld = case addConnection taskId host port connectionTask iworld of @@ -43,7 +45,8 @@ where handlers _ = {ConnectionHandlers| onConnect = onConnect req, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onDisconnect (Left acc) _ # textResponse = concat acc @@ -61,7 +64,8 @@ where handlers _ = {ConnectionHandlers| onConnect = onConnect req, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy=onDestroy} onDisconnect (Left acc) _ # textResponse = concat acc @@ -79,7 +83,8 @@ where handlers req = {ConnectionHandlers| onConnect = onConnect req, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onDisconnect (Left acc) _ # textResponse = concat acc @@ -98,7 +103,8 @@ where handlers req = {ConnectionHandlers| onConnect = onConnect, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onConnect _ _ _ # req = createRequest p @@ -127,7 +133,8 @@ where handlers = {ConnectionHandlers| onConnect = onConnect, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onConnect connId _ _ = (Ok (Nothing, []), Nothing, [createMessage p +++ "\n"], False) @@ -160,6 +167,7 @@ where , onData = onData , onShareChange = onShareChange , onDisconnect = onDisconnect + , onDestroy = onDestroy } onConnect connId _ _ # req = toWriteRequest p w @@ -191,7 +199,8 @@ where handlers = {ConnectionHandlers| onConnect = onConnect, onData = onData, onShareChange = onShareChange, - onDisconnect = onDisconnect} + onDisconnect = onDisconnect, + onDestroy = onDestroy} onConnect connId _ _ = (Ok (Left ""), Nothing, [toWriteMessage p w +++ "\n"], False) diff --git a/Libraries/iTasks/Internal/SDSService.icl b/Libraries/iTasks/Internal/SDSService.icl index da757a750..395fa7f6e 100644 --- a/Libraries/iTasks/Internal/SDSService.icl +++ b/Libraries/iTasks/Internal/SDSService.icl @@ -66,6 +66,7 @@ where , onShareChange = onShareChange , onTick = onTick , onDisconnect = onDisconnect + , onDestroy = \s iw->(Ok s, [], iw) } reevaluateShares :: !{#Symbol} !TaskId ![(ConnectionId, (Bool, String, String))] *IWorld -> (MaybeErrorString [(ConnectionId, (Bool, String, String))], *IWorld) diff --git a/Libraries/iTasks/Internal/Task.dcl b/Libraries/iTasks/Internal/Task.dcl index ba5ec635e..60cd71f9e 100644 --- a/Libraries/iTasks/Internal/Task.dcl +++ b/Libraries/iTasks/Internal/Task.dcl @@ -34,6 +34,7 @@ derive gEq Task , onShareChange :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, ![String], !Bool, !*IWorld)) , onTick :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, ![String], !Bool, !*IWorld)) , onDisconnect :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, !*IWorld)) + , onDestroy :: !( l *IWorld -> *(!MaybeErrorString l, ![String], !*IWorld)) } //Background computation tasks diff --git a/Libraries/iTasks/Internal/Task.icl b/Libraries/iTasks/Internal/Task.icl index acbd5596a..35ad9f4b8 100644 --- a/Libraries/iTasks/Internal/Task.icl +++ b/Libraries/iTasks/Internal/Task.icl @@ -45,8 +45,8 @@ where error = "Creating default task functions is impossible" wrapConnectionTask :: (ConnectionHandlers l r w) (sds () r w) -> ConnectionTask | TC l & TC r & TC w & RWShared sds -wrapConnectionTask ch=:{ConnectionHandlers|onConnect,onData,onShareChange,onDisconnect} sds - = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`} (toDynamic sds) +wrapConnectionTask ch=:{ConnectionHandlers|onConnect,onData,onShareChange,onDisconnect,onDestroy} sds + = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`,onDestroy=onDestroy`} (toDynamic sds) where onConnect` connId host (r :: r^) env # (mbl, mbw, out, close) = onConnect connId host r @@ -71,10 +71,14 @@ where # (mbl, mbw) = onDisconnect l r = (toDyn <$> mbl, toDyn <$> mbw, env) onDisconnect` l r env = abort ("onDisconnect does not match with type l=" +++ toString (typeCodeOfDynamic l) +++ ", r=" +++ toString (typeCodeOfDynamic r)) + onDestroy` (l :: l^) env + # (mbl, out) = onDestroy l + = (toDyn <$> mbl, out, env) + onDestroy` l env = abort ("onDestroy does not match with type l=" +++ toString (typeCodeOfDynamic l)) wrapIWorldConnectionTask :: (ConnectionHandlersIWorld l r w) (sds () r w) -> ConnectionTask | TC l & TC r & TC w & RWShared sds -wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect,onData,onShareChange,onTick,onDisconnect} sds - = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`} (toDynamic sds) +wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect,onData,onShareChange,onTick,onDisconnect,onDestroy} sds + = ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`,onDestroy=onDestroy`} (toDynamic sds) where onConnect` connId host (r :: r^) env # (mbl, mbw, out, close, env) = onConnect connId host r env @@ -102,6 +106,10 @@ where # (mbl, mbw, env) = onDisconnect l r env = (toDyn <$> mbl, toDyn <$> mbw, env) onDisconnect` l r env = abort ("onDisconnect does not match with type l=" +++ toString (typeCodeOfDynamic l) +++ ", r=" +++ toString (typeCodeOfDynamic r)) + onDestroy` (l :: l^) env + # (mbl, out, env) = onDestroy l env + = (toDyn <$> mbl, out, env) + onDestroy` l env = abort ("onDestroy does not match with type l=" +++ toString (typeCodeOfDynamic l)) mkInstantTask :: (TaskId *IWorld -> (!MaybeError (Dynamic,String) a,!*IWorld)) -> Task a | iTask a mkInstantTask iworldfun = Task (evalOnce iworldfun) diff --git a/Libraries/iTasks/Internal/TaskServer.icl b/Libraries/iTasks/Internal/TaskServer.icl index 906312232..46a7ca470 100644 --- a/Libraries/iTasks/Internal/TaskServer.icl +++ b/Libraries/iTasks/Internal/TaskServer.icl @@ -153,7 +153,7 @@ where = (n + 1,[x:xs]) //TODO: Use share notification to trigger task re-evaluation based on io events -process :: !Int [(!Int,!SelectResult)] !*IWorld -> !*IWorld +process :: !Int [(!Int,!SelectResult)] !*IWorld -> *IWorld process i chList iworld=:{ioTasks={done,todo=[]}} = iworld process i chList iworld=:{ioTasks={done,todo=[ListenerInstance lopts listener:todo]},ioStates,world} # taskId=:(TaskId instanceNo _) = lopts.ListenerInstanceOpts.taskId @@ -228,13 +228,16 @@ process i chList iworld=:{ioTasks={done, todo=[ConnectionInstance opts duplexCha # iworld = processIOTask i chList opts.ConnectionInstanceOpts.taskId opts.ConnectionInstanceOpts.connectionId opts.ConnectionInstanceOpts.removeOnClose sds tcpConnectionIOOps - (\_ -> handlers.ConnectionHandlersIWorld.onDisconnect) handlers.ConnectionHandlersIWorld.onData - handlers.ConnectionHandlersIWorld.onShareChange handlers.ConnectionHandlersIWorld.onTick (ConnectionInstance opts) duplexChannel iworld + (\_ -> handlers.ConnectionHandlersIWorld.onDisconnect) + handlers.ConnectionHandlersIWorld.onData + handlers.ConnectionHandlersIWorld.onShareChange + handlers.ConnectionHandlersIWorld.onTick + handlers.ConnectionHandlersIWorld.onDestroy + (ConnectionInstance opts) duplexChannel iworld = process (i+1) chList iworld where (ConnectionTask handlers sds) = opts.ConnectionInstanceOpts.connectionTask - process i chList iworld=:{ioTasks={done,todo=[BackgroundInstance opts bt=:(BackgroundTask eval):todo]}} # (mbe,iworld=:{ioTasks={done,todo}}) = eval {iworld & ioTasks = {done=done,todo=todo}} | mbe =: (Error _) = abort (snd (fromError mbe)) //TODO Handle the error without an abort @@ -292,15 +295,16 @@ processIOTask :: !Int !(readData Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld)) !(Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld)) !(Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld)) + !(Dynamic *IWorld -> (!MaybeErrorString Dynamic, ![String], !*IWorld)) !(.ioChannels -> *IOTaskInstance) !.ioChannels !*IWorld -> *IWorld processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandler onDataHandler - onShareChangeHandler onTickHandler mkIOTaskInstance ioChannels iworld=:{ioStates} + onShareChangeHandler onTickHandler onDestroyHandler mkIOTaskInstance ioChannels iworld=:{ioStates} + # (TaskId instanceNo _) = taskId = case 'DM'.get taskId ioStates of Just (IOActive taskStates) - # (TaskId instanceNo _) = taskId // get task state # mbTaskState = 'DM'.get connectionId taskStates | isNothing mbTaskState @@ -383,6 +387,18 @@ processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandle # {done, todo} = iworld.ioTasks = {iworld & ioStates = ioStates, ioTasks = {done = [mkIOTaskInstance ioChannels : done], todo = todo}} Just (IODestroyed taskStates) + // get task state one last time + # mbTaskState = 'DM'.get connectionId taskStates + | isNothing mbTaskState + # iworld = if (instanceNo > 0) (queueRefresh [(taskId, "Exception for " <+++ instanceNo)] iworld) iworld + # ioStates = 'DM'.put taskId (IOException "Missing IO task state for task ") ioStates + = ioOps.closeIO (ioChannels, {iworld & ioStates = ioStates}) + # (Just taskState, _) = mbTaskState + //Ondestroy handler + # (mbTaskState, out, iworld) = onDestroyHandler taskState iworld + | mbTaskState =: (Error _) = taskStateException mbTaskState instanceNo ioStates ioOps.closeIO (ioChannels, iworld) + // write data + # (ioChannels, iworld) = seq [ioOps.writeData o \\ o <- out] (ioChannels, iworld) # iworld = ioOps.closeIO (ioChannels, iworld) //Remove the state for this connection //If this is the last connection for this task, we can clean up. diff --git a/Libraries/iTasks/Internal/Tonic/Server.icl b/Libraries/iTasks/Internal/Tonic/Server.icl index 303e26919..e04e7cd70 100644 --- a/Libraries/iTasks/Internal/Tonic/Server.icl +++ b/Libraries/iTasks/Internal/Tonic/Server.icl @@ -313,6 +313,7 @@ acceptTonicTraces tonicShare , onData = onData , onShareChange = onShareChange , onDisconnect = onDisconnect + , onDestroy = onDestroy } where onConnect :: ConnectionId String TMessageStore @@ -353,3 +354,4 @@ acceptTonicTraces tonicShare onDisconnect st lines = (Ok st, Just lines) + onDestroy st = (Ok st, []) diff --git a/Libraries/iTasks/Internal/WebService.icl b/Libraries/iTasks/Internal/WebService.icl index 0459d36c6..b9378c41c 100644 --- a/Libraries/iTasks/Internal/WebService.icl +++ b/Libraries/iTasks/Internal/WebService.icl @@ -131,7 +131,7 @@ wsockTextMsg payload = [wsockMsgFrame WS_OP_TEXT True payload] httpServer :: !Int !Timespec ![WebService r w] (sds () r w) -> ConnectionTask | TC r & TC w & RWShared sds httpServer port keepAliveTime requestProcessHandlers sds - = wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect, onData=onData, onShareChange=onShareChange, onTick=onTick, onDisconnect=onDisconnect} sds + = wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect, onData=onData, onShareChange=onShareChange, onTick=onTick, onDisconnect=onDisconnect, onDestroy=onDestroy} sds where onConnect connId host r iworld=:{IWorld|world,clock} = (Ok (NTIdle host clock),Nothing,[],False,{IWorld|iworld & world = world}) @@ -235,6 +235,8 @@ where = (Ok connState, mbW, env) onDisconnect connState r env = (Ok connState, Nothing, env) + onDestroy s iw = (Ok s, [], iw) + selectHandler req [] = Nothing selectHandler req [h:hs] | h.urlMatchPred req.HTTPRequest.req_path = Just h diff --git a/Libraries/iTasks/WF/Tasks/IO.dcl b/Libraries/iTasks/WF/Tasks/IO.dcl index e9c551d07..1a45616e9 100644 --- a/Libraries/iTasks/WF/Tasks/IO.dcl +++ b/Libraries/iTasks/WF/Tasks/IO.dcl @@ -13,9 +13,10 @@ from Data.Error import :: MaybeError, :: MaybeErrorString :: ConnectionHandlers l r w = { onConnect :: !(ConnectionId String r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) - , onData :: !( String l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) + , onData :: !( String l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) , onShareChange :: !( l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool)) - , onDisconnect :: !( l r -> (!MaybeErrorString l, Maybe w )) + , onDisconnect :: !( l r -> (!MaybeErrorString l, Maybe w )) + , onDestroy :: !( l -> (!MaybeErrorString l, ![String] )) } /** -- GitLab