AsyncSDS.icl 13.8 KB
Newer Older
1 2
implementation module iTasks.Internal.AsyncSDS

3
import Data.Maybe, Data.Either, Data.List, Data.Func
4
import Text, Text.GenJSON
5
import StdMisc, StdArray, StdBool
6 7 8 9 10 11 12 13 14 15 16 17 18 19
import Internet.HTTP

import iTasks.Engine
import iTasks.Internal.Distributed.Symbols
import iTasks.Internal.IWorld
import iTasks.Internal.SDS
import iTasks.Internal.Task
import iTasks.SDS.Definition
import iTasks.WF.Tasks.IO

import iTasks.Extensions.Distributed._Formatter

from iTasks.Internal.TaskServer import addConnection
from iTasks.SDS.Sources.Core import unitShare
20
import iTasks.Internal.SDSService
21 22 23 24 25

import qualified Data.Map as DM

derive JSONEncode SDSNotifyRequest, RemoteNotifyOptions

Haye Böhm's avatar
Haye Böhm committed
26
createRequestString req = serializeToBase64 req
Haye Böhm's avatar
Haye Böhm committed
27

Haye Böhm's avatar
Haye Böhm committed
28
onConnect reqq connId _ _ = (Ok (Left []), Nothing, [createRequestString reqq +++ "\n"], False)
Haye Böhm's avatar
Haye Böhm committed
29

Haye Böhm's avatar
Haye Böhm committed
30
onData data (Left acc) _ = (Ok (Left (acc ++ [data])), Nothing, [], False)
Haye Böhm's avatar
Haye Böhm committed
31 32 33

onShareChange acc _ = (Ok acc, Nothing, [], False)

34
queueSDSRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r
35 36
queueSDSRequest req host port taskId symbols env
= case addConnection taskId host port connectionTask env of
Haye Böhm's avatar
Haye Böhm committed
37 38
	(Error e, env)  		= (Error e, env)
	(Ok (id, _), env)     	= (Ok id, env)
39
where
40
	connectionTask = wrapConnectionTask (handlers req) unitShare
41

42
	handlers :: (SDSRequest p r w) -> ConnectionHandlers (Either [String] (MaybeError TaskException r)) () () | TC r
43 44 45 46
	handlers _ = {ConnectionHandlers| onConnect = onConnect req,
		onData = onData,
		onShareChange = onShareChange,
		onDisconnect = onDisconnect}
47

48
	onDisconnect (Left acc) _
49 50 51
	# textResponse = concat acc
	| size textResponse < 1 = (Error ("queueSDSRequest: Server " +++ host +++ " disconnected without responding"), Nothing)
	= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)
Haye Böhm's avatar
Haye Böhm committed
52 53 54

queueModifyRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | TC r & TC w
queueModifyRequest req=:(SDSModifyRequest p r w) host port taskId symbols env = case addConnection taskId host port connectionTask env of
55 56
	(Error e, env)          = (Error e, env)
	(Ok (id, _), env)       = (Ok id, env)
Haye Böhm's avatar
Haye Böhm committed
57
where
58
	connectionTask = wrapConnectionTask (handlers req) unitShare
59

60
	handlers :: (SDSRequest p r w) -> ConnectionHandlers (Either [String] (MaybeError TaskException (r, w))) () () | TC r & TC w
61 62 63 64
	handlers _ = {ConnectionHandlers| onConnect = onConnect req,
		onData = onData,
		onShareChange = onShareChange,
		onDisconnect = onDisconnect}
65

66
	onDisconnect (Left acc) _
67 68 69
	# textResponse = concat acc
	| size textResponse == 0 = (Error ("queueModifyRequest: Server" +++ host +++ " disconnected without responding"), Nothing)
	= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)
70

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
queueWriteRequest :: !(SDSRequest p r w) !String !Int !TaskId !{#Symbol} !*IWorld ->  (!MaybeError TaskException ConnectionId, !*IWorld) | TC r & TC w
queueWriteRequest req=:(SDSWriteRequest sds p w) host port taskId symbols env = case addConnection taskId host port connectionTask env of
	(Error e, env)          = (Error e, env)
	(Ok (id, _), env)       = (Ok id, env)
where
	connectionTask = wrapConnectionTask (handlers req) unitShare

	handlers :: (SDSRequest p r w) -> ConnectionHandlers (Either [String] (MaybeError TaskException ())) () () | TC r & TC w
	handlers req = {ConnectionHandlers| onConnect = onConnect req,
		onData = onData,
		onShareChange = onShareChange,
		onDisconnect = onDisconnect}

	onDisconnect (Left acc) _
	# textResponse = concat acc
	| size textResponse == 0 = (Error ("queueWriteRequest: Server" +++ host +++ " disconnected without responding"), Nothing)
	= (Ok $ Right $ deserializeFromBase64 textResponse symbols, Nothing)

Haye Böhm's avatar
Haye Böhm committed
89 90 91
queueServiceRequest :: !(SDSRemoteService p r w) p !TaskId !Bool !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r
queueServiceRequest service=:(SDSRemoteService (HTTPShareOptions {host, port, createRequest, fromResponse})) p taskId _ env
= case addConnection taskId host port connectionTask env of
92 93
	(Error e, env) = (Error e, env)
	(Ok (id, _), env) = (Ok id, env)
94
where
Haye Böhm's avatar
Haye Böhm committed
95
	connectionTask = wrapConnectionTask (handlers service) unitShare
96

Haye Böhm's avatar
Haye Böhm committed
97
	handlers req = {ConnectionHandlers| onConnect = onConnect,
98 99 100
		onData = onData,
		onShareChange = onShareChange,
		onDisconnect = onDisconnect}
101

Haye Böhm's avatar
Haye Böhm committed
102 103 104 105
	onConnect _ _ _
	# req = createRequest p
	# sreq = toString {HTTPRequest|req & req_headers = 'DM'.put "Connection" "Close" req.HTTPRequest.req_headers}
	= (Ok (Left []), Nothing, [sreq], False)
106

107
	onData data (Left acc) _ = (Ok (Left (acc ++ [data])), Nothing, [], False)
108

109
	onShareChange acc _ = (Ok acc, Nothing, [], False)
110

Haye Böhm's avatar
Haye Böhm committed
111
	onDisconnect (Left []) _ = (Error ("queueServiceRequest: Server" +++ host +++ ":" +++ toString port +++ " disconnected without responding"), Nothing)
112
	onDisconnect (Left acc) _
113 114 115
	# textResponse = concat acc
	= case parseResponse textResponse of
		Nothing = (Error ("Unable to parse HTTP response, got: " +++ textResponse), Nothing)
Haye Böhm's avatar
Haye Böhm committed
116 117 118 119
		(Just parsed) = case fromResponse parsed p of
			(Error error) = (Error error, Nothing)
			(Ok a) = (Ok (Right a), Nothing)

120
queueServiceRequest service=:(SDSRemoteService (TCPShareOptions {host, port, createMessage, fromTextResponse})) p taskId register env
Haye Böhm's avatar
Haye Böhm committed
121 122 123 124 125 126 127 128 129 130
= case addConnection taskId host port connectionTask env of
	(Error e, env) = (Error e, env)
	(Ok (id, _), env) = (Ok id, env)
where
	connectionTask = wrapConnectionTask (handlers service) unitShare
	handlers req = {ConnectionHandlers| onConnect = onConnect,
		onData = onData,
		onShareChange = onShareChange,
		onDisconnect = onDisconnect}

131
	onConnect connId _ _	= (Ok (Nothing, []), Nothing, [createMessage p +++ "\n"], False)
Haye Böhm's avatar
Haye Böhm committed
132 133 134

	onData data (previous, acc) _
	# newacc = acc ++ [data]
135 136
	// If already a result, and we are registering, then we have received a refresh notification from the server.
	| register && isJust previous = (Ok (previous, newacc), Nothing, [], True)
137
	= case fromTextResponse (concat newacc) p register of
Haye Böhm's avatar
Haye Böhm committed
138
		Error e = (Error e, Nothing, [], True)
139 140 141 142 143
		// No full response yet, keep the old value.
		Ok (Nothing,response) 	= (Ok (previous, newacc), Nothing, maybe [] (\resp. [resp +++ "\n"]) response, False)
		Ok (Just r, Just resp) 	= (Ok (Just r, []), Nothing, [resp +++ "\n"], False)
		// Only close the connection when we have a value and when we are not registering.
		Ok (Just r, Nothing) 	= (Ok (Just r, []), Nothing, [], not register)
Haye Böhm's avatar
Haye Böhm committed
144 145 146

	onShareChange state _ = (Ok state, Nothing, [], False)
	onDisconnect state _ = (Ok state, Nothing)
147

148 149
queueRead :: !(SDSRemoteSource p r w) p !TaskId !Bool !SDSIdentity !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r & TC w
queueRead rsds=:(SDSRemoteSource sds {SDSShareOptions|domain, port}) p taskId register reqSDSId env
150
# (symbols, env) = case read symbolsShare EmptyContext env of
Haye Böhm's avatar
Haye Böhm committed
151
	(Ok (ReadingDone r), env) = (readSymbols r, env)
152
	_ = abort "Reading symbols failed!"
153
# (request, env) = buildRequest register env
154 155
= queueSDSRequest request domain port taskId symbols env
where
156 157
	buildRequest True env=:{options}= (SDSRegisterRequest sds p reqSDSId (sdsIdentity rsds) taskId options.sdsPort, env)
	buildRequest False env = (SDSReadRequest sds p, env)
158

159 160
queueRemoteRefresh :: ![SDSNotifyRequest] !*IWorld -> *IWorld
queueRemoteRefresh [] iworld = iworld
161
queueRemoteRefresh [notifyRequest : reqs] iworld=:{options}
162
# (symbols, iworld) = case read symbolsShare EmptyContext iworld of
Haye Böhm's avatar
Haye Böhm committed
163
	(Ok (ReadingDone r), iworld) = (readSymbols r, iworld)
164 165
# (host, port, sdsId) = case notifyRequest.remoteOptions of
	(Just {hostToNotify, portToNotify, remoteSdsId}) = (hostToNotify, portToNotify, remoteSdsId)
166
# request = reqq notifyRequest.reqTaskId sdsId
167
= case queueSDSRequest request host port SDSSERVICE_TASK_ID symbols iworld of
168
	(_, iworld) = queueRemoteRefresh reqs iworld
169
where
170 171 172
	// Hack to get it to compile. The Refresh Request alternative does not use any of the parameters.
	reqq :: !TaskId !SDSIdentity -> SDSRequest () String ()
	reqq taskId sdsId = SDSRefreshRequest taskId sdsId
173

174 175
queueWrite :: !w !(SDSRemoteSource p r w) p !TaskId !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r & TC w
queueWrite w rsds=:(SDSRemoteSource sds share=:{SDSShareOptions|domain, port}) p taskId env
176
# (symbols, env) = case read symbolsShare EmptyContext env of
Haye Böhm's avatar
Haye Böhm committed
177
	(Ok (ReadingDone r), env) = (readSymbols r, env)
178
# request = SDSWriteRequest sds p w
179
= queueWriteRequest request domain port taskId symbols env
180

181 182
queueModify :: !(r -> MaybeError TaskException w) !(SDSRemoteSource p r w) p !TaskId !*IWorld -> (!MaybeError TaskException ConnectionId, !*IWorld) | gText{|*|} p & TC p & TC r & TC w
queueModify f rsds=:(SDSRemoteSource sds share=:{SDSShareOptions|domain, port}) p taskId env
183
# (symbols, env) = case read symbolsShare EmptyContext env of
Haye Böhm's avatar
Haye Böhm committed
184
	(Ok (ReadingDone r), env) = (readSymbols r, env)
185
# request = SDSModifyRequest sds p f
Haye Böhm's avatar
Haye Böhm committed
186
= queueModifyRequest request domain port taskId symbols env
187

Haye Böhm's avatar
Haye Böhm committed
188 189
getAsyncServiceValue :: !(SDSRemoteService p r w) !TaskId !ConnectionId IOStates -> MaybeError TaskException (Maybe r) | TC r & TC w & TC p
getAsyncServiceValue service taskId connectionId ioStates
Haye Böhm's avatar
Haye Böhm committed
190
# getValue = case service of
191 192 193 194
	SDSRemoteService (HTTPShareOptions _) = getValueHttp
	SDSRemoteServiceQueued _ _ (HTTPShareOptions _) = getValueHttp
	SDSRemoteService (TCPShareOptions _) = getValueTCP
	SDSRemoteServiceQueued _ _ (TCPShareOptions _) = getValueTCP
Haye Böhm's avatar
Haye Böhm committed
195 196
=  case 'DM'.get taskId ioStates of
		Nothing                             = Error (exception "No iostate for this task")
197 198 199 200
		Just ioState                        = case ioState of
			IOException exc                   = Error (exception exc)
			IOActive connectionMap            = getValue connectionId connectionMap
			IODestroyed connectionMap         = getValue connectionId connectionMap
Haye Böhm's avatar
Haye Böhm committed
201
where
Haye Böhm's avatar
Haye Böhm committed
202
	getValueHttp connectionId connectionMap = case 'DM'.get connectionId connectionMap of
203
		Just (value :: Either [String] r^, _) = case value of
Haye Böhm's avatar
Haye Böhm committed
204 205
			(Left _)                                = Ok Nothing
			(Right val)                     		= Ok (Just val)
206 207 208 209 210 211
		Just (dyn, _)
			# message = "Dynamic not of the correct service type, got: "
				+++ toString (typeCodeOfDynamic dyn)
				+++ ", required: "
				+++ toString (typeCodeOfDynamic (dynamic service))
			= Error (exception message)
Haye Böhm's avatar
Haye Böhm committed
212 213
		Nothing                             	= Ok Nothing

Haye Böhm's avatar
Haye Böhm committed
214 215
	getValueTCP connectionId connectionMap
	= case 'DM'.get connectionId connectionMap of
216 217 218
		Just (value :: (Maybe r^, [String]), _) = case value of
				(Nothing, _)                        = Ok Nothing
				(Just r,_)                     		= Ok (Just r)
219 220 221 222 223 224
		Just (dyn, _)
			# message = "Dynamic not of the correct service type, got: "
				+++ toString (typeCodeOfDynamic dyn)
				+++ ", required: "
				+++ toString (typeCodeOfDynamic (dynamic service))
			= Error (exception message)
Haye Böhm's avatar
Haye Böhm committed
225 226
		Nothing                             	= Ok Nothing

227
getAsyncReadValue :: !(sds p r w) !TaskId !ConnectionId IOStates -> MaybeError TaskException (Maybe r) | TC r
228 229
getAsyncReadValue _ taskId connectionId ioStates
=  case 'DM'.get taskId ioStates of
230
		Nothing                             = Error (exception "No iostate for this task")
231
		(Just ioState)                      = case ioState of
232
			(IOException exc)                   = Error (exception exc)
233 234
			(IOActive connectionMap)            = getValue connectionId connectionMap
			(IODestroyed connectionMap)         = getValue connectionId connectionMap
235
where
236
	getValue connectionId connectionMap = case 'DM'.get connectionId connectionMap of
237 238 239 240
		(Just (value :: Either [String] (MaybeError TaskException r^), _)) = case value of
			(Left _)                                = Ok Nothing
			(Right (Ok val))                        = Ok (Just val)
			(Right (Error e))						= Error e
Haye Böhm's avatar
Haye Böhm committed
241
		(Just (dyn, _))							= Error (exception ("Dynamic not of the correct read type, got" +++ toString (typeCodeOfDynamic dyn)))
242 243
		Nothing                             	= Ok Nothing

244
getAsyncWriteValue :: !(sds p r w) !TaskId !ConnectionId IOStates -> MaybeError TaskException (Maybe ()) | TC w
245
getAsyncWriteValue _ taskId connectionId ioStates =  case 'DM'.get taskId ioStates of
246
		Nothing                             = Error (exception "No iostate for this task")
247
		(Just ioState)                      = case ioState of
248
			(IOException exc)                   = Error (exception exc)
249 250
			(IOActive connectionMap)            = getValue connectionId connectionMap
			(IODestroyed connectionMap)         = getValue connectionId connectionMap
251
where
252
	getValue connectionId connectionMap = case 'DM'.get connectionId connectionMap of
253
		(Just (value :: Either [String] (MaybeError TaskException ()), _)) = case value of
254 255 256 257 258 259 260
			(Left _)                                    = Ok Nothing
			(Right (Ok val))                            = Ok (Just val)
			(Right (Error e))							= Error e
		(Just (dyn, _))						= Error (exception ("Dynamic not of the correct write type, got" +++ toString (typeCodeOfDynamic dyn)))
		Nothing                             = Ok Nothing

getAsyncModifyValue :: !(sds p r w) !TaskId !ConnectionId IOStates -> MaybeError TaskException (Maybe (r,w)) | TC w & TC r
261
getAsyncModifyValue _ taskId connectionId ioStates =  case 'DM'.get taskId ioStates of
262
		Nothing                             = Error (exception "No iostate for this task")
263
		(Just ioState)                      = case ioState of
264
			(IOException exc)                   = Error (exception exc)
265 266
			(IOActive connectionMap)            = getValue connectionId connectionMap
			(IODestroyed connectionMap)         = getValue connectionId connectionMap
267
where
268 269
	getValue connectionId connectionMap
	= case 'DM'.get connectionId connectionMap of
270 271 272 273 274 275
		(Just (value :: Either [String] (MaybeError TaskException (r^, w^)), _)) = case value of
			(Left _)						= Ok Nothing
			(Right (Ok val))				= Ok (Just val)
			(Right (Error e))				= Error e
		(Just (dyn, _))					= Error (exception ("Dynamic not of the correct modify type, got " +++ toString (typeCodeOfDynamic dyn)))
		Nothing 						= Ok Nothing