Commit 8fc83891 authored by Steffen Michels's avatar Steffen Michels

store SDS notify request such that duplicate requests are prevented

parent 007ba5dc
Pipeline #12335 passed with stage
in 2 minutes and 42 seconds
......@@ -143,7 +143,7 @@ createClientIWorld serverURL currentInstance
,attachmentChain = []
,nextTaskNo = 6666
}
,sdsNotifyRequests = []
,sdsNotifyRequests = 'Data.Map'.newMap
,memoryShares = 'Data.Map'.newMap
,readCache = 'Data.Map'.newMap
,writeCache = 'Data.Map'.newMap
......
implementation module iTasks.Internal.EngineTasks
import StdBool, StdOverloaded, StdList, StdOrdList
import qualified Data.Map as DM
import qualified Data.Set as DS
import Data.Functor, Data.Func
import iTasks.Engine
import iTasks.Internal.IWorld
import iTasks.WF.Definition
......@@ -24,8 +27,7 @@ timeout mt iworld = case read taskEvents iworld of
//No events
(Ok (Queue [] []),iworld=:{sdsNotifyRequests,world})
# (ts, world) = nsTime world
# to = minListBy lesser [mt:map (getTimoutFromClock ts) sdsNotifyRequests]
= ( minListBy lesser [mt:map (getTimoutFromClock ts) sdsNotifyRequests]
= ( minListBy lesser [mt:flatten $ map (getTimeoutFromClock ts) $ 'DM'.elems sdsNotifyRequests]
, {iworld & world = world})
(Ok _,iworld) = (Just 0,iworld) //There are still events, don't wait
(Error _,iworld) = (Just 500,iworld) //Keep retrying, but not too fast
......@@ -34,13 +36,16 @@ where
lesser (Just _) Nothing = True
lesser Nothing Nothing = False
getTimoutFromClock :: Timespec SDSNotifyRequest -> Maybe Int
getTimoutFromClock now snr=:{cmpParam=(ts :: ClockParameter Timespec)}
| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
# fire = iworldTimespecNextFire now snr.reqTimespec ts
= Just (max 0 (toMs fire - toMs now))
= mt
getTimoutFromClock _ _ = mt
getTimeoutFromClock :: Timespec (Set SDSNotifyRequest) -> [Maybe Int]
getTimeoutFromClock now requests = getTimeoutFromClock` <$> 'DS'.toList requests
where
getTimeoutFromClock` :: SDSNotifyRequest -> Maybe Int
getTimeoutFromClock` snr=:{cmpParam=(ts :: ClockParameter Timespec)}
| startsWith "$IWorld:timespec$" snr.reqSDSId && ts.interval <> zero
# fire = iworldTimespecNextFire now snr.reqTimespec ts
= Just (max 0 (toMs fire - toMs now))
= mt
getTimeoutFromClock` _ = mt
toMs x = x.tv_sec * 1000 + x.tv_nsec / 1000000
......
......@@ -18,7 +18,7 @@ from iTasks.Internal.TaskEval import :: TaskTime
from iTasks.WF.Definition import :: TaskValue, :: Event, :: TaskId, :: InstanceNo, :: TaskNo
from iTasks.WF.Combinators.Core import :: ParallelTaskType, :: TaskListItem
from iTasks.SDS.Definition import :: SDS, :: RWShared, :: ReadWriteShared, :: Shared, :: ReadOnlyShared
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared, :: DeferredWrite
from iTasks.Internal.SDS import :: SDSNotifyRequest, :: JSONShared, :: DeferredWrite, :: SDSIdentity
from iTasks.Extensions.DateTime import :: Time, :: Date, :: DateTime
from Sapl.Linker.LazyLinker import :: LoaderState
......@@ -35,7 +35,7 @@ CLEAN_HOME_VAR :== "CLEAN_HOME"
, random :: [Int] // Infinite random stream
, sdsNotifyRequests :: ![SDSNotifyRequest] // Notification requests from previously read sds's
, sdsNotifyRequests :: !Map SDSIdentity (Set SDSNotifyRequest) // Notification requests from previously read sds's
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
......
......@@ -71,7 +71,7 @@ createIWorld options world
,attachmentChain = []
,nextTaskNo = 0
}
,sdsNotifyRequests = []
,sdsNotifyRequests = 'DM'.newMap
,memoryShares = 'DM'.newMap
,readCache = 'DM'.newMap
,writeCache = 'DM'.newMap
......
......@@ -13,16 +13,17 @@ import iTasks.SDS.Definition
//Notification requests are stored in the IWorld
:: SDSNotifyRequest =
{ reqTaskId :: TaskId //Id of the task that read the SDS. This Id also connects a chain of notify requests that were registered together
, reqSDSId :: SDSIdentity //Id of the actual SDS used to create this request (may be a derived one)
, reqTimespec :: Timespec
{ reqTaskId :: !TaskId //Id of the task that read the SDS. This Id also connects a chain of notify requests that were registered together
, reqSDSId :: !SDSIdentity //Id of the actual SDS used to create this request (may be a derived one)
, reqTimespec :: !Timespec
, cmpSDSId :: SDSIdentity //Id of the SDS we are saving for comparison
, cmpParam :: Dynamic //Parameter we are saving for comparison
, cmpParamText :: String //String version of comparison parameter for tracing
, cmpParam :: !Dynamic //Parameter we are saving for comparison
, cmpParamText :: !String //String version of comparison parameter for tracing
}
:: SDSIdentity :== String
instance < SDSNotifyRequest
:: DeferredWrite = E. p r w: DeferredWrite !p !w !(SDS p r w) & iTask p & TC r & TC w
//Internal creation functions:
......
implementation module iTasks.Internal.SDS
from StdFunc import const
import StdString, StdTuple, StdMisc, StdList, StdBool
import StdString, StdTuple, StdMisc, StdBool, StdFunc, StdInt, StdChar, dynamic_string
from StdList import flatten, map, take, drop, instance toString [a]
from Text import class Text(join), instance Text String
import qualified Text
from Data.Map import :: Map
import qualified Data.Map as DM
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text, Text.GenJSON
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text.GenJSON, Data.Foldable
from Data.Set import instance Foldable Set
import qualified Data.Set as Set
import iTasks.Engine
import iTasks.Internal.IWorld
......@@ -50,7 +54,7 @@ createSDS ns id read write = SDSSource
//Construct the identity of an sds
sdsIdentity :: !(RWShared p r w) -> SDSIdentity
sdsIdentity s = concat (sdsIdentity` s [])
sdsIdentity s = 'Text'.concat (sdsIdentity` s [])
where
sdsIdentity` :: !(RWShared p r w) [String] -> [String]
sdsIdentity` (SDSSource {SDSSource|name}) acc = ["$", name, "$":acc]
......@@ -77,8 +81,13 @@ mbRegister :: !p !(RWShared p r w) !(Maybe TaskId) !SDSIdentity !*IWorld -> *IWo
mbRegister p sds Nothing reqSDSId iworld = iworld
mbRegister p sds (Just taskId) reqSDSId iworld=:{IWorld|sdsNotifyRequests,world}
# (ts, world) = nsTime world
# req = {SDSNotifyRequest|reqTimespec=ts,reqTaskId=taskId,reqSDSId=reqSDSId,cmpSDSId=sdsIdentity sds,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= {iworld & world=world, sdsNotifyRequests = [req:sdsNotifyRequests]}
# req = {SDSNotifyRequest|reqTimespec=ts,reqTaskId=taskId,reqSDSId=reqSDSId,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
= { iworld
& world = world
, sdsNotifyRequests = 'DM'.alter (Just o maybe ('Set'.singleton req) ('Set'.insert req))
(sdsIdentity sds)
sdsNotifyRequests
}
read` :: !p !(Maybe TaskId) !SDSIdentity !(RWShared p r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | iTask p & TC r
read` p mbNotify reqSDSId sds=:(SDSSource {SDSSource|read}) env
......@@ -340,8 +349,9 @@ checkRegistrations sdsId pred iworld
= (match,nomatch,iworld)
where
//Find all notify requests for the given share id
lookupRegistrations sdsId iworld=:{sdsNotifyRequests}
= ([reg \\ reg=:{SDSNotifyRequest|cmpSDSId} <- sdsNotifyRequests | cmpSDSId == sdsId],iworld)
lookupRegistrations :: String !*IWorld -> (![SDSNotifyRequest], !*IWorld)
lookupRegistrations sdsId iworld=:{sdsNotifyRequests} =
('Set'.toList $ 'DM'.findWithDefault 'Set'.newSet sdsId sdsNotifyRequests, iworld)
//Match the notify requests against the predicate to determine two sets:
//The registrations that matched the predicate, and those that did not match the predicate
......@@ -371,13 +381,22 @@ queueNotifyEvents sdsId notify iworld
clearTaskSDSRegistrations :: !(Set TaskId) !*IWorld -> *IWorld
clearTaskSDSRegistrations taskIds iworld=:{IWorld|sdsNotifyRequests}
= {iworld & sdsNotifyRequests = [r \\ r=:{SDSNotifyRequest|reqTaskId} <- sdsNotifyRequests | not ('Set'.member reqTaskId taskIds)]}
= {iworld & sdsNotifyRequests = 'DM'.foldlWithKey clearRegistrationRequests 'DM'.newMap sdsNotifyRequests}
where
clearRegistrationRequests :: (Map SDSIdentity (Set SDSNotifyRequest)) SDSIdentity (Set SDSNotifyRequest) -> Map SDSIdentity (Set SDSNotifyRequest)
clearRegistrationRequests notifyRequests sdsIdentity requests
| 'Set'.null filteredRequests = notifyRequests
| otherwise = 'DM'.put sdsIdentity filteredRequests notifyRequests
where
filteredRequests = 'Set'.filter (\req -> not $ 'Set'.member req.reqTaskId taskIds) requests
listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList (foldr addReg 'DM'.newMap sdsNotifyRequests),iworld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList ('DM'.foldrWithKey addRegs 'DM'.newMap sdsNotifyRequests),iworld)
where
addReg {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _),cmpSDSId} list
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
addRegs cmpSDSId reqs list = foldr addReg list reqs
where
addReg {SDSNotifyRequest|reqTaskId=reqTaskId=:(TaskId taskInstance _)} list
= 'DM'.put taskInstance [(reqTaskId,cmpSDSId):fromMaybe [] ('DM'.get taskInstance list)] list
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatSDSRegistrationsList list
......@@ -433,5 +452,8 @@ newURL iworld=:{IWorld|options={serverUrl},random}
// TODO: different URL for clients
getURLbyId :: !String !*IWorld -> (!String, !*IWorld)
getURLbyId sdsId iworld=:{IWorld|options={serverUrl},random}
= ("sds:" +++ serverUrl +++ "/" +++ sdsId, iworld)
= ("sds:" +++ serverUrl +++ "/" +++ sdsId, iworld)
// some efficient order to be able to put notify requests in sets
instance < SDSNotifyRequest where
< x y = copy_to_string x < copy_to_string y
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment