Commit 6bc2694a authored by Steffen Michels's avatar Steffen Michels

store SDS notify requests such that there cannot be any request which only differ in the timespec

parent 8fc83891
Pipeline #12338 passed with stage
in 2 minutes and 31 seconds
......@@ -36,13 +36,13 @@ where
lesser (Just _) Nothing = True
lesser Nothing Nothing = False
getTimeoutFromClock :: Timespec (Set SDSNotifyRequest) -> [Maybe Int]
getTimeoutFromClock now requests = getTimeoutFromClock` <$> 'DS'.toList requests
getTimeoutFromClock :: Timespec (Map SDSNotifyRequest Timespec) -> [Maybe Timeout]
getTimeoutFromClock now requests = getTimeoutFromClock` <$> 'DM'.toList requests
where
getTimeoutFromClock` :: SDSNotifyRequest -> Maybe Int
getTimeoutFromClock` snr=:{cmpParam=(ts :: ClockParameter Timespec)}
getTimeoutFromClock` :: (!SDSNotifyRequest, !Timespec) -> Maybe Timeout
getTimeoutFromClock` (snr=:{cmpParam=(ts :: ClockParameter Timespec)}, reqTimespec)
| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
# fire = iworldTimespecNextFire now snr.reqTimespec ts
# fire = iworldTimespecNextFire now reqTimespec ts
= Just (max 0 (toMs fire - toMs now))
= mt
getTimeoutFromClock` _ = mt
......
......@@ -29,23 +29,23 @@ from TCPIP import :: TCP_Listener, :: TCP_Listener_, :: TCP_RChannel_, :: TCP_SC
CLEAN_HOME_VAR :== "CLEAN_HOME"
:: *IWorld = { options :: !EngineOptions // Engine configuration
, clock :: !Timespec // Server side clock
, current :: !TaskEvalState // Shared state during task evaluation
:: *IWorld = { options :: !EngineOptions // Engine configuration
, clock :: !Timespec // Server side clock
, current :: !TaskEvalState // Shared state during task evaluation
, random :: [Int] // Infinite random stream
, random :: [Int] // Infinite random stream
, sdsNotifyRequests :: !Map SDSIdentity (Set SDSNotifyRequest) // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, sdsNotifyRequests :: !Map SDSIdentity (Map SDSNotifyRequest Timespec) // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
, exposedShares :: !Map String (Dynamic, JSONShared) // Shared source
, jsCompilerState :: !Maybe JSCompilerState // Sapl to Javascript compiler state
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, ioTasks :: !*IOTasks // The low-level input/output tasks
, ioStates :: !IOStates // Results of low-level io tasks, indexed by the high-level taskid that it is linked to
, world :: !*World // The outside world
, world :: !*World // The outside world
//Experimental database connection cache
, resources :: *[*Resource]
......
......@@ -15,7 +15,6 @@ import iTasks.SDS.Definition
:: SDSNotifyRequest =
{ reqTaskId :: !TaskId //Id of the task that read the SDS. This Id also connects a chain of notify requests that were registered together
, reqSDSId :: !SDSIdentity //Id of the actual SDS used to create this request (may be a derived one)
, reqTimespec :: !Timespec
, cmpParam :: !Dynamic //Parameter we are saving for comparison
, cmpParamText :: !String //String version of comparison parameter for tracing
......
implementation module iTasks.Internal.SDS
from StdFunc import const
import StdString, StdTuple, StdMisc, StdBool, StdFunc, StdInt, StdChar, dynamic_string
from StdList import flatten, map, take, drop, instance toString [a]
from Text import class Text(join), instance Text String
import qualified Text
import StdString, StdTuple, StdMisc, StdList, StdBool, StdFunc
from Data.Map import :: Map
import qualified Data.Map as DM
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text.GenJSON, Data.Foldable
from Data.Set import instance Foldable Set
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text, Text.GenJSON
import qualified Data.Set as Set
import iTasks.Engine
import iTasks.Internal.IWorld
......@@ -54,7 +50,7 @@ createSDS ns id read write = SDSSource
//Construct the identity of an sds
sdsIdentity :: !(RWShared p r w) -> SDSIdentity
sdsIdentity s = 'Text'.concat (sdsIdentity` s [])
sdsIdentity s = concat (sdsIdentity` s [])
where
sdsIdentity` :: !(RWShared p r w) [String] -> [String]
sdsIdentity` (SDSSource {SDSSource|name}) acc = ["$", name, "$":acc]
......@@ -81,10 +77,10 @@ mbRegister :: !p !(RWShared p r w) !(Maybe TaskId) !SDSIdentity !*IWorld -> *IWo
mbRegister p sds Nothing reqSDSId iworld = iworld
mbRegister p sds (Just taskId) reqSDSId iworld=:{IWorld|sdsNotifyRequests,world}
# (ts, world) = nsTime world
# req = {SDSNotifyRequest|reqTimespec=ts,reqTaskId=taskId,reqSDSId=reqSDSId,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
# req = {SDSNotifyRequest|reqTaskId=taskId,reqSDSId=reqSDSId,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= { iworld
& world = world
, sdsNotifyRequests = 'DM'.alter (Just o maybe ('Set'.singleton req) ('Set'.insert req))
, sdsNotifyRequests = 'DM'.alter (Just o maybe ('DM'.singleton req ts) ('DM'.put req ts))
(sdsIdentity sds)
sdsNotifyRequests
}
......@@ -349,14 +345,14 @@ checkRegistrations sdsId pred iworld
= (match,nomatch,iworld)
where
//Find all notify requests for the given share id
lookupRegistrations :: String !*IWorld -> (![SDSNotifyRequest], !*IWorld)
lookupRegistrations :: String !*IWorld -> (![(!SDSNotifyRequest, !Timespec)], !*IWorld)
lookupRegistrations sdsId iworld=:{sdsNotifyRequests} =
('Set'.toList $ 'DM'.findWithDefault 'Set'.newSet sdsId sdsNotifyRequests, iworld)
('DM'.toList $ 'DM'.findWithDefault 'DM'.newMap sdsId sdsNotifyRequests, iworld)
//Match the notify requests against the predicate to determine two sets:
//The registrations that matched the predicate, and those that did not match the predicate
matchRegistrations pred [] = ('Set'.newSet,'Set'.newSet)
matchRegistrations pred [{SDSNotifyRequest|reqTimespec,reqTaskId,cmpParam}:regs]
matchRegistrations pred [({SDSNotifyRequest|reqTaskId,cmpParam}, reqTimespec):regs]
# (match,nomatch) = matchRegistrations pred regs
= case cmpParam of
(p :: p^) = if (pred reqTimespec p)
......@@ -383,19 +379,22 @@ clearTaskSDSRegistrations :: !(Set TaskId) !*IWorld -> *IWorld
clearTaskSDSRegistrations taskIds iworld=:{IWorld|sdsNotifyRequests}
= {iworld & sdsNotifyRequests = 'DM'.foldlWithKey clearRegistrationRequests 'DM'.newMap sdsNotifyRequests}
where
clearRegistrationRequests :: (Map SDSIdentity (Set SDSNotifyRequest)) SDSIdentity (Set SDSNotifyRequest) -> Map SDSIdentity (Set SDSNotifyRequest)
clearRegistrationRequests :: (Map SDSIdentity (Map SDSNotifyRequest Timespec))
SDSIdentity
(Map SDSNotifyRequest Timespec)
-> Map SDSIdentity (Map SDSNotifyRequest Timespec)
clearRegistrationRequests notifyRequests sdsIdentity requests
| 'Set'.null filteredRequests = notifyRequests
| otherwise = 'DM'.put sdsIdentity filteredRequests notifyRequests
| 'DM'.null filteredRequests = notifyRequests
| otherwise = 'DM'.put sdsIdentity filteredRequests notifyRequests
where
filteredRequests = 'Set'.filter (\req -> not $ 'Set'.member req.reqTaskId taskIds) requests
filteredRequests = 'DM'.filterWithKey (\req _ -> not $ 'Set'.member req.reqTaskId taskIds) requests
listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList ('DM'.foldrWithKey addRegs 'DM'.newMap sdsNotifyRequests),iworld)
where
addRegs cmpSDSId reqs list = foldr addReg list reqs
addRegs cmpSDSId reqs list = 'DM'.foldlWithKey addReg list reqs
where
addReg {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _)} list
addReg list {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _)} _
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
......@@ -456,4 +455,4 @@ getURLbyId sdsId iworld=:{IWorld|options={serverUrl},random}
// some efficient order to be able to put notify requests in sets
instance < SDSNotifyRequest where
< x y = copy_to_string x < copy_to_string y
< x y = (x.reqTaskId, x.reqSDSId, x.cmpParamText) < (y.reqTaskId, y.reqSDSId, y.cmpParamText)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment