SDS.icl 52.3 KB
Newer Older
1
implementation module iTasks.Internal.SDS
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
2

3
import StdString, StdTuple, StdMisc, StdBool, StdInt, StdChar, StdFunctions, StdArray
4
from StdList import flatten, map, take, drop, instance toString [a], instance length []
5 6
from Text import class Text, instance Text String
import qualified Text
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
7 8
from Data.Map import :: Map
import qualified Data.Map as DM
9 10
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text.GenJSON, Data.Foldable
from Data.Set import instance Foldable Set, instance < (Set a)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
11
import qualified Data.Set as Set
12
import iTasks.Engine
13 14
import iTasks.Internal.IWorld
import iTasks.Internal.Task, iTasks.Internal.TaskStore, iTasks.Internal.TaskEval
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
15

16 17 18 19
import iTasks.SDS.Sources.Core
import iTasks.WF.Tasks.IO
import Text.GenJSON
import iTasks.Internal.AsyncSDS
Haye Böhm's avatar
Haye Böhm committed
20 21
import iTasks.Internal.Util
from Text import instance + String
22

Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
23 24 25
createReadWriteSDS ::
	!String
	!String
26 27
	!(p *IWorld -> *(MaybeError TaskException r, *IWorld))
	!(p w *IWorld -> *(MaybeError TaskException (SDSNotifyPred p), *IWorld))
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
28
	->
29
	SDSSource p r w
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
30 31 32 33
createReadWriteSDS ns id read write
	= createSDS ns id read write

createReadOnlySDS ::
34
	!(p *IWorld -> *(r, *IWorld))
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
35
	->
36
	SDSSource p r ()
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
37
createReadOnlySDS read
38
	= createReadOnlySDSError (\p iworld -> appFst Ok (read p iworld))
39

Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
40
createReadOnlySDSError ::
41
	!(p *IWorld -> *(MaybeError TaskException r, *IWorld))
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
42
	->
43
	SDSSource p r ()
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
44
createReadOnlySDSError read
45
	= createSDS "readonly" "readonly" read (\_ _ iworld -> (Ok (const (const True)), iworld))
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
46 47 48

createSDS ::
	!String
49
	!String
50 51
	!(p *IWorld -> *(MaybeError TaskException r, *IWorld))
	!(p w *IWorld -> *(MaybeError TaskException (SDSNotifyPred p), *IWorld))
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
52
	->
53
	SDSSource p r w
54 55
createSDS ns id read write = SDSSource
	{ SDSSourceOptions
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
56
	| name = ns +++ ":" +++ id
57
	, read = read
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
58 59 60 61
	, write = write
	}

//Construct the identity of an sds
62
sdsIdentity :: !(sds p r w) -> SDSIdentity | Identifiable sds
63
sdsIdentity s = 'Text'.concat (nameSDS s [])
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
64 65

iworldNotifyPred :: !(p -> Bool) !p !*IWorld -> (!Bool,!*IWorld)
66
iworldNotifyPred npred p iworld = (npred p, iworld)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
67

68
read            :: !(sds () r w) !TaskContext !*IWorld
Haye Böhm's avatar
Haye Böhm committed
69
	-> (!MaybeError TaskException (AsyncRead r w), !*IWorld) | TC r & TC w & Readable sds
70
read sds c iworld = case readSDS sds () c iworld of
Haye Böhm's avatar
Haye Böhm committed
71 72 73
	(Error e, iworld) = (Error e, iworld)
	(Ok (ReadResult r sds), iworld) = (Ok (ReadingDone r), iworld)
	(Ok (AsyncRead sds), iworld) = (Ok (Reading sds), iworld)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
74

75
readRegister    :: !TaskId !(sds () r w) !*IWorld
Haye Böhm's avatar
Haye Böhm committed
76
	-> (!MaybeError TaskException (AsyncRead r w), !*IWorld) | TC r & TC w & Readable, Registrable sds
77
readRegister taskId sds iworld = case readRegisterSDS sds () (TaskContext taskId) taskId (sdsIdentity sds) iworld of
Haye Böhm's avatar
Haye Böhm committed
78 79 80
	(Error e, iworld) = (Error e, iworld)
	(Ok (ReadResult r sds), iworld) = (Ok (ReadingDone r), iworld)
	(Ok (AsyncRead sds), iworld) = (Ok (Reading sds), iworld)
81

82
mbRegister :: !p (sds p r w) !(Maybe (!TaskId, !SDSIdentity)) !TaskContext !*IWorld -> *IWorld | gText{|*|} p & TC p & Identifiable sds
83
// When a remote requests a register, we do not have a local task id rather a remote task context which we use to record the request.
84 85
mbRegister _ _ Nothing _ iworld = iworld
mbRegister p sds (Just (taskId, reqSDSId)) context iworld=:{IWorld|sdsNotifyRequests, sdsNotifyReqsByTask, world}
86 87 88 89
	# (ts, world) = nsTime world
	# req = buildRequest context taskId reqSDSId p
	# sdsId = sdsIdentity sds
	= { iworld
90
	  & world = world
91
	  , sdsNotifyRequests = 'DM'.alter (Just o maybe ('DM'.singleton req ts) ('DM'.put req ts))
92 93 94 95 96
									   sdsId
									   sdsNotifyRequests
	  , sdsNotifyReqsByTask = case context of
			// We do not store remote requests in the tasks map, the task ID's are not local to this instance.
			(RemoteTaskContext _ _ _ _ _)   = sdsNotifyReqsByTask
97
			_ 								= ('DM'.alter (Just o maybe ('Set'.singleton sdsId) ('Set'.insert sdsId)) taskId sdsNotifyReqsByTask)
98
	  }
99
where
100 101 102 103 104 105 106 107 108 109 110 111 112
	buildRequest (RemoteTaskContext reqTaskId currTaskId remoteSDSId host port) _ reqSDSId p
		= buildRequest` reqTaskId reqSDSId p (Just {hostToNotify=host, portToNotify=port, remoteSdsId=remoteSDSId})
	buildRequest (TaskContext taskId) _ reqSDSId p
		= buildRequest` taskId reqSDSId p Nothing
	buildRequest EmptyContext taskId reqSDSId p
		= buildRequest` taskId reqSDSId p Nothing

	buildRequest` taskId reqSDSId p mbRemoteOptions =
		{ reqTaskId=taskId
		, reqSDSId=reqSDSId
		, cmpParam=dynamic p
		, cmpParamText=toSingleLineText p
		, remoteOptions = mbRemoteOptions}
Haye Böhm's avatar
Haye Böhm committed
113

114
write :: !w !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException (AsyncWrite r w), !*IWorld) | TC r & TC w & Writeable sds
Haye Böhm's avatar
Haye Böhm committed
115 116 117 118
write w sds c iworld
= case writeSDS sds () c w iworld of
		(Ok (WriteResult notify _), iworld)
		= (Ok WritingDone, queueNotifyEvents (sdsIdentity sds) notify iworld)
119 120
		(Ok (AsyncWrite sds), iworld) = (Ok (Writing sds), iworld)
		(Error e,iworld)    = (Error e,iworld)
Bas Lijnse's avatar
Bas Lijnse committed
121

Haye Böhm's avatar
Haye Böhm committed
122 123
directResult :: (AsyncRead r w) -> r
directResult (ReadingDone r) = r
124 125
directResult _ = abort "No direct result!"

Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
126 127
//Check the registrations and find the set of id's for which the current predicate holds
//and for which id's it doesn't
128 129 130
checkRegistrations :: !SDSIdentity !(SDSNotifyPred p) !*IWorld
                   -> (!Set (!TaskId, !Maybe RemoteNotifyOptions), !Set (!TaskId, !Maybe RemoteNotifyOptions), !*IWorld)
                    | TC p
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
131 132 133
checkRegistrations sdsId pred iworld
	# (registrations, iworld) 	= lookupRegistrations sdsId iworld
	# (match,nomatch) 			= matchRegistrations pred registrations
134
	= (match,nomatch,iworld)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
135 136
where
	//Find all notify requests for the given share id
137
	lookupRegistrations :: String !*IWorld -> (![(SDSNotifyRequest, Timespec)], !*IWorld)
138
	lookupRegistrations sdsId iworld=:{sdsNotifyRequests} =
139
		('DM'.toList $ 'DM'.findWithDefault 'DM'.newMap sdsId sdsNotifyRequests, iworld)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
140 141 142 143

	//Match the notify requests against the predicate to determine two sets:
	//The registrations that matched the predicate, and those that did not match the predicate
	matchRegistrations pred [] = ('Set'.newSet,'Set'.newSet)
144
	matchRegistrations pred [(req=:{SDSNotifyRequest|cmpParam}, reqTimespec):regs]
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
145
		# (match,nomatch) = matchRegistrations pred regs
146 147
		= case cmpParam of
			(p :: p^) = if (pred reqTimespec p)
148 149
						   ('Set'.insert (req.reqTaskId, req.remoteOptions) match,nomatch)
						   (match, 'Set'.insert (req.reqTaskId, req.remoteOptions) nomatch)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
150
			//In case of a type mismatch, just ignore (should not happen)
151
			_                        = abort "Not matching!"
152

Haye Böhm's avatar
Haye Böhm committed
153
modify :: !(r -> w)          !(sds () r w) !TaskContext !*IWorld -> (!MaybeError TaskException (AsyncModify r w), !*IWorld) | TC r & TC w & Modifiable sds
154
modify f sds context iworld
Haye Böhm's avatar
Haye Böhm committed
155 156
= case modifySDS (\r. Ok (f r)) sds () context iworld of
	(Error e, iworld) 					= (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
157
	(Ok (AsyncModify sds _), iworld) 		= (Ok (Modifying sds f), iworld)
Haye Böhm's avatar
Haye Böhm committed
158 159 160
	(Ok (ModifyResult notify r w _), iworld)
	# iworld = queueNotifyEvents (sdsIdentity sds) notify iworld
	= (Ok (ModifyingDone w), iworld)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
161

162
queueNotifyEvents :: !String !(Set (!TaskId, !Maybe RemoteNotifyOptions)) !*IWorld -> *IWorld
Bas Lijnse's avatar
Bas Lijnse committed
163
queueNotifyEvents sdsId notify iworld
164 165 166 167
    # remotes = [(reqTaskId, remoteOptions) \\ (reqTaskId, Just remoteOptions) <- 'Set'.toList notify]
    # locals = [reqTaskId \\ (reqTaskId, Nothing) <- 'Set'.toList notify]
    # iworld = queueRefresh [(reqTaskId,"Notification for write of " +++ sdsId) \\ reqTaskId <- locals] iworld
    = queueRemoteRefresh remotes iworld
168

169
clearTaskSDSRegistrations :: !(Set TaskId) !*IWorld -> *IWorld
170 171
clearTaskSDSRegistrations taskIds iworld=:{IWorld|sdsNotifyRequests, sdsNotifyReqsByTask}
	# sdsIdsToClear = foldl
172 173 174
		(\sdsIdsToClear taskId -> 'Set'.union ('DM'.findWithDefault 'Set'.newSet taskId sdsNotifyReqsByTask) sdsIdsToClear)
		'Set'.newSet
		taskIds
175 176 177 178
	= { iworld
	  & sdsNotifyRequests   = foldl clearRegistrationRequests sdsNotifyRequests sdsIdsToClear
	  , sdsNotifyReqsByTask = foldl (flip 'DM'.del) sdsNotifyReqsByTask taskIds
	  }
179
where
180
	clearRegistrationRequests :: (Map SDSIdentity (Map SDSNotifyRequest Timespec))
181 182
								 SDSIdentity
							  -> Map SDSIdentity (Map SDSNotifyRequest Timespec)
183 184 185
	clearRegistrationRequests requests sdsId
		| 'DM'.null filteredReqsForSdsId = 'DM'.del sdsId requests
		| otherwise                      = 'DM'.put sdsId filteredReqsForSdsId requests
186
	where
187
		reqsForSdsId         = fromJust $ 'DM'.get sdsId requests
188
		filteredReqsForSdsId = 'DM'.filterWithKey (\req _ -> not $ 'Set'.member req.reqTaskId taskIds) reqsForSdsId
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
189 190

listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
191
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList ('DM'.foldrWithKey addRegs 'DM'.newMap sdsNotifyRequests),iworld)
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
192
where
193
	addRegs cmpSDSId reqs list = 'DM'.foldlWithKey addReg list reqs
194
	where
195
		addReg list {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _)} _
196
			= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
197

Haye Böhm's avatar
Haye Böhm committed
198 199 200
formatSDSRegistrationsList :: [SDSNotifyRequest] -> String
formatSDSRegistrationsList list = 'Text'.join "\n" lines
where
Haye Böhm's avatar
Haye Böhm committed
201
	lines = [ "Task id " +++ toString reqTaskId +++ ": " +++ reqSDSId +++ " (" +++ cmpParamText +++ ")" \\ {reqTaskId, reqSDSId, cmpParamText} <- list]
Haye Böhm's avatar
Haye Böhm committed
202 203 204 205 206 207 208

formatRegistrations :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatRegistrations list = 'Text'.join "\n" lines
where
	lines = [toString instanceNo +++ " -> " +++
				('Text'.join "\n\t" [toString tId +++ ":" +++ sdsId \\ (tId, sdsId) <- requests])
			\\ (instanceNo, requests) <- list]
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
209

Bas Lijnse's avatar
Bas Lijnse committed
210 211 212 213
flushDeferredSDSWrites :: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
flushDeferredSDSWrites iworld=:{writeCache}
	# (errors,iworld) = flushAll ('DM'.toList writeCache) iworld
	| errors =: [] = (Ok (), {iworld & writeCache = 'DM'.newMap})
214
	# msg = 'Text'.join OS_NEWLINE ["Could not flush all deferred SDS writes, some data may be lost":map snd errors]
215
	= (Error (exception msg),{iworld & writeCache = 'DM'.newMap})
Bas Lijnse's avatar
Bas Lijnse committed
216 217 218
where
	flushAll [] iworld = ([],iworld)
	flushAll [(_,(_,DeferredWrite p w sds)):rest] iworld
219
		= case writeSDS sds p EmptyContext w iworld of
220
			(Ok (WriteResult notify _),iworld)
Bas Lijnse's avatar
Bas Lijnse committed
221 222 223 224 225 226
				# iworld = queueNotifyEvents (sdsIdentity sds) notify iworld
				= flushAll rest iworld
			(Error e,iworld)
				# (errors,iworld) = flushAll rest iworld
				= ([e:errors],iworld)

227 228
instance Identifiable SDSSource
where
229 230
	nameSDS (SDSSource {SDSSourceOptions|name}) acc = ["$", name, "$":acc]
	nameSDS (SDSValue done mr sds) acc = nameSDS sds acc
231 232 233

instance Readable SDSSource
where
234
	readSDS sds p c iworld = readSDSSource sds p c Nothing iworld
Jurriën Stutterheim's avatar
CRLF  
Jurriën Stutterheim committed
235

236
instance Writeable SDSSource
237
where
238 239 240 241 242 243
	writeSDS sds=:(SDSSource {SDSSourceOptions|write,name}) p _ w iworld
	= case write p w iworld of
		(Error e, iworld)   = (Error e, iworld)
		(Ok npred, iworld)
			# (match,nomatch, iworld) = checkRegistrations (sdsIdentity sds) npred iworld
			= (Ok (WriteResult match sds), iworld)
244

245 246 247 248
	writeSDS (SDSValue False val sds) p c w iworld = case writeSDS sds p c w iworld of
		(Error e, iworld)                   = (Error e, iworld)
		(Ok (AsyncWrite ssds), iworld)      = (Ok (AsyncWrite (SDSValue False val ssds)), iworld)
		(Ok (WriteResult r ssds), iworld)   = (Ok (WriteResult r (SDSValue True val ssds)), iworld)
249

250
	writeSDS (SDSValue True val sds) p c w iworld = (Ok (WriteResult 'Set'.newSet sds), iworld)
251

252
instance Modifiable SDSSource where
253
	modifySDS f sds=:(SDSSource {SDSSourceOptions|name}) p context iworld
254
	= case readSDS sds p context iworld of
255 256 257 258 259
		(Error e, iworld)               = (Error e, iworld)
		(Ok (ReadResult r ssds), iworld)     =  case f r of
			Error e                         = (Error e, iworld)
			Ok w                            = case writeSDS ssds p context w iworld of
				(Error e, iworld)               = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
260
				(Ok (WriteResult n ssds), iworld)    = (Ok (ModifyResult n r w ssds), iworld)
261 262 263

	modifySDS f (SDSValue False v sds) p c iworld = case modifySDS f sds p c iworld of
		(Error e, iworld) = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
264
		(Ok (ModifyResult notify r w ssds), iworld) = (Ok (ModifyResult notify r w (SDSValue False v ssds)), iworld)
265 266 267
		(Ok (AsyncModify ssds f), iworld) = (Ok (AsyncModify (SDSValue True v ssds) f), iworld)

	modifySDS f (SDSValue True r sds) p c iworld = case f r of
Haye Böhm's avatar
Haye Böhm committed
268 269
		Error e = (Error e, iworld)
		Ok w = (Ok (ModifyResult 'Set'.newSet r w (SDSValue True r sds)), iworld)
270

271 272
instance Registrable SDSSource
where
273
	readRegisterSDS sds p c taskId reqSDSId iworld = readSDSSource sds p c (Just (taskId, reqSDSId)) iworld
274

275
readSDSSource :: !(SDSSource p r w) !p !TaskContext !(Maybe (!TaskId, !SDSIdentity)) !*IWorld
276
              -> *(!MaybeError TaskException (ReadResult p r w), !*IWorld) | gText{|*|} p & TC p & TC r & TC w
277 278
readSDSSource sds=:(SDSSource {SDSSourceOptions|read,name}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
279 280 281 282
	= case read p iworld of
		(Error e, iworld) = (Error e, iworld)
		(Ok r, iworld) = (Ok (ReadResult r sds), iworld)

283
readSDSSource sds=:(SDSValue done v _) _ _ _ iworld = (Ok (ReadResult v sds), iworld)
284 285 286

instance Identifiable SDSLens
where
287
	nameSDS (SDSLens sds {SDSLensOptions|name}) acc = nameSDS sds ["/[", name, "]":acc]
288 289 290

instance Readable SDSLens
where
291
	readSDS sds p c iworld = readSDSLens sds p c Nothing iworld
292

293
instance Writeable SDSLens
294
where
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
	writeSDS sds=:(SDSLens sds1 opts=:{SDSLensOptions|param,write,notify,name}) p c w iworld
	# ps = param p
	= case (write,notify) of
		//Special case: we don't need to read the base SDS
		(SDSWriteConst writef,SDSNotifyConst notifyf)
			//Check which registrations the current parameter matches
			# (match,nomatch,iworld) = checkRegistrations (sdsIdentity sds) (notifyf p w) iworld
			= case writef p w of
				Error e = (Error e, iworld)
				Ok Nothing
					//We need to decide based on the current parameter if we need to notify or not
					= (Ok (WriteResult match sds), iworld)
				Ok (Just ws) = case writeSDS sds1 ps c ws iworld of
					(Error e, iworld) = (Error e, iworld)
					(Ok (AsyncWrite sds), iworld) = (Ok (AsyncWrite (SDSLens sds opts)), iworld)
					(Ok (WriteResult notify ssds), iworld)
						//Remove the registrations that we can eliminate based on the current parameter
						# notify = 'Set'.difference notify ('Set'.difference nomatch match)
						= (Ok (WriteResult notify (SDSLens ssds opts)), iworld)
		//General case: read base SDS before writing
315
		_ = case readSDS sds1 ps c iworld of
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
				(Error e, iworld) = (Error e, iworld)
				(Ok (AsyncRead sds), iworld) = (Ok (AsyncWrite (SDSLens sds opts)), iworld)
				(Ok (ReadResult rs ssds), iworld)
					# ws = case write of
						SDSWrite writef = writef p rs w
						SDSWriteConst writef = writef p w
					# notifyf = case notify of
						SDSNotify notifyf = notifyf p rs w
						SDSNotifyConst notifyf = notifyf p w
					//Check which registrations the current parameter matches
					# (match,nomatch,iworld) = checkRegistrations (sdsIdentity sds) notifyf iworld
					= case ws of
						Error e         = (Error e, iworld)
						Ok Nothing      = (Ok (WriteResult match (SDSLens ssds opts)), iworld)
						Ok (Just ws) = case writeSDS ssds ps c ws iworld of
							(Error e, iworld) = (Error e, iworld)
							(Ok (AsyncWrite sds), iworld) = (Ok (AsyncWrite (SDSLens sds opts)), iworld)
							(Ok (WriteResult notify ssds), iworld)
								//Remove the registrations that we can eliminate based on the current parameter
335
								# notify = 'Set'.difference notify ('Set'.difference nomatch match)
336
								= (Ok (WriteResult notify (SDSLens ssds opts)), iworld)
337 338

instance Modifiable SDSLens where
339 340
	modifySDS f sds=:(SDSLens sds1 opts=:{SDSLensOptions|param, read, write, reducer, notify, name}) p context iworld
	= case reducer of
341
		Nothing = case readSDS sds p context iworld of
342 343 344 345 346 347 348
			(Error e, iworld)               = (Error e, iworld)
			(Ok (AsyncRead sds), iworld)    = (Ok (AsyncModify sds f), iworld)
			(Ok (ReadResult r ssds), iworld)    = case f r of
				Error e                             = (Error e, iworld)
				Ok w                                = case writeSDS ssds p context w iworld of
					(Error e, iworld)                        = (Error e, iworld)
					(Ok (AsyncWrite sds), iworld)            = (Ok (AsyncModify sds f), iworld)
Haye Böhm's avatar
Haye Böhm committed
349 350
					(Ok (WriteResult notify ssds), iworld)
					= (Ok (ModifyResult notify r w ssds), iworld)
351 352 353 354

		Just reducer = case modifySDS sf sds1 (param p) context iworld of
			(Error e, iworld)                           = (Error e, iworld)
			(Ok (AsyncModify sds _), iworld)            = (Ok (AsyncModify (SDSLens sds opts) f), iworld)
Haye Böhm's avatar
Haye Böhm committed
355
			(Ok (ModifyResult toNotify rs ws ssds), iworld) = case reducer p ws of
356 357 358 359 360 361 362
				Error e                                     = (Error e, iworld)
				Ok w
				# notf = case notify of
					SDSNotify f         = f p rs w
					SDSNotifyConst f    = f p w
				= case doRead read p rs of
					Error e = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
363 364
					Ok r
					# (match, nomatch, iworld) = checkRegistrations (sdsIdentity sds) notf iworld
365
					# notify = 'Set'.difference toNotify ('Set'.difference nomatch match)
Haye Böhm's avatar
Haye Böhm committed
366
					= (Ok (ModifyResult notify r w (SDSLens ssds opts)), iworld)
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
		where
			sf rs
			# readV = doRead read p rs
			= case readV of
				(Error e) = (Error e)
				(Ok r) = case f r of
					(Error e) = (Error e)
					(Ok w) = case doWrite write p rs w of
						Error e = Error e
						Ok (Just ws) = Ok ws
						_ = abort "Contact not satisfied: write yields Nothing while there is a reducer"

			doRead readf p rs = case readf of
				(SDSRead rf) = rf p rs
				(SDSReadConst rf) = Ok (rf p)

			doWrite writef p rs w = case writef of
				(SDSWrite wf) = wf p rs w
				(SDSWriteConst wf) = wf p w
386

387 388
instance Registrable SDSLens
where
389
	readRegisterSDS sds p c taskId reqSDSId iworld = readSDSLens sds p c (Just (taskId, reqSDSId)) iworld
390

391
readSDSLens :: !(SDSLens p r w) !p !TaskContext !(Maybe (!TaskId, !SDSIdentity)) !*IWorld
392
            -> *(!MaybeError TaskException (ReadResult p r w), !*IWorld) | gText{|*|} p & TC p & TC r & TC w
393 394
readSDSLens sds=:(SDSLens sds1 opts=:{SDSLensOptions|param,read}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
395
	= case read of
396
		SDSRead f = case readAndMbRegisterSDS sds1 (param p) c mbNotify iworld of
397 398 399 400 401 402
			(Error e, iworld)  = (Error e, iworld)
			(Ok (ReadResult r ssds), iworld)     = case f p r of
				Error e = (Error e, iworld)
				Ok r = (Ok (ReadResult r (SDSLens ssds opts)), iworld)
			(Ok (AsyncRead sds), iworld) = (Ok (AsyncRead (SDSLens sds opts)), iworld)
		SDSReadConst f = (Ok (ReadResult (f p) sds), iworld)
403 404 405

// SDSCache
instance Identifiable SDSCache where
406
	nameSDS (SDSCache sds _) acc = ["$": nameSDS sds ["$":acc]]
407 408

instance Readable SDSCache where
409
	readSDS sds p c iworld = readSDSCache sds p c Nothing iworld
410 411

instance Writeable SDSCache where
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
	writeSDS sds=:(SDSCache sds1 opts=:{SDSCacheOptions|write}) p c w iworld=:{IWorld|readCache,writeCache}
	# key = (sdsIdentity sds, toSingleLineText p)
	//Check cache
	# mbr = case 'DM'.get key readCache of
		Just (val :: r^) = Just val
		_                = Nothing
	# mbw = case 'DM'.get key writeCache of
		Just (val :: w^,_) = Just val
		_                  = Nothing
	//Determine what to do
	# (mbr,policy) = write p mbr mbw w
	//Update read cache
	# readCache = case mbr of
		Just r = 'DM'.put key (dynamic r :: r^) readCache
		Nothing  = 'DM'.del key readCache
	= case policy of
428
		NoWrite = (Ok (WriteResult 'Set'.newSet sds), {iworld & readCache = readCache})
429 430
		WriteNow = case writeSDS sds1 p c w {iworld & readCache = readCache} of
			(Error e, iworld) = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
431
			(Ok (WriteResult r ssds), iworld) = (Ok (WriteResult r sds), iworld)
432
		WriteDelayed
433
			//FIXME: Even though write is delayed, the notification should still happen
434 435
			# writeCache = 'DM'.put key (dynamic w :: w^, DeferredWrite p w sds1) writeCache
			= (Ok (WriteResult 'Set'.newSet sds), {iworld & readCache = readCache, writeCache = writeCache})
436 437

instance Modifiable SDSCache where
438
	modifySDS f sds=:(SDSCache _ opts) p context iworld
439
	= case readSDS sds p context iworld of
440 441 442 443 444 445
		(Error e, iworld)               = (Error e, iworld)
		(Ok (AsyncRead sds), iworld)    = (Ok (AsyncModify sds f), iworld)
		(Ok (ReadResult r ssds), iworld)     = case f r of
			(Error e)   = (Error e, iworld)
			(Ok w)      = case writeSDS ssds p context w iworld of
				(Error e, iworld)   = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
446
				(Ok (WriteResult notify ssds), iworld) = (Ok (ModifyResult notify r w sds), iworld)
447 448

instance Registrable SDSCache where
449
	readRegisterSDS sds p c taskId reqSDSId iworld = readSDSCache sds p c (Just (taskId, reqSDSId)) iworld
450

451
readSDSCache :: !(SDSCache p r w) !p !TaskContext !(Maybe (!TaskId, !SDSIdentity)) !*IWorld
452
             -> *(!MaybeError TaskException (ReadResult p r w), !*IWorld) | gText{|*|} p & TC p & TC r & TC w
453 454
readSDSCache sds=:(SDSCache sds1 opts) p c mbNotify iworld=:{readCache}
	# iworld = mbRegister p sds mbNotify c iworld
455 456 457 458
	# key = (sdsIdentity sds,toSingleLineText p)
	//First check cache
	= case 'DM'.get key readCache of
		Just (val :: r^)
459
			# iworld = mbRegister p sds1 mbNotify c iworld
460 461
			= (Ok (ReadResult val sds),iworld)
		Just _           = (Error (exception "Cached value of wrong type"), iworld)
462
		Nothing = case readAndMbRegisterSDS sds1 p c mbNotify iworld of
463 464 465
			(Error e,iworld) = (Error e, iworld)
			//Read and add to cache
			(Ok (ReadResult val ssds),iworld)  = (Ok (ReadResult val sds), {iworld & readCache = 'DM'.put key (dynamic val :: r^) iworld.readCache})
466 467 468

// SDSSequence
instance Identifiable SDSSequence where
469
	nameSDS (SDSSequence sds1 sds2 {SDSSequenceOptions|name}) acc = ["<",name:nameSDS sds1 [",":nameSDS sds2 [">":acc]]]
470 471

instance Readable SDSSequence where
472
	readSDS sds p c iworld = readSDSSequence sds p c Nothing iworld
473

474
instance Writeable SDSSequence where
475
	writeSDS sds=:(SDSSequence sds1 sds2 opts=:{SDSSequenceOptions|paraml,paramr,writel,writer,name}) p c w iworld=:{IWorld|readCache,writeCache}
476
	= case readSDS sds1 (paraml p) c iworld of
477 478 479 480 481 482 483
		(Error e, iworld)  = (Error e, iworld)
		(Ok (AsyncRead asds), iworld)  = (Ok (AsyncWrite (SDSSequence asds sds2 opts)), iworld)
		(Ok (ReadResult r1 ssds), iworld)
			//Write sds1 if necessary
			# (npreds1,iworld) = case writel of
				(SDSWrite f)  = case f p r1 w of
					Error e             = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
484 485
					Ok Nothing   		= (Ok (WriteResult 'Set'.newSet ssds), iworld)
					Ok (Just w1)     	= writeSDS ssds (paraml p) c w1 iworld
486 487
				(SDSWriteConst f) = case f p w of
					Error e             = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
488 489
					Ok Nothing   		= (Ok (WriteResult 'Set'.newSet ssds), iworld)
					Ok (Just w1)     	= writeSDS ssds (paraml p) c w1 iworld
490 491 492
			| npreds1 =:(Error _) = (liftError npreds1, iworld)
			//Read/write sds2 if necessary
			# (npreds2,iworld) = case writer of
493
				(SDSWrite f)                    = case readSDS sds2 (paramr p r1) c iworld of //Also read sds2
494 495
					(Error e, iworld)               = (Error e, iworld)
					(Ok (ReadResult r2 ssds),iworld)     = case f p r2 w of
Haye Böhm's avatar
Haye Böhm committed
496 497 498
						Error e                         	= (Error e, iworld)
						Ok Nothing               			= (Ok (WriteResult 'Set'.newSet ssds), iworld)
						Ok (Just w2)                 		= writeSDS sds2 (paramr p r1) c w2 iworld
499 500
				(SDSWriteConst f)               = case f p w of
					Error e                         = (Error e, iworld)
Haye Böhm's avatar
Haye Böhm committed
501 502
					Ok Nothing               		= (Ok (WriteResult 'Set'.newSet sds2), iworld)
					Ok (Just w2)                 	= writeSDS sds2 (paramr p r1) c w2 iworld
503 504 505 506
			| npreds2 =:(Error _) = (liftError npreds2, iworld)
			= case (npreds1, npreds2) of
				(Ok (WriteResult notify1 ssds1), Ok (WriteResult notify2 ssds2))        = (Ok (WriteResult ('Set'.union notify1 notify2) (SDSSequence ssds1 ssds2 opts)), iworld)
				(Ok (WriteResult notify1 ssds1), Ok (AsyncWrite sds2))            = (Ok (AsyncWrite (SDSSequence ssds sds2 opts)), queueNotifyEvents (sdsIdentity sds1) notify1 iworld)
507 508

instance Modifiable SDSSequence where
Haye Böhm's avatar
Haye Böhm committed
509
	modifySDS f sds p context iworld
510
	= case readSDS sds p context iworld of
511 512 513 514 515 516 517
		(Error e, iworld)               = (Error e, iworld)
		(Ok (AsyncRead sds), iworld)    = (Error (exception "SDSSequence cannot be modified asynchronously in the left SDS."), iworld)
		(Ok (ReadResult r ssds), iworld)     = case f r of
			Error e                         = (Error e, iworld)
			Ok w                            = case writeSDS sds p context w iworld of
				(Error e, iworld)                   = (Error e, iworld)
				(Ok (AsyncWrite _), iworld)         = (Error (exception "SDSSequence cannot be modified asynchronously"), iworld)
Haye Böhm's avatar
Haye Böhm committed
518
				(Ok (WriteResult notify ssds), iworld)   = (Ok (ModifyResult notify r w sds), iworld)
519 520

instance Registrable SDSSequence where
521
	readRegisterSDS sds p c taskId reqSDSId iworld = readSDSSequence sds p c (Just (taskId, reqSDSId)) iworld
522

523
readSDSSequence :: !(SDSSequence p r w) !p !TaskContext !(Maybe (!TaskId, !SDSIdentity)) !*IWorld
524
                -> *(!MaybeError TaskException (ReadResult p r w), !*IWorld) | gText{|*|} p & TC p & TC r & TC w
525 526 527
readSDSSequence sds=:(SDSSequence sds1 sds2 opts=:{SDSSequenceOptions|paraml,paramr,read,name}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
	= case readAndMbRegisterSDS sds1 (paraml p) c mbNotify iworld of
528 529 530 531
		(Error e, iworld) = (Error e, iworld)
		(Ok (AsyncRead sds), iworld) = (Ok (AsyncRead (SDSSequence sds sds2 opts)), iworld)
		(Ok (ReadResult r1 ssds1), iworld) = case read p r1 of
			Left r = (Ok (ReadResult r (SDSSequence ssds1 sds2 opts)), iworld)
532
			Right read2 = case readAndMbRegisterSDS sds2 (paramr p r1) c mbNotify iworld of
533 534 535
				(Error e, iworld) = (Error e, iworld)
				(Ok (ReadResult r2 ssds2), iworld) = (Ok (ReadResult (read2 (r1,r2)) (SDSSequence ssds1 ssds2 opts)), iworld)
				(Ok (AsyncRead sds2), iworld) = (Ok (AsyncRead (SDSSequence ssds1 sds2 opts)), iworld)
536 537 538

// SDSSelect
instance Identifiable SDSSelect where
539
	nameSDS (SDSSelect sds1 sds2 {SDSSelectOptions|name}) acc = ["{", name:nameSDS sds1 [",":nameSDS sds2 ["}":acc]]]
540 541

instance Readable SDSSelect where
542
	readSDS sds p c iworld = readSDSSelect sds p c Nothing iworld
543 544

instance Writeable SDSSelect where
545 546 547
	writeSDS sds=:(SDSSelect sds1 sds2 opts=:{SDSSelectOptions|select,notifyl,notifyr,name}) p c w iworld=:{IWorld|readCache,writeCache}
	= case select p of
		Left p1 = case notifyl of
548
			(SDSNotify f)  = case readSDS sds1 p1 c iworld of
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
				(Error e, iworld)  = (Error e, iworld)
				(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite (SDSSelect ssds sds2 opts)), iworld)
				(Ok (ReadResult r1 ssds), iworld)    = case writeSDS ssds p1 c w iworld of
					(Error e, iworld) = (Error e, iworld)
					(Ok (AsyncWrite ssds), iworld) = (Ok (AsyncWrite (SDSSelect ssds sds2 opts)), iworld)
					(Ok (WriteResult notify ssds), iworld)
						# npred = (\ts pq -> case select pq of Right p2 = f p1 r1 w ts p2; _ = False)
						# (match,nomatch,iworld) = checkRegistrations (sdsIdentity sds) npred iworld
						//Add the matching registrations for the 'other' SDS
						# notify = 'Set'.union notify match
						= (Ok (WriteResult notify (SDSSelect ssds sds2 opts)), iworld)
			(SDSNotifyConst f) = case writeSDS sds1 p1 c w iworld of
				(Error e, iworld) = (Error e, iworld)
				(Ok (AsyncWrite ssds), iworld) = (Ok (AsyncWrite (SDSSelect ssds sds2 opts)), iworld)
				(Ok (WriteResult notify ssds), iworld)
					# npred = (\ts pq -> case select pq of Right p2 = f p1 w ts p2; _ = False)
					# (match,nomatch,iworld) = checkRegistrations (sdsIdentity sds) npred iworld
					# notify = 'Set'.union notify match
					= (Ok (WriteResult notify (SDSSelect ssds sds2 opts)), iworld)
		Right p2 = case notifyr of
569
			(SDSNotify f) = case readSDS sds2 p2 c iworld of
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
				(Error e, iworld)  = (Error e, iworld)
				(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite (SDSSelect sds1 ssds opts)), iworld)
				(Ok (ReadResult r2 ssds), iworld)    = case writeSDS ssds p2 c w iworld of
					(Error e, iworld) = (Error e,iworld)
					(Ok (AsyncWrite ssds), iworld) = (Ok (AsyncWrite (SDSSelect sds1 ssds opts)), iworld)
					(Ok (WriteResult notify ssds), iworld)
						# npred = (\ts pq -> case select pq of Left p1 = f p2 r2 w ts p1 ; _ = False)
						# (match,nomatch,iworld) = checkRegistrations (sdsIdentity sds) npred iworld
						//Add the matching registrations for the 'other' SDS
						# notify = 'Set'.union notify match
						= (Ok (WriteResult notify (SDSSelect sds1 ssds opts)), iworld)
			(SDSNotifyConst f) = case writeSDS sds2 p2 c w iworld of
				(Error e, iworld) = (Error e,iworld)
				(Ok (AsyncWrite ssds), iworld) = (Ok (AsyncWrite (SDSSelect sds1 ssds opts)), iworld)
				(Ok (WriteResult notify ssds), iworld)
					# npred = (\ts pq -> case select pq of Left p1 = f p2 w ts p1 ; _ = False)
					# (match,nomatch,iworld) = checkRegistrations (sdsIdentity sds) npred iworld
					//Add the matching registrations for the 'other' SDS
					# notify = 'Set'.union notify match
					= (Ok (WriteResult notify (SDSSelect sds1 ssds opts)), iworld)
590 591

instance Modifiable SDSSelect where
592 593 594
	modifySDS f sds=:(SDSSelect sds1 sds2 opts=:{select}) p context iworld
	= case select p of
		(Left p1)       = case modifySDS f sds1 p1 context iworld of
Haye Böhm's avatar
Haye Böhm committed
595 596 597
			(Error e, iworld)                   		= (Error e, iworld)
			(Ok (AsyncModify sds f), iworld)    		= (Ok (AsyncModify (SDSSelect sds sds2 opts) f), iworld)
			// TODO: Use applicable notify function.
Haye Böhm's avatar
Haye Böhm committed
598
			(Ok (ModifyResult notify r w ssds), iworld) = (Ok (ModifyResult notify r w (SDSSelect ssds sds2 opts)), iworld)
599
		(Right p2)      = case modifySDS f sds2 p2 context iworld of
Haye Böhm's avatar
Haye Böhm committed
600 601 602
			(Error e, iworld)                  			= (Error e, iworld)
			(Ok (AsyncModify sds f), iworld)    		= (Ok (AsyncModify (SDSSelect sds1 sds opts) f), iworld)
			// TODO: Use applicable notify function.
Haye Böhm's avatar
Haye Böhm committed
603
			(Ok (ModifyResult notify r w ssds), iworld) = (Ok (ModifyResult notify r w (SDSSelect sds1 ssds opts)), iworld)
604 605

instance Registrable SDSSelect where
606
	readRegisterSDS sds p c taskId reqSDSId iworld = readSDSSelect sds p c (Just (taskId, reqSDSId)) iworld
607

608
readSDSSelect :: !(SDSSelect p r w) !p !TaskContext !(Maybe (!TaskId, !SDSIdentity)) !*IWorld
609
              -> *(!MaybeError TaskException (ReadResult p r w), !*IWorld) | gText{|*|} p & TC p & TC r & TC w
610 611
readSDSSelect sds=:(SDSSelect sds1 sds2 opts=:{SDSSelectOptions|select,name}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
612
	= case select p of
613
		Left p1     = case readAndMbRegisterSDS sds1 p1 c mbNotify iworld of
614 615 616
			(Error e, iworld)                   = (Error e, iworld)
			(Ok (ReadResult r ssds), iworld)    = (Ok (ReadResult r (SDSSelect ssds sds2 opts)), iworld)
			(Ok (AsyncRead sds), iworld)        = (Ok (AsyncRead (SDSSelect sds sds2 opts)), iworld)
617
		Right p2    = case readAndMbRegisterSDS sds2 p2 c mbNotify iworld of
618 619 620
			(Error e, iworld)                   = (Error e, iworld)
			(Ok (ReadResult r ssds), iworld)    = (Ok (ReadResult r (SDSSelect sds1 ssds opts)), iworld)
			(Ok (AsyncRead sds), iworld)        = (Ok (AsyncRead (SDSSelect sds1 sds opts)), iworld)
621 622 623

// SDSParallel
instance Identifiable SDSParallel where
624 625 626 627 628 629 630
	nameSDS sds acc = case sds of
        SDSParallel           sds1 sds2 opts = parallelName sds1 sds2 opts
        SDSParallelWriteLeft  sds1 sds2 opts = parallelName sds1 sds2 opts
        SDSParallelWriteRight sds1 sds2 opts = parallelName sds1 sds2 opts
        SDSParallelWriteNone  sds1 sds2 opts = parallelName sds1 sds2 opts
    where
        parallelName sds1 sds2 opts = ["|",opts.SDSParallelOptions.name:nameSDS sds1 [",":nameSDS sds2 ["|":acc]]]
631 632

instance Readable SDSParallel where
633
	readSDS sds p c iworld = readSDSParallel sds p c Nothing iworld
634

635
instance Writeable SDSParallel where
636 637 638 639
	writeSDS sds=:(SDSParallel sds1 sds2 opts=:{SDSParallelOptions|param,writel,writer,name}) p c w iworld
	# (p1,p2) = param p
	//Read/write sds1
	# (npreds1,iworld) = case writel of
640
		(SDSWrite f) = case readSDS sds1 p1 c iworld of
641 642 643 644 645 646 647 648 649 650 651 652 653
			(Error e, iworld)  = (Error e, iworld)
			(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite ssds), iworld)
			(Ok (ReadResult r1 ssds),iworld)     = case f p r1 w of
				Error e                 = (Error e, iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet ssds), iworld)
				Ok (Just w1)            = writeSDS ssds p1 c w1 iworld
		(SDSWriteConst f) = case f p w of
				Error e                 = (Error e,iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet sds1),iworld)
				Ok (Just w1)            = writeSDS sds1 p1 c w1 iworld
	| npreds1 =:(Error _) = (liftError npreds1, iworld)
	//Read/write sds2
	# (npreds2,iworld) = case writer of
654
		(SDSWrite f) = case readSDS sds2 p2 c iworld of
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
			(Error e, iworld)  = (Error e, iworld)
			(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite ssds), iworld)
			(Ok (ReadResult r2 ssds),iworld)     = case f p r2 w of
				Error e                 = (Error e, iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet ssds), iworld)
				Ok (Just w2)         = writeSDS ssds p2 c w2 iworld
		(SDSWriteConst f) = case f p w of
				Error e                 = (Error e,iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet sds2), iworld)
				Ok (Just w2)            = writeSDS sds2 p2 c w2 iworld
	| npreds2 =:(Error _) = (liftError npreds2, iworld)
	= case (npreds1, npreds2) of
		(Ok (WriteResult n1 ssds1), Ok (WriteResult n2 ssds2)) = (Ok (WriteResult ('Set'.union n1 n2) (SDSParallel ssds1 ssds2 opts)), iworld)
		(Ok (WriteResult n1 ssds1), Ok (AsyncWrite sds2)) = (Ok (AsyncWrite (SDSParallel ssds1 sds2 opts)), queueNotifyEvents (sdsIdentity sds1) n1 iworld)
		(Ok (AsyncWrite sds1), Ok (WriteResult n2 ssds2)) = (Ok (AsyncWrite (SDSParallel sds1 ssds2 opts)), queueNotifyEvents (sdsIdentity sds2) n2 iworld)
		(Ok (AsyncWrite sds1), Ok (AsyncWrite sds2)) = (Ok (AsyncWrite (SDSParallel sds1 sds2 opts)), iworld)
671

672 673 674 675
	writeSDS sds=:(SDSParallelWriteLeft sds1 sds2 opts=:{SDSParallelOptions|param,writel,name}) p c w iworld
	# p1 = fst (param p)
	//Read/write sds1
	# (npreds1,iworld) = case writel of
676
		(SDSWrite f) = case readSDS sds1 p1 c iworld of
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
			(Error e, iworld)  = (Error e, iworld)
			(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite ssds), iworld)
			(Ok (ReadResult r1 ssds),iworld)     = case f p r1 w of
				Error e                 = (Error e, iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet ssds), iworld)
				Ok (Just w1)            = writeSDS ssds p1 c w1 iworld
		(SDSWriteConst f) = case f p w of
				Error e                 = (Error e,iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet sds1),iworld)
				Ok (Just w1)            = writeSDS sds1 p1 c w1 iworld
	= case npreds1 of
		Error e 					= (Error e, iworld)
		Ok (WriteResult n1 ssds1) 	= (Ok (WriteResult n1 (SDSParallelWriteLeft ssds1 sds2 opts)), iworld)
		Ok (AsyncWrite sds1) 		= (Ok (AsyncWrite (SDSParallelWriteLeft sds1 sds2 opts)), iworld)

692
	writeSDS sds=:(SDSParallelWriteRight sds1 sds2 opts=:{SDSParallelOptions|param,writer,name}) p c w iworld
693 694 695
	# p2 = snd (param p)
	//Read/write sds1
	# (npreds2,iworld) = case writer of
696
		(SDSWrite f) = case readSDS sds2 p2 c iworld of
697 698 699 700 701 702 703 704 705 706 707 708
			(Error e, iworld)  = (Error e, iworld)
			(Ok (AsyncRead ssds), iworld) = (Ok (AsyncWrite ssds), iworld)
			(Ok (ReadResult r2 ssds),iworld)     = case f p r2 w of
				Error e                 = (Error e, iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet ssds), iworld)
				Ok (Just w2)            = writeSDS ssds p2 c w2 iworld
		(SDSWriteConst f) = case f p w of
				Error e                 = (Error e,iworld)
				Ok Nothing              = (Ok (WriteResult 'Set'.newSet sds2),iworld)
				Ok (Just w2)            = writeSDS sds2 p2 c w2 iworld
	= case npreds2 of
		Error e 					= (Error e, iworld)
709 710
		Ok (WriteResult n2 ssds2) 	= (Ok (WriteResult n2 (SDSParallelWriteRight sds1 ssds2 opts)), iworld)
		Ok (AsyncWrite sds2) 		= (Ok (AsyncWrite (SDSParallelWriteRight sds1 sds2 opts)), iworld)
711 712 713 714

	writeSDS sds=:(SDSParallelWriteNone sds1 sds2 opts) p c w iworld
	= (Ok (WriteResult 'Set'.newSet sds), iworld)

715
instance Modifiable SDSParallel where
Haye Böhm's avatar
Haye Böhm committed
716
	modifySDS f sds p context iworld
717
	= case readSDS sds p context iworld of
718 719 720 721 722 723 724
		(Error e, iworld)               = (Error e, iworld)
		(Ok (AsyncRead sds), iworld)    = (Ok (AsyncModify sds f), iworld)
		(Ok (ReadResult r ssds), iworld)    = case f r of
			Error e                             = (Error e, iworld)
			Ok w                                = case writeSDS ssds p context w iworld of
				(Error e, iworld)                        = (Error e, iworld)
				(Ok (AsyncWrite sds), iworld)            = (Ok (AsyncModify sds f), iworld)
Haye Böhm's avatar
Haye Böhm committed
725
				(Ok (WriteResult notify ssds), iworld)   = (Ok (ModifyResult notify r w ssds), iworld)
726 727

instance Registrable SDSParallel where
728
	readRegisterSDS sds p c taskId reqSDSId iworld = readSDSParallel sds p c (Just (taskId, reqSDSId)) iworld
729

730
readSDSParallel :: !(SDSParallel p r w) !p !TaskContext !(Maybe (!TaskId, !SDSIdentity)) !*IWorld
731
                -> *(!MaybeError TaskException (ReadResult p r w), !*IWorld) | gText{|*|} p & TC p & TC r & TC w
732 733
readSDSParallel sds=:(SDSParallel sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
734
	# (p1,p2) = param p
735
	# (res1, iworld) = readAndMbRegisterSDS sds1 p1 c mbNotify iworld
736 737
	| res1 =:(Error _)
		= (liftError res1, iworld)
738
	# (res2, iworld) = readAndMbRegisterSDS sds2 p2 c mbNotify iworld
739 740 741 742 743 744 745 746
	| res2 =:(Error _)
		= (liftError res2, iworld)
	= case (fromOk res1, fromOk res2) of
		(ReadResult r1 ssds1, ReadResult r2 ssds2) 	= (Ok (ReadResult (read (r1, r2)) (SDSParallel ssds1 ssds2 opts)), iworld)
		(AsyncRead sds1, ReadResult r2 ssds2) 		= (Ok (AsyncRead (SDSParallel sds1 ssds2 opts)), iworld)
		(ReadResult r1 ssds1, AsyncRead sds2) 		= (Ok (AsyncRead (SDSParallel ssds1 sds2 opts)), iworld)
		(AsyncRead sds1, AsyncRead sds2) 			= (Ok (AsyncRead (SDSParallel sds1 sds2 opts)), iworld)

747 748
readSDSParallel sds=:(SDSParallelWriteLeft sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
749
	# (p1,p2) = param p
750
	# (res1, iworld) = readAndMbRegisterSDS sds1 p1 c mbNotify iworld
751 752
	| res1 =:(Error _)
		= (liftError res1, iworld)
753
	# (res2, iworld) = readAndMbRegisterSDS sds2 p2 c mbNotify iworld
754 755 756 757 758 759 760 761
	| res2 =:(Error _)
		= (liftError res2, iworld)
	= case (fromOk res1, fromOk res2) of
		(ReadResult r1 ssds1, ReadResult r2 ssds2) 	= (Ok (ReadResult (read (r1, r2)) (SDSParallelWriteLeft ssds1 ssds2 opts)), iworld)
		(AsyncRead sds1, ReadResult r2 ssds2) 		= (Ok (AsyncRead (SDSParallelWriteLeft sds1 ssds2 opts)), iworld)
		(ReadResult r1 ssds1, AsyncRead sds2) 		= (Ok (AsyncRead (SDSParallelWriteLeft ssds1 sds2 opts)), iworld)
		(AsyncRead sds1, AsyncRead sds2) 			= (Ok (AsyncRead (SDSParallelWriteLeft sds1 sds2 opts)), iworld)

762 763
readSDSParallel sds=:(SDSParallelWriteRight sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
764
	# (p1,p2) = param p
765
	# (res1, iworld) = readAndMbRegisterSDS sds1 p1 c mbNotify iworld
766 767
	| res1 =:(Error _)
		= (liftError res1, iworld)
768
	# (res2, iworld) = readAndMbRegisterSDS sds2 p2 c mbNotify iworld
769 770 771 772 773 774 775 776
	| res2 =:(Error _)
		= (liftError res2, iworld)
	= case (fromOk res1, fromOk res2) of
		(ReadResult r1 ssds1, ReadResult r2 ssds2) 	= (Ok (ReadResult (read (r1, r2)) (SDSParallelWriteRight ssds1 ssds2 opts)), iworld)
		(AsyncRead sds1, ReadResult r2 ssds2) 		= (Ok (AsyncRead (SDSParallelWriteRight sds1 ssds2 opts)), iworld)
		(ReadResult r1 ssds1, AsyncRead sds2) 		= (Ok (AsyncRead (SDSParallelWriteRight ssds1 sds2 opts)), iworld)
		(AsyncRead sds1, AsyncRead sds2) 			= (Ok (AsyncRead (SDSParallelWriteRight sds1 sds2 opts)), iworld)

777 778
readSDSParallel sds=:(SDSParallelWriteNone sds1 sds2 opts=:{SDSParallelOptions|param,read,name}) p c mbNotify iworld
	# iworld = mbRegister p sds mbNotify c iworld
779
	# (p1,p2) = param p
780
	# (res1, iworld) = readAndMbRegisterSDS sds1 p1 c mbNotify iworld
781 782
	| res1 =:(Error _)
		= (liftError res1, iworld)
783
	# (res2, iworld) = readAndMbRegisterSDS sds2 p2 c mbNotify iworld
784 785 786 787 788 789 790
	| res2 =:(Error _)
		= (liftError res2, iworld)
	= case (fromOk res1, fromOk res2) of
		(ReadResult r1 ssds1, ReadResult r2 ssds2) 	= (Ok (ReadResult (read (r1, r2)) (SDSParallelWriteNone ssds1 ssds2 opts)), iworld)
		(AsyncRead sds1, ReadResult r2 ssds2) 		= (Ok (AsyncRead (SDSParallelWriteNone sds1 ssds2</