ra.py 11.4 KB
Newer Older
1
2
from z3gi.model import Transition, Acceptor, NoTransitionFired, MultipleTransitionsFired, Transducer, \
    MutableAcceptorMixin, MutableAutomatonMixin
3
from abc import ABCMeta, abstractmethod
4
import itertools
5
import collections
6
from typing import List
7
8

Action = collections.namedtuple('Action', ('label', 'value'))
9

10
11
class RATransition(Transition):
    def __init__(self, start_state, start_label, guard, assignment, end_state):
12
        super().__init__(start_state, start_label, end_state)
13
14
15
        self.guard = guard
        self.assignment = assignment

16
17
18
    def is_enabled(self, valuation, action):
        if action.label == self.start_label:
            satisfied = self.guard.is_satisfied(valuation, action.value)
19
20
21
            return satisfied
        return False

22
    def update(self, valuation, action):
23
24
        return self.assignment.update(valuation, action.value)

25
26
27
28
29
30
31
32
    def __str__(self, shortened = False):
        if not shortened:
            return "{0} {1}({2}) -> {3} {4}".format(self.start_state, self.start_label, self.guard,
                                               self.assignment, self.end_state)
        else:
            return "{0}({1}) -> {2} {3}".format(self.start_label, self.guard,
                                                    self.assignment, self.end_state)

33

34
35
36
class SymbolicValue(metaclass=ABCMeta):
    """Symbolic values can be used to symbolically express registers, constants and parameters."""
    def __init__(self, index):
37
        super().__init__()
38
39
        self.index = index

40
41
42
43
44
45
46
47
48
49
class Fresh(SymbolicValue):
    def __init__(self, index):
        super().__init__(index)

    def __str__(self):
        return "f" + str(self.index)

    def __repr__(self):
        return self.__str__()

50
51
class Register(SymbolicValue):
    def __init__(self, index):
52
        super().__init__(index)
53
54
55
56

    def __str__(self):
        return "r" + str(self.index)

57
58
59
    def __repr__(self):
        return self.__str__()

60

61
62
63
64
65
66
67
68
69
70
71
72

class IORATransition(RATransition):
    def __init__(self, start_state, start_label, guard, assignment, output_label, output_mapping, output_assignment, end_state):
        super().__init__(start_state, start_label, guard, assignment, end_state)
        self.guard = guard
        self.assignment = assignment
        self.output_label = output_label
        self.output_mapping = output_mapping
        self.output_assignment =output_assignment

    def output(self, valuation, values):
        if type(self.output_mapping) == Fresh:
73
            return Action(self.output_label, max(values) + 1 if len(values) > 0 else 0)
74
75
76
77
78
79
80
        else:
            return Action(self.output_label, valuation[self.output_mapping])

    def output_update(self, valuation, action):
        return self.output_assignment.update(valuation, action.value)

    def __str__(self, shortened=False):
81
        trans_str = "{0}({1}) {2} / {3}({4}) {5}  {6}"\
82
            .format(self.start_label, self.guard, self.assignment, self.output_label,\
83
                    self.output_mapping, self.output_assignment, self.end_state)
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

        if not shortened:
            trans_str = self.start_label + "  " + trans_str
        return trans_str

# some methods shared between the different register automatas
class RegisterMachine():
    def _fired_transition(self, transitions, reg_val, action):
        """Retrieves the transition fired based on the action and valuation"""
        fired_transitions = [trans for trans in transitions if trans.is_enabled(reg_val, action)]

        # the number of fired transitions can be more than one since we could have multiple equalities
        # todo (merge into or guard?)
        if len(fired_transitions) == 0:
            raise NoTransitionFired

        if len(fired_transitions) > 1:
            raise MultipleTransitionsFired
        return fired_transitions[0]

104

105
class RegisterAutomaton(Acceptor, RegisterMachine):
106
    def __init__(self, locations, loc_to_acc, loc_to_trans, registers):
107
      super().__init__(locations, loc_to_trans, loc_to_acc)
108
109
      self._registers = registers

110
    def registers(self) -> List[Register]:
111
        return self._registers
112

113
114
    def transitions(self, state: str, label:str = None) -> List[RATransition]:
        return super().transitions(state, label)
115

116
    def state(self, trace: List[Action]) -> str:
117
118
        init = -1
        reg_val = dict()
119
        for reg in self.registers():
120
121
122
            reg_val[reg] = init

        crt_state = self.start_state()
123
        tr_str = "({0}:{1})".format(crt_state, reg_val)
124
125
        for action in trace:
            transitions = self.transitions(crt_state, action.label)
126
            fired_transition = super()._fired_transition(transitions, reg_val, action)
127
128
            reg_val = fired_transition.update(reg_val, action)
            crt_state = fired_transition.end_state
129
            tr_str += " {0} ({1}:{2})".format(action, crt_state, reg_val)
130

131
        # print(tr_str)
132
133
        return crt_state

134
135

class IORegisterAutomaton(Transducer, RegisterMachine):
136
137
    def __init__(self, locations, loc_to_trans, registers):
      super().__init__(locations, loc_to_trans)
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
      self._registers = registers

    def registers(self) -> List[Register]:
        return self._registers

    def transitions(self, state: str, label:str = None) -> List[IORATransition]:
        return super().transitions(state, label)

    def state(self, trace: List[Action]) -> str:
        (reached_state, valuation, values) = self._machine_state(trace)
        return reached_state

    def _machine_state(self, trace: List[Action]) -> (str, dict, set):
        init = -1
        # maintains the set of values encountered
        values = set()
        reg_val = dict()
        for reg in self.registers():
            reg_val[reg] = init

        crt_state = self.start_state()
        for action in trace:
            values.add(action.value)
            transitions = self.transitions(crt_state, action.label)
            fired_transition = super()._fired_transition(transitions, reg_val, action)
            reg_val = fired_transition.update(reg_val, action)

            # the transition should give us the output action
            output_action = fired_transition.output(reg_val, values)
            # based on this output, the transition should update the set of registers
            reg_val = fired_transition.output_update(reg_val, output_action)
169
            values.add(output_action.value)
170
171
172
173
174
175
176
177
178
179
180
181

            crt_state = fired_transition.end_state
        return (crt_state, reg_val, values)

    def output(self, trace: List[Action]) -> Action:
        if len(trace) == 0:
            return None
        else:
            (reached_state, valuation, values) = self._machine_state(trace[:-1])
            action = trace[-1]
            transitions = super().transitions(reached_state, action.label)
            fired_transition = super()._fired_transition(transitions, valuation, action)
182
            values.add(action.value)
183
184
185
            output = fired_transition.output(valuation, values)
            return output

186
187
188
189
190
191
192
193
194
    def compress(self):
        mutable_ra = MutableIORegisterAutomaton()
        [mutable_ra.add_state(state) for state in self.states()]
        for state in self.states():
            transitions = self.transitions()




195
class MutableRegisterAutomaton(RegisterAutomaton, MutableAcceptorMixin):
196
    def __init__(self):
197
        super().__init__([], dict(), dict(), [])
198

199
200
    def add_transition(self, state:str, transition:RATransition):
        super().add_transition(state, transition)
201
        for reg in transition.guard.registers():
202
203
204
            if reg not in self._registers:
                self._registers.append(reg)

205
    def to_immutable(self) -> RegisterAutomaton:
206
207
        return RegisterAutomaton(self._states, self._state_to_acc, self._state_to_trans, self._registers)

208
209
210
211
212
213
214
215
216
217
218
219
220
221
class MutableIORegisterAutomaton(IORegisterAutomaton, MutableAutomatonMixin):
    def __init__(self):
        super().__init__([], dict(), [])

    def add_transition(self, state:str, transition:IORATransition):
        super().add_transition(state, transition)
        for reg in transition.guard.registers():
            if reg not in self._registers:
                self._registers.append(reg)

    def to_immutable(self):
        return IORegisterAutomaton(self._states, self._state_to_trans, self._registers)


222
class Guard(metaclass=ABCMeta):
223
    """A guard with is_satisfied implements a predicate over the current register valuation and the parameter value. """
224
225
226
    def __init__(self):
        pass

227

228
    @abstractmethod
229
    def registers(self):
230
        """Returns the registers/constants over which the guard is formed"""
231
232
        pass

233
    # to make this more abstract, value would have to be replaced by param valuation
234
235
236
237
    @abstractmethod
    def is_satisfied(self, valuation, value):
        pass

238
class EqualityGuard(Guard):
239
    """An equality guard holds iff. the parameter value is equal to the value assigned to its register."""
240
    def __init__(self, register):
241
        super().__init__()
242
        self._register = register
243
244

    def is_satisfied(self, valuation, value):
245
        return valuation[self._register] == value
246

247
248
    def registers(self):
        return [self._register]
249

250
    def __str__(self):
251
        return "p=={0}".format(str(self._register))
252

253
254
255
256
257
258
259
260
261
262
class OrGuard(Guard):
    def __init__(self, guards):
        self.guards = guards

    def is_satisfied(self, valuation, value):
        for guard in self.guards:
            if guard.is_satisfied(valuation, value):
                return True
        return False

263
    def registers(self):
264
        regs_of_guards = [guard.registers() for guard in self.guards]
265
266
267
268
269
        regs = itertools.chain.from_iterable(regs_of_guards)
        seen = set()
        distinct_regs = [x for x in regs if x not in seen and not seen.add(x)]
        return list(distinct_regs)

270
271
    def __str__(self):
        all_guards = [str(guard) for guard in self.guards]
272
        return " \\/ ".join(all_guards)
273

274

275
class FreshGuard(Guard):
276
    """An fresh guard holds if the parameter value is different from the value assigned to any of its registers."""
277
    def __init__(self, guarded_registers = []):
278
        super().__init__()
279
        self._registers = guarded_registers
280
281

    def is_satisfied(self, valuation, value):
282
        for register in self._registers:
283
284
285
286
            if valuation[register] == value:
                return False
        return True

287
288
    def registers(self):
        return self._registers
289

290
    def __str__(self):
291
        if len(self._registers) == 0:
292
293
            return "True"
        else:
294
295
            all_deq = ["p!={0}".format(str(reg)) for reg in self._registers]
            return " /\\ ".join(all_deq)
296

297

298
class Assignment(metaclass=ABCMeta):
299
    """An assignment updates the valuation of registers using the old valuation and the current parameter value"""
300
301
302
    def __init__(self):
        pass

303
    # to make this more general, value would have to be replaced by variable(reg/param) mapping and par valuation
304
305
306
307
308
309
310
    @abstractmethod
    def update(self, valuation, value):
        pass


class RegisterAssignment(Assignment):
    def __init__(self, register):
311
        super().__init__()
312
        self._register = register
313
314

    def update(self, valuation, value):
315
        new_valuation = dict(valuation)
316
        new_valuation[self._register] = value
317
        return new_valuation
318

319
    def __str__(self):
320
        return "{0}:=p".format(str(self._register))
321

322

323
324
class NoAssignment(Assignment):
    def __init__(self):
325
        super().__init__()
326

327
    def update(self, valuation, value):
328
329
        return dict(valuation)

330
331
    def __str__(self):
        return "r:=r"
332