TaskServer.icl 22.7 KB
Newer Older
1
implementation module iTasks._Framework.TaskServer
2

3
import StdFile, StdBool, StdInt, StdClass, StdList, StdMisc, StdArray, StdTuple, StdOrdList
4
import Data.Maybe, Data.Functor, Data.Error, System.Time, Text
Jurriën Stutterheim's avatar
Jurriën Stutterheim committed
5
from Data.Map import :: Map (..)
6 7
import qualified Data.List as DL
import qualified Data.Map as DM
8
import qualified iTasks._Framework.SDS as SDS
9 10
import TCPChannelClass, TCPChannels, TCPEvent, TCPStringChannels, TCPDef, tcp

11 12 13
import iTasks._Framework.IWorld
import iTasks._Framework.Task
import iTasks._Framework.TaskEval
14

15 16 17
//Helper type that holds the mainloop instances during a select call
//in these mainloop instances the unique listeners and read channels
//have been temporarily removed.
18
:: *IOTaskInstanceDuringSelect
19 20
    = ListenerInstanceDS !ListenerInstanceOpts
    | ConnectionInstanceDS !ConnectionInstanceOpts !*TCP_SChannel
21
    | BackgroundInstanceDS !BackgroundInstanceOpts !BackgroundTask
22

23
serve :: !Int !ConnectionTask ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
24 25
serve port ct bt determineTimeout iworld
    = loop determineTimeout (init port ct bt iworld)
26

27
init :: !Int !ConnectionTask ![BackgroundTask] !*IWorld -> *IWorld
28
init port ct bt iworld=:{IWorld|ioTasks,world}
29 30
    # (success, mbListener, world) = openTCP_Listener port world
    | not success = abort ("Error: port "+++ toString port +++ " already in use.\n")
31 32
    # opts = {ListenerInstanceOpts|taskId=TaskId 0 0, nextConnectionId=0, port=port, connectionTask=ct, removeOnClose = True}
    # ioStates = 'DM'.fromList [(TaskId 0 0, IOActive 'DM'.newMap)]
33
    = {iworld & ioTasks = {done=[],todo=[ListenerInstance opts (fromJust mbListener):map (BackgroundInstance {bgInstId=0})bt]}, ioStates = ioStates,  world = world}
34

35 36
loop :: !(*IWorld -> (!Maybe Timeout,!*IWorld)) !*IWorld -> *IWorld
loop determineTimeout iworld
37
    # (mbTimeout,iworld=:{IWorld|ioTasks={todo},world}) = determineTimeout iworld
38 39 40
    //Check which mainloop tasks have data available
    # (todo,chList,world) = select mbTimeout todo world
    //Process the select result
41
    # iworld =:{shutdown,ioTasks={done}} = process 0 chList {iworld & ioTasks = {done=[],todo=todo}, world = world}
42
    //Move everything from the done list  back to the todo list
43
    # iworld = {iworld & ioTasks={todo = reverse done,done=[]}}
44 45 46
    //Everything needs to be re-evaluated
    | shutdown  = halt iworld
    | otherwise = loop determineTimeout iworld
47

48
select :: (Maybe Timeout) *[IOTaskInstance] *World -> (!*[IOTaskInstance],![(Int,SelectResult)],!*World)
49 50 51 52 53 54 55 56
select mbTimeout mlInstances world
    # (listeners,rChannels,mlInstances)
        = toSelectSet mlInstances
    # (chList,(TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)),_,world)	
        = selectChannel_MT mbTimeout (TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)) TCP_Void world
    # (mlInstances, chList)
        = fromSelectSet listeners rChannels mlInstances chList
    = (mlInstances, chList, world)
57

58
toSelectSet :: !*[IOTaskInstance] -> *(!*[*TCP_Listener],!*[*TCP_RChannel],!*[*IOTaskInstanceDuringSelect])
59 60 61 62
toSelectSet [] = ([],[],[])
toSelectSet [i:is]
    # (ls,rs,is) = toSelectSet is
    = case i of
63 64
        ListenerInstance opts l = ([l:ls],rs,[ListenerInstanceDS opts:is])
        ConnectionInstance opts {rChannel,sChannel} = (ls,[rChannel:rs],[ConnectionInstanceDS opts sChannel:is])
65
        BackgroundInstance opts bt = (ls,rs,[BackgroundInstanceDS opts bt:is])
66

67 68 69 70
/* Restore the list of main loop instances.
    In the same pass also update the indices in the select result to match the
    correct indices of the main loop instance list.
*/
71
fromSelectSet :: !*[*TCP_Listener] !*[*TCP_RChannel] !*[*IOTaskInstanceDuringSelect] ![(!Int,!SelectResult)] -> *(![*IOTaskInstance],![(!Int,!SelectResult)])
72 73 74 75
fromSelectSet ls rs is chList
    # (numListeners,ls) = ulength ls
    # sortedChList      = sortBy (\(x,_) (y,_) -> (x < y)) chList //The single-pass algorithm expects a sorted select result
    = fromSelectSet` 0 numListeners 0 0 ls rs sortedChList is
76
where
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls rs _ [] = ([],[])
    //Listeners
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers [l:ls] rs [] [ListenerInstanceDS opts:is]
        # (is,_) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs [] is
        = ([ListenerInstance opts l:is],[])
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers [l:ls] rs [(c,what):ch] [ListenerInstanceDS opts:is]
        | c == numSeenListeners //Check select result
            # (is,ch) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs ch is
            = ([ListenerInstance opts l:is],[(i,what):ch])
        | otherwise 
            # (is,ch) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs [(c,what):ch] is
            = ([ListenerInstance opts l:is],ch)
    //Receivers
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls [rChannel:rs] [] [ConnectionInstanceDS opts sChannel:is]
        # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs [] is
        = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],[])
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls [rChannel:rs] [(c,what):ch] [ConnectionInstanceDS opts sChannel:is]
        | c == numListeners + numSeenReceivers
            # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs ch is
            = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],[(i,what):ch])
97
        | otherwise
98 99 100
            # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs [(c,what):ch] is
            = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],ch)
    //Background tasks
101
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls rs ch [BackgroundInstanceDS opts bt:is]
102
        # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners numSeenReceivers ls rs ch is
103
        = ([BackgroundInstance opts bt:is],ch)
104 105 106 107 108

    ulength [] = (0,[])
    ulength [x:xs]
        # (n,xs) = ulength xs
        = (n + 1,[x:xs])
109

110
//TODO: Use share notification to trigger task re-evaluation based on io events
111
process :: !Int [(!Int,!SelectResult)] !*IWorld -> *IWorld
112 113 114 115 116 117 118 119 120 121 122 123 124 125
process i chList iworld=:{ioTasks={done,todo=[]}} = iworld
process i chList iworld=:{ioTasks={done,todo=[ListenerInstance lopts listener:todo]},ioStates,world}
    # (TaskId instanceNo _) = lopts.ListenerInstanceOpts.taskId
    = case 'DM'.get lopts.ListenerInstanceOpts.taskId ioStates of
        //Active listener:
        Just (IOActive conStates)
            # (mbSelect,chList) = checkSelect i chList
            | mbSelect =:(Just _)
     	        # (tReport, mbNewConn, listener, world)   = receive_MT (Just 0) listener world
                | tReport == TR_Success
                    # (ip,{rChannel,sChannel}) = fromJust mbNewConn
                    # (ConnectionTask handlers sds) = lopts.ListenerInstanceOpts.connectionTask
                    # (mbr,iworld) = 'SDS'.read sds {iworld & ioTasks={done=done,todo=todo},world=world}
                    | mbr =:(Error _)
126
                        # iworld=:{ioTasks={done,todo},world} = queueRefresh [(instanceNo,"IO Exception for instance "<+++instanceNo)] iworld
127 128 129 130
                        # ioStates = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOException (snd (fromError mbr))) ioStates
 	                    # world = closeRChannel listener world
                        = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
                    # (mbConState,mbw,out,close,iworld) = handlers.ConnectionHandlersIWorld.onConnect (toString ip) (fromOk mbr) iworld
131
                    # iworld = queueRefresh [(instanceNo,"New TCP connection for instance "<+++instanceNo)] iworld
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
                    # iworld=:{ioTasks={done,todo},world}  = writeShareIfNeeded sds mbw iworld
                    | mbConState =:(Error _)
                        # ioStates = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOException (fromError mbConState)) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                    # conStates = 'DM'.put lopts.ListenerInstanceOpts.nextConnectionId (fromOk mbConState,close) conStates
                    # (sChannel,world) = case out of
                        []          = (sChannel,world)
                        data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
                    | close
                    //Close the connection immediately
                        # world = closeRChannel rChannel world
                        # world = closeChannel sChannel world
                        //Remove the connection state if configured in the connection listener options
                        # conStates = if lopts.ListenerInstanceOpts.removeOnClose
                            ('DM'.del lopts.ListenerInstanceOpts.nextConnectionId conStates)
                            conStates
                        # ioStates  = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOActive conStates) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                    | otherwise 
                    //Persist the connection
                        # copts = {ConnectionInstanceOpts|taskId = lopts.ListenerInstanceOpts.taskId
                                  ,connectionId = lopts.ListenerInstanceOpts.nextConnectionId
                                  ,remoteHost = ip, connectionTask = lopts.ListenerInstanceOpts.connectionTask
                                  ,removeOnClose = lopts.ListenerInstanceOpts.removeOnClose}
                        # todo = todo ++ [ConnectionInstance copts {rChannel=rChannel,sChannel=sChannel}]
                        # lopts = {lopts & nextConnectionId = lopts.nextConnectionId + 1}
                        # ioStates  = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOActive conStates) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                //We did not accept properly accept a connection
                | otherwise
                    = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, world=world}
            //Nothing to do
            | otherwise
                = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, world=world}
        //Destroyed listener:
        Just (IODestroyed conStates)
 	        # world = closeRChannel listener world
            //If there are no connections belonging to this listener we can clean up, if there are the last connection will cleanup
            # ioStates = if ('DM'.mapSize conStates == 0) ('DM'.del lopts.ListenerInstanceOpts.taskId ioStates) ioStates
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
        //There was an exception or the state has already been removed
        _
 	        # world = closeRChannel listener world
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
           
process i chList iworld=:{ioTasks={done,todo=[ConnectionInstance opts {rChannel,sChannel}:todo]},ioStates,world}
    # (TaskId instanceNo _) = opts.ConnectionInstanceOpts.taskId
    = case 'DM'.get opts.ConnectionInstanceOpts.taskId ioStates of
        Just (IOActive conStates)
            # (ConnectionTask handlers sds) = opts.ConnectionInstanceOpts.connectionTask
            # (mbSelect,chList) = checkSelect i chList

            # mbConState = 'DM'.get opts.ConnectionInstanceOpts.connectionId conStates
            | mbConState =: Nothing
                //Set exception, close connection and continue 
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException "Missing connection state") ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
            # conState = fst (fromJust mbConState)
            //Read sds
            # (mbr,iworld=:{ioTasks={done,todo},world}) = 'SDS'.read sds {iworld & ioTasks={done=done,todo=todo},world=world}
            | mbr =:(Error _)
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException (snd (fromError mbr))) ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
            //Check if disconnected
            | mbSelect =:(Just SR_Disconnected) || mbSelect=:(Just SR_EOM)
                //Call disconnect function
                # (conState,mbw,iworld) = handlers.ConnectionHandlersIWorld.onDisconnect conState (fromOk mbr) {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
203
                # iworld = queueRefresh [(instanceNo,"TCP connection disconnected for "<+++instanceNo)] iworld
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
                # iworld=:{world,ioStates} = writeShareIfNeeded sds mbw iworld
                # ioStates = case conState of
                    Ok state
                        = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive ('DM'.put opts.ConnectionInstanceOpts.connectionId (state,True) conStates)) ioStates
                    Error e
                        = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException e) ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioStates = ioStates, world=world}
            //Read channel data
            # (data,rChannel,world) = case mbSelect of
                Just SR_Available
		            # (data,rChannel,world) = receive rChannel world
                    = (Just (toString data),rChannel,world)
                _
                    = (Nothing,rChannel,world)
            //Call whileConnected function
            # (mbConState,mbw,out,close,iworld)
                = handlers.ConnectionHandlersIWorld.whileConnected data conState (fromOk mbr) {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world} 
            //Queue refresh when there was new data or when the connection was closed
224 225
            # iworld = if (isNothing data) iworld (queueRefresh [(instanceNo, "New TCP data for "<+++instanceNo)] iworld)
            # iworld = if close (queueRefresh [(instanceNo, "TCP connection closed for "<+++instanceNo)] iworld) iworld
226 227 228 229 230 231 232 233 234
            //Write share
            # iworld=:{ioTasks={todo,done},ioStates,world} = writeShareIfNeeded sds mbw iworld
            | mbConState =:(Error _)
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException (fromError mbConState)) ioStates
 		        # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
            # conStates = 'DM'.put opts.ConnectionInstanceOpts.connectionId (fromOk mbConState,close) conStates
            //Send data if produced
235 236 237 238
            # (sChannel,world) = case out of
                []          = (sChannel,world)
                data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
            | close
239 240 241 242 243
                //Remove the connection state if configured in the connection listener options
                # conStates = if opts.ConnectionInstanceOpts.removeOnClose
                    ('DM'.del opts.ConnectionInstanceOpts.connectionId conStates)
                    conStates
                # ioStates  = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive conStates) ioStates
244 245
 		        # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
246 247 248 249 250 251 252 253 254 255 256 257 258
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
            | otherwise
                //Perssist connection
                # ioStates  = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive conStates) ioStates
                = process (i+1) chList {iworld & ioTasks={done=[ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:done],todo=todo},ioStates=ioStates,world=world}
        Just (IODestroyed conStates)
 	        # world = closeRChannel rChannel world
            # world = closeChannel sChannel world
            //Remove the state for this connection
            # conStates = 'DM'.del opts.ConnectionInstanceOpts.connectionId conStates
            //If this is the last connection for this task, we can clean up.
            # ioStates = if ('DM'.mapSize conStates == 0) ('DM'.del opts.ConnectionInstanceOpts.taskId ioStates) ioStates
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
259
        _
260 261 262 263 264
            //No state, just close
 	        # world = closeRChannel rChannel world
            # world = closeChannel sChannel world
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
           
265
process i chList iworld=:{ioTasks={done,todo=[BackgroundInstance opts bt=:(BackgroundTask eval):todo]}}
266
    # iworld=:{ioTasks={done,todo}} = eval {iworld & ioTasks = {done=done,todo=todo}}
267
    = process (i+1) chList {iworld & ioTasks={done=[BackgroundInstance opts bt:done],todo=todo}}
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
process i chList iworld=:{ioTasks={done,todo=[t:todo]}}
    = process (i+1) chList {iworld & ioTasks={done=[t:done],todo=todo}}

writeShareIfNeeded sds Nothing iworld = iworld
writeShareIfNeeded sds (Just w) iworld 
    # (_,iworld) = 'SDS'.write w sds iworld //TODO: Deal with exceptions at this level
    = iworld

addListener :: !TaskId !Int !Bool !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addListener taskId port removeOnClose connectionTask iworld=:{ioTasks={todo,done}, ioStates, world}
    //Open listener
    # (success, mbListener, world) = openTCP_Listener port world
    | not success
        = (Error (exception ("Error: port "+++ toString port +++ " already in use.")), {iworld & ioTasks = {done=done,todo=todo},world = world})
    # opts = {ListenerInstanceOpts|taskId = taskId, nextConnectionId = 0, port = port, connectionTask= connectionTask, removeOnClose = removeOnClose}
    # todo = todo ++ [ListenerInstance opts (fromJust mbListener)]
    # ioStates = 'DM'.put taskId (IOActive 'DM'.newMap) ioStates
    = (Ok (),{iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})

addConnection :: !TaskId !String !Int !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addConnection taskId=:(TaskId instanceNo _) host port connectionTask iworld=:{ioTasks={done,todo},ioStates,world}
    # (mbIP,world) = lookupIPAddress host world
    | mbIP =: Nothing
        = (Error (exception ("Failed to connect to host "+++ host)), {iworld & ioTasks = {done=done,todo=todo}, world = world})
    # (tReport,mbConn,world) = connectTCP_MT Nothing (fromJust mbIP,port) world
    = case mbConn of
        Nothing
            = (Error (exception ("Failed to connect to host "+++ host)), {iworld & ioTasks = {done=done,todo=todo}, world = world})
        Just {DuplexChannel|rChannel,sChannel}
            # ip                                = fromJust mbIP
            # (ConnectionTask handlers sds)     = connectionTask
            // Read share
            # (mbr,iworld) = 'SDS'.read sds {iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world}
            | mbr =: (Error _)
                = (liftError mbr, iworld)
            // Evaluate onConnect handler
            # (mbl,mbw,out,close,iworld=:{IWorld|ioTasks={done,todo},ioStates,world}) = handlers.ConnectionHandlersIWorld.onConnect (toString ip) (fromOk mbr) iworld
            // Write possible output
            # (sChannel,world) = case out of
                []          = (sChannel,world)
                data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
            //Close connection, or add to queue
            | close
         	    # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = (Ok (), {iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})
            | otherwise
                # opts = {ConnectionInstanceOpts|taskId = taskId, connectionId = 0, remoteHost = ip, connectionTask = connectionTask, removeOnClose = False}
                # todo = todo ++ [ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}]
                # ioStates = case mbl of
                    Ok l        = 'DM'.put taskId (IOActive ('DM'.fromList [(0,(l,False))])) ioStates
                    Error e     = 'DM'.put taskId (IOException e) ioStates
                = (Ok (),{iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})
321

322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
//Dynamically add a background task
addBackgroundTask :: !BackgroundTaskId !BackgroundTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addBackgroundTask btid bt iworld=:{ioTasks={done,todo}}
# todo = todo ++ [BackgroundInstance {BackgroundInstanceOpts|bgInstId=btid} bt]
= (Ok (), {iworld & ioTasks={done=done, todo=todo}})

//Dynamically remove a background task
removeBackgroundTask :: !BackgroundTaskId !*IWorld -> (!MaybeError TaskException (),!*IWorld)
removeBackgroundTask btid iworld=:{ioTasks={done,todo}} 
//We filter the tasks and use the boolean state to hold whether a task was dropped
# (r, todo) = foldr (\e (b, l)->let (b`, e`)=drop e in (b` || b, if b` l [e`:l])) (False, []) todo
# iworld = {iworld & ioTasks={done=done, todo=todo}}
| not r = (Error (exception "No backgroundtask with that id"), iworld)
= (Ok (), iworld)
	where
		drop a=:(BackgroundInstance {bgInstId} _) = (bgInstId == btid, a)
		drop a = (False, a)

340 341 342
checkSelect :: !Int ![(!Int,!SelectResult)] -> (!Maybe SelectResult,![(!Int,!SelectResult)])
checkSelect i chList =:[(who,what):ws] | (i == who) = (Just what,ws)
checkSelect i chList = (Nothing,chList)
343

344
halt :: !*IWorld -> *IWorld
345 346
halt iworld=:{ioTasks={todo=[],done}} = iworld
halt iworld=:{ioTasks={todo=[ListenerInstance _ listener:todo],done},world}
347
 	# world = closeRChannel listener world
348 349
    = halt {iworld & ioTasks = {todo=todo,done=done}}
halt iworld=:{ioTasks={todo=[ConnectionInstance _ {rChannel,sChannel}:todo],done},world}
350 351
 	# world = closeRChannel rChannel world
    # world = closeChannel sChannel world
352
    = halt {iworld & ioTasks = {todo=todo,done=done}}
353
halt iworld=:{ioTasks={todo=[BackgroundInstance _ _ :todo],done},world}
354
    = halt {iworld & ioTasks= {todo=todo,done=done}}
355