Commit 2d24010b authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

A lot of changes, added mealy/dfa definitions, active learner framework and other small changes.

parent d6a6a528
......@@ -5,6 +5,7 @@ from utils import determinize
from z3gi.learn.ra import RALearner
from tests.iora_testscenario import *
from encode.iora import IORAEncoder
from model.parsing import to_dot
def determinize_act_io(tuple_seq):
seq = list(itertools.chain.from_iterable(tuple_seq))
......@@ -21,6 +22,7 @@ def check_ra_against_obs(learner: RALearner, learned_ra:IORegisterAutomaton, m,
for (inp_act, out_act) in trace:
inp_trace.append(inp_act)
output = learned_ra.output(inp_trace)
if output != out_act:
print("Register automaton: \n {0} \n with model: \n {1} \n responds to {2} "
"with {3} instead of {4}".format(learned_ra, m, inp_trace, output, out_act))
......@@ -37,5 +39,6 @@ for i in range(1,2):
model = ra.export(m)
print(model)
check_ra_against_obs(learner, model, m, exp)
to_dot(model, None)
else:
print("unsat")
\ No newline at end of file
from abc import ABCMeta, abstractmethod
import z3
def enum(name, elements):
d = z3.Datatype(name)
for element in elements:
d.declare(element)
d = d.create()
return d, [d.__getattribute__(element) for element in elements]
class Automaton(metaclass=ABCMeta):
@abstractmethod
def export(self, model):
"""Returns a z3gi.model for this automaton."""
pass
pass
\ No newline at end of file
import collections
from abc import ABCMeta, abstractmethod
import z3
from define import enum, Automaton
import model.fa
def FSM(Automaton,metaclass=ABCMeta):
@abstractmethod
def __init__(self, num_states):
self.State, self.states = enum('State', ['state{0}'.format(i) for i in range(num_states)])
def DFA(FSM):
def __init__(self, labels, num_states):
super.__init__(num_states)
self.Label, elements = enum('Label', labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self.transition = z3.Function('transition', self.State, self.Label, self.State)
self.output = z3.Function('output', self.State, z3.BoolSort())
def export(self, model : z3.ModelRef) -> model.fa.DFA:
pass
#builder = DFABuilder(self)
#dfa = builder.build_dfa(self)
#return dfa
def MealyMachine(FSM):
def __init__(self, input_labels, output_labels, num_states):
super.__init__(num_states)
self.Input, elements = enum('Input', input_labels)
self.inputs = {input_labels[i]: elements[i] for i in range(len(input_labels))}
self.Output, elements = enum('Output', output_labels)
self.outputs = {output_labels[i]: elements[i] for i in range(len(output_labels))}
self.transition = z3.Function('transition', self.State, self.Input, self.State)
self.output = z3.Function('output', self.State, self.Input, self.Output)
def export(self, model : z3.ModelRef) -> model.fa.MealyMachine:
builder = MealyMachineBuilder(self)
mealy = builder.build_mealy(model)
return mealy
class MealyMachineBuilder(object):
def __init__(self, mm : MealyMachine):
super().__init__()
self.mm = mm
def build_mealy(self, model : z3.ModelRef) -> model.fa.MealyMachine:
tr = FATranslator()
mut_mm = MutableMealyMachine()
def _add_state(self, model, translator, mut_ra, z3state):
z3acc = model.eval(model.eval(self.ra.output(z3state)))
mut_ra.add_state(translator.z3_to_state(z3state), translator.z3_to_bool(z3acc) )
def _add_transitions(self, model, translator, mut_ra, z3state, z3label):
z3end_state_to_guards = dict()
enabled_z3guards = [guard for guard in self.ra.registers if
translator.z3_to_bool(model.eval(self.ra.used(z3state, guard))) or
guard is self.ra.fresh
]
if self.ra.fresh not in enabled_z3guards:
enabled_z3guards.append(self.ra.fresh)
for z3guard in enabled_z3guards:
z3end_state = model.eval(self.ra.transition(z3state, z3label, z3guard))
if z3end_state not in z3end_state_to_guards:
z3end_state_to_guards[z3end_state] = []
z3end_state_to_guards[z3end_state].append(z3guard)
update = model.eval(self.ra.update(z3state, z3label))
used_z3regs = [reg for reg in enabled_z3guards if reg is not self.ra.fresh]
for (z3end_state, z3guards) in z3end_state_to_guards.items():
# a transition which makes an assignment is never merged
if self.ra.fresh in z3guards and not translator.z3_to_bool(model.eval(update == self.ra.fresh)):
self._add_transition(translator, mut_ra, z3state, z3label,
[self.ra.fresh], update, z3end_state, used_z3regs)
z3guards.remove(self.ra.fresh)
if len(z3guards) > 0:
self._add_transition(translator, mut_ra, z3state, z3label,
z3guards, None, z3end_state, used_z3regs)
def _add_transition(self, translator, mut_ra, z3start_state, z3label, z3guards, z3update, z3end_state, used_z3regs):
transition = RATransition(translator.z3_to_state(z3start_state), # start state
translator.z3_to_label(z3label), # label
translator.z3_to_guard(z3guards, used_z3regs), # guard
translator.z3_to_assignment(z3update), # assignment
translator.z3_to_state(z3end_state) # end state
)
mut_ra.add_transition(translator.z3_to_state(z3start_state),transition)
pass
class FATranslator(object):
"""Provides translation from z3 constants to RA elements. """
def __init__(self, fa:FSM ):
self.fa = fa
def z3_to_bool(self, z3bool):
return str(z3bool) == "True"
def z3_to_state(self, z3state):
return "q"+str(self.fa.states.index(z3state))
def z3_to_label(self, z3label):
return str(z3label)
......@@ -2,6 +2,7 @@ from collections import namedtuple
import z3
from model.ra import *
from define import enum
from define import Automaton
......@@ -46,7 +47,7 @@ class IORegisterAutomaton(Automaton):
self.loctype = z3.Function('loctype', self.Location, z3.BoolSort())
def export(self, model : z3.ModelRef) -> RegisterAutomaton:
def export(self, model : z3.ModelRef) -> IORegisterAutomaton:
builder = IORegisterAutomatonBuilder(self)
ra = builder.build_ra(model)
return ra
......@@ -77,7 +78,7 @@ class RegisterAutomatonBuilder(object):
def build_ra(self, model):
mut_ra = MutableRegisterAutomaton()
translator = Translator(self.ra)
translator = RATranslator(self.ra)
for z3state in self.ra.locations:
self._add_state(model, translator, mut_ra, z3state)
for z3label in self.ra.labels.values():
......@@ -135,7 +136,7 @@ class IORegisterAutomatonBuilder(object):
def build_ra(self, model):
mut_ra = MutableIORegisterAutomaton()
translator = Translator(self.ra)
translator = RATranslator(self.ra)
z3input_states = [z3state for z3state in self.ra.locations if
translator.z3_to_bool(model.eval(
......@@ -219,7 +220,7 @@ class IORegisterAutomatonBuilder(object):
)
mut_ra.add_transition(translator.z3_to_state(z3start_state), transition)
class Translator(object):
class RATranslator(object):
"""Provides translation from z3 constants to RA elements. """
def __init__(self, ra):
self.reg_context = dict()
......@@ -267,11 +268,3 @@ class Translator(object):
if str(z3register) not in self.reg_context:
self.reg_context[str(z3register)] = Register(self.ra.registers.index(z3register))
return self.reg_context[str(z3register)]
def enum(name, elements):
d = z3.Datatype(name)
for element in elements:
d.declare(element)
d = d.create()
return d, [d.__getattribute__(element) for element in elements]
import itertools
from encode import Encoder
from utils import Tree
from model.fa import MealyMachine
import z3
class MealyEncoder(Encoder):
def __init__(self, input_labels, output_labels):
self.tree = Tree(itertools.count(0))
self.cache = {}
self.input_labels = set()
self.output_labels = set()
def add(self, trace):
_ = self.tree[trace]
self.input_labels.update([input_label for input_label in [i for (i, _) in trace]])
self.output_labels.update([output_label for output_label in [o for (_, o) in trace]])
def build(self, num_states) -> (MealyMachine, z3.ExprRef):
return None
......@@ -2,7 +2,7 @@ import itertools
import z3
from encode import Encoder
from define.ra import RegisterAutomaton, IORegisterAutomaton, Mapper
from define.ra import RegisterAutomaton, Mapper
from utils import Tree, determinize
......
from abc import ABCMeta, abstractmethod
from model import Automaton
class Learner(metaclass=ABCMeta):
@abstractmethod
def add(self, trace):
pass
@abstractmethod
def model(self):
def model(self) -> Automaton:
""""Infers a minimal model whose behavior corresponds to the traces added so far.
Returns None if no model could be obtained."""
pass
\ No newline at end of file
pass
from abc import abstractmethod
from typing import List
from learn import Learner
from learn.ra import RALearner
from model import Automaton
from model.ra import IORegisterAutomaton, MutableIORegisterAutomaton, IORATransition, FreshGuard, NoAssignment, Fresh
from test import Tester
undefined = "undef"
class ActiveLearner():
def __init__(self, model_learner:Learner, model_tester:Tester):
self.learner = model_learner
self.tester = model_tester
def learn(self, inputs:List[str]):
model = None
while True:
model = self._learn_new(self.learner, model)
trace = self.tester.find_ce(model)
if trace is not None:
self.learner.add(trace)
else:
break
@abstractmethod
def _epsilon(self, inputs) -> Automaton:
"""Returns an epsilon initial model"""
pass
@abstractmethod
def _learn_new(self, learner, old_model):
"""Learns a new model having recently invalidated an old model"""
pass
class IORAActiveLearner(ActiveLearner):
def __init__(self, model_learner:RALearner, model_tester:Tester):
super().__init__(model_learner, model_tester)
def _learn_new(self, learner: RALearner, old_model: IORegisterAutomaton):
return learner.model(min_locations=len(old_model.states()), min_registers=len(old_model.registers()) )
def _epsilon(self, inputs):
mut_iora = MutableIORegisterAutomaton()
init_loc = "q0"
mut_iora.add_state(init_loc)
for inp in inputs:
mut_iora.add_transition(init_loc, IORATransition(
init_loc, init_loc, FreshGuard(), NoAssignment(), undefined, Fresh(0), NoAssignment(), init_loc
))
return mut_iora.to_immutable()
import z3
from encode.ra import RAEncoder
from learn import Learner
import model.fa
class MealyLearner(Learner):
def __init__(self, labels, io=False, outputs=None, encoder=None, solver=None, verbose=False):
if not encoder:
encoder = RAEncoder()
if not solver:
solver = z3.Solver()
if outputs:
self.outputs = outputs
self.labels = labels
self.encoder = encoder
self.solver = solver
self.verbose = verbose
self.io = io
def add(self, trace):
self.encoder.add(trace)
def model(self, min_states=1, max_states=20) -> model.fa.MealyMachine:
(succ, ra_def, m) = self._learn_model(min_states=min_states,
max_states=max_states)
if succ:
return ra_def.export(m)
return None
def _learn_model(self, min_states=1, max_states=20):
"""generates the definition and model for an ra whose traces include the traces added so far"""
num_values = len(self.encoder.values)
for num_locations in range(min_states, max_states + 1):
ra, constraints = self.encoder.build(num_locations, num_registers)
self.solver.add(constraints)
result = self.solver.check()
if self.verbose:
print("Learning with {0} states. Result: {1}"
.format(num_locations, result))
if result == z3.sat:
model = self.solver.model()
self.solver.reset()
return (True, ra, model)
else:
self.solver.reset()
# TODO: process the unsat core?
pass
return (False, None, None)
\ No newline at end of file
......@@ -16,9 +16,6 @@ class RALearner(Learner):
self.labels = labels
self.encoder = encoder
self.solver = solver
self.previous_model = None
self.num_locations = None
self.num_registers = None
self.verbose = verbose
self.io = io
......@@ -32,12 +29,12 @@ class RALearner(Learner):
return ra_def.export(m)
return None
def _learn_model(self, min_locations=1, max_locations=20, max_registers=10):
def _learn_model(self, min_locations=1, max_locations=20, min_registers=0, max_registers=10):
"""generates the definition and model for an ra whose traces include the traces added so far"""
num_values = len(self.encoder.values)
for num_locations in range(min_locations, max_locations + 1):
# TODO: calculate max registers based on repeating values
for num_registers in range(0, min(max_registers, num_locations) + 1):
for num_registers in range(min_registers, min(max_registers, num_locations) + 1):
ra, constraints = self.encoder.build(num_locations, num_registers)
self.solver.add(constraints)
print(num_locations, num_registers)
......@@ -47,9 +44,6 @@ class RALearner(Learner):
.format(num_locations, num_registers, result))
if result == z3.sat:
model = self.solver.model()
self.num_locations = num_locations
self.num_registers = num_registers
self.previous_model = model
self.solver.reset()
return (True, ra, model)
else:
......
from abc import ABCMeta, abstractmethod
from typing import List
"""The most basic transition class available"""
class Transition():
def __init__(self, start_state, start_label, end_state):
self.start_state = start_state
self.start_label = start_label
self.end_state = end_state
def __str__(self, shortened=False):
if not shortened:
return "{0} {1} -> {2}".format(self.start_state, self.start_label, self.end_state)
else:
"{1} -> {2}".format(self.start_label, self.end_state)
"""Exception raised when no transition can be fired"""
class NoTransitionFired(Exception):
pass
"""Exception raised when several transitions can be fired in a deterministic machine"""
class MultipleTransitionsFired(Exception):
pass
"""A basic abstract automaton model"""
class Automaton(metaclass=ABCMeta):
......@@ -13,7 +37,7 @@ class Automaton(metaclass=ABCMeta):
def states(self):
return list(self._states)
def transitions(self, state, label=None):
def transitions(self, state, label=None) -> List[Transition]:
if label is None:
return list(self._state_to_trans[state])
else:
......@@ -38,6 +62,11 @@ class Automaton(metaclass=ABCMeta):
return crt_state
@abstractmethod
def output(self, trace):
pass
# Basic __str__ function which works for most FSMs.
def __str__(self):
str_rep = ""
......@@ -85,6 +114,9 @@ class Acceptor(Automaton, metaclass=ABCMeta):
is_acc = self.is_accepting(state)
return is_acc
def output(self, trace):
self.accepts(trace)
def __str__(self):
return str(self._state_to_acc) + "\n" + super().__str__()
......@@ -93,25 +125,3 @@ class MutableAcceptorMixin(MutableAutomatonMixin, metaclass=ABCMeta):
super().add_state(state)
self._state_to_acc[state] = accepts
"""The most basic transition class available"""
class Transition():
def __init__(self, start_state, start_label, end_state):
self.start_state = start_state
self.start_label = start_label
self.end_state = end_state
def __str__(self, shortened=False):
if not shortened:
return "{0} {1} -> {2}".format(self.start_state, self.start_label, self.end_state)
else:
"{1} -> {2}".format(self.start_label, self.end_state)
"""Exception raised when no transition can be fired"""
class NoTransitionFired(Exception):
pass
"""Exception raised when several transitions can be fired in a deterministic machine"""
class MultipleTransitionsFired(Exception):
pass
\ No newline at end of file
from z3gi.model import Transition, Acceptor, Transducer, NoTransitionFired, MultipleTransitionsFired, Automaton
from abc import ABCMeta, abstractmethod
import itertools
import collections
from z3gi.model import Transition, Acceptor, Transducer
from model import MutableAutomatonMixin, MutableAcceptorMixin
from typing import List
Symbol = str
......@@ -9,6 +7,12 @@ State = str
Label = str
Output = str
class IOTransition(Transition):
def __init__(self, start_state, start_label, output, end_state):
super().__init__(start_state, start_label, end_state)
self.output = output
class DFA(Acceptor):
def __init__(self, states, state_to_trans, state_to_acc):
super().__init__(states, state_to_trans, state_to_acc)
......@@ -38,9 +42,8 @@ class MooreMachine(Transducer):
return self.state_to_out[crt_state]
class MealyMachine(Transducer):
def __init__(self, states, state_to_trans, state_to_out):
def __init__(self, states, state_to_trans):
super().__init__(states, state_to_trans)
self.state_to_out = state_to_out
def transitions(self, state: State, label: Label = None) -> List[IOTransition]:
return super().transitions(state, label)
......@@ -57,8 +60,7 @@ class MealyMachine(Transducer):
trans = self.transitions(state_before, trace[-1])
return trans.output
class IOTransition(Transition):
def __init__(self, start_state, start_label, output, end_state):
super().__init__(start_state, start_label, end_state)
self.output = output
class MutableMealyMachine(MealyMachine, MutableAutomatonMixin):
def __init__(self):
super().__init__([], {})
import sys
from xml.etree import ElementTree
from model import *
from model.fa import IOTransition
......@@ -52,5 +53,105 @@ def to_dot (automaton:Automaton, file_name:str):
f.close()
def xml_to_iora (file_name:str):
"""Takes a .xml register automata and produces a iora"""
xmlrootelem = ElementTree.ElementTree(file=file_name).getroot()
# see doc at : http://docs.python.org/2/library/xml.etree.elementtree.html
# print getPrettyPrintedString(xmlrootnode)
# <symbol name="OFrame">
# <param type="int" name="p0"/>
# <param type="int" name="p1"/>
# </symbol>
def get_labels(xmlElem):
symbols = []
if not xmlElem:
return []
for symbol in xmlElem:
#params = []
#for param in symbol:
# params += [(param.attrib["type"], param.attrib["name"])]
# note parameter names not important for symbol type declaration in alphabet
#symbols += [symbol.attrib["name"], params)]
symbols += symbol.attrib["name"]
return symbols
elemAlphabet = xmlrootelem.find("alphabet")
inputs = get_labels(elemAlphabet.find("inputs"))
outputs = get_labels(elemAlphabet.find("outputs"))
symbolname2params = {}
for symbol in inputs + outputs:
symbolname2params[symbol.getName()] = symbol.getParamNames()
initLocation = None
locations = []
id2location = {}
for locationxml in xmlrootelem.iter("location"):
name = locationxml.attrib["name"]
if locationxml.attrib.has_key("initial") and locationxml.attrib["initial"] == "true":
location = Location(name, initial=True)
initLocation = location
else:
location = Location(name, initial=False)
# print location.to_xml()
locations += [location]
id2location[location.getId()] = location
transitions = []
for transitionxml in xmlrootelem.iter("transition"):
tr_from = transitionxml.attrib["from"]
tr_to = transitionxml.attrib["to"]
tr_symbol = transitionxml.attrib["symbol"]
tr_param_names = []
if transitionxml.attrib.has_key("params"):
paramsstr = transitionxml.attrib["params"]
tr_param_names = [param.strip() for param in paramsstr.split(',')]
else:
# default param names from alphabet definition
if symbolname2params.has_key(tr_symbol):
tr_param_names = symbolname2params[tr_symbol]
guardElem = transitionxml.find("guard")
tr_guard = ""
if guardElem is not None:
tr_guard = guardElem.text
assigns = []
for assignxml in transitionxml.iter("assign"):
assign_to = assignxml.attrib["to"]
assign_value = assignxml.text