Commit fefb4f67 authored by Steffen Michels's avatar Steffen Michels

avoid linear scan through all SDS notification request when clearing notifications

parent a3a930ba
Pipeline #12504 passed with stage
in 3 minutes and 14 seconds
......@@ -144,6 +144,7 @@ createClientIWorld serverURL currentInstance
,nextTaskNo = 6666
}
,sdsNotifyRequests = 'Data.Map'.newMap
,sdsNotifyReqsByTask = 'Data.Map'.newMap
,memoryShares = 'Data.Map'.newMap
,readCache = 'Data.Map'.newMap
,writeCache = 'Data.Map'.newMap
......
......@@ -36,6 +36,7 @@ CLEAN_HOME_VAR :== "CLEAN_HOME"
, random :: [Int] // Infinite random stream
, sdsNotifyRequests :: !Map SDSIdentity (Map SDSNotifyRequest Timespec) // Notification requests from previously read sds's
, sdsNotifyReqsByTask :: !Map TaskId (Set SDSIdentity) // Allows to efficiently find notification by taskID for clearing notifications
, memoryShares :: !Map String Dynamic // Run-time memory shares
, readCache :: !Map (String,String) Dynamic // Cached share reads
, writeCache :: !Map (String,String) (Dynamic,DeferredWrite) // Cached deferred writes
......
......@@ -72,6 +72,7 @@ createIWorld options world
,nextTaskNo = 0
}
,sdsNotifyRequests = 'DM'.newMap
,sdsNotifyReqsByTask = 'DM'.newMap
,memoryShares = 'DM'.newMap
,readCache = 'DM'.newMap
,writeCache = 'DM'.newMap
......
implementation module iTasks.Internal.SDS
from StdFunc import const
import StdString, StdTuple, StdMisc, StdList, StdBool, StdFunc
import StdString, StdTuple, StdMisc, StdBool, StdFunc, StdInt, StdChar
from StdList import flatten, map, take, drop, instance toString [a]
from Text import class Text, instance Text String
import qualified Text
from Data.Map import :: Map
import qualified Data.Map as DM
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text, Text.GenJSON
import Data.Error, Data.Func, Data.Tuple, System.OS, System.Time, Text.GenJSON, Data.Foldable
from Data.Set import instance Foldable Set, instance < (Set a)
import qualified Data.Set as Set
import iTasks.Engine
import iTasks.Internal.IWorld
......@@ -50,7 +54,7 @@ createSDS ns id read write = SDSSource
//Construct the identity of an sds
sdsIdentity :: !(RWShared p r w) -> SDSIdentity
sdsIdentity s = concat (sdsIdentity` s [])
sdsIdentity s = 'Text'.concat (sdsIdentity` s [])
where
sdsIdentity` :: !(RWShared p r w) [String] -> [String]
sdsIdentity` (SDSSource {SDSSource|name}) acc = ["$", name, "$":acc]
......@@ -75,14 +79,16 @@ readRegister taskId sds env = read` () (Just taskId) (sdsIdentity sds) sds env
mbRegister :: !p !(RWShared p r w) !(Maybe TaskId) !SDSIdentity !*IWorld -> *IWorld | iTask p
mbRegister p sds Nothing reqSDSId iworld = iworld
mbRegister p sds (Just taskId) reqSDSId iworld=:{IWorld|sdsNotifyRequests,world}
mbRegister p sds (Just taskId) reqSDSId iworld=:{IWorld|sdsNotifyRequests, sdsNotifyReqsByTask, world}
# (ts, world) = nsTime world
# req = {SDSNotifyRequest|reqTaskId=taskId,reqSDSId=reqSDSId,cmpParam=dynamic p,cmpParamText=toSingleLineText p}
# sdsId = sdsIdentity sds
= { iworld
& world = world
, sdsNotifyRequests = 'DM'.alter (Just o maybe ('DM'.singleton req ts) ('DM'.put req ts))
(sdsIdentity sds)
sdsId
sdsNotifyRequests
, sdsNotifyReqsByTask = 'DM'.alter (Just o maybe ('Set'.singleton sdsId) ('Set'.insert sdsId)) taskId sdsNotifyReqsByTask
}
read` :: !p !(Maybe TaskId) !SDSIdentity !(RWShared p r w) !*IWorld -> (!MaybeError TaskException r, !*IWorld) | iTask p & TC r
......@@ -376,18 +382,25 @@ queueNotifyEvents sdsId notify iworld
= queueRefresh [(t,"Notification for write of " +++ sdsId) \\ t <- 'Set'.toList notify] iworld
clearTaskSDSRegistrations :: !(Set TaskId) !*IWorld -> *IWorld
clearTaskSDSRegistrations taskIds iworld=:{IWorld|sdsNotifyRequests}
= {iworld & sdsNotifyRequests = 'DM'.foldlWithKey clearRegistrationRequests 'DM'.newMap sdsNotifyRequests}
clearTaskSDSRegistrations taskIds iworld=:{IWorld|sdsNotifyRequests, sdsNotifyReqsByTask}
# sdsIdsToClear = foldl
(\sdsIdsToClear taskId -> 'Set'.union ('DM'.findWithDefault 'Set'.newSet taskId sdsNotifyReqsByTask) sdsIdsToClear)
'Set'.newSet
taskIds
= { iworld
& sdsNotifyRequests = foldl clearRegistrationRequests sdsNotifyRequests sdsIdsToClear
, sdsNotifyReqsByTask = foldl (flip 'DM'.del) sdsNotifyReqsByTask taskIds
}
where
clearRegistrationRequests :: (Map SDSIdentity (Map SDSNotifyRequest Timespec))
SDSIdentity
(Map SDSNotifyRequest Timespec)
-> Map SDSIdentity (Map SDSNotifyRequest Timespec)
clearRegistrationRequests notifyRequests sdsIdentity requests
| 'DM'.null filteredRequests = notifyRequests
| otherwise = 'DM'.put sdsIdentity filteredRequests notifyRequests
clearRegistrationRequests requests sdsId
| 'DM'.null filteredReqsForSdsId = 'DM'.del sdsId requests
| otherwise = 'DM'.put sdsId filteredReqsForSdsId requests
where
filteredRequests = 'DM'.filterWithKey (\req _ -> not $ 'Set'.member req.reqTaskId taskIds) requests
reqsForSdsId = fromJust $ 'DM'.get sdsId requests
filteredReqsForSdsId = 'DM'.filterWithKey (\req _ -> not $ 'Set'.member req.reqTaskId taskIds) reqsForSdsId
listAllSDSRegistrations :: *IWorld -> (![(InstanceNo,[(TaskId,SDSIdentity)])],!*IWorld)
listAllSDSRegistrations iworld=:{IWorld|sdsNotifyRequests} = ('DM'.toList ('DM'.foldrWithKey addRegs 'DM'.newMap sdsNotifyRequests),iworld)
......@@ -399,14 +412,16 @@ where
formatSDSRegistrationsList :: [(InstanceNo,[(TaskId,SDSIdentity)])] -> String
formatSDSRegistrationsList list
= join "\n" (flatten [["Task instance " +++ toString i +++ ":"
:["\t"+++toString taskId +++ "->"+++sdsId\\(taskId,sdsId) <- regs]] \\ (i,regs) <- list])
= 'Text'.join "\n" ( flatten [ [ "Task instance " +++ toString i +++ ":"
:["\t"+++toString taskId +++ "->"+++sdsId\\(taskId,sdsId) <- regs]] \\ (i,regs) <- list
]
)
flushDeferredSDSWrites :: !*IWorld -> (!MaybeError TaskException (), !*IWorld)
flushDeferredSDSWrites iworld=:{writeCache}
# (errors,iworld) = flushAll ('DM'.toList writeCache) iworld
| errors =: [] = (Ok (), {iworld & writeCache = 'DM'.newMap})
# msg = join OS_NEWLINE ["Could not flush all deferred SDS writes, some data may be lost":map snd errors]
# msg = 'Text'.join OS_NEWLINE ["Could not flush all deferred SDS writes, some data may be lost":map snd errors]
= (Error (exception msg),{iworld & writeCache = 'DM'.newMap})
where
flushAll [] iworld = ([],iworld)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment