Commit fe8fa967 authored by Mart Lubbers's avatar Mart Lubbers

Add ondestroy handler for tcp connections

Normally you can gracefull clean up in iTasks by hooking into the
onDisconnect handler, however, if the IOtask is destroyed by for example
a step, it was impossible to gracefully close the connection.

To fix this, this commit:

- Patches the tcp connections used internally (async, {web,sds}service, tonic)
- Adapts the wrapping tasks from Internal.Task
- Adapts processIOTask to get an onDestroy handler
- When an IOTask is destroyed, read the task state once more, call the
  onDestroy handler and possibly write some data once more. This can be
  used to gracefully close any connections to services that cannot
  reasonably detect broken TCP connections
parent f3ca2589
......@@ -31,6 +31,8 @@ onData data (Left acc) _ = (Ok (Left (acc ++ [data])), Nothing, [], False)
onShareChange acc _ = (Ok acc, Nothing, [], False)
onDestroy s = (Ok s, [])
queueSDSRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r
queueSDSRequest req host port taskId symbols iworld
= case addConnection taskId host port connectionTask iworld of
......@@ -43,7 +45,8 @@ where
handlers _ = {ConnectionHandlers| onConnect = onConnect req,
onData = onData,
onShareChange = onShareChange,
onDisconnect = onDisconnect}
onDisconnect = onDisconnect,
onDestroy = onDestroy}
onDisconnect (Left acc) _
# textResponse = concat acc
......@@ -61,7 +64,8 @@ where
handlers _ = {ConnectionHandlers| onConnect = onConnect req,
onData = onData,
onShareChange = onShareChange,
onDisconnect = onDisconnect}
onDisconnect = onDisconnect,
onDestroy=onDestroy}
onDisconnect (Left acc) _
# textResponse = concat acc
......@@ -79,7 +83,8 @@ where
handlers req = {ConnectionHandlers| onConnect = onConnect req,
onData = onData,
onShareChange = onShareChange,
onDisconnect = onDisconnect}
onDisconnect = onDisconnect,
onDestroy = onDestroy}
onDisconnect (Left acc) _
# textResponse = concat acc
......@@ -98,7 +103,8 @@ where
handlers req = {ConnectionHandlers| onConnect = onConnect,
onData = onData,
onShareChange = onShareChange,
onDisconnect = onDisconnect}
onDisconnect = onDisconnect,
onDestroy = onDestroy}
onConnect _ _ _
# req = createRequest p
......@@ -127,7 +133,8 @@ where
handlers = {ConnectionHandlers| onConnect = onConnect,
onData = onData,
onShareChange = onShareChange,
onDisconnect = onDisconnect}
onDisconnect = onDisconnect,
onDestroy = onDestroy}
onConnect connId _ _ = (Ok (Nothing, []), Nothing, [createMessage p +++ "\n"], False)
......@@ -160,6 +167,7 @@ where
, onData = onData
, onShareChange = onShareChange
, onDisconnect = onDisconnect
, onDestroy = onDestroy
}
onConnect connId _ _
# req = toWriteRequest p w
......@@ -191,7 +199,8 @@ where
handlers = {ConnectionHandlers| onConnect = onConnect,
onData = onData,
onShareChange = onShareChange,
onDisconnect = onDisconnect}
onDisconnect = onDisconnect,
onDestroy = onDestroy}
onConnect connId _ _ = (Ok (Left ""), Nothing, [toWriteMessage p w +++ "\n"], False)
......
......@@ -66,6 +66,7 @@ where
, onShareChange = onShareChange
, onTick = onTick
, onDisconnect = onDisconnect
, onDestroy = \s iw->(Ok s, [], iw)
}
reevaluateShares :: !{#Symbol} !TaskId ![(ConnectionId, (Bool, String, String))] *IWorld -> (MaybeErrorString [(ConnectionId, (Bool, String, String))], *IWorld)
......
......@@ -34,6 +34,7 @@ derive gEq Task
, onShareChange :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, ![String], !Bool, !*IWorld))
, onTick :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, ![String], !Bool, !*IWorld))
, onDisconnect :: !( l r *IWorld -> *(!MaybeErrorString l, Maybe w, !*IWorld))
, onDestroy :: !( l *IWorld -> *(!MaybeErrorString l, ![String], !*IWorld))
}
//Background computation tasks
......
......@@ -45,8 +45,8 @@ where
error = "Creating default task functions is impossible"
wrapConnectionTask :: (ConnectionHandlers l r w) (sds () r w) -> ConnectionTask | TC l & TC r & TC w & RWShared sds
wrapConnectionTask ch=:{ConnectionHandlers|onConnect,onData,onShareChange,onDisconnect} sds
= ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`} (toDynamic sds)
wrapConnectionTask ch=:{ConnectionHandlers|onConnect,onData,onShareChange,onDisconnect,onDestroy} sds
= ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`,onDestroy=onDestroy`} (toDynamic sds)
where
onConnect` connId host (r :: r^) env
# (mbl, mbw, out, close) = onConnect connId host r
......@@ -71,10 +71,14 @@ where
# (mbl, mbw) = onDisconnect l r
= (toDyn <$> mbl, toDyn <$> mbw, env)
onDisconnect` l r env = abort ("onDisconnect does not match with type l=" +++ toString (typeCodeOfDynamic l) +++ ", r=" +++ toString (typeCodeOfDynamic r))
onDestroy` (l :: l^) env
# (mbl, out) = onDestroy l
= (toDyn <$> mbl, out, env)
onDestroy` l env = abort ("onDestroy does not match with type l=" +++ toString (typeCodeOfDynamic l))
wrapIWorldConnectionTask :: (ConnectionHandlersIWorld l r w) (sds () r w) -> ConnectionTask | TC l & TC r & TC w & RWShared sds
wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect,onData,onShareChange,onTick,onDisconnect} sds
= ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`} (toDynamic sds)
wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect,onData,onShareChange,onTick,onDisconnect,onDestroy} sds
= ConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect`,onData=onData`,onShareChange=onShareChange`,onTick=onTick`,onDisconnect=onDisconnect`,onDestroy=onDestroy`} (toDynamic sds)
where
onConnect` connId host (r :: r^) env
# (mbl, mbw, out, close, env) = onConnect connId host r env
......@@ -102,6 +106,10 @@ where
# (mbl, mbw, env) = onDisconnect l r env
= (toDyn <$> mbl, toDyn <$> mbw, env)
onDisconnect` l r env = abort ("onDisconnect does not match with type l=" +++ toString (typeCodeOfDynamic l) +++ ", r=" +++ toString (typeCodeOfDynamic r))
onDestroy` (l :: l^) env
# (mbl, out, env) = onDestroy l env
= (toDyn <$> mbl, out, env)
onDestroy` l env = abort ("onDestroy does not match with type l=" +++ toString (typeCodeOfDynamic l))
mkInstantTask :: (TaskId *IWorld -> (!MaybeError (Dynamic,String) a,!*IWorld)) -> Task a | iTask a
mkInstantTask iworldfun = Task (evalOnce iworldfun)
......
......@@ -153,7 +153,7 @@ where
= (n + 1,[x:xs])
//TODO: Use share notification to trigger task re-evaluation based on io events
process :: !Int [(!Int,!SelectResult)] !*IWorld -> !*IWorld
process :: !Int [(!Int,!SelectResult)] !*IWorld -> *IWorld
process i chList iworld=:{ioTasks={done,todo=[]}} = iworld
process i chList iworld=:{ioTasks={done,todo=[ListenerInstance lopts listener:todo]},ioStates,world}
# taskId=:(TaskId instanceNo _) = lopts.ListenerInstanceOpts.taskId
......@@ -228,13 +228,16 @@ process i chList iworld=:{ioTasks={done, todo=[ConnectionInstance opts duplexCha
# iworld = processIOTask
i chList opts.ConnectionInstanceOpts.taskId opts.ConnectionInstanceOpts.connectionId
opts.ConnectionInstanceOpts.removeOnClose sds tcpConnectionIOOps
(\_ -> handlers.ConnectionHandlersIWorld.onDisconnect) handlers.ConnectionHandlersIWorld.onData
handlers.ConnectionHandlersIWorld.onShareChange handlers.ConnectionHandlersIWorld.onTick (ConnectionInstance opts) duplexChannel iworld
(\_ -> handlers.ConnectionHandlersIWorld.onDisconnect)
handlers.ConnectionHandlersIWorld.onData
handlers.ConnectionHandlersIWorld.onShareChange
handlers.ConnectionHandlersIWorld.onTick
handlers.ConnectionHandlersIWorld.onDestroy
(ConnectionInstance opts) duplexChannel iworld
= process (i+1) chList iworld
where
(ConnectionTask handlers sds) = opts.ConnectionInstanceOpts.connectionTask
process i chList iworld=:{ioTasks={done,todo=[BackgroundInstance opts bt=:(BackgroundTask eval):todo]}}
# (mbe,iworld=:{ioTasks={done,todo}}) = eval {iworld & ioTasks = {done=done,todo=todo}}
| mbe =: (Error _) = abort (snd (fromError mbe)) //TODO Handle the error without an abort
......@@ -292,15 +295,16 @@ processIOTask :: !Int
!(readData Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld))
!(Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld))
!(Dynamic Dynamic *IWorld -> (!MaybeErrorString Dynamic, !Maybe Dynamic, ![String], !Bool, !*IWorld))
!(Dynamic *IWorld -> (!MaybeErrorString Dynamic, ![String], !*IWorld))
!(.ioChannels -> *IOTaskInstance)
!.ioChannels
!*IWorld
-> *IWorld
processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandler onDataHandler
onShareChangeHandler onTickHandler mkIOTaskInstance ioChannels iworld=:{ioStates}
onShareChangeHandler onTickHandler onDestroyHandler mkIOTaskInstance ioChannels iworld=:{ioStates}
# (TaskId instanceNo _) = taskId
= case 'DM'.get taskId ioStates of
Just (IOActive taskStates)
# (TaskId instanceNo _) = taskId
// get task state
# mbTaskState = 'DM'.get connectionId taskStates
| isNothing mbTaskState
......@@ -383,6 +387,18 @@ processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandle
# {done, todo} = iworld.ioTasks
= {iworld & ioStates = ioStates, ioTasks = {done = [mkIOTaskInstance ioChannels : done], todo = todo}}
Just (IODestroyed taskStates)
// get task state one last time
# mbTaskState = 'DM'.get connectionId taskStates
| isNothing mbTaskState
# iworld = if (instanceNo > 0) (queueRefresh [(taskId, "Exception for " <+++ instanceNo)] iworld) iworld
# ioStates = 'DM'.put taskId (IOException "Missing IO task state for task ") ioStates
= ioOps.closeIO (ioChannels, {iworld & ioStates = ioStates})
# (Just taskState, _) = mbTaskState
//Ondestroy handler
# (mbTaskState, out, iworld) = onDestroyHandler taskState iworld
| mbTaskState =: (Error _) = taskStateException mbTaskState instanceNo ioStates ioOps.closeIO (ioChannels, iworld)
// write data
# (ioChannels, iworld) = seq [ioOps.writeData o \\ o <- out] (ioChannels, iworld)
# iworld = ioOps.closeIO (ioChannels, iworld)
//Remove the state for this connection
//If this is the last connection for this task, we can clean up.
......
......@@ -313,6 +313,7 @@ acceptTonicTraces tonicShare
, onData = onData
, onShareChange = onShareChange
, onDisconnect = onDisconnect
, onDestroy = onDestroy
}
where
onConnect :: ConnectionId String TMessageStore
......@@ -353,3 +354,4 @@ acceptTonicTraces tonicShare
onDisconnect st lines
= (Ok st, Just lines)
onDestroy st = (Ok st, [])
......@@ -131,7 +131,7 @@ wsockTextMsg payload = [wsockMsgFrame WS_OP_TEXT True payload]
httpServer :: !Int !Timespec ![WebService r w] (sds () r w) -> ConnectionTask | TC r & TC w & RWShared sds
httpServer port keepAliveTime requestProcessHandlers sds
= wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect, onData=onData, onShareChange=onShareChange, onTick=onTick, onDisconnect=onDisconnect} sds
= wrapIWorldConnectionTask {ConnectionHandlersIWorld|onConnect=onConnect, onData=onData, onShareChange=onShareChange, onTick=onTick, onDisconnect=onDisconnect, onDestroy=onDestroy} sds
where
onConnect connId host r iworld=:{IWorld|world,clock}
= (Ok (NTIdle host clock),Nothing,[],False,{IWorld|iworld & world = world})
......@@ -235,6 +235,8 @@ where
= (Ok connState, mbW, env)
onDisconnect connState r env = (Ok connState, Nothing, env)
onDestroy s iw = (Ok s, [], iw)
selectHandler req [] = Nothing
selectHandler req [h:hs]
| h.urlMatchPred req.HTTPRequest.req_path = Just h
......
......@@ -13,9 +13,10 @@ from Data.Error import :: MaybeError, :: MaybeErrorString
:: ConnectionHandlers l r w =
{ onConnect :: !(ConnectionId String r -> (!MaybeErrorString l, Maybe w, ![String], !Bool))
, onData :: !( String l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool))
, onData :: !( String l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool))
, onShareChange :: !( l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool))
, onDisconnect :: !( l r -> (!MaybeErrorString l, Maybe w ))
, onDisconnect :: !( l r -> (!MaybeErrorString l, Maybe w ))
, onDestroy :: !( l -> (!MaybeErrorString l, ![String] ))
}
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment