Commit 8763c55c authored by Mart Lubbers's avatar Mart Lubbers

Merge branch 'optimisedSDS' into 'master'

Optimised SDS

See merge request !163
parents b8c197af 6bc2694a
Pipeline #12379 passed with stage
in 3 minutes
......@@ -143,7 +143,7 @@ createClientIWorld serverURL currentInstance
,attachmentChain = []
,nextTaskNo = 6666
}
,sdsNotifyRequests = []
,sdsNotifyRequests = 'Data.Map'.newMap
,memoryShares = 'Data.Map'.newMap
,readCache = 'Data.Map'.newMap
,writeCache = 'Data.Map'.newMap
......
implementation module iTasks.Internal.EngineTasks
import StdBool, StdOverloaded, StdList, StdOrdList
import qualified Data.Map as DM
import qualified Data.Set as DS
import Data.Functor, Data.Func
import iTasks.Engine
import iTasks.Internal.IWorld
import iTasks.WF.Definition
......@@ -24,8 +27,7 @@ timeout mt iworld = case read taskEvents iworld of
//No events
(Ok (Queue [] []),iworld=:{sdsNotifyRequests,world})
# (ts, world) = nsTime world
# to = minListBy lesser [mt:map (getTimoutFromClock ts) sdsNotifyRequests]
= ( minListBy lesser [mt:map (getTimoutFromClock ts) sdsNotifyRequests]
= ( minListBy lesser [mt:flatten $ map (getTimeoutFromClock ts) $ 'DM'.elems sdsNotifyRequests]
, {iworld & world = world})
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
......@@ -34,13 +36,16 @@ where
lesser (Just _) Nothing = True
lesser Nothing Nothing = False
getTimoutFromClock :: Timespec SDSNotifyRequest -> Maybe Int
getTimoutFromClock now snr=:{cmpParam=(ts :: ClockParameter Timespec)}
| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
# fire = iworldTimespecNextFire now snr.reqTimespec ts
= Just (max 0 (toMs fire - toMs now))
= mt
getTimoutFromClock _ _ = mt
getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
getTimeoutFromClock now requests = getTimeoutFromClock` <$> 'DM'.toList requests
where
getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
# fire = iworldTimespecNextFire now reqTimespec ts
= Just (max 0 (toMs fire - toMs now))
= mt
getTimeoutFromClock` _ = mt
toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000
......
......@@ -18,7 +18,7 @@ from iTasks.Internal.TaskEval import :: TaskTime
from iTasks.WF.Definition import :: TaskValue, :: Event, :: TaskId, :: InstanceNo, :: TaskNo
from iTasks.WF.Combinators.Core import :: ParallelTaskType, :: TaskListItem
from iTasks.SDS.Definition import :: SDS, :: RWShared, :: ReadWriteShared, :: Shared, :: ReadOnlyShared
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared, :: DeferredWrite
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared, :: DeferredWrite, :: SDSIdentity
from iTasks.Extensions.DateTime import :: Time, :: Date, :: DateTime
from Sapl.Linker.LazyLinker import :: LoaderState
......@@ -29,23 +29,23 @@ from TCPIP import :: TCP_Listener, :: TCP_Listener_, :: TCP_RChannel_, :: TCP_SC
CLEAN_HOME_VAR :== "CLEAN_HOME"
:: *IWorld = { options :: !EngineOptions // Engine configuration
, clock :: !Timespec // Server side clock
, current :: !TaskEvalState // Shared state during task evaluation
:: *IWorld = { options :: !EngineOptions // Engine configuration
, clock :: !Timespec // Server side clock
, current :: !TaskEvalState // Shared state during task evaluation
, random :: [Int] // Infinite random stream
, random :: [Int] // Infinite random stream
, sdsNotifyRequests :: ![SDSNotifyRequest] // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, sdsNotifyRequests :: !Map SDSIdentity (Map SDSNotifyRequest Timespec) // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, world :: !*World // The outside world
, world :: !*World // The outside world
//Experimental database connection cache
, resources :: *[*Resource]
......
......@@ -71,7 +71,7 @@ createIWorld options world
,attachmentChain = []
,nextTaskNo = 0
}
,sdsNotifyRequests = []
,sdsNotifyRequests = 'DM'.newMap
,memoryShares = 'DM'.newMap
,readCache = 'DM'.newMap
,writeCache = 'DM'.newMap
......
......@@ -13,16 +13,16 @@ import iTasks.SDS.Definition
//Notification requests are stored in the IWorld
:: SDSNotifyRequest =
{ reqTaskId :: TaskId //Id of the task that read the SDS. This Id also connects a chain of notify requests that were registered together
, reqSDSId :: SDSIdentity //Id of the actual SDS used to create this request (may be a derived one)
, reqTimespec :: Timespec
{ reqTaskId :: !TaskId //Id of the task that read the SDS. This Id also connects a chain of notify requests that were registered together
, reqSDSId :: !SDSIdentity //Id of the actual SDS used to create this request (may be a derived one)
, cmpSDSId :: SDSIdentity //Id of the SDS we are saving for comparison
, cmpParam :: Dynamic //Parameter we are saving for comparison
, cmpParamText :: String //String version of comparison parameter for tracing
, cmpParam :: !Dynamic //Parameter we are saving for comparison
, cmpParamText :: !String //String version of comparison parameter for tracing
}
:: SDSIdentity :== String
instance < SDSNotifyRequest
:: DeferredWrite = E. p r w: DeferredWrite !p !w !(SDS p r w) & iTask p & TC r & TC w
//Internal creation functions:
......
implementation module iTasks.Internal.SDS
from StdFunc import const
import StdString, StdTuple, StdMisc, StdList, StdBool
import StdString, StdTuple, StdMisc, StdList, StdBool, StdFunc
from Data.Map import :: Map
import qualified Data.Map as DM
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text, Text.GenJSON
......@@ -77,8 +77,13 @@ mbRegister :: !p !(RWShared p r w) !(Maybe TaskId) !SDSIdentity !*IWorld -> *IWo
mbRegister p sds Nothing reqSDSId iworld = iworld
mbRegister p sds (Just taskId) reqSDSId iworld=:{IWorld|sdsNotifyRequests,world}
# (ts, world) = nsTime world
# req = {SDSNotifyRequest|reqTimespec=ts,reqTaskId=taskId,reqSDSId=reqSDSId,cmpSDSId=sdsIdentity sds,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= {iworld & world=world, sdsNotifyRequests = [req:sdsNotifyRequests]}
# req = {SDSNotifyRequest|reqTaskId=taskId,reqSDSId=reqSDSId,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= { iworld
& world = world
, sdsNotifyRequests = 'DM'.alter (Just o maybe ('DM'.singleton req ts) ('DM'.put req ts))
(sdsIdentity sds)
sdsNotifyRequests
}
read` :: !p !(Maybe TaskId) !SDSIdentity !(RWShared p r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | iTask p & TC r
read` p mbNotify reqSDSId sds=:(SDSSource {SDSSource|read}) env
......@@ -340,13 +345,14 @@ checkRegistrations sdsId pred iworld
= (match,nomatch,iworld)
where
//Find all notify requests for the given share id
lookupRegistrations sdsId iworld=:{sdsNotifyRequests}
= ([reg \\ reg=:{SDSNotifyRequest|cmpSDSId} <- sdsNotifyRequests | cmpSDSId == sdsId],iworld)
lookupRegistrations :: String !*IWorld -> (![(!SDSNotifyRequest, !Timespec)], !*IWorld)
lookupRegistrations sdsId iworld=:{sdsNotifyRequests} =
('DM'.toList $ 'DM'.findWithDefault 'DM'.newMap sdsId sdsNotifyRequests, iworld)
//Match the notify requests against the predicate to determine two sets:
//The registrations that matched the predicate, and those that did not match the predicate
matchRegistrations pred [] = ('Set'.newSet,'Set'.newSet)
matchRegistrations pred [{SDSNotifyRequest|reqTimespec,reqTaskId,cmpParam}:regs]
matchRegistrations pred [({SDSNotifyRequest|reqTaskId,cmpParam}, reqTimespec):regs]
# (match,nomatch) = matchRegistrations pred regs
= case cmpParam of
(p :: p^) = if (pred reqTimespec p)
......@@ -371,13 +377,25 @@ queueNotifyEvents sdsId notify iworld
clearTaskSDSRegistrations :: !(Set TaskId) !*IWorld -> *IWorld
clearTaskSDSRegistrations taskIds iworld=:{IWorld|sdsNotifyRequests}
= {iworld & sdsNotifyRequests = [r \\ r=:{SDSNotifyRequest|reqTaskId} <- sdsNotifyRequests | not ('Set'.member reqTaskId taskIds)]}
= {iworld & sdsNotifyRequests = 'DM'.foldlWithKey clearRegistrationRequests 'DM'.newMap sdsNotifyRequests}
where
clearRegistrationRequests :: (Map SDSIdentity (Map SDSNotifyRequest Timespec))
SDSIdentity
(Map SDSNotifyRequest Timespec)
-> Map SDSIdentity (Map SDSNotifyRequest Timespec)
clearRegistrationRequests notifyRequests sdsIdentity requests
| 'DM'.null filteredRequests = notifyRequests
| otherwise = 'DM'.put sdsIdentity filteredRequests notifyRequests
where
filteredRequests = 'DM'.filterWithKey (\req _ -> not $ 'Set'.member req.reqTaskId taskIds) requests
listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList (foldr addReg 'DM'.newMap sdsNotifyRequests),iworld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList ('DM'.foldrWithKey addRegs 'DM'.newMap sdsNotifyRequests),iworld)
where
addReg {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _),cmpSDSId} list
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
addRegs cmpSDSId reqs list = 'DM'.foldlWithKey addReg list reqs
where
addReg list {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _)} _
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatSDSRegistrationsList list
......@@ -433,5 +451,8 @@ newURL iworld=:{IWorld|options={serverUrl},random}
// TODO: different URL for clients
getURLbyId :: !String !*IWorld -> (!String, !*IWorld)
getURLbyId sdsId iworld=:{IWorld|options={serverUrl},random}
= ("sds:" +++ serverUrl +++ "/" +++ sdsId, iworld)
= ("sds:" +++ serverUrl +++ "/" +++ sdsId, iworld)
// some efficient order to be able to put notify requests in sets
instance < SDSNotifyRequest where
< x y = (x.reqTaskId, x.reqSDSId, x.cmpParamText) < (y.reqTaskId, y.reqSDSId, y.cmpParamText)
......@@ -21,9 +21,10 @@ import iTasks.WF.Tasks.System
import StdList, StdBool, StdTuple, StdString, Data.Maybe
from StdFunc import o
import qualified Data.Map as DM
import qualified Data.Set as DS
import qualified Data.Queue as DQ
import Data.Maybe, Data.Either, Data.Error
import Data.Maybe, Data.Either, Data.Error, Data.Func
import Text.GenJSON
from Data.Functor import <$>, class Functor(fmap)
......@@ -302,7 +303,8 @@ where
# curEnabledActions = [actionId action \\ action <- actions | isEnabled action]
= (ValueResult value evalInfo rep (TCParallel taskId ts taskTrees curEnabledActions),iworld)
//Cleanup
eval event evalOpts (TCDestroy (TCParallel taskId ts taskTrees _)) iworld=:{current}
eval event evalOpts ttree=:(TCDestroy (TCParallel taskId ts taskTrees _)) iworld=:{current}
# iworld = clearTaskSDSRegistrations ('DS'.singleton $ fromOk $ taskIdFromTaskTree ttree) iworld
//Mark all tasks as deleted and use the standar evaluation function to clean up
# taskListFilter = {TaskListFilter|onlyIndex=Nothing,onlyTaskId=Nothing,onlySelf=False,includeValue=False,includeAttributes=False,includeProgress=False}
# (mbError,iworld) = modify (\ptss -> ((),map (\pts -> {ParallelTaskState|pts & change=Just RemoveParallelTask}) ptss)) (sdsFocus (taskId,taskListFilter) taskInstanceParallelTaskList) iworld
......@@ -791,7 +793,8 @@ where
# stable = (curStatus =: ASDeleted) || (curStatus =: ASExcepted)
= (ValueResult (Value curStatus stable) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=False} change (TCAttach taskId ts curStatus build instanceKey), iworld)
eval event evalOpts (TCDestroy (TCAttach taskId _ _ _ _)) iworld
eval event evalOpts ttree=:(TCDestroy (TCAttach taskId _ _ _ _)) iworld
# iworld = clearTaskSDSRegistrations ('DS'.singleton $ fromOk $ taskIdFromTaskTree ttree) iworld
# (_,iworld) = modify (\p -> ((),release p)) (sdsFocus instanceNo taskInstanceProgress) iworld
= (DestroyedResult,iworld)
where
......
......@@ -10,7 +10,7 @@ import iTasks.Internal.TaskEval
import iTasks.Internal.IWorld
import qualified iTasks.Internal.SDS as SDS
import Data.Error, Data.Maybe
import Data.Error, Data.Maybe, Data.Func
import Text.GenJSON
import StdString, StdBool
import qualified Data.Set as DS
......@@ -56,7 +56,9 @@ where
interact :: !d !EditMode !(SDS () r w) (InteractionHandlers l r w v) (Editor v) -> Task (l,v) | toPrompt d & iTask l & iTask r & iTask v & TC w
interact prompt mode shared {onInit,onEdit,onRefresh} editor = Task eval
where
eval event evalOpts (TCDestroy _) iworld = (DestroyedResult,iworld)
eval event evalOpts tt=:(TCDestroy _) iworld
# iworld = 'SDS'.clearTaskSDSRegistrations ('DS'.singleton $ fromOk $ taskIdFromTaskTree tt) iworld
= (DestroyedResult, iworld)
eval event evalOpts tree iworld=:{current={taskTime}}
//Decode or initialize state
......
......@@ -81,14 +81,16 @@ where
= (ValueResult (Value i True) (info ts) (rep event) tree, iworld)
//Destroyed while the process was still running
eval event evalOpts (TCDestroy (TCBasic taskId ts jsonph _)) iworld
eval event evalOpts tree=:(TCDestroy (TCBasic taskId ts jsonph _)) iworld
# iworld = clearTaskSDSRegistrations ('DS'.singleton $ fromOk $ taskIdFromTaskTree tree) iworld
= apIWTransformer iworld
$ tuple (fjson jsonph)
>-= \(ph, _)->liftOSErr (terminateProcess ph)
>-= \_ ->tuple (Ok DestroyedResult)
//Destroyed when the task was already stable
eval event evalOpts (TCDestroy _) iworld
eval event evalOpts tree=:(TCDestroy _) iworld
# iworld = clearTaskSDSRegistrations ('DS'.singleton $ fromOk $ taskIdFromTaskTree tree) iworld
= (DestroyedResult, iworld)
info ts = {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True}
......
......@@ -8,7 +8,8 @@ import iTasks.Internal.Task
import iTasks.Internal.TaskState
import iTasks.Internal.TaskEval
import qualified iTasks.Internal.SDS as SDS
import StdString
import StdString, Data.Func, Data.Error
import qualified Data.Set as DS
instance toString SharedException
where
......@@ -57,10 +58,9 @@ where
Ok val = ValueResult (Value val False) {TaskEvalInfo|lastEvent=ts,removedTasks=[],refreshSensitive=True} (rep event) (TCInit taskId ts)
Error e = ExceptionResult e
= (res,iworld)
eval event repAs (TCDestroy _) iworld = (DestroyedResult,iworld)
eval event repAs ttree=:(TCDestroy _) iworld
# iworld = 'SDS'.clearTaskSDSRegistrations ('DS'.singleton $ fromOk $ taskIdFromTaskTree ttree) iworld
= (DestroyedResult,iworld)
rep ResetEvent = ReplaceUI (ui UIEmpty)
rep _ = NoChange
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment