Commit 62e7fb68 authored by Haye Böhm's avatar Haye Böhm

Tracing, fix updating connectionmap with value

parent e81f9eb6
Pipeline #15419 failed with stage
in 4 minutes and 38 seconds
......@@ -82,8 +82,8 @@ where
[BackgroundTask (processEvents MAX_EVENTS)
:if (webTasks =: [])
[BackgroundTask stopOnStable]
[BackgroundTask removeOutdatedSessions
,BackgroundTask flushWritesWhenIdle
[//BackgroundTask removeOutdatedSessions
BackgroundTask flushWritesWhenIdle
]
]
......
......@@ -2,7 +2,7 @@ implementation module iTasks.Internal.AsyncSDS
import Data.Maybe, Data.Either, Data.List, Data.Func
import Text, Text.GenJSON
import StdMisc, StdArray
import StdMisc, StdArray, StdBool
import Internet.HTTP
import iTasks.Engine
......@@ -118,7 +118,7 @@ where
(Error error) = (Error error, Nothing)
(Ok a) = (Ok (Right a), Nothing)
queueServiceRequest service=:(SDSRemoteService (TCPShareOptions {host, port, createMessage, fromTextResponse})) p taskId keepOpen env
queueServiceRequest service=:(SDSRemoteService (TCPShareOptions {host, port, createMessage, fromTextResponse})) p taskId register env
= case addConnection taskId host port connectionTask env of
(Error e, env) = (Error e, env)
(Ok (id, _), env) = (Ok id, env)
......@@ -129,17 +129,19 @@ where
onShareChange = onShareChange,
onDisconnect = onDisconnect}
onConnect connId _ _ = trace_n ("New TCP connection: " +++ toString connId) (Ok ([], []), Nothing, [createMessage p +++ "\n"], False)
onConnect connId _ _ = trace_n ("New TCP connection: " +++ toString connId +++ ". Sending: \n" +++ createMessage p) (Ok ([], []), Nothing, [createMessage p +++ "\n"], False)
onData data (previous, acc) _
| not (trace_tn ("Received " +++ data)) = undef
# newacc = acc ++ [data]
= case fromTextResponse (concat newacc) p of
| register && not (isnull previous) = trace_n "Close previously registered connection" (Ok (previous, newacc), Nothing, [], True)
= case fromTextResponse (concat newacc) p register of
Error e = (Error e, Nothing, [], True)
Ok Nothing = (Ok (previous, newacc), Nothing, [], False)
Ok (Just r)
# rrs = [r : previous]
| not (trace_tn ("Number of r's: " +++ toString (length rrs))) = undef
= (Ok (rrs, []), Nothing, [], False)
Ok (Nothing,response) = (Ok (previous, newacc), Nothing, maybe [] (\resp. [resp]) response, False)
Ok (Just r, Just resp)
| not (trace_tn ("Registering: " +++ resp)) = undef
= (Ok ([r : previous], []), Nothing, [resp], False)
Ok (Just r, Nothing) = trace_n "Not responding, normal read" (Ok ([r : previous], []), Nothing, [], not register)
onShareChange state _ = (Ok state, Nothing, [], False)
onDisconnect state _ = (Ok state, Nothing)
......@@ -187,33 +189,40 @@ queueModify f rsds=:(SDSRemoteSource sds share=:{SDSShareOptions|domain, port})
getAsyncServiceValue :: !(SDSRemoteService p r w) !TaskId !ConnectionId IOStates -> MaybeError TaskException (Maybe r) | TC r & TC w & TC p
getAsyncServiceValue service taskId connectionId ioStates
# getValue = case service of
(SDSRemoteService (HTTPShareOptions _)) = getValueHttp
(SDSRemoteServiceQueued _ _ (HTTPShareOptions _)) = getValueHttp
(SDSRemoteService (TCPShareOptions _)) = getValueTCP
(SDSRemoteServiceQueued _ _ (TCPShareOptions _)) = getValueTCP
SDSRemoteService (HTTPShareOptions _) = getValueHttp
SDSRemoteServiceQueued _ _ (HTTPShareOptions _) = getValueHttp
SDSRemoteService (TCPShareOptions _) = getValueTCP
SDSRemoteServiceQueued _ _ (TCPShareOptions _) = getValueTCP
= case 'DM'.get taskId ioStates of
Nothing = Error (exception "No iostate for this task")
(Just ioState) = case ioState of
(IOException exc) = Error (exception exc)
(IOActive connectionMap) = getValue connectionId connectionMap
(IODestroyed connectionMap) = getValue connectionId connectionMap
Just ioState = case ioState of
IOException exc = Error (exception exc)
IOActive connectionMap = getValue connectionId connectionMap
IODestroyed connectionMap = getValue connectionId connectionMap
where
getValueHttp connectionId connectionMap = case 'DM'.get connectionId connectionMap of
(Just (value :: Either [String] r^, _)) = case value of
Just (value :: Either [String] r^, _) = case value of
(Left _) = Ok Nothing
(Right val) = Ok (Just val)
(Just (dyn, _)) = Error (exception ("Dynamic not of the correct service type, got: " +++ toString (typeCodeOfDynamic dyn) +++ ", required: " +++ toString (typeCodeOfDynamic (dynamic service))))
Just (dyn, _)
# message = "Dynamic not of the correct service type, got: "
+++ toString (typeCodeOfDynamic dyn)
+++ ", required: "
+++ toString (typeCodeOfDynamic (dynamic service))
= Error (exception message)
Nothing = Ok Nothing
getValueTCP connectionId connectionMap
| not (trace_tn ("Get value from TCP service for connection " +++ toString connectionId)) = undef
= case 'DM'.get connectionId connectionMap of
(Just (value :: ([r^], [String]), _))
| not (trace_tn "Got some value..") = undef
= case value of
([], _) = trace_n "No read value yet" (Ok Nothing)
([r : rs],_) = trace_n "Got value!!" Ok (Just r)
(Just (dyn, _)) = Error (exception ("Dynamic not of the correct service type, got: " +++ toString (typeCodeOfDynamic dyn) +++ ", required: " +++ toString (typeCodeOfDynamic (dynamic service))))
Just (value :: ([r^], [String]), _) = case value of
([], _) = Ok Nothing
([r : rs],_) = Ok (Just r)
Just (dyn, _)
# message = "Dynamic not of the correct service type, got: "
+++ toString (typeCodeOfDynamic dyn)
+++ ", required: "
+++ toString (typeCodeOfDynamic (dynamic service))
= Error (exception message)
Nothing = Ok Nothing
getAsyncReadValue :: !(sds p r w) !TaskId !ConnectionId IOStates -> MaybeError TaskException (Maybe r) | TC r
......
......@@ -310,7 +310,7 @@ processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandle
# mbTaskState = 'DM'.get connectionId taskStates
| isNothing mbTaskState
# iworld = if (instanceNo > 0) (queueRefresh [(taskId, "Exception for " <+++ instanceNo)] iworld) iworld
# ioStates = 'DM'.put taskId (IOException ("Missing IO task state for task " +++ toString taskId +++ ". We have: " +++ (concat $ map (\k. toString k +++ ",") $ 'DM'.keys taskStates) +++ ". Required: " +++ toString connectionId)) ioStates
# ioStates = 'DM'.put taskId (IOException "Missing IO task state for task ") ioStates
= ioOps.closeIO (ioChannels, {iworld & ioStates = ioStates})
# taskState = fst (fromJust mbTaskState)
......@@ -373,15 +373,16 @@ processIOTask i chList taskId connectionId removeOnClose sds ioOps onCloseHandle
# {done, todo} = iworld.ioTasks
= {iworld & ioStates = ioStates, ioTasks = {done = [mkIOTaskInstance ioChannels : done], todo = todo}}
IODData data
| not (trace_tn ("Got data for task " +++ toString taskId +++ ", close: " +++ toString close)) = undef
# (mbTaskState, mbw, out, close, iworld) = onDataHandler data taskState r iworld
# (connectionMap, iworld) = connMapForTask taskId iworld
# iworld = if (instanceNo > 0) (queueRefresh [(taskId, "New data for "<+++ instanceNo)] iworld) iworld
# (mbSdsErr, iworld) = writeShareIfNeeded sds mbw iworld
// write data
# (ioChannels, iworld) = seq [ioOps.writeData o \\ o <- out] (ioChannels, iworld)
| mbTaskState =: (Error _) = taskStateException mbTaskState instanceNo ioStates ioOps.closeIO (ioChannels, iworld)
| isError mbSdsErr = sdsException mbSdsErr instanceNo ioStates ioOps.closeIO (ioChannels, iworld)
# ioStates = 'DM'.put taskId (IOActive ('DM'.put connectionId (fromOk mbTaskState, close) connectionMap)) ioStates
# (connectionMap, iworld) = appFst (\map. 'DM'.put connectionId (fromOk mbTaskState, close) map) (connMapForTask taskId iworld)
# ioStates = 'DM'.put taskId (IOActive connectionMap) ioStates
| close = closeConnection connectionMap ioStates ioOps.closeIO (ioChannels, iworld)
| otherwise
// persist connection
......
......@@ -160,6 +160,7 @@ where
reducer _ [(_,attr)] = Ok attr
import StdDebug, StdMisc
taskListItemValue :: !(SharedTaskList a) -> SDSLens (Either Int TaskId) (TaskValue a) () | TC a
taskListItemValue tasklist = mapReadError read (toReadOnly (sdsTranslate "taskListItemValue" listFilter tasklist))
where
......@@ -167,7 +168,7 @@ where
listFilter (Right taskId) = {onlyIndex=Nothing,onlyTaskId=Just [taskId],onlySelf=False,includeValue=True,includeAttributes=False,includeProgress=False}
read (_,items) = case [value \\ {TaskListItem|value} <- items] of
[v:_] = Ok v
vs=:[v:_] = trace_n ("taskListItemValues: " +++ toString (length vs)) (Ok v)
_ = Error (exception "taskListItemValue: item not found")
taskListItemProgress :: !(SharedTaskList a) -> SDSLens (Either Int TaskId) InstanceProgress () | TC a
......
......@@ -280,7 +280,7 @@ required type w. The reducer has the job to turn this ws into w.
{ host :: String
, port :: Int
, createMessage :: p -> String
, fromTextResponse :: String p -> MaybeErrorString (Maybe r)}
, fromTextResponse :: String p Bool -> MaybeErrorString (Maybe r, Maybe String)}
:: SDSRemoteService p r w =
/**
......
......@@ -120,11 +120,16 @@ where
Just a = (ValueResult (Value a True) {lastEvent=ts,removedTasks=[],refreshSensitive=False} (rep event) s, iworld)
Nothing = (ExceptionResult (exception "Corrupt task result"), iworld)
import StdMisc, StdDebug
derive gText Event, Set
watch :: !(sds () r w) -> Task r | iTask r & TC w & Readable, Registrable sds
watch shared = Task (eval shared)
where
eval :: (sds () r w) Event TaskEvalOpts TaskTree *IWorld -> (TaskResult r, !*IWorld) | iTask r & TC w & Readable, Registrable sds
eval shared event evalOpts (TCInit taskId ts) iworld=:{sdsEvalStates} = case 'SDS'.readRegister taskId shared iworld of
eval shared event evalOpts (TCInit taskId ts) iworld=:{sdsEvalStates}
| not (trace_tn ("watch eval init bacause " +++ toSingleLineText event)) = undef
= case 'SDS'.readRegister taskId shared iworld of
(Error e, iworld) = (ExceptionResult e, iworld)
(Ok (ReadingDone val), iworld) = (ValueResult (Value val False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} (rep event) (TCInit taskId ts), iworld)
(Ok (Reading sds), iworld)
......@@ -132,7 +137,9 @@ where
# sdsEvalStates = 'DM'.put taskId (dynamicResult ('SDS'.readRegister taskId sds)) sdsEvalStates
= (ValueResult NoValue ei (rep event) (TCAwait Read taskId ts (TCInit taskId ts)), {iworld & sdsEvalStates = sdsEvalStates})
eval shared event evalOpts (TCAwait Read taskId ts subtree) iworld=:{sdsEvalStates} = case 'DM'.get taskId sdsEvalStates of
eval shared event evalOpts (TCAwait Read taskId ts subtree) iworld=:{sdsEvalStates}
| not (trace_tn ("watch eval await bacause " +++ toSingleLineText event)) = undef
= case 'DM'.get taskId sdsEvalStates of
Nothing = (ExceptionResult (exception ("No SDS state found for task " +++ toString taskId)), iworld)
Just val = case val iworld of
(Error e, iworld) = (ExceptionResult e, iworld)
......@@ -143,6 +150,7 @@ where
= (ValueResult NoValue {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} NoChange (TCAwait Read taskId ts (TCInit taskId ts)), {iworld & sdsEvalStates = sdsEvalStates})
eval shared event repAs ttree=:(TCDestroy _) iworld
| not (trace_tn ("watch eval destroy bacause " +++ toSingleLineText event)) = undef
# iworld = 'SDS'.clearTaskSDSRegistrations ('DS'.singleton $ fromOk $ taskIdFromTaskTree ttree) iworld
= (DestroyedResult,iworld)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment