Commit 149ee552 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Added chain and coloring test

parent 70be9b43
......@@ -7,11 +7,14 @@ from learn.algorithm import learn_mbt
from learn.fa import FALearner
from learn.ra import RALearner
from parse.importer import build_automaton_from_dot
from sut import SUTType
from sut import SUTType, MealyObservation, StatsSUT, RAObservation, IORAObservation
from sut.fifoset import FIFOSetClass
from sut.login import new_login_sut, LoginClass
from sut.simulation import MealyMachineSimulation
from test import IORATest
from sut.sut_cache import IOCache, CacheSUT, AcceptorCache
from test import IORATest, MealyTest
from test.chain import TestGeneratorChain
from test.coloring import ColoringTestGenerator
from test.rwalk import IORARWalkFromState, MealyRWalkFromState, DFARWalkFromState, RARWalkFromState
from test.yanna import YannakakisTestGenerator
from tests.iora_testscenario import *
......@@ -33,7 +36,7 @@ def scalable_learn_mbt_iora():
learner.set_timeout(10000)
sut = new_login_sut(1)
mbt = IORARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
......@@ -43,7 +46,7 @@ def scalable_learn_mbt_mealy():
login = LoginClass()
sut = login.new_sut(SUTType.Mealy, 2)
mbt = MealyRWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
......@@ -53,7 +56,7 @@ def scalable_learn_mbt_dfa():
login = LoginClass()
sut = login.new_sut(SUTType.DFA, 2)
mbt = DFARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
......@@ -63,7 +66,7 @@ def scalable_learn_mbt_ra():
login = FIFOSetClass()
sut = login.new_sut(SUTType.RA, 1)
mbt = RARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
......@@ -73,7 +76,7 @@ def sim_learn_mbt_mealy():
maestro_aut = build_automaton_from_dot("MealyMachine", os.path.join("resources", "models", "bankcards", "MAESTRO.dot"))
maestro_sut = MealyMachineSimulation(maestro_aut)
mbt = MealyRWalkFromState(maestro_sut, 3, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
(model, statistics) = learn_mbt(maestro_sut,learner, mbt, 10000)
print(model)
print(statistics)
......@@ -83,8 +86,36 @@ def sim_learn_mbt_yan_mealy(dot_path):
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
yan_cmd = os.path.join("resources", "binaries", "yannakakis.exe")
mbt = YannakakisTestGenerator(dot_sut, yan_cmd)
(model, statistics) = learn_mbt(learner, mbt, 10000)
mbt = YannakakisTestGenerator(dot_sut, yan_cmd, seed=1)
(model, statistics) = learn_mbt(dot_sut,learner, mbt, 10000)
print(model)
print(statistics)
def sim_learn_mbt_chainyan_mealy(dot_path):
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
yan_cmd = os.path.join("resources", "binaries", "yannakakis.exe")
mbt1 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt2 = ColoringTestGenerator(cache_sut, cache)
mbt = TestGeneratorChain([mbt1, mbt2])
(model, statistics) = learn_mbt(cache_sut,learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def scalable_learn_mbt_chainrw_iora():
learner = RALearner(IORAEncoder())
learner.set_timeout(600000)
login = FIFOSetClass()
sut = login.new_sut(SUTType.IORA, 1)
cache = IOCache(IORAObservation)
cache_sut = CacheSUT(sut, cache)
mbt = TestGeneratorChain([ColoringTestGenerator(cache_sut, cache), RARWalkFromState(sut, 5, 0.2)])
(model, statistics) = learn_mbt(cache_sut, learner, mbt, 10000)
print(model)
print(statistics)
......@@ -99,8 +130,8 @@ def visa_learn_mbt_yan_mealy():
def biometric_learn_mbt_yan_mealy():
sim_learn_mbt_yan_mealy(os.path.join("models_loc", "biometric.dot"))
visa_learn_mbt_yan_mealy()
scalable_learn_mbt_chainrw_iora()
#sim_learn_mbt_chainyan_mealy(os.path.join(models_loc, "bankcards", "VISA.dot"))
#visa_learn_mbt_yan_mealy()
#scalable_learn_mbt_mealy()
#scalable_learn_mbt_iora()
......@@ -18,6 +18,8 @@ from sut.fifoset import FIFOSetClass
from sut.stack import StackClass
from sut.sut_cache import AcceptorCache, IOCache, CacheSUT
from learn.algorithm import learn_mbt, Statistics
from test.chain import TestGeneratorChain
from test.coloring import ColoringTestGenerator
from test.rwalk import DFARWalkFromState, MealyRWalkFromState, RARWalkFromState, IORARWalkFromState
from statistics import stdev, median
......@@ -66,7 +68,7 @@ class Benchmark:
self.learn_setup[sut_type] = (sut_learner, sut_tester)
return self
def _run_benchmark(self, sut_class:ScalableSUTClass, sut_type:SUTType, test_desc:TestDesc, tout:int, max_size:int) \
def _run_benchmark(self, sut_class:ScalableSUTClass, sut_type:SUTType, test_desc:TestDesc, tout:int, max_size:int, use_coloring:bool) \
-> List[Tuple[SutDesc, ExperimentStats]]:
results = []
size = 1
......@@ -90,9 +92,12 @@ class Benchmark:
cache = IOCache(IORAObservation)
cache_sut = CacheSUT(stats_sut, cache)
learner,tester = get_learner_setup(cache_sut, sut_type, size, test_desc)
if use_coloring:
tester = TestGeneratorChain([ColoringTestGenerator(cache_sut, cache), tester])
learner.set_timeout(tout)
# ugly but there you go
(model, statistics) = learn_mbt(learner, tester, test_desc.max_tests, stats_tracker=sut_stats)
(model, statistics) = learn_mbt(cache_sut, learner, tester, test_desc.max_tests, stats_tracker=sut_stats)
if model is None:
break
else:
......@@ -111,10 +116,10 @@ class Benchmark:
max_ltime = max(statistics.learning_times)
return ExperimentStats(states=states, registers=registers, tests=learn_tests, inputs=learn_inputs, max_ltime=max_ltime, learn_time=learn_time)
def run_benchmarks(self, test_desc:TestDesc, timeout:int, max_size:int=None) -> List[Tuple[SutDesc, ExperimentStats]]:
def run_benchmarks(self, test_desc:TestDesc, timeout:int, max_size:int=None, use_coloring:bool=False) -> List[Tuple[SutDesc, ExperimentStats]]:
results = []
for sut_class, sut_type in self.suts:
res = self._run_benchmark(sut_class, sut_type, test_desc, timeout, max_size)
res = self._run_benchmark(sut_class, sut_type, test_desc, timeout, max_size, use_coloring)
results.extend(res)
return results
......@@ -172,9 +177,9 @@ b = Benchmark()
#b.add_setup(SUTType.IORA, RALearner(IORAEncoder()), IORARWalkFromState)
# add the sut classes we want to benchmark
b.add_sut(FIFOSetClass())
#b.add_sut(FIFOSetClass())
b.add_sut(LoginClass())
b.add_sut(StackClass())
#b.add_sut(StackClass())
#b.add_sut(FIFOSetClass(), SUTType.DFA)
......@@ -194,10 +199,13 @@ num_exp = 5
# up to what systems of what size do we want to run experiments (set to None if size is ignored as a stop condition)
max_size = None
# do you want to augment test generation by coloring (before the rwalk, we explore all uncolored transitions in the hyp)
use_coloring = True
# run the benchmark and collect results
results = []
for i in range(0, num_exp):
results += b.run_benchmarks(t_desc, timeout, max_size)
results += b.run_benchmarks(t_desc, timeout, max_size, use_coloring)
print("============================")
print_results(results)
sut_dict = dict()
......
......@@ -102,6 +102,6 @@ if __name__ == '__main__':
rand_test_length = args.rand_length
reset_prob = args.reset_prob
test_generator = aut2rwalkcls[aut_type](sut_to_learn, rand_test_length, reset_prob)
(automaton, statistics) = alg.learn_mbt(learner, test_generator, num_tests)
(automaton, statistics) = alg.learn_mbt(sut_to_learn, learner, test_generator, num_tests)
print("Learned\n", automaton, "\nWith stats\n", statistics)
\ No newline at end of file
......@@ -282,18 +282,6 @@ class IORAEncoder(Encoder):
if value is not None:
constraints.extend([
# If the transition is over a register, then the register is in use.
# z3.ForAll(
# [r],
# z3.Implies(
# z3.And(
# r != ra.fresh,
# ra.transition(mapper.map(n), l, r) == mapper.map(c)
# ),
# ra.used(mapper.map(n), r) == True
# )
# ),
# If a non-fresh register has changed, it must have been updated
z3.ForAll(
[r],
......
......@@ -3,7 +3,9 @@ from typing import List, Tuple, Union,cast
from model import Automaton
from learn import Learner
from sut import StatsTracker
from model.ra import Action
from sut import StatsTracker, SUT
from sut.scalable import ActionSignature
from test import TestGenerator, Test
import time
......@@ -78,9 +80,9 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
statistics.set_suite_size(len(traces))
return (model, statistics)
def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int, stats_tracker:StatsTracker=None) -> Tuple[Union[Automaton, None], Statistics]:
def learn_mbt(sut:SUT, learner:Learner, test_generator:TestGenerator, max_tests:int, stats_tracker:StatsTracker=None) -> Tuple[Union[Automaton, None], Statistics]:
""" takes learner, a test generator, and bound on the number of tests and generates a model"""
next_test = test_generator.gen_test(None)
next_test = gen_blind_test(sut)
statistics = Statistics()
if next_test is None:
return (None, statistics)
......@@ -108,9 +110,7 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int, stat
for next_test in generated_tests:
ce = next_test.check(model)
if ce is not None:
#print("TEST: ", next_test.trace())
print("CE: ", ce)
#print(model)
learner.add(ce)
done = False
break
......@@ -131,6 +131,7 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int, stat
generated_tests.append(next_test)
ce = next_test.check(model)
if ce is not None:
print("CE: ", ce)
learner_tests.append(next_test)
learner.add(ce)
done = False
......@@ -143,3 +144,23 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int, stat
#print([str(test.trace() for test in learner_tests)])
return (model, statistics)
def gen_blind_test(sut:SUT):
"""generates a sequence covering all input elements in the sut interface"""
seq = []
for abs_inp in sut.input_interface():
cnt = 0
# if it's RA stuff
if isinstance(abs_inp, ActionSignature):
if abs_inp.num_params == 0:
val = None
else:
val = cnt
cnt += 1
seq.append(Action(abs_inp.label, val))
elif isinstance(abs_inp, str):
seq.append(abs_inp)
else:
raise Exception("Unrecognized type")
obs = sut.run(seq)
return obs.to_test()
\ No newline at end of file
......@@ -73,9 +73,9 @@ class Automaton(metaclass=ABCMeta):
if state is not None:
if len(self._acc_trans_seq) == 0:
raise Exception("Access sequences haven't been defined for this machine")
return self._seq(self._acc_trans_seq[state])
return self.trans_to_inputs(self._acc_trans_seq[state])
else:
return {state:self._seq(self._acc_trans_seq[state]) for state in self.states()}
return {state:self.trans_to_inputs(self._acc_trans_seq[state]) for state in self.states()}
def states(self):
return list(self._states)
......@@ -106,8 +106,13 @@ class Automaton(metaclass=ABCMeta):
return crt_state
@abstractmethod
def _seq(self, transitions:List[Transition])->List[object]:
"""returns the sequence of inputs generated by execution of these transitions"""
def trans_to_inputs(self, transitions:List[Transition])->List[object]:
"""returns a unique sequence of inputs generated by execution traversing these transitions"""
pass
@abstractmethod
def inputs_to_trans(self, seq: List[object])->List[Transition]:
"""returns a unique sequence of transitions generated by execution of these inputs"""
pass
def input_labels(self):
......
from abc import ABCMeta
from typing import List
from model import MutableAutomatonMixin, MutableAcceptorMixin, Transition, Acceptor, Transducer
from model import MutableAutomatonMixin, MutableAcceptorMixin, Transition, Acceptor, Transducer, Automaton
Symbol = str
State = str
......@@ -21,7 +21,20 @@ class IOTransition(Transition):
else:
return short
class DFA(Acceptor):
class NavigatorMixin(Automaton, metaclass=ABCMeta):
def trans_to_inputs(self, transitions: List[Transition]) -> List[Symbol]:
return [trans.start_label for trans in transitions]
def inputs_to_trans(self, seq: List[Symbol]) -> List[Transition]:
trans = []
state = self.start_state()
for inp in seq:
next_trans = self.transitions(state, inp)
trans.append(next_trans[0])
state = next_trans[0].end_state
return trans
class DFA(Acceptor, NavigatorMixin):
def __init__(self, states, state_to_trans, state_to_acc, acc_trans_seq={}):
super().__init__(states, state_to_trans, state_to_acc, acc_trans_seq)
......@@ -32,10 +45,7 @@ class DFA(Acceptor):
crt_state = super().state(trace)
return crt_state
def _seq(self, transitions: List[Transition]):
return [trans.start_label for trans in transitions]
class MutableDFA(DFA, MutableAcceptorMixin):
class MutableDFA(DFA, MutableAcceptorMixin, NavigatorMixin):
def __init__(self):
super().__init__([], {}, {})
......@@ -45,7 +55,7 @@ class MutableDFA(DFA, MutableAcceptorMixin):
def to_immutable(self) -> DFA:
return DFA(self._states, self._state_to_trans, self._state_to_acc, self.acc_trans_seq())
class MooreMachine(Transducer):
class MooreMachine(Transducer, NavigatorMixin):
def __init__(self, states, state_to_trans, state_to_out):
super().__init__(states, state_to_trans)
self.state_to_out = state_to_out
......@@ -61,11 +71,7 @@ class MooreMachine(Transducer):
crt_state = self.state(trace)
return self.state_to_out[crt_state]
def _seq(self, transitions:List[Transition]):
#trace = [(trans.start_label, self.state_to_out[trans.end_state]) for trans in transitions]
return [trans.start_label for trans in transitions]#trace
class MealyMachine(Transducer):
class MealyMachine(Transducer, NavigatorMixin):
def __init__(self, states, state_to_trans, acc_trans_seq={}):
super().__init__(states, state_to_trans, acc_trans_seq)
......@@ -76,10 +82,6 @@ class MealyMachine(Transducer):
crt_state = super().state(trace)
return crt_state
def _seq(self, transitions:List[IOTransition]):
#trace = [(trans.start_label, trans.output) for trans in transitions]
return [trans.start_label for trans in transitions]
def output(self, trace: List[Symbol]) -> Output:
if len(trace) == 0:
return None
......
......@@ -147,7 +147,23 @@ class RegisterAutomaton(Acceptor, RegisterMachine):
return crt_state
def _seq(self, transitions:List[RATransition]):
def inputs_to_trans(self, seq: List[Action]) -> List[RATransition]:
init = -1
reg_val = dict()
for reg in self.registers():
reg_val[reg] = init
fired_trans = []
crt_state = self.start_state()
for action in seq:
transitions = self.transitions(crt_state, action.label)
fired_transition = super()._fired_transition(transitions, reg_val, action)
reg_val = fired_transition.update(reg_val, action)
crt_state = fired_transition.end_state
fired_trans.append(fired_transition)
return fired_trans
def trans_to_inputs(self, transitions:List[RATransition]):
run = []
values = set()
reg_val = dict()
......@@ -158,6 +174,7 @@ class RegisterAutomaton(Acceptor, RegisterMachine):
inp_val = 0 if len(values) == 0 else max(values) + 1
values.add(inp_val)
inp = Action(trans.start_label, inp_val)
reg_val = trans.update(reg_val, inp)
run.append(inp)
return run
......@@ -219,7 +236,27 @@ class IORegisterAutomaton(Transducer, RegisterMachine):
output = fired_transition.output(valuation, values)
return output
def _seq(self, transitions: List[IORATransition]):
def inputs_to_trans(self, seq: List[Action]) -> List[IORATransition]:
init = -1
# maintains the set of values encountered
values = set()
reg_val = dict()
for reg in self.registers():
reg_val[reg] = init
fired_trans = []
crt_state = self.start_state()
for action in seq:
if action.value is not None:
values.add(action.value)
transitions = self.transitions(crt_state, action.label)
fired_transition = super()._fired_transition(transitions, reg_val, action)
reg_val = fired_transition.update(reg_val, action)
crt_state = fired_transition.end_state
fired_trans.append(fired_transition)
return fired_trans
def trans_to_inputs(self, transitions: List[IORATransition]) -> List[Action]:
seq = []
values = set()
reg_val = dict()
......
......@@ -4,6 +4,7 @@ from typing import List, Tuple
from model.fa import Symbol
from model.ra import Action
from test import MealyTest, AcceptorTest, IORATest
__all__ = [
"get_simulation",
......@@ -22,7 +23,12 @@ class Observation():
"""returns all the inputs from an observation"""
pass
class TransducerObservation(metaclass=ABCMeta):
@abstractmethod
def to_test(self):
"""returns a corresponding test for the observation"""
pass
class TransducerObservation(Observation, metaclass=ABCMeta):
def __init__(self, iotrace:List[Tuple[object,object]]):
self.iotrace = iotrace
......@@ -35,7 +41,7 @@ class TransducerObservation(metaclass=ABCMeta):
def __str__(self):
return "Obs" + str(self.trace()) + ""
class AcceptorObservation(metaclass=ABCMeta):
class AcceptorObservation(Observation, metaclass=ABCMeta):
def __init__(self, seq:List[object], acc:bool):
self.seq = seq
self.acc = acc
......@@ -46,6 +52,9 @@ class AcceptorObservation(metaclass=ABCMeta):
def inputs(self) -> List[object]:
return self.seq
def to_test(self):
return AcceptorTest(self.trace())
def __str__(self):
return "Obs" + str(self.trace())
......@@ -69,6 +78,9 @@ class MealyObservation(TransducerObservation):
def inputs(self) -> List[Symbol]:
return super().inputs()
def to_test(self):
return MealyTest(self.trace())
class RegisterMachineObservation(Observation):
@abstractmethod
......@@ -106,6 +118,9 @@ class IORAObservation(TransducerObservation, RegisterMachineObservation):
ov = [a.value for (_,a) in self.trace() if a.value is not None]
return set(iv+ov)
def to_test(self):
return IORATest(self.trace())
class SUT(metaclass=ABCMeta):
OK = "OK"
NOK = "NOK"
......
......@@ -17,6 +17,10 @@ class Cache(metaclass=ABCMeta):
def update_cache(self, obs):
pass
@abstractmethod
def obs_iterator(self):
pass
class IOCache(Cache):
def __init__(self, obs_gen):
self._tree = Tree(itertools.count(0))
......@@ -39,13 +43,32 @@ class IOCache(Cache):
to_add = list(itertools.chain(*map(iter, obs.trace())))
self._tree[to_add]
def obs_iterator(self):
next_seqs = [[]]
while len(next_seqs) > 0:
seq = next_seqs.pop()
obs = self.from_cache(seq)
if obs is None:
print(self._tree)
print(seq)
exit(0)
seq_node = self._tree[ list(itertools.chain(*map(iter, obs.trace())))]
if len(seq_node.children) > 0:
for inp in seq_node.children.keys():
next_seqs.append(seq + [inp])
else:
yield obs
class AcceptorCache(Cache):
def __init__(self, obs_gen):
self._tree = Tree(itertools.count(0))
self._obs_gen = obs_gen
self._acc = {}
def from_cache(self, seq):
def from_cache(self, seq) -> Observation:
node = self._tree[seq]
if node in self._acc:
return self._obs_gen(seq, self._acc[node])
......@@ -56,6 +79,23 @@ class AcceptorCache(Cache):
node = self._tree[obs.inputs()]
self._acc[node] = obs.acc
def obs_iterator(self):
next_seqs = [[]]
while len(next_seqs) > 0:
seq = next_seqs.pop()
seq_node = self._tree[seq]
if len(seq_node.children) > 0:
for inp in seq_node.children.keys():
next_seqs.append(seq + [inp])
else:
obs = self.from_cache(seq)
#if obs is None:
# print(seq)
# print(self._tree)
# exit(0)
yield obs
class CacheSUT(SUT):
def __init__(self, sut:SUT, cache:Cache):
self._sut = sut
......
......@@ -5,8 +5,6 @@ import itertools
from model import Automaton, Acceptor, Transducer
from model.fa import Symbol
from model.ra import IORegisterAutomaton, Action
from sut import SUT
from sut.scalable import ActionSignature
from utils import determinize
......@@ -66,25 +64,6 @@ class TestGenerator(metaclass=ABCMeta):
test = self.gen_test(model)
self.terminate()
def gen_blind_inp_seq(self, sut:SUT):
"""generates a sequence covering all input elements in the sut interface"""
seq = []
for abs_inp in self.sut.input_interface():
cnt = 0
# if it's RA stuff
if isinstance(abs_inp, ActionSignature):
if abs_inp.num_params == 0:
val = None
else:
val = cnt
cnt += 1
seq.append(Action(abs_inp.label, val))
elif isinstance(abs_inp, str):
seq.append(abs_inp)
else:
raise Exception("Unrecognized type")
return seq
def initialize(self, model: Automaton):
"""feeds the tests generator the supplied automaton in an initialization step"""
pass
......
from typing import List
from model import Automaton
from test import TestGenerator
"""Used to chain multiple test generators"""
class TestGeneratorChain(TestGenerator):
def __init__(self, test_gen:List[TestGenerator]):
self._generators = test_gen
self._test_iter = None
def initialize(self, model: Automaton):