Commit 714ad307 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Added experimental incremental classes (no improvement achieved)

parent 149ee552
import os.path
from enum import Enum
from encode.fa import MealyEncoder, DFAEncoder
from encode.incremental.fa import MealyIEncoder, MealyTreeIEncoder
from encode.ra import RAEncoder
from learn.algorithm import learn
from learn.algorithm import learn_mbt
from learn.fa import FALearner
from learn.incremental.fa import FAILearner
from learn.ra import RALearner
from parse.importer import build_automaton_from_dot
from sut import SUTType, MealyObservation, StatsSUT, RAObservation, IORAObservation
......@@ -21,8 +24,14 @@ from tests.iora_testscenario import *
from encode.iora import IORAEncoder
# some example runs
# some paths
yan_cmd = os.path.join("resources", "binaries", "yannakakis.exe")
models_loc = os.path.join("resources", "models")
maestro = os.path.join(models_loc, "bankcards", "MAESTRO.dot")
visa = os.path.join(models_loc, "bankcards", "VISA.dot")
biometric = os.path.join(models_loc, "biometric.dot")
# some example runs
def scalable_learn_iora():
learner = RALearner(IORAEncoder())
test_type = IORATest
......@@ -70,38 +79,44 @@ def scalable_learn_mbt_ra():
print(model)
print(statistics)
def sim_learn_mbt_mealy():
learner = FALearner(MealyEncoder())
def sim_learn_mbt_yan_mealy(dot_path):
learner = FAILearner(MealyTreeIEncoder())# FALearner(MealyEncoder())
learner.set_timeout(10000)
maestro_aut = build_automaton_from_dot("MealyMachine", os.path.join("resources", "models", "bankcards", "MAESTRO.dot"))
maestro_sut = MealyMachineSimulation(maestro_aut)
mbt = MealyRWalkFromState(maestro_sut, 3, 0.2)
(model, statistics) = learn_mbt(maestro_sut,learner, mbt, 10000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
(model, statistics) = learn_mbt(cache_sut, learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def sim_learn_mbt_yan_mealy(dot_path):
def sim_learn_mbt_coloryan_mealy(dot_path):
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
yan_cmd = os.path.join("resources", "binaries", "yannakakis.exe")
mbt = YannakakisTestGenerator(dot_sut, yan_cmd, seed=1)
(model, statistics) = learn_mbt(dot_sut,learner, mbt, 10000)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt1 = ColoringTestGenerator(cache_sut, cache)
mbt2 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt = TestGeneratorChain([mbt1, mbt2])
(model, statistics) = learn_mbt(cache_sut,learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def sim_learn_mbt_chainyan_mealy(dot_path):
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
def sim_inc_learn_mbt_coloryan_mealy(dot_path):
learner = FAILearner(MealyTreeIEncoder())
learner.set_timeout(200000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
yan_cmd = os.path.join("resources", "binaries", "yannakakis.exe")
mbt1 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt2 = ColoringTestGenerator(cache_sut, cache)
mbt1 = ColoringTestGenerator(cache_sut, cache)
mbt2 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt = TestGeneratorChain([mbt1, mbt2])
(model, statistics) = learn_mbt(cache_sut,learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
......@@ -119,19 +134,7 @@ def scalable_learn_mbt_chainrw_iora():
print(model)
print(statistics)
models_loc = os.path.join("resources", "models")
def maestro_learn_mbt_yan_mealy():
sim_learn_mbt_yan_mealy(os.path.join(models_loc, "bankcards", "MAESTRO.dot"))
def visa_learn_mbt_yan_mealy():
sim_learn_mbt_yan_mealy(os.path.join(models_loc, "bankcards", "VISA.dot"))
def biometric_learn_mbt_yan_mealy():
sim_learn_mbt_yan_mealy(os.path.join("models_loc", "biometric.dot"))
scalable_learn_mbt_chainrw_iora()
#sim_learn_mbt_chainyan_mealy(os.path.join(models_loc, "bankcards", "VISA.dot"))
#visa_learn_mbt_yan_mealy()
#scalable_learn_mbt_mealy()
#scalable_learn_mbt_iora()
sim_learn_mbt_yan_mealy(maestro)
#sim_learn_mbt_mealy()
......@@ -13,6 +13,10 @@ def int_enum(name, elements):
d = z3.IntSort()
return d, [z3.IntVal(i) for i in range(1, len(elements) + 1)]
def declsort_enum(name, elements):
d = z3.DeclareSort(name)
return d, [z3.Const(element) for element in elements]
class Automaton(metaclass=ABCMeta):
@abstractmethod
def export(self, model):
......
......@@ -52,13 +52,14 @@ class Mapper(object):
return z3.Const("n"+str(name), self.Element)
class MealyMachineBuilder(object):
def __init__(self, mm : MealyMachine):
super().__init__()
self.mm = mm
def build_mealy(self, m : z3.ModelRef) -> model.fa.MealyMachine:
tr = FATranslator(self.mm)
tr = MealyTranslator(m, self.mm)
mut_mm = model.fa.MutableMealyMachine()
for state in self.mm.states:
mut_mm.add_state(tr.z3_to_state(state))
......@@ -68,8 +69,8 @@ class MealyMachineBuilder(object):
to_state = m.eval(self.mm.transition(state, inp))
trans = model.fa.IOTransition(
tr.z3_to_state(state),
tr.z3_to_label(inp),
tr.z3_to_label(output),
tr.z3_to_input(inp),
tr.z3_to_output(output),
tr.z3_to_state(to_state))
mut_mm.add_transition(tr.z3_to_state(state), trans)
mut_mm.generate_acc_seq()
......@@ -97,11 +98,43 @@ class DFABuilder(object):
mut_dfa.generate_acc_seq()
return mut_dfa.to_immutable()
class MealyTranslator(object):
"""Provides translation from z3 constants to RA elements. """
def __init__(self, model, mm: MealyMachine):
self._model = model
self._mm = mm
self._z3_to_inp = dict(map(reversed, mm.inputs.items()))
self._z3_to_out = dict(map(reversed, mm.outputs.items()))
def z3_to_state(self, z3state):
if z3state in self._mm.states:
return "q" + str(self._mm.states.index(z3state))
else:
return "q" + str(list(map(self._model.eval, self._mm.states)).index(z3state))
def z3_to_input(self, z3label):
if z3label in self._z3_to_inp:
return self._z3_to_inp[z3label]
else:
for inp in self._z3_to_inp:
if self._model.eval(inp) == self._model.eval(z3label):
return self._z3_to_inp[inp]
def z3_to_output(self, z3label):
if z3label in self._z3_to_out:
return self._z3_to_out[z3label]
else:
for out in self._z3_to_out:
if self._model.eval(out) == self._model.eval(z3label):
return self._z3_to_out[out]
class FATranslator(object):
"""Provides translation from z3 constants to RA elements. """
def __init__(self, fa:FSM ):
def __init__(self, fa:FSM):
self.fa = fa
def z3_to_bool(self, z3bool):
return str(z3bool) == "True"
......
......@@ -3,7 +3,9 @@ from abc import ABCMeta, abstractmethod
import z3
from define import Automaton
# an encoder which builds all constraints necessary.
class Encoder(metaclass=ABCMeta):
@abstractmethod
def add(self, trace) -> None:
pass
......@@ -12,18 +14,15 @@ class Encoder(metaclass=ABCMeta):
def build(self, *args) -> (Automaton, z3.ExprRef):
pass
# an encoder which builds trace and automaton constraints independently from each other. This allows Learners to use incremental
# solving algorithms
# an encoder which builds trace and automaton constraints separately. This allows Learners to use incremental
# solving algorithms by pushing/popping automaton constraints.
# TODO build incremental encoders and learners
class IncrementalEncoder(metaclass=ABCMeta):
@abstractmethod
def add(self, trace) -> None:
pass
@abstractmethod
def transitions(self):
def trace(self, trace) -> z3.ExprRef:
pass
@abstractmethod
def automaton(self, *args):
def automaton(self, *args) -> z3.ExprRef:
pass
\ No newline at end of file
import itertools
from z3 import z3
from define import int_enum
from define.fa import MealyMachine
from encode import IncrementalEncoder
from utils import Tree
# very ugly incremental tree encoder, just for experimentation
class MealyTreeIEncoder(IncrementalEncoder):
def __init__(self):
self.tree = Tree(itertools.count(0))
self.cache = {}
self.inputs = dict()
self.outputs = dict()
self.State = z3.DeclareSort("State")
self.Element = z3.DeclareSort('Element')
self.start_node = self.element(0)
self.map = z3.Function('map', self.Element, self.State)
self.Input = z3.DeclareSort("Input")
self.Output = z3.DeclareSort("Output")
self.start = z3.Const("start", self.State)
self.transition = z3.Function('transition', self.State, self.Input, self.State)
self.output = z3.Function('output', self.State, self.Input, self.Output)
def trace(self, trace):
self._update_labels(trace)
first = len(self.cache) == 0
inputs = []
new_nodes = []
for i,o in trace:
inputs.append(i)
node = self.tree[inputs]
if node not in self.cache:
new_nodes.append(node)
self.cache[node] = o
trace_constraints = self._trace_constraint(new_nodes)
if first:
trace_constraints = z3.And(trace_constraints, self.map(self.start_node) == self.start)
return z3.And(trace_constraints)
def _trace_constraint(self, new_nodes):
transition_constraints = []
node_constraints = []
for node in new_nodes:
parent = node.parent
input = None
for label in parent.children:
if parent.children[label] is node:
input = label
break
output = self.cache[node]
n = self.element(parent.id)
i = self.inputs[input]
o = self.outputs[output]
node_constraints.append(self.output(self.map(n), i) == o)
c = self.element(node.id)
transition_constraints.append(self.transition(self.map(n), i) == self.map(c))
return z3.And(node_constraints + transition_constraints)
def element(self, name):
return z3.Const("n"+str(name), self.Element)
def _update_labels(self, trace):
input_sequence = [input for (input, _) in trace]
output_sequence = [output for (_, output) in trace]
for input in input_sequence:
if input not in self.inputs:
self.inputs[input] = z3.Const(input, self.Input)
for output in set(output_sequence):
if output not in self.outputs:
self.outputs[output] = z3.Const(output, self.Output)
def automaton(self, num_states):
states = [self.start] + [z3.Const("q"+str(i), self.State) for i in range(1,num_states)]
cst = [z3.And([
z3.And([
z3.Or([
self.transition(state, inp) == to_state for to_state in states])
for inp in iter(self.inputs.values())]) for state in states])]
cst.extend(
[ z3.Distinct(states), z3.Distinct(list(self.inputs.values())), z3.Distinct(list(self.outputs.values()))]
)
mm_def = MealyMachine(list(self.inputs.keys()), list(self.outputs.keys()), num_states,
state_enum=int_enum, label_enum=int_enum)
mm_def.transition = self.transition
mm_def.output = self.output
mm_def.inputs = self.inputs
mm_def.outputs = self.outputs
mm_def.states = states
mm_def.start = self.start
return (z3.And(cst), mm_def)
class MealyIEncoder(IncrementalEncoder):
def __init__(self):
self.inputs = dict()
self.outputs = dict()
self.State = z3.DeclareSort("State")
self.Input = z3.DeclareSort("Input")
self.Output = z3.DeclareSort("Output")
self.start = z3.Const("start", self.State)
self.transition = z3.Function('transition', self.State, self.Input, self.State)
self.output = z3.Function('output', self.State, self.Input, self.Output)
def trace(self, trace):
self._update_labels(trace)
trace_const = self._trace_constraint(trace)
return trace_const
def _trace_constraint(self, trace):
crt_state = self.start
trace_constraints = []
for (input, output) in trace:
z3_inp, z3_out = self.inputs[input], self.outputs[output]
trace_constraints.append(self.output(crt_state, z3_inp) == z3_out)
crt_state = self.transition(crt_state, z3_inp)
return z3.And(trace_constraints)
def _update_labels(self, trace):
input_sequence = [input for (input, _) in trace]
output_sequence = [output for (_, output) in trace]
for input in input_sequence:
if input not in self.inputs:
self.inputs[input] = z3.Const(input, self.Input)
for output in set(output_sequence):
if output not in self.outputs:
self.outputs[output] = z3.Const(output, self.Output)
def automaton(self, num_states):
states = [self.start] + [z3.Const("q"+str(i), self.State) for i in range(1,num_states)]
cst = [z3.And([
z3.And([
z3.Or([
self.transition(state, inp) == to_state for to_state in states])
for inp in iter(self.inputs.values())]) for state in states])]
cst.extend(
[ z3.Distinct(states), z3.Distinct(list(self.inputs.values())), z3.Distinct(list(self.outputs.values()))]
)
mm_def = MealyMachine(list(self.inputs.keys()), list(self.outputs.keys()), num_states,
state_enum=int_enum, label_enum=int_enum)
mm_def.transition = self.transition
mm_def.output = self.output
mm_def.inputs = self.inputs
mm_def.outputs = self.outputs
mm_def.states = states
mm_def.start = self.start
return (z3.And(cst), mm_def)
......@@ -28,8 +28,6 @@ class YannakakisTestGenerator(TestGenerator):
self.yan_proc.stdin.write(dot_content)
self.yan_proc.stdin.write("\n")
self.yan_proc.stdin.flush()
#to use exhaustive yanna (
#self.yan_proc = subprocess.Popen([self.yan_path, os.path.join("..", "resources", "models","bankcards", "VISA.dot"),
# "fixed", "1", "1"],
# stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=10, universal_newlines=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment