sut_cache.py 1.94 KB
Newer Older
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from abc import abstractmethod, ABCMeta
from typing import List, Type
import itertools

from sut import SUT, TransducerObservation, Observation, MealyObservation
from utils import Tree



class Cache(metaclass=ABCMeta):

    @abstractmethod
    def from_cache(self, seq) -> Observation:
        pass

    @abstractmethod
    def update_cache(self, obs):
        pass

class IOCache(Cache):
    def __init__(self, obs_gen):
        self._tree = Tree(itertools.count(0))
        self._obs_gen = obs_gen

    def from_cache(self, seq):
        node = self._tree
        outs = []
        for inp in seq:
            if inp in node.children:
                out_node = node[[inp]]
                (out, node) = next(iter(out_node.children.items()))
                outs.append(out)
            else:
                return None
        trace = list(zip(seq, outs))
        return self._obs_gen(trace)

    def update_cache(self, obs: TransducerObservation):
        to_add = list(itertools.chain(*map(iter, obs.trace())))
        self._tree[to_add]

class CacheSUT(SUT):
    def __init__(self, sut:SUT, cache:Cache):
        self._sut = sut
        self._cache = cache

    def run(self, seq:List[object]):
        obs = self._cache.from_cache(seq)
        if obs is None:
            obs = self._sut.run(seq)
            self._cache.update_cache(obs)
            obs_t = self._cache.from_cache(seq)
            if obs_t is None or obs_t.trace() != obs.trace():
                print(obs.trace())
                print(self._cache._tree)
                print(obs_t.trace())
                exit(0)
        return obs

    def input_interface(self):
        return self._sut.input_interface()

#c = IOCache(MealyObservation)
#c.update_cache(MealyObservation([("a","oa"), ("a","ob")]))
#print(c._tree)
#c.update_cache(MealyObservation([("a","oa"), ("b","ob")]))
#print(c._tree)
#c.update_cache(MealyObservation([("b","ob"), ("b","ob")]))
#print(c._tree)
#
#print(c.from_cache(["b", "b"]))