Commit 89dd8f56 authored by Bas Lijnse's avatar Bas Lijnse
Browse files

MAJOR CHANGE

Completely changed the way the store works. It is now fully built on top of the core SDS sources and combinators. Before the store existed as part of the engine although you could access it using some SDS's.
To make this possible I added a generic caching combinator for SDS's such that caching is no longer limited to files in the store, but can be applied to any shared source.

This change removes *a lot* of unnecessary I/O and makes iTask apps feel more responsive.
parent bdbc3f34
......@@ -69,13 +69,13 @@ where
moduleDefinition :: SDS (FilePath,ModuleName) [String] [String]
moduleDefinition = mapReadWrite mapToLines (sdsTranslate "moduleDefinition" (\(p,m) -> modulePath p m "dcl") externalFile)
moduleDefinition = mapReadWrite mapToLines (sdsTranslate "moduleDefinition" (\(p,m) -> modulePath p m "dcl") (removeMaybe (Just "") fileShare))
moduleImplementation :: SDS (FilePath,ModuleName) [String] [String]
moduleImplementation = mapReadWrite mapToLines (sdsTranslate "moduleImplementation" (\(p,m) -> modulePath p m "icl") externalFile)
moduleImplementation = mapReadWrite mapToLines (sdsTranslate "moduleImplementation" (\(p,m) -> modulePath p m "icl") (removeMaybe (Just "") fileShare))
moduleDocumentation :: SDS (FilePath,ModuleName) [String] [String]
moduleDocumentation = mapReadWrite mapToLines (sdsTranslate "moduleDocumentation" (\(p,m) -> modulePath p m "md") externalFile)
moduleDocumentation = mapReadWrite mapToLines (sdsTranslate "moduleDocumentation" (\(p,m) -> modulePath p m "md") (removeMaybe (Just "") fileShare))
mapToLines = (split "\n",\w _ -> Just (join "\n" w))
......
implementation module iTasks.Extensions.User
import iTasks
import Text
import Data.Functor
import Data.Functor, Data.Either
import qualified Data.Map as DM
import iTasks.UI.Definition, iTasks.UI.Editor, iTasks.UI.Editor.Builtin, iTasks.UI.Editor.Combinators
import iTasks.UI.Layout.Default
......@@ -186,7 +186,11 @@ where
taskInstancesForCurrentUser :: ROShared () [TaskInstance]
taskInstancesForCurrentUser
= sdsSequence "taskInstancesForCurrentUser" (\() u -> u) snd (SDSWriteConst (\_ _ -> Ok Nothing)) (SDSWriteConst (\_ _ -> Ok Nothing)) currentUser taskInstancesForUser
= sdsSequence "taskInstancesForCurrentUser"
id
(\() u -> u)
(\_ _ -> Right snd)
(SDSWriteConst (\_ _ -> Ok Nothing)) (SDSWriteConst (\_ _ -> Ok Nothing)) currentUser taskInstancesForUser
workOn :: !t -> Task AttachmentStatus | toInstanceNo t
workOn t
......
......@@ -148,7 +148,8 @@ createClientIWorld serverURL currentInstance
}
,sdsNotifyRequests = []
,memoryShares = 'Data.Map'.newMap
,cachedShares = 'Data.Map'.newMap
,readCache = 'Data.Map'.newMap
,writeCache = 'Data.Map'.newMap
,exposedShares = 'Data.Map'.newMap
,jsCompilerState = locundef "jsCompilerState"
,shutdown = Nothing
......
......@@ -119,7 +119,8 @@ where
systemTasks =
[BackgroundTask updateClocks
,BackgroundTask (processEvents MAX_EVENTS)
,BackgroundTask removeOutdatedSessions]
,BackgroundTask removeOutdatedSessions
,BackgroundTask flushWritesWhenIdle]
runTasks :: a !*World -> *World | Runnable a
runTasks tasks world
......@@ -214,6 +215,14 @@ where
where
(Timestamp tNow) = timestamp
//When the event queue is empty, write deferred SDS's
flushWritesWhenIdle:: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
flushWritesWhenIdle iworld = case read taskEvents iworld of
(Error e,iworld) = (Error e,iworld)
(Ok (Queue [] []),iworld) = flushDeferredSDSWrites iworld
(Ok _,iworld) = (Ok (),iworld)
//When we don't run the built-in HTTP server we don't want to loop forever so we stop the loop
//once all tasks are stable
stopOnStable :: !*IWorld -> *(!MaybeError TaskException (), !*IWorld)
......
......@@ -5,6 +5,7 @@ from Data.Map import :: Map
from Data.Maybe import :: Maybe
from Data.Error import :: MaybeError(..), :: MaybeErrorString(..)
from Data.Set import :: Set
from Data.Queue import :: Queue
from StdFile import class FileSystem
from System.Time import :: Timestamp
from Text.JSON import :: JSONNode
......@@ -17,7 +18,7 @@ from iTasks.Internal.TaskEval import :: TaskTime
from iTasks.WF.Definition import :: TaskValue, :: Event, :: TaskId, :: InstanceNo, :: TaskNo
from iTasks.WF.Combinators.Core import :: ParallelTaskType, :: TaskListItem
from iTasks.SDS.Definition import :: SDS, :: RWShared, :: ReadWriteShared, :: Shared
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared, :: DeferredWrite
from iTasks.Extensions.DateTime import :: Time, :: Date, :: DateTime
from Sapl.Linker.LazyLinker import :: LoaderState
......@@ -28,23 +29,24 @@ from TCPIP import :: TCP_Listener, :: TCP_Listener_, :: TCP_RChannel_, :: TCP_SC
CLEAN_HOME_VAR :== "CLEAN_HOME"
:: *IWorld = { server :: !ServerInfo // Static server info, initialized at startup
, config :: !Config // Server configuration
, clocks :: !SystemClocks // Server side clocks
, current :: !TaskEvalState // Shared state during task evaluation
:: *IWorld = { server :: !ServerInfo // Static server info, initialized at startup
, config :: !Config // Server configuration
, clocks :: !SystemClocks // Server side clocks
, current :: !TaskEvalState // Shared state during task evaluation
, random :: [Int] // Infinite random stream
, random :: [Int] // Infinite random stream
, sdsNotifyRequests :: ![SDSNotifyRequest] // Notification requests from previously read sds's
, memoryShares :: !Map (String,String) Dynamic // Run-time memory shares
, cachedShares :: !ShareCache // Cached json file shares
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, sdsNotifyRequests :: ![SDSNotifyRequest] // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, world :: !*World // The outside world
, world :: !*World // The outside world
//Experimental database connection cache
, resources :: !*(Maybe *Resource)
......@@ -55,7 +57,6 @@ CLEAN_HOME_VAR :== "CLEAN_HOME"
:: Config =
{ sessionTime :: !Int //* Time (in seconds) before inactive sessions are garbage collected. Default is 3600 (one hour).
, smtpServer :: !String //* The smtp server to use for sending e-mails
, persistTasks :: !Bool //* Persist the task state to disk
}
......@@ -81,10 +82,6 @@ CLEAN_HOME_VAR :== "CLEAN_HOME"
, utcTime :: !Time
}
// share cached used for json stores (Store.cachedJSONFileStore) and dynamic string stores (Store.cachedDynamicStringFileStore)
:: ShareCache :== Map (String, String) (Dynamic, Bool, Maybe CachedValue)
:: CachedValue = CachedJSONValue DeferredJSON | CachedDynamicValue
:: JSCompilerState =
{ loaderState :: !LoaderState // State of the lazy loader
, functionMap :: !FuncTypeMap // Function name -> source code mapping
......
......@@ -68,11 +68,8 @@ createIWorld appName appPath persistTasks mbWebdirPath mbStorePath mbSaplPath wo
# build = strfTime "%Y%m%d-%H%M%S" tm
# (local,world) = currentLocalDateTimeWorld world
# (utc,world) = currentUTCDateTimeWorld world
# (_,world) = ensureDir "data" dataDir world
# tmpDir = dataDir </> "tmp"
# (_,world) = ensureDir "tmp" tmpDir world
# storeDir = dataDir </> "stores"
# (exists,world) = ensureDir "stores" storeDir world
# (timestamp=:(Timestamp seed), world) = time world
= {IWorld
|server =
......@@ -105,7 +102,8 @@ createIWorld appName appPath persistTasks mbWebdirPath mbStorePath mbSaplPath wo
}
,sdsNotifyRequests = []
,memoryShares = 'DM'.newMap
,cachedShares = 'DM'.newMap
,readCache = 'DM'.newMap
,writeCache = 'DM'.newMap
,exposedShares = 'DM'.newMap
,jsCompilerState = Nothing
,shutdown = Nothing
......@@ -123,14 +121,6 @@ where
, persistTasks = persistTasks
}
ensureDir :: !String !FilePath *World -> (!Bool,!*World)
ensureDir name path world
# (exists, world) = fileExists path world
| exists = (True,world)
# (res, world) = createDirectory path world
| isError res = abort ("Cannot create " +++ name +++ " directory" +++ path +++ " : " +++ snd (fromError res))
= (False,world)
//Temporary fallback to use "sapl" instead of "<Application name>-sapl".
//Once everybody uses an upgraded sapl-collector-linker that creates the proper
//directory name it can be removed
......
......@@ -21,7 +21,7 @@ import iTasks.SDS.Definition
}
:: SDSIdentity :== String
//:: WriteShare p = E.r w: Write !w !(RWShared p r w)
:: DeferredWrite = E. p r w: DeferredWrite !p !w !(SDS p r w) & iTask p & TC r & TC w
//Internal creation functions:
......@@ -46,13 +46,13 @@ createReadOnlySDSError ::
//Internal access functions
//Just read an SDS
read :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld)
read :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | TC r
//Read an SDS and register a taskId to be notified when it is written
readRegister :: !TaskId !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld)
readRegister :: !TaskId !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | TC r
//Write an SDS (and queue evaluation of those task instances which contained tasks that registered for notification)
write :: !w !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
write :: !w !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld) | TC r & TC w
//Read followed by write. The 'a' typed value is a result that is returned
modify :: !(r -> (!a,!w)) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException a, !*IWorld)
modify :: !(r -> (!a,!w)) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException a, !*IWorld) | TC r & TC w
//Force notify (queue evaluation of task instances that registered for notification)
notify :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
......@@ -66,10 +66,13 @@ clearInstanceSDSRegistrations :: ![InstanceNo] !*IWorld -> *IWorld
listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
//Flush all deffered/cached writes of
flushDeferredSDSWrites :: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
:: JSONShared :== RWShared JSONNode JSONNode JSONNode
//Exposing shares for external nodes
toJSONShared :: (RWShared p r w) -> JSONShared | JSONDecode{|*|} p & JSONEncode{|*|} r & JSONDecode{|*|} w & iTask p
toJSONShared :: (RWShared p r w) -> JSONShared | JSONDecode{|*|} p & JSONEncode{|*|} r & JSONDecode{|*|} w & iTask p & TC r & TC w
fromJSONShared :: JSONShared -> RWShared p r w | JSONEncode{|*|} p & JSONDecode{|*|} r & JSONEncode{|*|} w
newURL :: !*IWorld -> (!String, !*IWorld)
getURLbyId :: !String !*IWorld -> (!String, !*IWorld)
......
......@@ -9,8 +9,6 @@ import qualified Data.Set as Set
import iTasks.Internal.IWorld
import iTasks.Internal.Task, iTasks.Internal.TaskStore, iTasks.Internal.TaskEval
:: SDSWriteNotifyFun :== (!SDSIdentity,!Dynamic) //Dynamic contains SDSNotifyPred function
createReadWriteSDS ::
!String
!String
......@@ -56,15 +54,16 @@ sdsIdentity (SDSLens sds {SDSLens|name}) = sdsIdentity sds +++"/["+++name+++"]"
sdsIdentity (SDSSelect sds1 sds2 {SDSSelect|name}) = "{"+++name+++ sdsIdentity sds1 +++ ","+++ sdsIdentity sds2 +++"}"
sdsIdentity (SDSParallel sds1 sds2 {SDSParallel|name}) = "|"+++name+++ sdsIdentity sds1 +++ ","+++ sdsIdentity sds2 +++"|"
sdsIdentity (SDSSequence sds1 sds2 {SDSSequence|name}) = "<"+++name+++ sdsIdentity sds1 +++ ","+++ sdsIdentity sds2 +++">"
sdsIdentity (SDSCache sds _) = sdsIdentity sds
sdsIdentity (SDSDynamic f) = "SDSDYNAMIC" //TODO: Figure out how to determine the identity of the wrapped sds
iworldNotifyPred :: !(p -> Bool) !p !*IWorld -> (!Bool,!*IWorld)
iworldNotifyPred npred p env = (npred p, env)
read :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld)
read :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | TC r
read sds env = read` () Nothing (sdsIdentity sds) sds env
readRegister :: !TaskId !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld)
readRegister :: !TaskId !(RWShared () r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | TC r
readRegister taskId sds env = read` () (Just taskId) (sdsIdentity sds) sds env
mbRegister :: !p !(RWShared p r w) !(Maybe TaskId) !SDSIdentity !*IWorld -> *IWorld | iTask p
......@@ -73,7 +72,7 @@ mbRegister p sds (Just taskId) reqSDSId iworld=:{IWorld|sdsNotifyRequests}
# req = {SDSNotifyRequest|reqTaskId=taskId,reqSDSId=reqSDSId,cmpSDSId=sdsIdentity sds,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= {iworld & sdsNotifyRequests = [req:sdsNotifyRequests]}
read` :: !p !(Maybe TaskId) !SDSIdentity !(RWShared p r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | iTask p
read` :: !p !(Maybe TaskId) !SDSIdentity !(RWShared p r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | iTask p & TC r
read` p mbNotify reqSDSId sds=:(SDSSource {SDSSource|read}) env
//New registration
# env = mbRegister p sds mbNotify reqSDSId env
......@@ -105,16 +104,31 @@ read` p mbNotify reqSDSId sds=:(SDSParallel sds1 sds2 {SDSParallel|param,read})
= (liftError res2, env)
= (Ok (read (fromOk res1, fromOk res2)), env)
read` p mbNotify reqSDSId sds=:(SDSSequence sds1 sds2 {SDSSequence|param,read}) env
read` p mbNotify reqSDSId sds=:(SDSSequence sds1 sds2 {SDSSequence|paraml,paramr,read}) env
# env = mbRegister p sds mbNotify reqSDSId env
# (res1,env) = read` p mbNotify reqSDSId sds1 env
# (res1,env) = read` (paraml p) mbNotify reqSDSId sds1 env
| res1 =:(Error _)
= (liftError res1,env)
# r1 = fromOk res1
# (res2,env) = read` (param p r1) mbNotify reqSDSId sds2 env
| res2 =:(Error _)
= (liftError res2,env)
= (Ok (read (r1,fromOk res2)),env)
= case read p r1 of
Left r = (Ok r,env)
Right read2
# (res2,env) = read` (paramr p r1) mbNotify reqSDSId sds2 env
| res2 =:(Error _)
= (liftError res2,env)
= (Ok (read2 (r1,fromOk res2)),env)
read` p mbNotify reqSDSId sds=:(SDSCache sds1 _) env=:{IWorld|readCache}
# env = mbRegister p sds mbNotify reqSDSId env
# key = (sdsIdentity sds1,toSingleLineText p)
//First check cache
= case 'DM'.get key readCache of
Just (val :: r^) = (Ok val,env)
Just _ = (Error (exception "Cached value of wrong type"), env)
Nothing = case read` p mbNotify reqSDSId sds1 env of
(Error e,env) = (Error e, env)
//Read and add to cache
(Ok val,env) = (Ok val, {env & readCache = 'DM'.put key (dynamic val :: r^) env.readCache})
read` p mbNotify reqSDSId sds=:(SDSDynamic f) env
# env = mbRegister p sds mbNotify reqSDSId env
......@@ -123,16 +137,13 @@ read` p mbNotify reqSDSId sds=:(SDSDynamic f) env
(Error e) = (Error e, env)
(Ok sds) = read` p mbNotify reqSDSId sds env
write :: !w !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
write :: !w !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld) | TC r & TC w
write w sds iworld
= case write` () w sds iworld of
(Ok notify, iworld)
# instanceNos = [no \\ (TaskId no _) <- 'Set'.toList notify]
# iworld = queueRefresh [(i,"Notification for write of " +++ sdsIdentity sds) \\ i <- instanceNos] iworld
# iworld = clearInstanceSDSRegistrations instanceNos iworld
= (Ok (), iworld)
(Ok notify, iworld) = (Ok (), queueNotifyEvents (sdsIdentity sds) notify iworld)
(Error e,iworld) = (Error e,iworld)
write` :: !p !w !(RWShared p r w) !*IWorld -> (!MaybeError TaskException (Set TaskId), !*IWorld) | iTask p
write` :: !p !w !(RWShared p r w) !*IWorld -> (!MaybeError TaskException (Set TaskId), !*IWorld) | iTask p & TC r & TC w
write` p w sds=:(SDSSource {SDSSource|name,write}) env
= case write p w env of
(Error e, env) = (Error e, env)
......@@ -185,22 +196,40 @@ write` p w sds=:(SDSLens sds1 {SDSLens|param,write,notify}) env
write` p w sds=:(SDSSelect sds1 sds2 {SDSSelect|select,notifyl,notifyr}) env
= case select p of
Left p1 = case read` p1 Nothing (sdsIdentity sds1) sds1 env of
(Error e, env) = (Error e, env)
(Ok r1, env) = case write` p1 w sds1 env of
(Error e, env) = (Error e, env)
(Ok notify, env)
# npred = (\pq -> case select pq of Right p2 = notifyl p1 r1 w p2; _ = True)
Left p1 = case notifyl of
(SDSNotify f) = case read` p1 Nothing (sdsIdentity sds1) sds1 env of
(Error e, env) = (Error e, env)
(Ok r1, env) = case write` p1 w sds1 env of
(Error e, env) = (Error e, env)
(Ok notify, env)
# npred = (\pq -> case select pq of Right p2 = f p1 r1 w p2; _ = False)
# (match,nomatch,env) = checkRegistrations (sdsIdentity sds) npred env
//Add the matching registrations for the 'other' SDS
# notify = 'Set'.union notify match
= (Ok notify, env)
(SDSNotifyConst f) = case write` p1 w sds1 env of
(Error e, env) = (Error e, env)
(Ok notify, env)
# npred = (\pq -> case select pq of Right p2 = f p1 w p2; _ = False)
# (match,nomatch,env) = checkRegistrations (sdsIdentity sds) npred env
//Add the matching registrations for the 'other' SDS
# notify = 'Set'.union notify match
= (Ok notify, env)
Right p2 = case read` p2 Nothing (sdsIdentity sds2) sds2 env of
(Error e, env) = (Error e, env)
(Ok r2, env) = case write` p2 w sds2 env of
(Error e, env) = (Error e,env)
(Ok notify, env)
# npred = (\pq -> case select pq of Left p1 = notifyr p2 r2 w p1 ; _ = True)
= (Ok notify, env)
Right p2 = case notifyr of
(SDSNotify f) = case read` p2 Nothing (sdsIdentity sds2) sds2 env of
(Error e, env) = (Error e, env)
(Ok r2, env) = case write` p2 w sds2 env of
(Error e, env) = (Error e,env)
(Ok notify, env)
# npred = (\pq -> case select pq of Left p1 = f p2 r2 w p1 ; _ = False)
# (match,nomatch,env) = checkRegistrations (sdsIdentity sds) npred env
//Add the matching registrations for the 'other' SDS
# notify = 'Set'.union notify match
= (Ok notify, env)
(SDSNotifyConst f) = case write` p2 w sds2 env of
(Error e, env) = (Error e,env)
(Ok notify, env)
# npred = (\pq -> case select pq of Left p1 = f p2 w p1 ; _ = False)
# (match,nomatch,env) = checkRegistrations (sdsIdentity sds) npred env
//Add the matching registrations for the 'other' SDS
# notify = 'Set'.union notify match
......@@ -236,8 +265,8 @@ write` p w sds=:(SDSParallel sds1 sds2 {SDSParallel|param,writel,writer}) env
| npreds2 =:(Error _) = (liftError npreds2, env)
= (Ok ('Set'.union (fromOk npreds1) (fromOk npreds2)), env)
write` p w sds=:(SDSSequence sds1 sds2 {SDSSequence|param,writel,writer}) env
= case read` p Nothing (sdsIdentity sds1) sds1 env of
write` p w sds=:(SDSSequence sds1 sds2 {SDSSequence|paraml,paramr,writel,writer}) env
= case read` (paraml p) Nothing (sdsIdentity sds1) sds1 env of
(Error e, env) = (Error e, env)
(Ok r1, env)
//Write sds1 if necessary
......@@ -245,27 +274,49 @@ write` p w sds=:(SDSSequence sds1 sds2 {SDSSequence|param,writel,writer}) env
(SDSWrite f) = case f p r1 w of
Error e = (Error e, env)
Ok (Nothing) = (Ok 'Set'.newSet, env)
Ok (Just w1) = write` p w1 sds1 env
Ok (Just w1) = write` (paraml p) w1 sds1 env
(SDSWriteConst f) = case f p w of
Error e = (Error e, env)
Ok (Nothing) = (Ok 'Set'.newSet, env)
Ok (Just w1) = write` p w1 sds1 env
Ok (Just w1) = write` (paraml p) w1 sds1 env
| npreds1 =:(Error _) = (liftError npreds1, env)
//Read/write sds2 if necessary
# (npreds2,env) = case writer of
(SDSWrite f) = case read` (param p r1) Nothing (sdsIdentity sds2) sds2 env of //Also read sds2
(SDSWrite f) = case read` (paramr p r1) Nothing (sdsIdentity sds2) sds2 env of //Also read sds2
(Error e, env) = (Error e, env)
(Ok r2,env) = case f p r2 w of
Error e = (Error e, env)
Ok (Nothing) = (Ok 'Set'.newSet, env)
Ok (Just w2) = write` (param p r1) w2 sds2 env
Ok (Just w2) = write` (paramr p r1) w2 sds2 env
(SDSWriteConst f) = case f p w of
Error e = (Error e, env)
Ok (Nothing) = (Ok 'Set'.newSet, env)
Ok (Just w2) = write` (param p r1) w2 sds2 env
Ok (Just w2) = write` (paramr p r1) w2 sds2 env
| npreds2 =:(Error _) = (liftError npreds2, env)
= (Ok ('Set'.union (fromOk npreds1) (fromOk npreds2)), env)
write` p w sds=:(SDSCache sds1 {SDSCache|write}) env=:{IWorld|readCache,writeCache}
# key = (sdsIdentity sds1,toSingleLineText p)
//Check cache
# mbr = case 'DM'.get key readCache of
Just (val :: r^) = Just val
_ = Nothing
# mbw = case 'DM'.get key writeCache of
Just (val :: w^,_) = Just val
_ = Nothing
//Determine what to do
# (mbr,policy) = write p mbr mbw w
//Update read cache
# readCache = case mbr of
Just r = 'DM'.put key (dynamic r :: r^) readCache
Nothing = 'DM'.del key readCache
= case policy of
NoWrite = (Ok 'Set'.newSet, {env & readCache = readCache})
WriteNow = write` p w sds1 {env & readCache = readCache}
WriteDelayed
# writeCache = 'DM'.put key (dynamic w :: w^, DeferredWrite p w sds1) writeCache
= (Ok 'Set'.newSet, {env & readCache = readCache, writeCache = writeCache})
write` p w sds=:(SDSDynamic f) env
# (mbsds, env) = f p env
= case mbsds of
......@@ -296,7 +347,7 @@ where
//In case of a type mismatch, just ignore (should not happen)
_ = (match,nomatch)
modify :: !(r -> (!a,!w)) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException a, !*IWorld)
modify :: !(r -> (!a,!w)) !(RWShared () r w) !*IWorld -> (!MaybeError TaskException a, !*IWorld) | TC r & TC w
modify f sds iworld = case read sds iworld of
(Ok r,iworld) = let (a,w) = f r in case write w sds iworld of
(Ok (),iworld) = (Ok a,iworld)
......@@ -306,6 +357,14 @@ modify f sds iworld = case read sds iworld of
notify :: !(RWShared () r w) !*IWorld -> (!MaybeError TaskException (), !*IWorld)
notify sds iworld = (Ok (), iworld) //TODO
queueNotifyEvents :: !String (Set TaskId) *IWorld -> *IWorld
queueNotifyEvents sdsId notify iworld
# instanceNos = [no \\ (TaskId no _) <- 'Set'.toList notify]
# iworld = queueRefresh [(i,"Notification for write of " +++ sdsId) \\ i <- instanceNos] iworld
# iworld = clearInstanceSDSRegistrations instanceNos iworld
= iworld
clearInstanceSDSRegistrations :: ![InstanceNo] !*IWorld -> *IWorld
clearInstanceSDSRegistrations instanceNos iworld=:{IWorld|sdsNotifyRequests}
= {iworld & sdsNotifyRequests = [r \\ r=:{SDSNotifyRequest|reqTaskId} <- sdsNotifyRequests | keep reqTaskId instanceNos]}
......@@ -323,7 +382,23 @@ formatSDSRegistrationsList list
= join "\n" (flatten [["Task instance " +++ toString i +++ ":"
:["\t"+++toString taskId +++ "->"+++sdsId\\(taskId,sdsId) <- regs]] \\ (i,regs) <- list])
toJSONShared :: (RWShared p r w) -> JSONShared | JSONDecode{|*|} p & JSONEncode{|*|} r & JSONDecode{|*|} w & iTask p
flushDeferredSDSWrites :: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
flushDeferredSDSWrites iworld=:{writeCache}
# (errors,iworld) = flushAll ('DM'.toList writeCache) iworld
| errors =: [] = (Ok (), {iworld & writeCache = 'DM'.newMap})
= (Error (exception "Could not flush all deferred SDS writes, some data may be lost"),{iworld & writeCache = 'DM'.newMap})
where
flushAll [] iworld = ([],iworld)
flushAll [(_,(_,DeferredWrite p w sds)):rest] iworld
= case write` p w sds iworld of
(Ok notify,iworld)
# iworld = queueNotifyEvents (sdsIdentity sds) notify iworld
= flushAll rest iworld
(Error e,iworld)
# (errors,iworld) = flushAll rest iworld
= ([e:errors],iworld)
toJSONShared :: (RWShared p r w) -> JSONShared | JSONDecode{|*|} p & JSONEncode{|*|} r & JSONDecode{|*|} w & iTask p & TC r & TC w
toJSONShared sds = SDSLens sds {SDSLens|name="toJSONShared",param=param,read=SDSRead read,write=SDSWriteConst write,notify=SDSNotifyConst notify}
where
param p = fromJust (fromJSON p)
......
......@@ -32,7 +32,6 @@ from GenEq import generic gEq
NS_TASK_INSTANCES :== "task-instances"
NS_DOCUMENT_CONTENT :== "document-data"
NS_APPLICATION_SHARES :== "application-data"
NS_JAVASCRIPT_CACHE :== "js-cache"
:: StoreReadError
= StoreReadMissingError !StoreName //When there is no file on disk for this
......@@ -43,41 +42,13 @@ NS_JAVASCRIPT_CACHE :== "js-cache"
instance toString StoreReadError
derive class iTask StoreReadError
//For system stores, the server configuration determines if and when data is written to disk
//This storage preference type is used to indicate
:: StoragePreference
= StoreInMemory //When the data is disposable. It will be gone when the application shuts down
| StoreInJSONFile //When the data should be persisted between different versions of an application
| StoreInDynamicFile //When the data contains functions, dynamics or otherwise
/**
* Creates a store in memory. Values in this store are lost when the server shuts down.
*
* @param The namespace in the store
* @param Optionally a default content to be used on first read. If nothing is given an error will occur when reading before writing.
*/
memoryStore :: !StoreNamespace !(Maybe a) -> RWShared StoreName a a | TC a