__init__.py 8.55 KB
Newer Older
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
1
from abc import ABCMeta, abstractmethod
2
from typing import List, Tuple, Dict
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
3
4
5

import collections

6
from model.fa import Symbol
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
7
from model.ra import Action
8
from enum import Enum
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
9

10
11
12
13
14
15
16
17
18
19
class Observation():
    @abstractmethod
    def trace(self):
        """returns the trace to be added to the solver"""
        pass

    @abstractmethod
    def inputs(self):
        """returns all the inputs from an observation"""
        pass
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
20

21

22

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class DFAObservation():
    def __init__(self, seq, acc):
        self.seq = seq
        self.acc = acc

    def trace(self):
        return (self.seq, self.acc)

    def inputs(self) -> List[Symbol]:
        return self.seq

class MealyObservation():
    def __init__(self, trace):
        self.tr = trace

    def trace(self):
        return self.tr

    def inputs(self) -> List[Symbol]:
        return [a for (a,_) in self.tr]

44
45
46
47
48
49
50
51
52
class RegisterMachineObservation(Observation):

    @abstractmethod
    def values(self):
        """returns all the values in the trace"""
        pass



53
54
55
56
57
58
class RAObservation(RegisterMachineObservation):
    def __init__(self, seq, acc):
        self.seq = seq
        self.acc = acc

    def trace(self):
59
        return (self.seq, self.acc)
60
61
62
63
64
65

    def inputs(self):
        return self.seq

    def values(self):
        return set([a.value for a in self.seq if a.value is not None])
66
67
68

class IORAObservation(RegisterMachineObservation):
    def __init__(self, trace):
69
        self.tr = trace
70

71
    def trace(self) -> List[Tuple[Action, Action]]:
72
        return self.tr
73
74

    def inputs(self):
75
        return [a for (a,_) in self.tr]
76
77

    def values(self):
78
79
        iv = [a.value for (a,_) in self.tr if a.value is not None]
        ov = [a.value for (_,a) in self.tr if a.value is not None]
80
81
82
        return set(iv+ov)

    def __str__(self):
83
        return "Obs: " + str(self.tr)
84

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class SUT(metaclass=ABCMeta):
    OK = "OK"
    NOK = "NOK"
    @abstractmethod
    def run(self, seq:List[object]) -> Observation:
        """Runs a sequence of inputs on the SUT and returns an observation"""
        pass

    @abstractmethod
    def input_interface(self) -> List[object]:
        """Runs the list of inputs or input signatures comprising the input interface"""
        pass


class SUTType(Enum):
    IORA = 1
    RA = 2
    Mealy = 3
    Moore = 4
    DFA = 5

    def is_acceptor(self):
        return  self == SUTType.RA or self.DFA

    def is_transducer(self):
        return  not self.is_acceptor()


class SUTClass(metaclass=ABCMeta):
114
    """for a class of systems (say stacks, or logins) provides means of instantiating SUTs of different types"""
115

116
117
118
119
    @abstractmethod
    def new_sut(self, sut_type : SUTType) -> SUT:
        """ builds a new SUT of the specified type. Returns None is no such SUT can be generated"""
        pass
120

121
122
123
124
125
126
127
128
129
class ScalableSUTClass(SUTClass, metaclass=ABCMeta):
    """provides instantiation for scalable SUTs. Scalable SUTs are classes whose constructor take a size >0 integer
    as an argument. This class also adds wrappers corresponding to the type (all SUTs are assumed to be ObjectSULs)."""

    def __init__(self, sut_type_dict:Dict[SUTType,type]):
        self.sut_type_dict = sut_type_dict

    def new_sut(self, sut_type : SUTType, size :int):
        if sut_type in self.sut_type_dict:
130
            sut_obj = ObjectSUT(self.sut_type_dict[sut_type].INTERFACE, lambda : self.sut_type_dict[sut_type](size))
131
132
133
134
135
136
137
138
            sut = sut_obj if sut_type is SUTType.IORA else \
                RAWrapper(sut_obj) if sut_type is SUTType.RA else \
                MealyWrapper(sut_obj) if sut_type is SUTType.Mealy else \
                DFAWrapper(MealyWrapper(sut_obj)) if sut_type is SUTType.DFA else \
                    None # no support for Moore Machines (yet)
            return sut
        else:
            return None
139
140
141
142
143
144
145
146
147
148
149

ActionSignature = collections.namedtuple("ActionSignature", ('label', 'num_params'))
class RASUT(metaclass=ABCMeta):
    @abstractmethod
    def input_interface(self) -> List[ActionSignature]:
        pass

    @abstractmethod
    def run(self, seq:List[Action]):
        """Runs a sequence of inputs on the SUT and returns an observation"""
        pass
150

151
class ObjectSUT(RASUT):
152
153
    """Wraps around an object and calls methods on it corresponding to the Actions.
        IORA is the natural formalism for describing practical SUTs. Depending on the SUT characteristics,
154
        we can also describe them using less expressing formalisms."""
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
155
156
157
158
    def __init__(self, act_sigs, obj_gen):
        self.obj_gen = obj_gen
        self.acts = {act_sig.label:act_sig for act_sig in act_sigs}

159
    def run(self, seq:List[Action]) -> IORAObservation:
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
160
        obj = self.obj_gen()
161
        values = dict()
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
162
163
164
165
166
167
        out_seq = []
        for (label, val) in seq:
            meth = obj.__getattribute__(label)
            if self.acts[label].num_params == 0:
                outp = meth()
            else:
168
                (values, val) = self._res_ival(values, val)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
169
                outp = meth(val)
170
171
172
173
            (out_label, out_val) = self.parse_out(outp)
            if out_val is not None:
                (values, out_val) = self._res_oval(values, out_val)
            outp_action = Action(out_label, out_val)
174
175
            out_seq.append(outp_action)
        obs = list(zip(seq, out_seq))
176
177
178
179
180
181
182
183
184
185
        return IORAObservation(obs)

    def _res_ival(self, val_dict, val):
        l = [a for a in val_dict.keys() if val_dict[a] == val]
        if len(l) > 1:
            raise Exception("Collision")
        if len(l) == 1:
            return (val_dict, l[0])
        val_dict[val] = val
        return (val_dict, val)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
186

187
188
189
190
191
192
    def _res_oval(self, val_dict, val):
        if val in val_dict:
            return (val_dict, val_dict[val])
        else:
            val_dict[val] = max(val_dict.values()) + 1 if len(val_dict) > 0 else 0
            return (val_dict, val_dict[val])
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
193
194
195
196

    def parse_out(self, outp) -> Action:
        fresh = None
        if isinstance(outp, bool):
197
            return Action(SUT.OK if outp else SUT.NOK, fresh)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
198
199
200
        if isinstance(outp, str):
            return Action(outp, fresh)
        if isinstance(outp, int):
201
            return  Action("int", outp)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
202
203
204
        if isinstance(outp, tuple) and len(outp) == 2:
            (lbl, val) = outp
            if isinstance(val, int) and isinstance(lbl, str):
205
                return Action(lbl, val)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
206
207
208
        raise Exception("Cannot process output")

    def input_interface(self) -> List[ActionSignature]:
209
210
        return list(self.acts.values())

211
212
213
214
215
216
217
218
219
220
221

class RAWrapper(RASUT):
    """Wraps around a Object SUT and creates an RA view of it. Generates accepting observations only for IORA
    observations ending in SUT.OK outputs or for the empty sequence of inputs. """
    def __init__(self, sut:ObjectSUT):
        self.sut = sut

    def run(self, seq: List[Action]):
        if len(seq) == 0:
            return RAObservation(seq, True)
        else:
222
            iora_obs = self.sut.run(seq)
223
224
225
226
            responses = set([out.label for (_, out) in iora_obs.trace()])
            acc = responses == set([SUT.OK])
            #(_, out) = iora_obs.trace()[-1]
            #acc = out.label is SUT.OK
227
            return RAObservation(seq, acc)
228
229
    def input_interface(self):
        return self.sut.input_interface()
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259

class MealyWrapper(SUT):
    """Wraps around an Object SUT and creates an Mealy view of it. Values in output actions are ignored
    and only the labels are returned. """

    def __init__(self, sut: ObjectSUT):
        self.sut = sut

    def run(self, seq: List[Symbol]) -> MealyObservation:
        iora_seq = [Action(inp, None) for inp in seq]
        iora_obs = self.sut.run(iora_seq)
        mealy_trace = [(inp_act.label, out_act.label) for (inp_act, out_act) in iora_obs.trace()]
        return MealyObservation(mealy_trace)

    def input_interface(self) -> List[Symbol]:
        labels = [action.label for action in self.sut.input_interface()]
        return labels

class DFAWrapper(SUT):
    """Wraps around a Mealy SUT and creates DFA view of it. Traces ending with the SUT.OK output generate accepting
    observations, as does the empty trace. All others generate rejecting observations"""

    def __init__(self, sut: MealyWrapper):
        self.sut = sut

    def run(self, seq: List[Symbol]) -> DFAObservation:
        if len(seq) == 0:
            return DFAObservation(seq, True)
        else:
            mealy_obs = self.sut.run(seq)
260
261
262
263
264
265
266
267
            responses = set([out for (_, out) in mealy_obs.trace()])
            acc =  responses == set([SUT.OK])
            #_,out) = mealy_obs.trace()[-1]
            #acc = out is SUT.OK
            return DFAObservation(seq, acc)

    def input_interface(self) -> List[Symbol]:
        return self.sut.input_interface()