Commit b05ad0c8 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

All encodings/testers should work now

parent 91548627
import itertools
from encode.fa import MealyEncoder, DFAEncoder
from encode.ra import RAEncoder
from learn.algorithm import learn
from learn.algorithm import learn_mbt
from learn.fa import FALearner
from learn.ra import RALearner
from sut.login import new_login_sut
from test import IORATest
from test.rwalk import IORARWalkFromState
from sut import SUTType
from sut.login import new_login_sut, LoginClass
from test import IORATest, MealyTest
from test.rwalk import IORARWalkFromState, MealyRWalkFromState, DFARWalkFromState, RARWalkFromState
from tests.iora_testscenario import *
from encode.iora import IORAEncoder
......@@ -19,10 +23,43 @@ def scrap_learn_iora():
def scrap_learn_mbt_iora():
learner = RALearner(IORAEncoder())
learner.set_timeout(10)
sut = new_login_sut(1)
mbt = IORARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 1000)
print(model)
print(statistics)
scrap_learn_mbt_iora()
\ No newline at end of file
def scrap_learn_mbt_mealy():
learner = FALearner(MealyEncoder())
learner.set_timeout(1000)
login = LoginClass()
sut = login.new_sut(SUTType.Mealy, 2)
mbt = MealyRWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
print(model)
print(statistics)
def scrap_learn_mbt_mealy():
learner = FALearner(DFAEncoder())
learner.set_timeout(100000)
login = LoginClass()
sut = login.new_sut(SUTType.DFA, 2)
mbt = DFARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
print(model)
print(statistics)
def scrap_learn_mbt_ra():
learner = RALearner(RAEncoder())
learner.set_timeout(100000)
login = LoginClass()
sut = login.new_sut(SUTType.RA, 1)
mbt = RARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(learner, mbt, 10000)
print(model)
print(statistics)
#scrap_learn_mbt_mealy()
#scrap_learn_mbt_mealy()
scrap_learn_mbt_ra()
......@@ -23,4 +23,8 @@ sut_m1 = MealyTestCase("output '1' if the length of the trace so far is even, el
(('a', '0'), ('b', '1'), ('b', '0'), ('b', '1')),
(('b', '0'), ('b', '1'), ('b', '0'), ('a', '1')),
(('b', '0'), ('b', '1'), ('b', '0'), ('b', '1')),
], 2)
\ No newline at end of file
], 2)
sut_m2 = MealyTestCase("login traces",
[[('register', 'OK'), ('login', 'OK'), ('logout', 'OK')],
[('login', 'NOK'), ('register', 'OK')], [('logout', 'NOK'), ('login', 'NOK')]], 2)
\ No newline at end of file
......@@ -13,7 +13,7 @@ class MealyLearnerTest(unittest.TestCase):
pass
def test_sut1(self):
self.check_scenario(sut_m1)
self.check_scenario(sut_m2)
def check_scenario(self, test_scenario):
print("Scenario " + test_scenario.description)
......@@ -52,4 +52,5 @@ class MealyLearnerTest(unittest.TestCase):
max_states = test_scenario.nr_states + 1
result = learner._learn_model(min_states, max_states) #
return result
\ No newline at end of file
......@@ -9,15 +9,16 @@ from define import Automaton
class RegisterMachine(Automaton):
def __init__(self, num_locations, num_registers):
def __init__(self, num_locations, num_registers, param_size):
self.Location, self.locations = enum('Location', ['location{0}'.format(i) for i in range(num_locations)])
self.Register, self.registers = enum('Register',
['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
self.param_size = param_size
class RegisterAutomaton(RegisterMachine):
def __init__(self, labels, num_locations, num_registers):
super().__init__(num_locations, num_registers)
def __init__(self, labels, param_size, num_locations, num_registers):
super().__init__(num_locations, num_registers, param_size)
self.Label, elements = enum('Label', labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self.start = self.locations[0]
......@@ -35,12 +36,11 @@ class RegisterAutomaton(RegisterMachine):
class IORegisterAutomaton(RegisterMachine):
def __init__(self, input_labels, output_labels, param_size, num_locations, num_registers):
super().__init__(num_locations, num_registers)
super().__init__(num_locations, num_registers, param_size)
labels = input_labels + output_labels
self.Label, elements = enum('Label', input_labels + output_labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self._input_labels = [self.labels[lbl] for lbl in input_labels]
self.param_size = param_size
self.sink = self.locations[-1]
self.start = self.locations[0]
self.fresh = self.registers[-1]
......
......@@ -12,6 +12,7 @@ class RAEncoder(Encoder):
self.cache = {}
self.values = set()
self.labels = set()
self.param_size = dict()
def add(self, trace):
seq, accept = trace
......@@ -19,9 +20,17 @@ class RAEncoder(Encoder):
self.cache[node] = accept
self.values.update([action.value for action in seq])
self.labels.update([action.label for action in seq])
for action in seq:
if action.label not in self.param_size:
self.param_size[action.label] = action.param_size()
else:
if self.param_size[action.label] != action.param_size():
raise Exception("It is not allowed to have actions with different param sizes."
"Problem action: "+str(action))
def build(self, num_locations, num_registers) -> (RegisterAutomaton, z3.ExprRef):
ra = RegisterAutomaton(list(self.labels), num_locations, num_registers)
ra = RegisterAutomaton(list(self.labels), self.param_size, num_locations, num_registers)
mapper = Mapper(ra)
constraints = self.axioms(ra, mapper) + \
self.node_constraints(ra, mapper) + \
......
......@@ -6,13 +6,21 @@ import define
class Learner(metaclass=ABCMeta):
def __init__(self):
self.timeout = None
@abstractmethod
def add(self, trace):
pass
@abstractmethod
def model(self, old_definition:define.Automaton=None, old_model:model.Automaton=None) -> Tuple[model.Automaton, define.Automaton]:
def model(self, old_definition:define.Automaton=None, old_model:model.Automaton=None) \
-> Tuple[model.Automaton, define.Automaton]:
""""Infers a minimal model whose behavior corresponds to the traces added so far.
Returns None if no model could be obtained."""
Returns (None, to) if no model could be obtained where to suggests that learning failed due to
evaluation timing out"""
pass
def set_timeout(self, to):
"""sets the amount of time the solver is given for constructing a hypothesis"""
self.timeout = to
\ No newline at end of file
......@@ -3,7 +3,7 @@ from typing import cast
from model import Automaton
from learn import Learner
from test import TraceTest, TestGenerator, Test
from test import TestGenerator, Test
import time
__all__ = [
......@@ -51,7 +51,7 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
return (None, statistics)
else:
statistics.set_suite_size(len(traces))
test = cast(TraceTest, test_type(traces.pop(0)))
test = cast(Test, test_type(traces.pop(0)))
definition = None
learner.add(test.tr)
statistics.add_learner_test(test)
......@@ -65,7 +65,7 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
statistics.add_learning_time(end_time-start_time)
done = True
for trace in traces:
test = cast(TraceTest, test_type(trace))
test = cast(Test, test_type(trace))
ce = test.check(model)
if ce is not None:
if ce not in learn_traces:
......@@ -88,14 +88,24 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
return (None, statistics)
else:
definition = None
print("next test, ", next_test.trace())
learner.add(next_test.trace())
statistics.add_learner_test(next_test)
done = False
learner_tests = [next_test]
generated_tests = [next_test]
last_ce = None
ce = None
while not done:
if last_ce and ce == last_ce:
raise Exception("Counterexample ", ce, " was not correctly processed")
last_ce = ce
start_time = int(time.time() * 1000)
(model, definition) = learner.model(old_definition=definition)
ret = learner.model(old_definition=definition)
if ret is None:
return (None, statistics)
(model, definition) = ret
print(model)
end_time = int(time.time() * 1000)
statistics.add_learning_time(end_time - start_time)
done = True
......@@ -103,7 +113,8 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
for next_test in generated_tests:
ce = next_test.check(model)
if ce is not None:
learner_tests.append(ce)
learner.add(ce)
learner_tests.append(next_test)
done = False
break
if not done:
......@@ -117,7 +128,7 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
generated_tests.append(next_test)
ce = next_test.check(model)
if ce is not None:
learner_tests.append(ce)
learner_tests.append(next_test)
learner.add(ce)
statistics.add_learner_test(next_test)
done = False
......
......@@ -15,6 +15,7 @@ import define.fa
class FALearner(Learner):
def __init__(self, encoder, solver=None, verbose=False):
super().__init__()
if not solver:
solver = z3.Solver()
self.solver = solver
......@@ -30,24 +31,35 @@ class FALearner(Learner):
(succ, fa, m) = self._learn_model(min_states=min_states,
max_states=max_states)
if succ:
return (fa.export(m), fa)
return None
return fa.export(m), fa
else:
return None
# to = fa
# return (None, to)
def _learn_model(self, min_states=1, max_states=20):
"""generates the definition and model for an fa whose traces include the traces added so far"""
"""generates the definition and model for an fa whose traces include the traces added so far
In case of failure, the second argument of the tuple signals occurrence of a timeout"""
for num_states in range(min_states, max_states + 1):
print("Learning with ", min_states)
fa, constraints = self.encoder.build(num_states)
self.solver.add(constraints)
if self.timeout is not None:
self.solver.set("timeout", self.timeout)
result = self.solver.check()
if self.verbose:
print("Learning with {0} states. Result: {1}"
.format(num_states, result))
print("Result {0} ".format(result))
if result == z3.sat:
model = self.solver.model()
self.solver.reset()
return (True, fa, model)
else:
self.solver.reset()
# timeout
if result == z3.unknown:
return (False, True, None)
# TODO: process the unsat core?
pass
return (False, None, None)
\ No newline at end of file
return (False, False, None)
\ No newline at end of file
......@@ -30,9 +30,13 @@ class RALearner(Learner):
(succ, ra_def, m) = self._learn_model(min_locations=min_locations,
max_locations=max_locations, min_registers=min_registers,
max_registers=max_registers)
if succ:
return (ra_def.export(m), ra_def)
return None
return ra_def.export(m), ra_def
else:
return None
#to = ra_def
#return (None, to)
def _learn_model(self, min_locations=1, max_locations=20, min_registers=0, max_registers=10):
"""generates the definition and model for an ra whose traces include the traces added so far"""
......@@ -41,7 +45,8 @@ class RALearner(Learner):
for num_registers in range(min_registers, min(max_registers, num_locations) + 1):
ra, constraints = self.encoder.build(num_locations, num_registers)
self.solver.add(constraints)
print(num_locations, num_registers)
if self.timeout is not None:
self.solver.set("timeout", self.timeout)
result = self.solver.check()
if self.verbose:
print("Learning with {0} locations and {1} registers. Result: {2}"
......@@ -52,7 +57,10 @@ class RALearner(Learner):
return (True, ra, model)
else:
self.solver.reset()
if result == z3.unknown:
return (False, True, None)
# TODO: process the unsat core?
pass
return (False, None, None)
\ No newline at end of file
return (False, False, None)
\ No newline at end of file
from encode.iora import IORAEncoder
from learn.algorithm import learn
from learn.ra import RALearner
from sut.login import new_login_sut
from sut.stack import new_stack_sut
from sut.fifoset import new_fifoset_sut
from test import IORATest
from test.exhaustive import ExhaustiveRAGenerator
# stack_sut = new_stack_sut(2)
# gen = ExhaustiveRAGenerator(stack_sut)
# obs = gen.generate_observations(4, max_registers=2)
# print("\n".join( [str(obs) for obs in obs]))
# learner = RALearner(IORAEncoder())
# (model, stats) = learn(learner, IORATest, obs)
# print(model, "\n \n", stats)
# expensive peace of work
# set_sut = new_fifoset_sut(3)
# gen = ExhaustiveRAGenerator(set_sut)
# obs = gen.generate_observations(8, max_registers=3)
# print("\n".join( [str(obs) for obs in obs]))
# learner = RALearner(IORAEncoder())
# (model, stats) = learn(learner, IORATest, obs)
# print(model, "\n \n", stats)
login_sut = new_login_sut(1)
gen = ExhaustiveRAGenerator(login_sut)
obs = gen.generate_observations(6, max_registers=1)
print("\n".join( [str(obs) for obs in obs]))
learner = RALearner(IORAEncoder())
(model, stats) = learn(learner, IORATest, obs)
print(model, "\n \n", stats)
\ No newline at end of file
......@@ -21,8 +21,8 @@ class IOTransition(Transition):
return short
class DFA(Acceptor):
def __init__(self, states, state_to_trans, state_to_acc):
super().__init__(states, state_to_trans, state_to_acc)
def __init__(self, states, state_to_trans, state_to_acc, acc_trans_seq={}):
super().__init__(states, state_to_trans, state_to_acc, acc_trans_seq)
def transitions(self, state: State, label:Label = None) -> List[Transition]:
return super().transitions(state, label)
......@@ -42,7 +42,7 @@ class MutableDFA(DFA, MutableAcceptorMixin):
return None
def to_immutable(self) -> DFA:
return DFA(self._states, self._state_to_trans, self._state_to_acc, self.acc_seq())
return DFA(self._states, self._state_to_trans, self._state_to_acc, self.acc_trans_seq())
class MooreMachine(Transducer):
def __init__(self, states, state_to_trans, state_to_out):
......@@ -93,4 +93,4 @@ class MutableMealyMachine(MealyMachine, MutableAutomatonMixin):
super().__init__([], {})
def to_immutable(self) -> MealyMachine:
return MealyMachine(self._states, self._state_to_trans, self.acc_seq())
return MealyMachine(self._states, self._state_to_trans, self.acc_trans_seq())
......@@ -242,7 +242,7 @@ class IORegisterAutomaton(Transducer, RegisterMachine):
class MutableRegisterAutomaton(RegisterAutomaton, MutableAcceptorMixin):
def __init__(self):
super().__init__([], dict(), [], [])
super().__init__([], dict(), dict(), [], [])
def add_transition(self, state:str, transition:RATransition):
super().add_transition(state, transition)
......
......@@ -56,7 +56,7 @@ class RAObservation(RegisterMachineObservation):
self.acc = acc
def trace(self):
return (self.trace, self.acc)
return (self.seq, self.acc)
def inputs(self):
return self.seq
......@@ -127,7 +127,7 @@ class ScalableSUTClass(SUTClass, metaclass=ABCMeta):
def new_sut(self, sut_type : SUTType, size :int):
if sut_type in self.sut_type_dict:
sut_obj = self.sut_type_dict[sut_type](size)
sut_obj = ObjectSUT(self.sut_type_dict[sut_type].INTERFACE, lambda : self.sut_type_dict[sut_type](size))
sut = sut_obj if sut_type is SUTType.IORA else \
RAWrapper(sut_obj) if sut_type is SUTType.RA else \
MealyWrapper(sut_obj) if sut_type is SUTType.Mealy else \
......@@ -220,12 +220,13 @@ class RAWrapper(RASUT):
return RAObservation(seq, True)
else:
iora_obs = self.sut.run(seq)
seq = iora_obs.inputs()
trace = iora_obs.trace()
(_, out) = trace[-1]
acc = out.label is SUT.OK
responses = set([out.label for (_, out) in iora_obs.trace()])
acc = responses == set([SUT.OK])
#(_, out) = iora_obs.trace()[-1]
#acc = out.label is SUT.OK
return RAObservation(seq, acc)
def input_interface(self):
return self.sut.input_interface()
class MealyWrapper(SUT):
"""Wraps around an Object SUT and creates an Mealy view of it. Values in output actions are ignored
......@@ -256,6 +257,11 @@ class DFAWrapper(SUT):
return DFAObservation(seq, True)
else:
mealy_obs = self.sut.run(seq)
(_,out) = mealy_obs[-1]
acc = out is SUT.OK
return DFAObservation(seq, acc)
\ No newline at end of file
responses = set([out for (_, out) in mealy_obs.trace()])
acc = responses == set([SUT.OK])
#_,out) = mealy_obs.trace()[-1]
#acc = out is SUT.OK
return DFAObservation(seq, acc)
def input_interface(self) -> List[Symbol]:
return self.sut.input_interface()
\ No newline at end of file
......@@ -2,26 +2,6 @@ from sut import SUT, ObjectSUT, ActionSignature, ScalableSUTClass, SUTType
class FIFOSet():
INTERFACE = [ActionSignature("get", 0), ActionSignature("put", 1)]
def __init__(self, size):
super()
self.size = size
self.list = list()
def get(self):
if len(self.list) == 0:
return SUT.NOK
else:
return ("OGET", self.list.pop())
def put(self, val):
if len(self.list) < self.size and val not in self.list:
self.list.append(val)
return SUT.OK
else:
return SUT.NOK
class RAFIFOSet():
INTERFACE = [ActionSignature("get", 1), ActionSignature("put", 1)]
def __init__(self, size):
......@@ -30,14 +10,11 @@ class RAFIFOSet():
self.list = list()
def get(self, val):
if len(self.list) == 0:
if len(self.list) == 0 or self.list[0] != val:
return SUT.NOK
else:
if self.list[-1] == val:
self.list.pop()
return SUT.OK
else:
return SUT.NOK
self.list.pop(0)
return SUT.OK
def put(self, val):
if len(self.list) < self.size and val not in self.list:
......@@ -71,10 +48,7 @@ class FIFOSetClass(ScalableSUTClass):
def __init__(self):
super({
SUTType.IORA: FIFOSet,
SUTType.RA: RAFIFOSet,
SUTType.RA: FIFOSet,
SUTType.Mealy: MealyFIFOSet,
SUTType.DFA: MealyFIFOSet
})
def new_fifoset_sut(size):
return ObjectSUT(FIFOSet.INTERFACE, lambda : FIFOSet(size))
})
\ No newline at end of file
from sut import SUT, ObjectSUT, ActionSignature, SUTType
from sut import SUT, ObjectSUT, ActionSignature, SUTType, ScalableSUTClass
class Login():
INTERFACE = [ActionSignature("register", 0), ActionSignature("login", 1), ActionSignature("logout", 1)]
INTERFACE = [ActionSignature("register", 1), ActionSignature("login", 1), ActionSignature("logout", 1)]
def __init__(self, size):
super()
self.size = size
self.logged = {}
def register(self):
if len(self.logged) == self.size:
def register(self, val):
if len(self.logged) == self.size or val in self.logged:
return SUT.NOK
else:
new_id = 100 if len(self.logged) == 0 else max(self.logged.keys()) + 100
new_id = val
self.logged[new_id] = False
return ("OREG", new_id)
return SUT.OK
def login(self, val):
if val not in self.logged or self.logged[val]:
......@@ -31,5 +30,45 @@ class Login():
self.logged[val] = False
return SUT.OK
class FSMLogin():
INTERFACE = [ActionSignature("register", 0), ActionSignature("login", 0), ActionSignature("logout", 0)]
def __init__(self, size):
super()
self.size = size
self.registered = 0
self.logged = 0
def register(self):
if self.registered == self.size:
return SUT.NOK
else:
self.registered += 1
return SUT.OK
def login(self):
if self.logged < self.registered:
self.logged += 1
return SUT.OK
else:
return SUT.NOK
def logout(self):
if self.logged == 0:
return SUT.NOK
else:
self.logged -= 1
return SUT.OK
class LoginClass(ScalableSUTClass):
def __init__(self):
super().__init__({
SUTType.IORA: Login,
SUTType.RA: Login,
SUTType.Mealy: FSMLogin,
SUTType.DFA: FSMLogin
})
def new_login_sut(size, sut_type = SUTType.IORA):
return ObjectSUT(Login.INTERFACE, lambda : Login(size))
from sut import SUT, ObjectSUT, ActionSignature
from sut import SUT, ObjectSUT, ActionSignature, ScalableSUTClass, SUTType
class Stack():
INTERFACE = [ActionSignature("get", 0), ActionSignature("put", 1)]
def __init__(self, size):
super()
self.size = size
self.list = list()
def get(self):
if len(self.list) == 0:
return SUT.NOK
else:
return ("OGET", self.list.pop())
def put(self, val):
if len(self.list) < self.size<