Commit 311098de authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Added learning algorithm. Extracted RegisterMachine definition. Slightly...

Added learning algorithm. Extracted RegisterMachine definition. Slightly modified learner interface (old_definition instead of old_model)
parent 9c09fa4c
......@@ -8,13 +8,18 @@ from define import enum
from define import Automaton
class RegisterAutomaton(Automaton):
class RegisterMachine(Automaton):
def __init__(self, num_locations, num_registers):
self.Location, self.locations = enum('Location', ['location{0}'.format(i) for i in range(num_locations)])
self.Register, self.registers = enum('Register',
['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
class RegisterAutomaton(RegisterMachine):
def __init__(self, labels, num_locations, num_registers):
super().__init__(num_locations, num_registers)
self.Label, elements = enum('Label', labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self.Location, self.locations = enum('Location', ['location{0}'.format(i) for i in range(num_locations)])
self.Register, self.registers = enum('Register', ['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
self.start = self.locations[0]
self.fresh = self.registers[-1]
self.transition = z3.Function('transition', self.Location, self.Label, self.Register, self.Location)
......@@ -28,16 +33,14 @@ class RegisterAutomaton(Automaton):
return ra
class IORegisterAutomaton(Automaton):
class IORegisterAutomaton(RegisterMachine):
def __init__(self, input_labels, output_labels, num_locations, num_registers):
super().__init__(num_locations, num_registers)
labels = input_labels + output_labels
self.Label, elements = enum('Label', input_labels + output_labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self._input_labels = [self.labels[lbl] for lbl in input_labels]
self.Location, self.locations = enum('Location', ['location{0}'.format(i) for i in range(num_locations + 1)])
self.sink = self.locations[-1]
self.Register, self.registers = enum('Register', ['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
self.start = self.locations[0]
self.fresh = self.registers[-1]
self.transition = z3.Function('transition', self.Location, self.Label, self.Register, self.Location)
......
from abc import ABCMeta, abstractmethod
from typing import Tuple
from model import Automaton
import model
import define
class Learner(metaclass=ABCMeta):
......@@ -9,7 +11,7 @@ class Learner(metaclass=ABCMeta):
pass
@abstractmethod
def model(self, old_model=None) -> Automaton:
def model(self, old_definition:define.Automaton=None) -> Tuple[model.Automaton, define.Automaton]:
""""Infers a minimal model whose behavior corresponds to the traces added so far.
Returns None if no model could be obtained."""
pass
......
from typing import Tuple
import z3
from encode.ra import RAEncoder
......@@ -9,6 +11,7 @@ import z3
from encode.fa import DFAEncoder, MealyEncoder
from learn import Learner
from model import Automaton
import define.fa
class FALearner(Learner):
def __init__(self, labels, encoder, solver=None, verbose=False):
......@@ -21,13 +24,13 @@ class FALearner(Learner):
def add(self, trace):
self.encoder.add(trace)
def model(self, min_states=1, max_states=20, old_model:Automaton=None) -> Automaton:
if old_model is not None:
min_states = len(old_model.states())
def model(self, min_states=1, max_states=20, old_definition:define.fa.FSM=None) -> Tuple[Automaton, define.fa.FSM]:
if old_definition is not None:
min_states = len(old_definition.states)
(succ, fa, m) = self._learn_model(min_states=min_states,
max_states=max_states)
if succ:
return fa.export(m)
return (fa.export(m), fa)
return None
def _learn_model(self, min_states=1, max_states=20):
......
from typing import Tuple
import z3
from encode.ra import RAEncoder
from learn import Learner
import model.ra
import define.ra
class RALearner(Learner):
def __init__(self, labels, io=False, outputs=None, encoder=None, solver=None, verbose=False):
if not encoder:
encoder = RAEncoder()
def __init__(self, encoder, solver=None, verbose=False):
if encoder is None:
raise Exception("RAEncoder has to be supplied")
if not solver:
solver = z3.Solver()
if outputs:
self.outputs = outputs
self.labels = labels
self.encoder = encoder
self.solver = solver
self.verbose = verbose
self.io = io
def add(self, trace):
self.encoder.add(trace)
def model(self, min_locations=1, max_locations=20, min_registers=0, max_registers=10,
old_model:model.ra.RegisterAutomaton = None) -> model.ra.RegisterAutomaton:
if old_model is not None:
min_locations = len(old_model.states())
min_registers = len(old_model.registers())
old_definition:define.ra.RegisterMachine = None) -> Tuple[model.ra.RegisterMachine, define.ra.RegisterMachine]:
if old_definition is not None:
min_locations = len(old_definition.locations)
min_registers = len(old_definition.registers) - 1
(succ, ra_def, m) = self._learn_model(min_locations=min_locations,
max_locations=max_locations, min_registers=min_registers,
max_registers=max_registers)
if succ:
return ra_def.export(m)
return (ra_def.export(m), ra_def)
return None
def _learn_model(self, min_locations=1, max_locations=20, min_registers=0, max_registers=10):
"""generates the definition and model for an ra whose traces include the traces added so far"""
num_values = len(self.encoder.values)
for num_locations in range(min_locations, max_locations + 1):
# TODO: calculate max registers based on repeating values
for num_registers in range(min_registers, min(max_registers, num_locations) + 1):
......
......@@ -72,6 +72,9 @@ class Automaton(metaclass=ABCMeta):
return crt_state
def input_labels(self):
return set([trans.start_label for trans in self.transitions(self.start_state())])
@abstractmethod
def output(self, trace):
pass
......@@ -113,6 +116,10 @@ class Transducer(Automaton, metaclass=ABCMeta):
def output(self, trace):
pass
def output_labels(self):
return set([trans.output for state in self.states() for trans in self.transitions(state)])
"""An automaton model whose states are accepting/rejecting"""
......
from z3gi.model import Transition, Acceptor, NoTransitionFired, MultipleTransitionsFired, Transducer, \
MutableAcceptorMixin, MutableAutomatonMixin
MutableAcceptorMixin, MutableAutomatonMixin, Automaton
from abc import ABCMeta, abstractmethod
import itertools
import collections
......@@ -87,7 +87,7 @@ class IORATransition(RATransition):
return trans_str
# some methods shared between the different register automatas
class RegisterMachine():
class RegisterMachine(Automaton):
def _fired_transition(self, transitions, reg_val, action):
"""Retrieves the transition fired based on the action and valuation"""
fired_transitions = [trans for trans in transitions if trans.is_enabled(reg_val, action)]
......@@ -101,6 +101,10 @@ class RegisterMachine():
raise MultipleTransitionsFired
return fired_transitions[0]
@abstractmethod
def registers(self) -> List[Register]:
pass
class RegisterAutomaton(Acceptor, RegisterMachine):
def __init__(self, locations, loc_to_acc, loc_to_trans, registers):
......@@ -203,7 +207,7 @@ class MutableIORegisterAutomaton(IORegisterAutomaton, MutableAutomatonMixin):
if reg not in self._registers:
self._registers.append(reg)
def to_immutable(self):
def to_immutable(self) -> IORegisterAutomaton:
return IORegisterAutomaton(self._states, self._state_to_trans, self._registers)
class Guard(metaclass=ABCMeta):
......
......@@ -29,6 +29,13 @@ class TestTemplate(metaclass=ABCMeta):
@abstractmethod
def _check_trace(self, model, trace):
pass
@abstractmethod
def size(self) -> int:
"""Returns the size (in terms of inputs) of the test"""
pass
......@@ -74,11 +81,17 @@ class IORATest(TestTemplate):
trace = determinize_act_io(trace)
for (inp_act, out_act) in trace:
test_trace.append((inp_act, out_act))
if inp_act.label not in model.input_labels():
return test_trace
output = model.output([inp for (inp, _) in test_trace])
if output != out_act:
return test_trace
return None
def size(self):
return len(self.trace)
def determinize_act_io(tuple_seq):
seq = list(itertools.chain.from_iterable(tuple_seq))
......@@ -101,6 +114,9 @@ class MealyTest(TestTemplate):
return test_trace
return None
def size(self):
return len(self.trace)
# Acceptor Test observations are tuples comprising sequences of Actions/Symbols joined by an accept/reject booleans
class AcceptorTest(TestTemplate):
def __init__(self, trace):
......@@ -111,4 +127,8 @@ class AcceptorTest(TestTemplate):
(seq, acc) = trace
if model.accepts(seq) is not acc:
return test_trace
return None
\ No newline at end of file
return None
def size(self):
(seq, acc) = self.trace
return len(seq)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment