__init__.py 8.63 KB
Newer Older
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
1
from abc import ABCMeta, abstractmethod
2
from typing import List, Tuple, Dict
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
3
4
5

import collections

6
from model.fa import Symbol
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
7
from model.ra import Action
8
from enum import Enum
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
9

10
11
12
13
14
15
16
17
18
19
class Observation():
    @abstractmethod
    def trace(self):
        """returns the trace to be added to the solver"""
        pass

    @abstractmethod
    def inputs(self):
        """returns all the inputs from an observation"""
        pass
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
20

21

22

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class DFAObservation():
    def __init__(self, seq, acc):
        self.seq = seq
        self.acc = acc

    def trace(self):
        return (self.seq, self.acc)

    def inputs(self) -> List[Symbol]:
        return self.seq

class MealyObservation():
    def __init__(self, trace):
        self.tr = trace

    def trace(self):
        return self.tr

    def inputs(self) -> List[Symbol]:
        return [a for (a,_) in self.tr]

44
45
46
47
48
49
50
51
52
class RegisterMachineObservation(Observation):

    @abstractmethod
    def values(self):
        """returns all the values in the trace"""
        pass



53
54
55
56
57
58
class RAObservation(RegisterMachineObservation):
    def __init__(self, seq, acc):
        self.seq = seq
        self.acc = acc

    def trace(self):
59
        return (self.seq, self.acc)
60
61
62
63
64
65

    def inputs(self):
        return self.seq

    def values(self):
        return set([a.value for a in self.seq if a.value is not None])
66
67
68

class IORAObservation(RegisterMachineObservation):
    def __init__(self, trace):
69
        self.tr = trace
70

71
    def trace(self) -> List[Tuple[Action, Action]]:
72
        return self.tr
73
74

    def inputs(self):
75
        return [a for (a,_) in self.tr]
76
77

    def values(self):
78
79
        iv = [a.value for (a,_) in self.tr if a.value is not None]
        ov = [a.value for (_,a) in self.tr if a.value is not None]
80
81
82
        return set(iv+ov)

    def __str__(self):
83
        return "Obs: " + str(self.tr)
84

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
class SUT(metaclass=ABCMeta):
    OK = "OK"
    NOK = "NOK"
    @abstractmethod
    def run(self, seq:List[object]) -> Observation:
        """Runs a sequence of inputs on the SUT and returns an observation"""
        pass

    @abstractmethod
    def input_interface(self) -> List[object]:
        """Runs the list of inputs or input signatures comprising the input interface"""
        pass


class SUTType(Enum):
    IORA = 1
    RA = 2
    Mealy = 3
    Moore = 4
    DFA = 5

    def is_acceptor(self):
        return  self == SUTType.RA or self.DFA

109
110
111
    def has_registers(self):
        return self == SUTType.RA or self == SUTType.IORA

112
113
114
115
116
    def is_transducer(self):
        return  not self.is_acceptor()


class SUTClass(metaclass=ABCMeta):
117
    """for a class of systems (say stacks, or logins) provides means of instantiating SUTs of different types"""
118

119
120
121
122
    @abstractmethod
    def new_sut(self, sut_type : SUTType) -> SUT:
        """ builds a new SUT of the specified type. Returns None is no such SUT can be generated"""
        pass
123

124
125
126
127
128
129
130
131
132
class ScalableSUTClass(SUTClass, metaclass=ABCMeta):
    """provides instantiation for scalable SUTs. Scalable SUTs are classes whose constructor take a size >0 integer
    as an argument. This class also adds wrappers corresponding to the type (all SUTs are assumed to be ObjectSULs)."""

    def __init__(self, sut_type_dict:Dict[SUTType,type]):
        self.sut_type_dict = sut_type_dict

    def new_sut(self, sut_type : SUTType, size :int):
        if sut_type in self.sut_type_dict:
133
            sut_obj = ObjectSUT(self.sut_type_dict[sut_type].INTERFACE, lambda : self.sut_type_dict[sut_type](size))
134
135
136
137
138
139
140
141
            sut = sut_obj if sut_type is SUTType.IORA else \
                RAWrapper(sut_obj) if sut_type is SUTType.RA else \
                MealyWrapper(sut_obj) if sut_type is SUTType.Mealy else \
                DFAWrapper(MealyWrapper(sut_obj)) if sut_type is SUTType.DFA else \
                    None # no support for Moore Machines (yet)
            return sut
        else:
            return None
142
143
144
145
146
147
148
149
150
151
152

ActionSignature = collections.namedtuple("ActionSignature", ('label', 'num_params'))
class RASUT(metaclass=ABCMeta):
    @abstractmethod
    def input_interface(self) -> List[ActionSignature]:
        pass

    @abstractmethod
    def run(self, seq:List[Action]):
        """Runs a sequence of inputs on the SUT and returns an observation"""
        pass
153

154
class ObjectSUT(RASUT):
155
156
    """Wraps around an object and calls methods on it corresponding to the Actions.
        IORA is the natural formalism for describing practical SUTs. Depending on the SUT characteristics,
157
        we can also describe them using less expressing formalisms."""
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
158
159
160
161
    def __init__(self, act_sigs, obj_gen):
        self.obj_gen = obj_gen
        self.acts = {act_sig.label:act_sig for act_sig in act_sigs}

162
    def run(self, seq:List[Action]) -> IORAObservation:
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
163
        obj = self.obj_gen()
164
        values = dict()
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
165
166
167
168
169
170
        out_seq = []
        for (label, val) in seq:
            meth = obj.__getattribute__(label)
            if self.acts[label].num_params == 0:
                outp = meth()
            else:
171
                (values, val) = self._res_ival(values, val)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
172
                outp = meth(val)
173
174
175
176
            (out_label, out_val) = self.parse_out(outp)
            if out_val is not None:
                (values, out_val) = self._res_oval(values, out_val)
            outp_action = Action(out_label, out_val)
177
178
            out_seq.append(outp_action)
        obs = list(zip(seq, out_seq))
179
180
181
182
183
184
185
186
187
188
        return IORAObservation(obs)

    def _res_ival(self, val_dict, val):
        l = [a for a in val_dict.keys() if val_dict[a] == val]
        if len(l) > 1:
            raise Exception("Collision")
        if len(l) == 1:
            return (val_dict, l[0])
        val_dict[val] = val
        return (val_dict, val)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
189

190
191
192
193
194
195
    def _res_oval(self, val_dict, val):
        if val in val_dict:
            return (val_dict, val_dict[val])
        else:
            val_dict[val] = max(val_dict.values()) + 1 if len(val_dict) > 0 else 0
            return (val_dict, val_dict[val])
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
196
197
198
199

    def parse_out(self, outp) -> Action:
        fresh = None
        if isinstance(outp, bool):
200
            return Action(SUT.OK if outp else SUT.NOK, fresh)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
201
202
203
        if isinstance(outp, str):
            return Action(outp, fresh)
        if isinstance(outp, int):
204
            return  Action("int", outp)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
205
206
207
        if isinstance(outp, tuple) and len(outp) == 2:
            (lbl, val) = outp
            if isinstance(val, int) and isinstance(lbl, str):
208
                return Action(lbl, val)
Paul Fiterau Brostean's avatar
Paul Fiterau Brostean committed
209
210
211
        raise Exception("Cannot process output")

    def input_interface(self) -> List[ActionSignature]:
212
213
        return list(self.acts.values())

214
215
216
217
218
219
220
221
222
223
224

class RAWrapper(RASUT):
    """Wraps around a Object SUT and creates an RA view of it. Generates accepting observations only for IORA
    observations ending in SUT.OK outputs or for the empty sequence of inputs. """
    def __init__(self, sut:ObjectSUT):
        self.sut = sut

    def run(self, seq: List[Action]):
        if len(seq) == 0:
            return RAObservation(seq, True)
        else:
225
            iora_obs = self.sut.run(seq)
226
227
228
229
            responses = set([out.label for (_, out) in iora_obs.trace()])
            acc = responses == set([SUT.OK])
            #(_, out) = iora_obs.trace()[-1]
            #acc = out.label is SUT.OK
230
            return RAObservation(seq, acc)
231
232
    def input_interface(self):
        return self.sut.input_interface()
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262

class MealyWrapper(SUT):
    """Wraps around an Object SUT and creates an Mealy view of it. Values in output actions are ignored
    and only the labels are returned. """

    def __init__(self, sut: ObjectSUT):
        self.sut = sut

    def run(self, seq: List[Symbol]) -> MealyObservation:
        iora_seq = [Action(inp, None) for inp in seq]
        iora_obs = self.sut.run(iora_seq)
        mealy_trace = [(inp_act.label, out_act.label) for (inp_act, out_act) in iora_obs.trace()]
        return MealyObservation(mealy_trace)

    def input_interface(self) -> List[Symbol]:
        labels = [action.label for action in self.sut.input_interface()]
        return labels

class DFAWrapper(SUT):
    """Wraps around a Mealy SUT and creates DFA view of it. Traces ending with the SUT.OK output generate accepting
    observations, as does the empty trace. All others generate rejecting observations"""

    def __init__(self, sut: MealyWrapper):
        self.sut = sut

    def run(self, seq: List[Symbol]) -> DFAObservation:
        if len(seq) == 0:
            return DFAObservation(seq, True)
        else:
            mealy_obs = self.sut.run(seq)
263
264
265
266
267
268
269
270
            responses = set([out for (_, out) in mealy_obs.trace()])
            acc =  responses == set([SUT.OK])
            #_,out) = mealy_obs.trace()[-1]
            #acc = out is SUT.OK
            return DFAObservation(seq, acc)

    def input_interface(self) -> List[Symbol]:
        return self.sut.input_interface()