Commit 56db3d15 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Updated files, in preparation for new version

parent f7a2fb10
import itertools
from model.ra import IORegisterAutomaton
from utils import determinize
from z3gi.learn.ra import RALearner
from tests.iora_testscenario import *
from encode.iora import IORAEncoder
def determinize_act_io(tuple_seq):
seq = list(itertools.chain.from_iterable(tuple_seq))
det_seq = determinize(seq)
det_act_seq = [Action(l, v) for (l, v) in det_seq]
det_duple_seq = [(det_act_seq[i], det_act_seq[i+1]) for i in range(0, len(det_act_seq), 2)]
return det_duple_seq
def check_ra_against_obs(learned_ra:IORegisterAutomaton, m, test_scenario: RaTestScenario):
"""Checks if the learned RA corresponds to the scenario observations"""
for trace in test_scenario.traces:
trace = determinize_act_io(trace)
inp_trace = []
for (inp_act, out_act) in trace:
inp_trace.append(inp_act)
output = learned_ra.output(inp_trace)
if output != out_act:
print("Register automaton: \n {0} \n with model: \n {1} \n responds to {2} "
"with {3} instead of {4}".format(learned_ra, m, inp_trace, output, out_act))
return
for i in range(1,2):
print("Experiment ",i)
learner = RALearner(IORAEncoder(), verbose=True)
exp = sut_m6
for trace in exp.traces:
learner.add(trace)
(_, ra, m) = learner._learn_model(exp.nr_locations-1,exp.nr_locations+1,exp.nr_registers)
if m is not None:
print(m)
model = ra.export(m)
print(model)
check_ra_against_obs( model, m, exp)
else:
print("unsat")
\ No newline at end of file
import os.path
from enum import Enum
from encode.fa import MealyEncoder, DFAEncoder
from encode.incremental.fa import MealyIEncoder, MealyTreeIEncoder
from encode.ra import RAEncoder
from learn.algorithm import learn
from learn.algorithm import learn_mbt
from learn.fa import FALearner
from learn.incremental.fa import FAILearner
from learn.ra import RALearner
from parse.importer import build_automaton_from_dot
from sut import SUTType, MealyObservation, StatsSUT, RAObservation, IORAObservation
from sut.fifoset import FIFOSetClass
from sut.login import new_login_sut, LoginClass
from sut.simulation import MealyMachineSimulation
from sut.sut_cache import IOCache, CacheSUT, AcceptorCache
from test import IORATest, MealyTest
from test.chain import TestGeneratorChain
from test.coloring import ColoringTestGenerator
from test.rwalk import IORARWalkFromState, MealyRWalkFromState, DFARWalkFromState, RARWalkFromState
from test.yanna import YannakakisTestGenerator
from tests.iora_testscenario import *
from encode.iora import IORAEncoder
# some paths
yan_cmd = os.path.join("resources", "binaries", "yannakakis.exe")
models_loc = os.path.join("resources", "models")
maestro = os.path.join(models_loc, "bankcards", "MAESTRO.dot")
visa = os.path.join(models_loc, "bankcards", "VISA.dot")
biometric = os.path.join(models_loc, "biometric.dot")
# some example runs
def scalable_learn_iora():
learner = RALearner(IORAEncoder())
test_type = IORATest
exp = sut_m5
(model, statistics) = learn(learner, test_type, exp.traces)
print(model)
print(statistics)
def scalable_learn_mbt_iora():
learner = RALearner(IORAEncoder())
learner.set_timeout(10000)
sut = new_login_sut(1)
mbt = IORARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def scalable_learn_mbt_mealy():
learner = FALearner(MealyEncoder())
learner.set_timeout(1000)
login = LoginClass()
sut = login.new_sut(SUTType.Mealy, 2)
mbt = MealyRWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def scalable_learn_mbt_dfa():
learner = FALearner(DFAEncoder())
learner.set_timeout(100000)
login = LoginClass()
sut = login.new_sut(SUTType.DFA, 2)
mbt = DFARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def scalable_learn_mbt_ra():
learner = RALearner(RAEncoder())
learner.set_timeout(600000)
login = FIFOSetClass()
sut = login.new_sut(SUTType.RA, 1)
mbt = RARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def sim_learn_mbt_yan_mealy(dot_path):
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
(model, statistics) = learn_mbt(cache_sut, learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def sim_learn_mbt_coloryan_mealy(dot_path):
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt1 = ColoringTestGenerator(cache_sut, cache)
mbt2 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt = TestGeneratorChain([mbt1, mbt2])
(model, statistics) = learn_mbt(cache_sut,learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def sim_inc_learn_mbt_coloryan_mealy(dot_path):
learner = FAILearner(MealyTreeIEncoder())
learner.set_timeout(200000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt1 = ColoringTestGenerator(cache_sut, cache)
mbt2 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt = TestGeneratorChain([mbt1, mbt2])
(model, statistics) = learn_mbt(cache_sut,learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def scalable_learn_mbt_chainrw_iora():
learner = RALearner(IORAEncoder())
learner.set_timeout(600000)
login = FIFOSetClass()
sut = login.new_sut(SUTType.IORA, 1)
cache = IOCache(IORAObservation)
cache_sut = CacheSUT(sut, cache)
mbt = TestGeneratorChain([ColoringTestGenerator(cache_sut, cache), RARWalkFromState(sut, 5, 0.2)])
(model, statistics) = learn_mbt(cache_sut, learner, mbt, 10000)
print(model)
print(statistics)
#scalable_learn_mbt_mealy()
#scalable_learn_mbt_iora()
sim_learn_mbt_yan_mealy(maestro)
#sim_learn_mbt_mealy()
from z3gi.learn.ra import RALearner
from tests.ra_testscenario import *
from encode.ra import RAEncoder
def check_ra_against_obs(learner, learned_ra, m, test_scenario):
"""Checks if the learned RA corresponds to the scenario observations"""
for trace, acc in test_scenario.traces:
if learned_ra.accepts(trace) != acc:
print(learner.encoder.tree)
print("Register automaton {0} \n\n with model {1} \n doesn't correspond to the observation {2}"
.format(learned_ra, m, str((trace, acc))))
return
for i in range(1,10):
print("Experiment ",i)
learner = RALearner(labels, encoder=RAEncoder())
exp = sut_m3
for trace in exp.traces:
learner.add(trace)
(_, ra, m) = learner._learn_model(exp.nr_locations,exp.nr_locations+1,exp.nr_registers)
if m is not None:
model = ra.export(m)
loc = ra.locations
lbl = ra.labels[labels[0]]
fresh = ra.fresh
check_ra_against_obs(learner, model, m, exp)
print("Learning successful")
else:
print("unsat")
\ No newline at end of file
import functools
from abc import ABCMeta
from typing import Tuple, List, Dict
import collections
from encode.fa import DFAEncoder, MealyEncoder
from encode.iora import IORAEncoder, IORAQREncoder
from encode.ra import RAEncoder, RAQREncoder
from learn import Learner
from learn.fa import FALearner
from learn.ra import RALearner
from model import Automaton
from model.ra import RegisterMachine
from sut import SUTType, StatsSUT, DFAObservation, RAObservation, MealyObservation, IORAObservation
from sut.login import LoginClass
from sut.scalable import ScalableSUTClass
from sut.fifoset import FIFOSetClass
from sut.stack import StackClass
from sut.sut_cache import AcceptorCache, IOCache, CacheSUT
from learn.algorithm import learn_mbt, Statistics
from test.chain import TestGeneratorChain
from test.coloring import ColoringTestGenerator
from test.rwalk import DFARWalkFromState, MealyRWalkFromState, RARWalkFromState, IORARWalkFromState
from statistics import stdev, median
class SutDesc(collections.namedtuple("SutDesc", 'sut_class type size')):
def __str__(self):
return str(self.type).replace("SUTType.","") + "_" + str(self.sut_class.__class__.__name__).replace("Class", "") + "(" + str(self.size) + ")"
TestDesc = collections.namedtuple("TestDesc", 'max_tests rand_length prop_reset')
class ExperimentStats(collections.namedtuple("CollectedStats", "states minimal registers tests "
"inputs max_ltime learn_time")):
pass
class CollatedStats(collections.namedtuple("CollatedStats", "exp_succ states registers minimal consistent avg_tests std_tests avg_inputs std_inputs max_ltime avg_ltime std_ltime")):
pass
def get_learner_setup(sut, sut_type:SUTType, size, test_desc:TestDesc):
args = (sut, test_desc.rand_length+size, test_desc.prop_reset)
if sut_type is SUTType.DFA:
return (FALearner(DFAEncoder()), DFARWalkFromState(*args))
elif sut_type is SUTType.Mealy:
return (FALearner(MealyEncoder()), MealyRWalkFromState(*args))
elif sut_type is SUTType.RA:
ra_learner = RALearner(RAEncoder())
ra_learner.set_num_reg(size)
return (ra_learner, RARWalkFromState(*args))
elif sut_type is SUTType.IORA:
ra_learner = RALearner(IORAEncoder())
ra_learner.set_num_reg(size)
return (ra_learner, IORARWalkFromState(*args))
raise Exception("Invalid setup")
class Benchmark:
def __init__(self):
self.suts:List[Tuple[ScalableSUTClass, SUTType]] = []
self.learn_setup: Dict[SUTType, Tuple[Learner, type]] = {}
def add_sut(self, sut_class:ScalableSUTClass, sut_type=None):
if sut_type is None:
for st in list(SUTType):
if sut_class.new_sut(st, 1) is not None:
self.suts.append((sut_class, st))
else:
if sut_class.new_sut(sut_type, 1) is None:
raise Exception(" Added type not implemented by the sut class")
self.suts.append((sut_class, sut_type))
return self
def add_setup(self, sut_type, sut_learner, sut_tester):
self.learn_setup[sut_type] = (sut_learner, sut_tester)
return self
def _run_benchmark(self, sut_class:ScalableSUTClass, sut_type:SUTType, test_desc:TestDesc, tout:int,\
min_size:int, max_size:int, use_coloring:bool) -> List[Tuple[SutDesc, ExperimentStats]]:
results = []
size = min_size
while True:
if max_size is not None and size > max_size:
break
print("Learning ", size)
sut = sut_class.new_sut(sut_type, size)
stats_sut = StatsSUT(sut)
sut_stats = stats_sut.stats_tracker()
if sut_type.is_acceptor():
if sut_type is SUTType.DFA:
cache = AcceptorCache(DFAObservation)
else:
cache = AcceptorCache(RAObservation)
else:
if sut_type is SUTType.Mealy:
cache = IOCache(MealyObservation)
else:
cache = IOCache(IORAObservation)
cache_sut = CacheSUT(stats_sut, cache)
learner,tester = get_learner_setup(cache_sut, sut_type, size, test_desc)
if use_coloring:
tester = TestGeneratorChain([ColoringTestGenerator(cache_sut, cache), tester])
learner.set_timeout(tout)
# ugly but there you go
(model, statistics) = learn_mbt(cache_sut, learner, tester, test_desc.max_tests, stats_tracker=sut_stats)
if model is None:
break
else:
sut_desc = SutDesc(sut_class, sut_type, size)
imp_stats = self._collect_stats(sut_desc, model, statistics)
results.append( (sut_desc, imp_stats))
size += 1
return results
def _collect_stats(self, sut_desc:SutDesc, model:Automaton, statistics:Statistics) -> ExperimentStats:
states = len(model.states())
registers = len(model.registers()) if isinstance(model, RegisterMachine) else None
exp_size = sut_desc.sut_class.num_states(sut_desc.type, sut_desc.size)
minimal = None if exp_size is None else exp_size == len(model.states())
learn_tests = statistics.resets
learn_inputs = statistics.inputs
learn_time = sum(statistics.learning_times)
max_ltime = max(statistics.learning_times)
return ExperimentStats(states=states, minimal=minimal, registers=registers, tests=learn_tests, inputs=learn_inputs, max_ltime=max_ltime, learn_time=learn_time)
def run_benchmarks(self, test_desc:TestDesc, timeout:int, min_size:int=1, max_size:int=1, use_coloring:bool=False) -> List[Tuple[SutDesc, ExperimentStats]]:
results = []
for sut_class, sut_type in self.suts:
res = self._run_benchmark(sut_class, sut_type, test_desc, timeout, min_size, max_size, use_coloring)
results.extend(res)
return results
def collate_stats(sut_desc: SutDesc, exp_stats:List[ExperimentStats]):
if exp_stats is None:
return None
else:
states = [e.states for e in exp_stats]
avg_states = median(states)
regs = [e.registers for e in exp_stats]
if sut_desc.type.has_registers():
avg_reg = median(regs)
else:
avg_reg = None
if sut_desc.sut_class.num_states(sut_desc.type, sut_desc.size) is None:
minimal = None
else:
minimal = functools.reduce(lambda x,y: x & y, [e.minimal for e in exp_stats])
consistent = len(set(states)) == 1 and \
(not sut_desc.type.has_registers() or len(set(regs)) == 1)
exp_succ = len(exp_stats)
ltests = [e.tests for e in exp_stats]
linputs = [e.inputs for e in exp_stats]
ltime = [e.learn_time for e in exp_stats]
maxltimes = [e.max_ltime for e in exp_stats]
return CollatedStats(
exp_succ=exp_succ,
states=avg_states,
registers=avg_reg,
minimal=minimal,
consistent=consistent,
avg_tests=median(ltests),
std_tests=0 if len(ltests) == 1 else stdev(ltests),
avg_inputs=median(linputs),
std_inputs=0 if len(linputs) == 1 else stdev(linputs),
max_ltime=max(maxltimes),
avg_ltime=median(ltime),
std_ltime=0 if len(ltime) == 1 else stdev(ltime),
)
#"exp_succ states registers consistent "
# "avg_ltests std_ltests avg_inputs std_inputs "
# "avg_ltime std_ltime"
def print_results(results : List[Tuple[SutDesc, ExperimentStats]]):
if len(results) == 0:
print ("No statistics to report on")
else:
for sut_desc,stats in results:
print(sut_desc, " ", stats)
b = Benchmark()
# adding learning setups for each type of machine
# no longer used, built function which returns tuple
#b.add_setup(SUTType.DFA, FALearner(DFAEncoder()), DFARWalkFromState)
#b.add_setup(SUTType.Mealy, FALearner(MealyEncoder()), MealyRWalkFromState)
#b.add_setup(SUTType.RA, RALearner(RAEncoder()), RARWalkFromState)
#b.add_setup(SUTType.IORA, RALearner(IORAEncoder()), IORARWalkFromState)
# add the sut classes we want to benchmark
b.add_sut(FIFOSetClass(), SUTType.DFA)
b.add_sut(LoginClass(), SUTType.DFA)
b.add_sut(FIFOSetClass(), SUTType.Mealy)
b.add_sut(LoginClass(), SUTType.Mealy)
#b.add_sut(StackClass(), SUTType.DFA)
#b.add_sut(FIFOSetClass(), SUTType.DFA)
#b.add_sut(FIFOSetClass(), SUTType.RA)
#b.add_sut(LoginClass(), SUTType.Mealy)
#b.add_sut(StackClass(), SUTType.IORA)
# create a test description
t_desc = TestDesc(max_tests=10000, prop_reset=0.2, rand_length=3)
# give an smt timeout value (in ms)
timeout = 10000
# how many times each experiment should be run
num_exp = 5
# the start size
min_size = 1
# up to what systems of what size do we want to run experiments (set to None if size is ignored as a stop condition)
max_size = None
# do you want to augment test generation by coloring (before the rwalk, we explore all uncolored transitions in the hyp)
use_coloring = False
# run the benchmark and collect results
results = []
for i in range(0, num_exp):
results += b.run_benchmarks(t_desc, timeout, min_size, max_size, use_coloring)
print("============================")
print_results(results)
sut_dict = dict()
for sut_desc,exp in results:
if sut_desc not in sut_dict:
sut_dict[sut_desc] = list()
sut_dict[sut_desc].append(exp)
collated_stats = [(sut_desc, collate_stats(sut_desc, experiments)) for sut_desc, experiments in sut_dict.items()]
for (sut_desc, c_stat) in collated_stats:
print(sut_desc, " ", c_stat)
# sort according to the sut_desc (the first element)
#results.sort()
# print results
print_results(results)
sut_dict = dict()
for sut_desc,exp in results:
if sut_desc not in sut_dict:
sut_dict[sut_desc] = list()
sut_dict[sut_desc].append(exp)
collated_stats = [(sut_desc, collate_stats(sut_desc, experiments)) for sut_desc, experiments in sut_dict.items()]
for (sut_desc, c_stat) in collated_stats:
print(sut_desc, " ", c_stat)
......@@ -9,7 +9,7 @@ import model.gen as gen
from encode.fa import MealyEncoder, DFAEncoder
from encode.iora import IORAEncoder
from encode.ra import RAEncoder, RAQREncoder
from encode.ra import RAQREncoder
from learn.fa import FALearner
from learn.ra import RALearner
from test import AcceptorTest, IORATest, MealyTest
......@@ -48,6 +48,7 @@ aut2suttype={
model.fa.DFA:sut.SUTType.DFA
}
# the interface is a bit convoluted, but it does give access to all the functionality available
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Inference tool which can:\n'
'(1) passively learn from traces file\n'
......
......@@ -11,7 +11,7 @@ def dt_enum(name, elements):
def int_enum(name, elements):
d = z3.IntSort()
return d, [z3.IntVal(i) for i in range(1, len(elements) + 1)]
return d, [z3.IntVal(i) for i in range(0, len(elements))]
def declsort_enum(name, elements):
d = z3.DeclareSort(name)
......
......@@ -52,6 +52,19 @@ class Mapper(object):
def element(self, name):
return z3.Const("n"+str(name), self.Element)
class IntMapper(object):
def __init__(self, fa):
self.Element = z3.IntSort()
self.map = z3.Function('map', self.Element, fa.State)
self._elements = dict()
self.start = z3.IntVal(0)
self._elements[0] = self.start
def element(self, name):
if name not in self._elements:
el = z3.IntVal(len(self._elements))
self._elements[name] = el
return self._elements[name]
class MealyMachineBuilder(object):
......@@ -120,7 +133,8 @@ class MealyTranslator(object):
return self._z3_to_out[self._model.eval(z3label)]
class DFATranslator(object):
"""Provides translation from z3 constants to DFA elements. """
"""Provides translation from z3 constants to DFA elements. It evaluates everything (s.t. variables are resolved
to their actual values), enabling it to translate both variables and values."""
def __init__(self, model : z3.ModelRef, fa:DFA):
self._fa = fa
self._z3_to_label = {model.eval(fa.labels[inp]): inp for inp in fa.labels.keys()}
......
......@@ -3,7 +3,7 @@ import itertools
import z3
from define import dt_enum
from define.fa import DFA, MealyMachine, Mapper
from define.fa import DFA, MealyMachine, Mapper, IntMapper
from encode import Encoder
from utils import Tree
......@@ -57,6 +57,62 @@ class DFAEncoder(Encoder):
constraints.append(dfa.transition(mapper.map(n), l) == mapper.map(c))
return constraints
# an integer based encoding
class IntDFAEncoder(Encoder):
def __init__(self, use_ineq=False):
self.tree = Tree(itertools.count(0))
self.cache = {}
self.labels = set()
self.use_ineq = use_ineq
def add(self, trace):
seq, accept = trace
node = self.tree[seq]
self.cache[node] = accept
self.labels.update(seq)
def build(self, num_states):
from define import int_enum
dfa = DFA(list(self.labels), num_states, state_enum=int_enum, label_enum=int_enum)
mapper = IntMapper(dfa)
constraints = self.axioms(dfa, mapper)
constraints += self.node_constraints(dfa, mapper)
constraints += self.transition_constraints(dfa, mapper)
return dfa, constraints
def axioms(self, dfa: DFA, mapper: Mapper):
if self.use_ineq:
return [
z3.Distinct([mapper.element(node.id) for node in self.cache]),
z3.And([z3.And([z3.And([dfa.transition(state, label) < z3.IntVal(len(dfa.states)),
dfa.transition(state, label) >= dfa.start])
for state in dfa.states])
for label in dfa.labels.values()]),
]
else:
return [
z3.And([z3.And([z3.Or([dfa.transition(state, label) == to_state
for to_state in dfa.states]) for state in dfa.states])
for label in dfa.labels.values()]),
]
def node_constraints(self, dfa, mapper):
constraints = []
for node in self.cache:
accept = self.cache[node]
n = mapper.element(node.id)
constraints.append(dfa.output(mapper.map(n)) == accept)
return constraints
def transition_constraints(self, dfa, mapper):
constraints = [dfa.start == mapper.map(mapper.start)]
for node, label, child in self.tree.transitions():
n = mapper.element(node.id)
l = dfa.labels[label]
c = mapper.element(child.id)
constraints.append(dfa.transition(mapper.map(n), l) == mapper.map(c))
return constraints
class MealyEncoder(Encoder):
def __init__(self):
self.tree = Tree(itertools.count(0))
......