Commit 07539bcb authored by Haye Böhm's avatar Haye Böhm

add error logging to SDSService, AsyncSDS module

parent 338d4850
Pipeline #14176 failed with stage
in 1 minute and 25 seconds
implementation module iTasks.Internal.AsyncSDS
import Data.Maybe, Data.Either, Data.List
import Data.Maybe, Data.Either, Data.List, Data.Func
import Text, Text.GenJSON
import StdMisc, StdArray
import Internet.HTTP
......@@ -46,8 +46,9 @@ where
onDisconnect = onDisconnect}
onDisconnect (Left acc) _
# r = deserializeFromBase64 (concat acc) symbols
= (Ok (Right r), Nothing)
# textResponse = concat acc
| size textResponse < 1 = (Error ("queueSDSRequest: Server " +++ host +++ " disconnected without responding"), Nothing)
= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)
queueModifyRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r & TC w
queueModifyRequest req=:(SDSModifyRequest p r w) host port taskId symbols env = case addConnection taskId host port connectionTask env of
......@@ -63,9 +64,9 @@ where
onDisconnect = onDisconnect}
onDisconnect (Left acc) _
# rawResponse = concat acc
# r = deserializeFromBase64 rawResponse symbols
= (Ok (Right r), Nothing)
# textResponse = concat acc
| size textResponse == 0 = (Error ("queueModifyRequest: Server" +++ host +++ " disconnected without responding"), Nothing)
= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)
queueServiceRequest :: !(SDSRemoteService p r w) p !TaskId !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r
queueServiceRequest (SDSRemoteService (HttpShareOptions req parse)) p taskId env = case addConnection taskId req.server_name req.server_port connectionTask env of
......@@ -86,9 +87,10 @@ where
onShareChange acc _ = (Ok acc, Nothing, [], False)
onDisconnect (Left acc) _
# rawResponse = concat acc
= case parseResponse rawResponse of
Nothing = (Error ("Unable to parse HTTP response, got: " +++ rawResponse), Nothing)
# textResponse = concat acc
| size textResponse == 0 = (Error ("queueServiceRequest: Server" +++ req.server_name +++ ":" +++ toString req.server_port +++ " disconnected without responding"), Nothing)
= case parseResponse textResponse of
Nothing = (Error ("Unable to parse HTTP response, got: " +++ textResponse), Nothing)
(Just parsed) = case parse parsed of
(Left error) = (Error error, Nothing)
(Right a) = (Ok (Right a), Nothing)
......@@ -171,7 +173,6 @@ getAsyncModifyValue _ taskId connectionId ioStates = case 'DM'.get taskId ioSta
(IOException exc) = Left exc
(IOActive connectionMap) = getValue connectionId connectionMap
(IODestroyed connectionMap) = getValue connectionId connectionMap
Nothing = Right Nothing
where
getValue connectionId connectionMap
= case 'DM'.get connectionId connectionMap of
......
......@@ -4,6 +4,7 @@ import iTasks
from Data.Func import $
from StdMisc import abort
import StdArray
import iTasks.Extensions.Distributed._Formatter
import iTasks.SDS.Definition
......@@ -37,7 +38,7 @@ where
# (symbols, iworld) = case read symbolsShare EmptyContext iworld of
(Ok (ReadResult symbols _), iworld) = (readSymbols symbols, iworld)
# (mbError, iworld) = addListener taskId port True (wrapIWorldConnectionTask (handlers symbols taskId) share) iworld
| mbError=:(Error _) = (ExceptionResult (fromError mbError), iworld)
| mbError=:(Error _) = showException "initialization" (fromError mbError) iworld
# iworld = iShow ["SDS server listening on " +++ toString port] iworld
= (ValueResult (Value () False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} (ReplaceUI (ui UIEmpty)) (TCBasic taskId ts (DeferredJSONNode JSONNull) False), iworld)
......@@ -46,14 +47,18 @@ where
# (symbols, iworld) = case read symbolsShare EmptyContext iworld of
(Ok (ReadResult symbols _), iworld) = (readSymbols symbols, iworld)
# (readResult, iworld) = read share EmptyContext iworld
| readResult=:(Error _) = (ExceptionResult (fromError readResult), iworld)
| readResult=:(Error _) = showException "read from symbols share" (fromError readResult) iworld
# shareValue = 'Map'.toList (directResult (fromOk readResult))
# (results, iworld) = reevaluateShares symbols taskId shareValue iworld
| results=:(Error _) = (ExceptionResult (exception (fromError results)), iworld)
| results=:(Error _) = showException "re-evaluating share values" (exception (fromError results)) iworld
# (writeResult, iworld) = write ('Map'.fromList (fromOk results)) share EmptyContext iworld
| writeResult=:(Error _) = (ExceptionResult (fromError writeResult), iworld)
| writeResult=:(Error _) = showException "writing result share values" (fromError writeResult) iworld
= (ValueResult (Value () False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} NoChange tree, iworld)
showException base taskException=:(_, str) iworld
# iworld = iShow ["SDSService exception during " +++ base +++ ": " +++ str] iworld
= (ExceptionResult taskException, iworld)
handlers symbols taskId = {ConnectionHandlersIWorld|onConnect = onConnect
, onData = onData symbols taskId
, onShareChange = onShareChange
......@@ -104,7 +109,8 @@ where
// Right: Still need to do work..
performRequest :: !{#Symbol} !TaskId !String !String !*IWorld -> !(MaybeErrorString !(Either !String !String), !*IWorld)
performRequest symbols taskId host request iworld
| newlines (fromString request) > 1 = abort ("Multiple requests: " +++ request)
| newlines (fromString request) > 1 = (Error ("Received multiple requests (only one is allowed): " +++ request), iworld)
| size request == 0 = (Error "Received empty request", iworld)
= case deserializeFromBase64 request symbols of
(SDSReadRequest sds p) = case readSDS sds p (TaskContext taskId) Nothing (sdsIdentity sds) iworld of
(Error (_, e), iworld) = (Error e, iworld)
......
......@@ -14,6 +14,7 @@ import iTasks.Engine, iTasks.Internal.IWorld, iTasks.Internal.TaskEval, iTasks.I
import iTasks.Internal.IWorld
import iTasks.Internal.Task
import iTasks.Internal.TaskEval
import iTasks.Internal.Util
from iTasks.Internal.TaskStore import queueRefresh
import iTasks.WF.Tasks.IO
import iTasks.SDS.Combinators.Common
......@@ -403,6 +404,7 @@ where
*(!.ioChannels, !*IWorld)
-> *IWorld
taskStateException mbTaskState instanceNo ioStates closeIO (ioChannels, iworld)
# iworld = iShow ["Exception in TaskServer: taskStateException: " +++ fromError mbTaskState] iworld
# iworld = if (instanceNo > 0) (queueRefresh [(taskId, "Exception for " <+++ instanceNo)] iworld) iworld
# ioStates = 'DM'.put taskId (IOException (fromError mbTaskState)) ioStates
= closeIO (ioChannels, {iworld & ioStates = ioStates})
......@@ -414,6 +416,7 @@ where
*(!.ioChannels, !*IWorld)
-> *IWorld
sdsException mbSdsErr instanceNo ioStates closeIO (ioChannels, iworld)
# iworld = iShow ["Exception in TaskServer: sdsException: " +++ snd (fromError mbSdsErr)] iworld
# iworld = if (instanceNo > 0) (queueRefresh [(taskId, "Exception for " <+++ instanceNo)] iworld) iworld
# ioStates = 'DM'.put taskId (IOException (snd (fromError mbSdsErr))) ioStates
= closeIO (ioChannels, {iworld & ioStates = ioStates})
......
......@@ -11,7 +11,7 @@ from System.FilePath import :: FilePath
from System.Process import :: ProcessPtyOptions
from Data.Error import :: MaybeError, :: MaybeErrorString
:: ConnectionHandlers l r w =
:: ConnectionHandlers l r w =
{ onConnect :: !(ConnectionId String r -> (!MaybeErrorString l, Maybe w, ![String], !Bool))
, onData :: !( String l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool))
, onShareChange :: !( l r -> (!MaybeErrorString l, Maybe w, ![String], !Bool))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment