Commit a83b2801 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Removed an unnecessary abstraction layer in the Test hierarchy. Fixed Mealy Machine exporter class.

parent fcbf13f8
......@@ -20,7 +20,7 @@ def scrap_learn_iora():
def scrap_learn_mbt_iora():
learner = RALearner(IORAEncoder())
sut = new_login_sut(1)
mbt = IORARWalkFromState(sut, 10)
mbt = IORARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 1000)
print(model)
print(statistics)
......
import unittest
from encode.fa import MealyEncoder
from test import MealyTest
from tests.mealy_testscenario import *
from z3gi.learn.fa import FALearner
......@@ -23,8 +24,6 @@ class MealyLearnerTest(unittest.TestCase):
self.assertTrue(succ, msg="Automaton could not be inferred")
self.assertEqual(len(fa.states), test_scenario.nr_states,
"Wrong number of states.")
self.assertEqual(len(fa.states), test_scenario.nr_states,
"Wrong number of registers.")
exported = fa.export(model)
print("Learned model: \n", exported)
self.assertEqual(len(exported.states()), test_scenario.nr_states,
......@@ -33,9 +32,11 @@ class MealyLearnerTest(unittest.TestCase):
def check_against_obs(self, learned_fa, test_scenario):
"""Checks if the learned RA corresponds to the scenario observations"""
for trace, acc in test_scenario.traces:
self.assertEqual(learned_fa.accepts(trace), acc,
"Register automaton output doesn't correspond to observation {0}".format(str(trace)))
for iotrace in test_scenario.traces:
test = MealyTest(iotrace)
res = test.check(learned_fa)
self.assertIsNone(res, "Register automaton output doesn't correspond to the observation {0}"
.format(test.trace()))
def learn_model(self, test_scenario):
# inputs = set()
......
......@@ -59,12 +59,12 @@ class MealyMachineBuilder(object):
self.mm = mm
def build_mealy(self, m : z3.ModelRef) -> model.fa.MealyMachine:
tr = FATranslator()
tr = FATranslator(self.mm)
mut_mm = model.fa.MutableMealyMachine()
for state in self.mm.states:
mut_mm.add_state(tr.z3_to_state(state))
for state in self.mm.states:
for inp in self.mm.inputs:
for inp in self.mm.inputs.values():
output = m.eval(self.mm.output(state, inp))
to_state = m.eval(self.mm.transition(state, inp))
trans = model.fa.IOTransition(
......@@ -72,6 +72,7 @@ class MealyMachineBuilder(object):
tr.z3_to_label(inp),
tr.z3_to_label(output),
tr.z3_to_state(to_state))
mut_mm.add_transition(tr.z3_to_state(state), trans)
mut_mm.generate_acc_seq()
return mut_mm.to_immutable()
......
......@@ -3,7 +3,7 @@ from typing import cast
from model import Automaton
from learn import Learner
from test import TestTemplate, TestGenerator
from test import TraceTest, TestGenerator, Test
import time
__all__ = [
......@@ -12,17 +12,18 @@ __all__ = [
]
class Statistics():
"""We only refe"""
def __init__(self):
self.num_tests = 0
self.num_inputs = 0
self.num_learner_tests = 0
self.num_learner_inputs = 0
self.suite_size = 0
self.learning_times = []
def add_inputs(self, num):
self.num_inputs += num
def add_tests(self, num):
self.num_tests += num
def add_learner_test(self, test:Test):
""" updates the stats with relevant information from the added test"""
self.num_learner_inputs += test.size()
self.num_learner_tests += 1
pass
def set_suite_size(self, num):
self.suite_size = num
......@@ -31,14 +32,16 @@ class Statistics():
self.learning_times.append(time)
def __str__(self): return "Total number tests used in learning: {0} \n" \
"Total number inputs used in learning: {1} \n " \
"Test suite size: {2} \n " \
"Learning time for each model: {3} \n " \
"Total learning time: {4} ".format(self.num_tests,
self.num_inputs,
self.suite_size,
self.learning_times,
sum(self.learning_times))
"Total number inputs used in learning: {1} \n " \
"Test suite size: {2} \n " \
"Average learner test size: {3} \n " \
"Learning time for each model: {4} \n " \
"Total learning time: {5} ".format(self.num_learner_tests,
self.num_learner_inputs,
self.suite_size,
self.num_learner_inputs / self.num_learner_tests,
self.learning_times,
sum(self.learning_times))
def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automaton, Statistics]:
......@@ -48,11 +51,10 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
return (None, statistics)
else:
statistics.set_suite_size(len(traces))
test = cast(TestTemplate, test_type(traces.pop(0)))
test = cast(TraceTest, test_type(traces.pop(0)))
definition = None
learner.add(test.tr)
statistics.add_tests(1)
statistics.add_inputs(test.size())
statistics.add_learner_test(test)
done = False
model = None
learn_traces = [test.tr]
......@@ -63,7 +65,7 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
statistics.add_learning_time(end_time-start_time)
done = True
for trace in traces:
test = cast(TestTemplate, test_type(trace))
test = cast(TraceTest, test_type(trace))
ce = test.check(model)
if ce is not None:
if ce not in learn_traces:
......@@ -72,10 +74,10 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
raise Exception("The CE {0} has already been processed yet it "
"is still a CE. \n CEs: {1} \n Model: {2}".format(ce, learn_traces, model))
learner.add(ce)
statistics.add_tests(1)
statistics.add_inputs(test.size())
statistics.add_learner_test(test)
done = False
break
statistics.set_suite_size(len(traces))
return (model, statistics)
def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> Tuple[Automaton, Statistics]:
......@@ -87,8 +89,7 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
else:
definition = None
learner.add(next_test.trace())
statistics.add_tests(1)
statistics.add_inputs(next_test.size())
statistics.add_learner_test(next_test)
done = False
learner_tests = [next_test]
generated_tests = [next_test]
......@@ -103,9 +104,6 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
ce = next_test.check(model)
if ce is not None:
learner_tests.append(ce)
learner.add(ce)
statistics.add_tests(1)
statistics.add_inputs(next_test.size())
done = False
break
if not done:
......@@ -121,8 +119,7 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
if ce is not None:
learner_tests.append(ce)
learner.add(ce)
statistics.add_tests(1)
statistics.add_inputs(next_test.size())
statistics.add_learner_test(next_test)
done = False
break
statistics.set_suite_size(len(generated_tests))
......
import pprint
from abc import ABCMeta, abstractmethod
from typing import List
......@@ -134,7 +133,6 @@ class MutableAutomatonMixin(metaclass=ABCMeta):
raise Exception("Could not find state {0} in tree {1}".format(state, ptree))
new_acc_seq[state] = node.path()
assert(len(new_acc_seq) == len(self.states()))
pprint.pprint(new_acc_seq)
self._acc_trans_seq = new_acc_seq
@abstractmethod
......
......@@ -29,7 +29,6 @@ class DFA(Acceptor):
def state(self, trace: List[Symbol]) -> State:
crt_state = super().state(trace)
# print(tr_str)
return crt_state
def _seq(self, transitions: List[Transition]):
......@@ -86,7 +85,8 @@ class MealyMachine(Transducer):
else:
state_before = self.state(trace[:-1])
trans = self.transitions(state_before, trace[-1])
return trans.output
assert(len(trans) == 1)
return trans[0].output
class MutableMealyMachine(MealyMachine, MutableAutomatonMixin):
def __init__(self):
......
import itertools
from abc import ABCMeta, abstractmethod
from types import GeneratorType
from typing import List, Generator, Iterable
from typing import List, Generator, Iterable, Tuple
from model import Automaton, Acceptor
from model.fa import MealyMachine
from model import Automaton, Acceptor, Transducer
from model.fa import MealyMachine, Symbol
from model.ra import IORegisterAutomaton, Action
from utils import determinize
class Test(metaclass=ABCMeta):
"""A test is based on a trace/observation on the actual SUT. The test verifies that
the model exhibits behavior corresponding to this observation."""
def __init__(self, trace):
self.tr = trace
@abstractmethod
def check(self, model:Automaton):
"""checks if the hyp passes the test. On failure, it returns the trace to be added by the learner.\
"""checks if the hyp passes the test. On failure, it returns a minimal trace to be added by the learner.
On success it return None"""
pass
@abstractmethod
def trace(self):
pass
return self._check_trace(model, self.tr)
@abstractmethod
def size(self) -> int:
"""Returns the size (in terms of inputs) of the test"""
def _check_trace(self, model, trace):
pass
class TestTemplate(metaclass=ABCMeta):
def __init__(self, trace):
self.tr = trace
def check(self, model: Automaton):
return self._check_trace(model, self.tr)
def trace(self):
"""returns the trace/observation of the SUT the Test is based on"""
return self.tr
@abstractmethod
def _check_trace(self, model, trace):
def size(self) -> int:
"""returns the size (in terms of inputs) of the test"""
pass
class TestGenerator(metaclass=ABCMeta):
@abstractmethod
def gen_test(self, model: Automaton) -> Test:
......@@ -78,30 +68,6 @@ class Tester(metaclass=ABCMeta):
if trace is not None:
return trace
# IORA Test traces are sequences of input and output action tuples
# (Action("in", 10), Action("out", 11))
class IORATest(TestTemplate):
def __init__(self, trace:List[tuple]):
super().__init__(trace)
def _check_trace(self, model: IORegisterAutomaton, trace:List[tuple]):
test_trace = []
trace = determinize_act_io(trace)
for (inp_act, out_act) in trace:
test_trace.append((inp_act, out_act))
if inp_act.label not in model.input_labels():
return test_trace
output = model.output([inp for (inp, _) in test_trace])
if output != out_act:
# print("Expected: ", out_act, " Got: ", output, "\n Trace: ", trace)
return test_trace
return None
def size(self):
return len(self.tr)
def determinize_act_io(tuple_seq):
seq = list(itertools.chain.from_iterable(tuple_seq))
......@@ -110,12 +76,11 @@ def determinize_act_io(tuple_seq):
det_duple_seq = [(det_act_seq[i], det_act_seq[i + 1]) for i in range(0, len(det_act_seq), 2)]
return det_duple_seq
class MealyTest(TestTemplate):
def __init__(self, trace:List[tuple]):
class TransducerTest(Test):
def __init__(self, trace:List[Tuple[object, object]]):
super().__init__(trace)
def _check_trace(self, model: MealyMachine, trace:List[tuple]):
def _check_trace(self, model: Transducer, trace:List[Tuple[object, object]]):
test_trace = []
for (inp_act, out_act) in trace:
test_trace.append((inp_act, out_act))
......@@ -127,9 +92,23 @@ class MealyTest(TestTemplate):
def size(self):
return len(self.tr)
class MealyTest(TransducerTest):
def __init__(self, trace:List[Tuple[Symbol, Symbol]]):
super().__init__(trace)
class IORATest(TransducerTest):
def __init__(self, trace: List[Tuple[Action, Action]]):
super().__init__(trace)
def _check_trace(self, model: IORegisterAutomaton, trace: List[Tuple[Action, Action]]):
# any observation trace has to be first determinized
trace = determinize_act_io(trace)
return super()._check_trace(model, trace)
# Acceptor Test observations are tuples comprising sequences of Actions/Symbols joined by an accept/reject booleans
class AcceptorTest(TestTemplate):
def __init__(self, trace):
class AcceptorTest(Test):
def __init__(self, trace:Tuple[List[object], bool]):
super().__init__(trace)
def _check_trace(self, model: Acceptor, trace):
......
......@@ -6,7 +6,8 @@ import collections
import itertools
from model import Automaton, Transition
from model.ra import IORATransition, IORegisterAutomaton, EqualityGuard, OrGuard, Action, Register, FreshGuard, Fresh
from model.ra import IORATransition, IORegisterAutomaton, EqualityGuard, OrGuard, Action, Register, FreshGuard, Fresh, \
RegisterAutomaton, RATransition
from sut import SUT, ActionSignature, RASUT
from test import TestGenerator, Test, AcceptorTest, MealyTest, IORATest
import random
......@@ -18,7 +19,7 @@ def rand_sel(l:List):
return l[rand.randint(0, len(l)-1)]
class RWalkFromState(TestGenerator, metaclass=ABCMeta):
def __init__(self, sut:SUT, test_gen, rand_length, prob_reset=0.2, rand_start_state=True):
def __init__(self, sut:SUT, test_gen, rand_length, prob_reset, rand_start_state=True):
# the probability after each input added, that we stop adding further inputs to the random sequence
# hence the rand_length is only the maximum length of the random sequence
self.prob_reset = prob_reset
......@@ -49,6 +50,8 @@ class RWalkFromState(TestGenerator, metaclass=ABCMeta):
r_trans = transitions[rand.randint(0, len(transitions)-1)]
crt_state = r_trans.end_state
trans_path.append(r_trans)
if rand.random()> self.prob_reset:
break
# instantiate the access and random sequences by extracting the sequence of inputs to be executed on the sut
# for FSMs every sequence has a unique instantiation
......@@ -86,20 +89,20 @@ class RWalkFromState(TestGenerator, metaclass=ABCMeta):
# FSM versions of the algorithm
class FSMRWalkFromState(RWalkFromState, metaclass=ABCMeta):
def __init__(self, sut: SUT, test_gen, rand_length, rand_start_state=True):
super().__init__(sut, test_gen, rand_length, rand_start_state)
def __init__(self, sut: SUT, test_gen, rand_length, prob_reset, rand_start_state=True):
super().__init__(sut, test_gen, rand_length, prob_reset, rand_start_state)
def _generate_seq(self, model: Automaton, trans_path:List[Transition]):
return [trans.start_label for trans in trans_path]
class DFARWalkFromState(RWalkFromState):
def __init__(self, sut:SUT, rand_length, rand_start_state=True):
super().__init__(self, sut, AcceptorTest, rand_length, rand_start_state)
def __init__(self, sut:SUT, rand_length, prob_reset, rand_start_state=True):
super().__init__(self, sut, AcceptorTest, rand_length, prob_reset, rand_start_state)
class MealyRWalkFromState(RWalkFromState):
def __init__(self, sut:SUT, rand_length, rand_start_state=True):
super().__init__(self, sut, MealyTest, rand_length, rand_start_state)
def __init__(self, sut:SUT, rand_length, prob_reset, rand_start_state=True):
super().__init__(self, sut, MealyTest, rand_length, prob_reset, rand_start_state)
class ValueProb(collections.namedtuple("ValueProb", ("history", "register", "fresh"))):
def select(self, reg_vals:List[int], his_vals:List[int], fresh_value):
......@@ -113,8 +116,8 @@ class ValueProb(collections.namedtuple("ValueProb", ("history", "register", "fre
class IORARWalkFromState(RWalkFromState):
def __init__(self, sut: RASUT, rand_length, prob = ValueProb(0.4, 0.4, 0.2), rand_start_state=True):
super().__init__(sut, IORATest, rand_length, rand_start_state)
def __init__(self, sut: RASUT, rand_length, prob_reset, prob = ValueProb(0.4, 0.4, 0.2), rand_start_state=True):
super().__init__(sut, IORATest, rand_length, prob_reset, rand_start_state)
self.prob = prob
def _generate_seq(self, model: IORegisterAutomaton, transitions:List[IORATransition]) -> List[Action]:
......@@ -164,5 +167,46 @@ class IORARWalkFromState(RWalkFromState):
# TODO needs refactoring, we shouldn't need duplication between RA and IORA random walks
class RARWalkFromState(RWalkFromState):
def __init__(self, sut: RASUT, rand_length, prob_reset, prob = ValueProb(0.4, 0.4, 0.2), rand_start_state=True):
super().__init__(sut, AcceptorTest, rand_length, prob_reset, rand_start_state)
self.prob = prob
def _generate_seq(self, model: RegisterAutomaton, transitions:List[RATransition]) -> List[Action]:
"""generates a sequence of inputs for the randomly chosen transition path"""
act_arity = model.act_arity()
seq = []
values = list() # we use a list to preserve order/eliminate potential non-det
reg_val = dict()
for trans in transitions:
if act_arity[trans.start_label] == 1:
if isinstance(trans.guard, EqualityGuard) or isinstance(trans.guard, OrGuard):
inp_val = reg_val[trans.guard.registers()[0]]
elif isinstance(trans.guard, FreshGuard):
# we have a fresh transition, which means we can either pick a fresh value or any past value
# as long as it is not stored in one of the guarded registers in this location
fresh_val = 0 if len(values) == 0 else max(values) + 1
active_regs = list() if len(model.registers()) == 0 or len(trans.guard.registers()) == 0 else \
list(itertools.chain.from_iterable(
[tr.guard.registers() for tr in model.transitions(trans.start_state, trans.start_label)
if tr is not trans]))
active_reg_vals = [reg_val[reg] for reg in active_regs]
selectable_reg_vals = [val for val in reg_val.values() if val not in active_reg_vals]
selectable_his_vals = [val for val in values if val not in active_reg_vals
and val not in selectable_reg_vals]
inp_val = self.prob.select(selectable_reg_vals, selectable_his_vals, fresh_val)
if inp_val not in values:
values.append(inp_val)
inp = Action(trans.start_label, inp_val)
reg_val = trans.update(reg_val, inp)
else:
raise Exception("Unkown guard")
else:
inp_val = None
inp = Action(trans.start_label, inp_val)
seq.append(inp)
return seq
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment