Commit a290e9ff authored by Haye Böhm's avatar Haye Böhm

Rewrite share communication to use TCP instead of HTTP

parent 844e6c44
Pipeline #13878 passed with stage
in 17 minutes and 7 seconds
......@@ -14,24 +14,24 @@ derive class iTask TestRecord
:: TestRecord = {number :: Int, numbers :: [Int], text :: String, texts :: [String]}
testShare = sharedStore "sharedStoreNamebla" {number = 37, numbers = [1, 2, 3], text = "Test", texts = ["een", "twee", "drie", "vier"]}
remoteTestShare = remoteShare testShare {domain = "TEST", port = 8080}
remoteTestShare = remoteShare testShare {domain = "TEST", port = 9999}
leftShare = sharedStore "leftShare" (1, 2, 3)
rightShare = sharedStore "rightShare" (10, 20, 30)
parallelShare = leftShare >*< rightShare
remoteParallelShare = remoteShare parallelShare {domain = "TEST", port = 8080}
remoteParallelShare = remoteShare parallelShare {domain = "TEST", port = 9999}
parallelWithLeftRemote = (remoteShare leftShare {domain = "TEST", port = 8080}) >*< rightShare
parallelWithRightRemote = leftShare >*< (remoteShare rightShare {domain = "TEST", port = 8080})
parallelWithLeftRemote = (remoteShare leftShare {domain = "TEST", port = 9999}) >*< rightShare
parallelWithRightRemote = leftShare >*< (remoteShare rightShare {domain = "TEST", port = 9999})
intShare = sharedStore "intShare" 15
simpleShare = remoteShare intShare {domain="TEST", port=8080}
simpleShare = remoteShare intShare {domain="TEST", port=9999}
projectedRemote = sdsProject (SDSLensRead (\r. Ok (r + 2))) (SDSLensWrite (\_ r. Ok (Just (r - 2)))) (Just \_ ws. Ok (ws + 2)) simpleShare
projectedLocal = sdsProject (SDSLensRead (\r. Ok (r + 2))) (SDSLensWrite (\_ r. Ok (Just (r - 2)))) (Just \_ ws. Ok (ws + 2)) intShare
selectShare = sdsSelect "testSelect" param (SDSNotifyConst (\_ _ _ _-> False)) (SDSNotifyConst (\_ _ _ _-> False))
(remoteShare leftShare {domain="TEST", port=8080}) rightShare
(remoteShare leftShare {domain="TEST", port=9999}) rightShare
where
param i
| i == 0 = Left ()
......@@ -50,7 +50,8 @@ where
, publish "/SDSRemoteService" (const sdsRemoteServiceTest)
, publish "/SDSSelect" (const sdsSelectTest)
, publish "/SDSSelectRemote" (const sdsSelectRemoteTest)
, publish "/all" (\_. viewAll)]
, publish "/all" (\_. viewAll)
, publish "/host" (const hostShares)]
sdsSelectRemoteTest = ((enterInformation "Enter the value to be SET for SDSSelect" [] >>= \v. set v (sdsFocus 0 selectShare))
-&&-
......@@ -144,6 +145,9 @@ where
-&&- viewSharedInformation "Value of intShare" [] intShare)
@! ())
hostShares = enterInformation "Please enter the share host port" []
>>= \port. sdsServiceTask port
// ======= Definitions required for defining a remote service =======
// TODO: Create HTTP request by focussing the parameter
......
......@@ -57,6 +57,8 @@ import
//JSON(En|De)code for Dynamic and (->)
from iTasks.Internal.Serialization import generic JSONEncode, generic JSONDecode
from iTasks.Internal.SDSService import sdsServiceTask
import iTasks.Internal.SDS
from StdFunc import id, const, o
from Data.List import instance Functor []
......@@ -47,7 +47,6 @@ derive class iTask EngineOptions
doTasks :: a !*World -> *World | Startable a
doTasks startable world = doTasksWithOptions defaultEngineCLIOptions startable world
import StdDebug,StdMisc
doTasksWithOptions :: ([String] EngineOptions -> MaybeError [String] EngineOptions) a !*World -> *World | Startable a
doTasksWithOptions initFun startable world
# (cli,world) = getCommandLine world
......@@ -94,7 +93,6 @@ where
engineWebService webtasks =
[taskUIService webtasks
,documentService
,sdsService
,staticResourceService [path \\ {WebTask|path} <- webtasks]
]
......
......@@ -24,7 +24,9 @@ derive JSONEncode SDSNotifyRequest, RemoteNotifyOptions
createRequestString req = serializeToBase64 req
onConnect reqq _ _ = (Ok (Left []), Nothing, [createRequestString reqq], False)
onConnect reqq _ _
# rs = createRequestString reqq
= (Ok (Left []), Nothing, [ rs +++ "\n"], False)
onData data (Left acc) _ = (Ok (Left (acc ++ [data])), Nothing, [], False)
......@@ -45,10 +47,7 @@ where
onDisconnect (Left acc) _
# rawResponse = concat acc
= case parseResponse rawResponse of
Nothing = (Error ("Unable to parse HTTP response, got: " +++ rawResponse), Nothing)
(Just parsed)
# r = deserializeFromBase64 parsed.rsp_data symbols
# r = deserializeFromBase64 rawResponse symbols
= (Ok (Right r), Nothing)
queueModifyRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r & TC w
......@@ -66,10 +65,7 @@ where
onDisconnect (Left acc) _
# rawResponse = concat acc
= case parseResponse rawResponse of
Nothing = (Error ("Unable to parse HTTP response, got: " +++ rawResponse), Nothing)
(Just parsed)
# r = deserializeFromBase64 parsed.rsp_data symbols
# r = deserializeFromBase64 rawResponse symbols
= (Ok (Right r), Nothing)
queueServiceRequest :: !(SDSRemoteService p r w) p !TaskId !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r
......
......@@ -2,4 +2,4 @@ definition module iTasks.Internal.SDSService
import iTasks.Internal.WebService
sdsService :: WebService a a
sdsServiceTask :: Int -> Task ()
......@@ -18,7 +18,7 @@ import StdString, StdList, StdArray
import qualified Data.Map as DM
import Data.Maybe, Data.Error
import Text.GenJSON, Text.URI
import StdMisc, graph_to_sapl_string
import StdMisc, StdDebug
import Data.Queue, Data.Functor
import iTasks.Extensions.Distributed._Formatter
......@@ -26,57 +26,58 @@ import iTasks.SDS.Definition
import iTasks.Internal.Distributed.Symbols
from iTasks.Internal.TaskStore import queueRefresh
import StdDebug
import iTasks.Internal.TaskState
import iTasks.Internal.Task
import iTasks.Internal.TaskServer
import iTasks.Internal.TaskEval
import qualified Data.Set as Set
import Text.GenPrint
derive gPrint HTTPRequest, Map, HTTPUpload, HTTPMethod, HTTPProtocol
sdsServiceTask :: Int -> Task ()
sdsServiceTask port = Task eval
where
eval event evalOpts tree=:(TCInit taskId ts) iworld
# (symbols, iworld) = case read symbolsShare EmptyContext iworld of
(Ok (ReadResult symbols _), iworld) = (readSymbols symbols, iworld)
# (mbError, iworld) = addListener taskId port True (wrapIWorldConnectionTask (handlers symbols) (sharedStore "sdsServiceTaskShare" "empty")) iworld
| mbError=:(Error _) = (ExceptionResult (fromError mbError), iworld)
= (ValueResult (Value () False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} (ReplaceUI (ui UIEmpty)) (TCBasic taskId ts (DeferredJSONNode JSONNull) False), iworld)
derive JSONEncode SDSNotifyRequest, RemoteNotifyOptions
eval event evalOpts (TCBasic taskId ts data bla) iworld = (ValueResult (Value () False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} (ReplaceUI (ui UIEmpty)) (TCBasic taskId ts data bla), iworld)
sdsService :: WebService a a
sdsService = { urlMatchPred = matchFun
, completeRequest = True
, onNewReq = reqFun
, onData = dataFun
handlers symbols = {ConnectionHandlersIWorld|onConnect = onConnect
, onData = onData symbols
, onShareChange = onShareChange
, onTick = onTick
, onDisconnect = disconnectFun
, onDisconnect = onDisconnect
}
where
matchFun :: String -> Bool
matchFun reqUrl = case pathToSegments reqUrl of
["","sds",_] = True
= False
reqFun :: !HTTPRequest a !*IWorld -> *(!HTTPResponse, !Maybe ConnectionState, !Maybe a, !*IWorld)
reqFun req=:{req_data, server_name} _ iworld
# (symbols, iworld) = case read symbolsShare EmptyContext iworld of
(Ok (ReadResult symbols _), iworld) = (readSymbols symbols, iworld)
= case deserializeFromBase64 req_data symbols of
onConnect :: String String *IWorld -> *(!MaybeErrorString String, Maybe w, ![String], !Bool, !*IWorld)
onConnect clientName sdsValue iworld = (Ok clientName, Nothing, [], False, iworld)
onData :: {#Symbol} !String String r *IWorld -> *(!MaybeErrorString String, Maybe w, ![String], !Bool, !*IWorld)
onData symbols receivedData state sdsValue iworld
= case deserializeFromBase64 receivedData symbols of
(SDSReadRequest sds p) = case readSDS sds p EmptyContext Nothing (sdsIdentity sds) iworld of
(Error (_, e), iworld) = (errorResponse e, Nothing, Nothing, iworld)
(Ok (ReadResult v _), iworld) = trace_n ("Got read") (base64Response (serializeToBase64 v), Nothing, Nothing, iworld)
(SDSRegisterRequest sds p reqSDSId taskId port) = case readSDS sds p (RemoteTaskContext taskId server_name port) (Just taskId) reqSDSId iworld of
(Error (_, e), iworld) = (errorResponse e, Nothing, Nothing, iworld)
(Ok (ReadResult v _), iworld) = trace_n ("Got register") (base64Response (serializeToBase64 v), Nothing, Nothing, iworld)
(Error (_, e), iworld) = (Error e, Nothing, [], True, iworld)
(Ok (ReadResult v _), iworld) = trace_n "Got read" (Ok state, Nothing, [serializeToBase64 v], True, iworld)
(SDSRegisterRequest sds p reqSDSId taskId port) = case readSDS sds p (RemoteTaskContext taskId "test" port) (Just taskId) reqSDSId iworld of
(Error (_, e), iworld) = (Error e, Nothing, [], True, iworld)
(Ok (ReadResult v _), iworld) = trace_n "Got register" (Ok state, Nothing, [serializeToBase64 v], True, iworld)
(SDSWriteRequest sds p val) = case writeSDS sds p EmptyContext val iworld of
(Error (_, e), iworld) = (errorResponse e, Nothing, Nothing, iworld)
(Ok (WriteResult notify _), iworld) = trace_n "Got write" (base64Response (serializeToBase64 ()), Nothing, Nothing, queueNotifyEvents (sdsIdentity sds) notify iworld)
(Error (_, e), iworld) = (Error e, Nothing, [], True, iworld)
(Ok (WriteResult notify _), iworld) = trace_n "Got write" (Ok state, Nothing, [serializeToBase64 ()], True, queueNotifyEvents (sdsIdentity sds) notify iworld)
(SDSModifyRequest sds p f) = case modifySDS f sds p EmptyContext iworld of
(Error (_, e), iworld) = (errorResponse e, Nothing, Nothing, iworld)
(Ok (ModifyResult r w _), iworld) = trace_n ("Got modify") (base64Response (serializeToBase64 (r,w)), Nothing, Nothing, iworld)
(Error (_, e), iworld) = (Error e, Nothing, [], True, iworld)
(Ok (ModifyResult r w _), iworld) = trace_n "Got modify" (Ok state, Nothing, [serializeToBase64 (r,w)], True, iworld)
(SDSRefreshRequest taskId sdsId)
# iworld = (queueRefresh [(taskId, "Notification for remote write of " +++ sdsId)] iworld)
= (plainResponse "Refresh queued", Nothing, Nothing, iworld)
plainResponse string
= {okResponse & rsp_headers = [("Content-Type","text/plain"), ("Content-Length", toString (size string))], rsp_data = string}
= (Ok state, Nothing, ["Refresh queued"], True, iworld)
base64Response string = {okResponse & rsp_headers = [("Content-Type","text/plain;base64"), ("Content-Length", toString (size string))], rsp_data = string}
onShareChange :: ! String r *IWorld -> *(!MaybeErrorString String, Maybe w, ![String], !Bool, !*IWorld)
onShareChange state sdsValue iworld = (Ok state, Nothing, [], False, iworld)
dataFun req _ data instanceNo iworld = ([], True, instanceNo, Nothing, iworld)
onTick :: ! String r *IWorld -> *(!MaybeErrorString String, Maybe w, ![String], !Bool, !*IWorld)
onTick state sdsValue iworld = (Ok state, Nothing, [], False,iworld)
onShareChange _ _ s iworld = ([], True, s, Nothing, iworld)
onTick _ _ instanceNo iworld =([], True, instanceNo, Nothing, iworld)
disconnectFun _ _ _ iworld = (Nothing,iworld)
onDisconnect :: ! String r *IWorld -> *(!MaybeErrorString String, Maybe w, !*IWorld)
onDisconnect state sdsValue iworld = (Ok state, Nothing, iworld)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment