Commit 272c8c8f authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Added second type of algorithm one which uses a (model based) test generation algorithm.

parent 062c64e4
......@@ -3,7 +3,7 @@ from typing import cast
from model import Automaton
from learn import Learner
from test import TestTemplate
from test import TestTemplate, TestGenerator
import time
class Statistics():
......@@ -37,6 +37,7 @@ class Statistics():
def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automaton, Statistics]:
""" takes learner and a list of traces and generates a model"""
statistics = Statistics()
if len(traces) == 0:
return (None, statistics)
......@@ -70,4 +71,48 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
statistics.add_inputs(test.size())
done = False
break
return (model, statistics)
\ No newline at end of file
return (model, statistics)
def learn_mbt(learner:Learner, test_generator:TestGenerator) -> Tuple[Automaton, Statistics]:
""" takes learner and a test generator, and generates a model"""
next_test = test_generator.gen_test(None)
statistics = Statistics()
if next_test is None:
return (None, statistics)
else:
definition = None
learner.add(next_test.trace())
statistics.add_tests(1)
statistics.add_inputs(next_test.size())
done = False
learner_tests = [next_test]
generated_tests = [next_test]
while not done:
start_time = int(time.time() * 1000)
(model, definition) = learner.model(old_definition=definition)
end_time = int(time.time() * 1000)
statistics.add_learning_time(end_time - start_time)
done = True
for next_test in generated_tests:
ce = next_test.check(model)
if ce is not None:
learner_tests.append(ce)
learner.add(ce)
statistics.add_tests(1)
statistics.add_inputs(next_test.size())
done = False
break
if not done:
continue
for next_test in test_generator.gen_test_iter(model):
generated_tests.append(next_test)
ce = next_test.check(model)
if ce is not None:
learner_tests.append(ce)
learner.add(ce)
statistics.add_tests(1)
statistics.add_inputs(next_test.size())
done = False
break
statistics.set_suite_size(len(generated_tests))
return (model, statistics)
from encode.iora import IORAEncoder
from learn.algorithm import learn
from learn.ra import RALearner
from sut.login import new_login_sut
from sut.stack import new_stack_sut
from sut.fifoset import new_fifoset_sut
from test import IORATest
......@@ -14,9 +15,17 @@ from test.generation import ExhaustiveRAGenerator
# (model, stats) = learn(learner, IORATest, obs)
# print(model, "\n \n", stats)
set_sut = new_fifoset_sut(2)
gen = ExhaustiveRAGenerator(set_sut)
obs = gen.generate_observations(6, max_registers=2)
# set_sut = new_fifoset_sut(2)
# gen = ExhaustiveRAGenerator(set_sut)
# obs = gen.generate_observations(6, max_registers=2)
# print("\n".join( [str(obs) for obs in obs]))
# learner = RALearner(IORAEncoder())
# (model, stats) = learn(learner, IORATest, obs)
# print(model, "\n \n", stats)
login_sut = new_login_sut(2)
gen = ExhaustiveRAGenerator(login_sut)
obs = gen.generate_observations(8, max_registers=2)
print("\n".join( [str(obs) for obs in obs]))
learner = RALearner(IORAEncoder())
(model, stats) = learn(learner, IORATest, obs)
......
......@@ -40,10 +40,14 @@ class Automaton(metaclass=ABCMeta):
super().__init__()
self._states = states
self._state_to_trans = state_to_trans
self._acc_seq = {}
def start_state(self):
return self._states[0]
def acc_seq(self, state):
return self._acc_seq[state]
def states(self):
return list(self._states)
......@@ -89,7 +93,6 @@ class Automaton(metaclass=ABCMeta):
return str_rep
class MutableAutomatonMixin(metaclass=ABCMeta):
def add_state(self, state):
if state not in self._states:
......@@ -100,6 +103,9 @@ class MutableAutomatonMixin(metaclass=ABCMeta):
self._state_to_trans[state] = []
self._state_to_trans[state].append(transition)
def add_acc_seq(self, state, trace):
self._acc_seq[state] = trace
@abstractmethod
def to_immutable(self) -> Automaton:
pass
......@@ -148,3 +154,52 @@ class MutableAcceptorMixin(MutableAutomatonMixin, metaclass=ABCMeta):
def add_state(self, state, accepts):
super().add_state(state)
self._state_to_acc[state] = accepts
def get_acc_seq(aut : Automaton, runner, old_acc_seq = dict()):
new_acc_seq = {aut.state(acc_seq):acc_seq for acc_seq in old_acc_seq.values()}
not_covered = [state for state in aut.states() if state not in new_acc_seq.keys()]
ptree = get_prefix_tree(aut)
for state in not_covered:
trace = runner(ptree.path(state))
new_acc_seq[state] = trace
return new_acc_seq
def get_prefix_tree(aut : Automaton):
visited = set()
to_visit = set()
root = PrefixTree(aut.start_state())
to_visit.add(root)
while len(to_visit) > 0:
crt_node = to_visit.pop()
visited.add(crt_node.state)
transitions = aut.transitions(crt_node.state)
for trans in transitions:
if trans.end_state not in visited:
child_node = PrefixTree(trans.end_state)
crt_node.add_child(trans, child_node)
return root
class PrefixTree():
def __init__(self, state):
self.state = state
self.tr_tree:dict = {}
self.parent = None
def add_child(self, trans, tree):
self.tr_tree[trans] = tree
self.tr_tree[trans].parent = self
def path(self, state) -> List[Transition]:
if len(self.tr_tree):
return None
else:
for (tran, child) in self.tr_tree.items():
path = child.path(state)
if path is not None:
return [tran] + path
return None
\ No newline at end of file
......@@ -111,7 +111,6 @@ class RegisterMachine(Automaton):
def registers(self) -> List[Register]:
pass
class RegisterAutomaton(Acceptor, RegisterMachine):
def __init__(self, locations, loc_to_acc, loc_to_trans, registers):
super().__init__(locations, loc_to_trans, loc_to_acc)
......@@ -332,3 +331,38 @@ class NoAssignment(Assignment):
def __str__(self):
return "r:=r"
def ra_run(transitions:List[RATransition]):
run = []
values = set()
reg_val = dict()
for trans in transitions:
if isinstance(trans.guard, EqualityGuard) or isinstance(trans.guard, OrGuard):
inp_val = reg_val[trans.guard.registers()[0]]
else:
inp_val = 0 if len(values) == 0 else max(values) + 1
values.add(inp_val)
inp = Action(trans.start_label, inp_val)
run.append(inp)
return run
def iora_run(transitions:List[IORATransition]):
run = []
values = set()
reg_val = dict()
for trans in transitions:
if isinstance(trans.guard, EqualityGuard) or isinstance(trans.guard, OrGuard):
inp_val = reg_val[trans.guard.registers()[0]]
else:
inp_val = 0 if len(values) == 0 else max(values) + 1
values.add(inp_val)
inp = Action(trans.start_label, inp_val)
reg_val = trans.update(reg_val, inp)
if isinstance(trans.output_mapping, Register):
out_val = reg_val[trans.output_mapping]
else:
out_val = 0 if len(values) == 0 else max(values) + 1
values.add(out_val)
out = Action(trans.output_label, out_val)
reg_val = trans.output_update(reg_val, out)
run.append((inp, out))
return run
......@@ -4,6 +4,7 @@ from typing import List
import collections
from model.ra import Action
from enum import Enum
class SUT(metaclass=ABCMeta):
......@@ -20,6 +21,30 @@ class SUT(metaclass=ABCMeta):
pass
class SUTType(Enum):
IORA = 1
RA = 2
Mealy = 3
Moore = 4
DFA = 5
def is_acceptor(self):
return self == SUTType.RA or self.DFA
def is_transducer(self):
return not self.is_acceptor()
class SUTClass(metaclass=ABCMeta):
def __init__(self, sut_dict):
self.sut_dict = sut_dict
def get_sut(self, sut_type : SUTType) -> SUT:
return self.sut_dict[sut_type]
def has_sut(self, sut_type : SUTType) -> bool:
return sut_type in self.sut_dict
ActionSignature = collections.namedtuple("ActionSignature", ('label', 'num_params'))
class RASUT(metaclass=ABCMeta):
@abstractmethod
......@@ -31,6 +56,57 @@ class RASUT(metaclass=ABCMeta):
"""Runs a sequence of inputs on the SUT and returns an observation"""
pass
class Observation():
@abstractmethod
def trace(self):
pass
@abstractmethod
def inputs(self):
"""returns all the values in the trace"""
pass
class RegisterMachineObservation(Observation):
@abstractmethod
def values(self):
"""returns all the values in the trace"""
pass
# class RAObservation(Observation):
# def __init__(self, trace):
#
#
#
# def trace(self):
# return self.trace
#
# def values(self):
# return
class IORAObservation(RegisterMachineObservation):
def __init__(self, trace):
self.trace = trace
def trace(self):
return self.trace
def inputs(self):
return [a for (a,_) in self.trace]
def values(self):
iv = [a.value for (a,_) in self.trace if a.value is not None]
ov = [a.value for (_,a) in self.trace if a.value is not None]
return set(iv+ov)
def __str__(self):
return "Obs: " + str(self.trace)
class ObjectSUT(RASUT):
"""Wraps around an object and calls methods on it corresponding to the Actions"""
def __init__(self, act_sigs, obj_gen):
......@@ -39,21 +115,38 @@ class ObjectSUT(RASUT):
def run(self, seq:List[Action]):
obj = self.obj_gen()
values = set()
values = dict()
out_seq = []
for (label, val) in seq:
meth = obj.__getattribute__(label)
if self.acts[label].num_params == 0:
outp = meth()
else:
values.add(val)
(values, val) = self._res_ival(values, val)
outp = meth(val)
outp_action = self.parse_out(outp)
values.add(outp_action.value)
(out_label, out_val) = self.parse_out(outp)
if out_val is not None:
(values, out_val) = self._res_oval(values, out_val)
outp_action = Action(out_label, out_val)
out_seq.append(outp_action)
obs = list(zip(seq, out_seq))
return obs
return IORAObservation(obs)
def _res_ival(self, val_dict, val):
l = [a for a in val_dict.keys() if val_dict[a] == val]
if len(l) > 1:
raise Exception("Collision")
if len(l) == 1:
return (val_dict, l[0])
val_dict[val] = val
return (val_dict, val)
def _res_oval(self, val_dict, val):
if val in val_dict:
return (val_dict, val_dict[val])
else:
val_dict[val] = max(val_dict.values()) + 1 if len(val_dict) > 0 else 0
return (val_dict, val_dict[val])
def parse_out(self, outp) -> Action:
fresh = None
......
import itertools
from abc import ABCMeta, abstractmethod
from typing import List
from types import GeneratorType
from typing import List, Generator, Iterable
from model import Automaton, Acceptor
from model.fa import MealyMachine
......@@ -16,6 +17,16 @@ class Test(metaclass=ABCMeta):
On success it return None"""
pass
@abstractmethod
def trace(self):
pass
@abstractmethod
def size(self) -> int:
"""Returns the size (in terms of inputs) of the test"""
pass
class TestTemplate(metaclass=ABCMeta):
def __init__(self, trace):
......@@ -24,15 +35,11 @@ class TestTemplate(metaclass=ABCMeta):
def check(self, model: Automaton):
return self._check_trace(model, self.trace)
@abstractmethod
def _check_trace(self, model, trace):
pass
def trace(self):
return self.trace
@abstractmethod
def size(self) -> int:
"""Returns the size (in terms of inputs) of the test"""
def _check_trace(self, model, trace):
pass
......@@ -42,6 +49,11 @@ class TestGenerator(metaclass=ABCMeta):
"""generates a Test. Returns None if the test suite is exhausted"""
pass
def gen_test_iter(self, model: Automaton) -> Iterable[Test]:
test = self.gen_test(model)
while test is not None:
yield test
test = self.gen_test(model)
class TracesGenerator(metaclass=ABCMeta):
def __init__(self, traces = list()):
......
......@@ -5,7 +5,7 @@ from encode.iora import IORAEncoder
from learn.algorithm import learn
from learn.ra import RALearner
from model.ra import Action
from sut import RASUT
from sut import RASUT, RegisterMachineObservation, IORAObservation
from sut.stack import new_stack_sut
from test import IORATest
......@@ -25,29 +25,33 @@ class ExhaustiveRAGenerator(ObservationGeneration):
raise Exception("This generator assumes at most one parameter per action")
def generate_observations(self, max_depth, max_registers=3) -> List[Tuple[Action, Action]]:
val_obs = self._generate_observations([(0,[])], 0, max_depth, max_registers)
obs = [obs for (_, obs) in val_obs]
return obs
obss = self._generate_observations([IORAObservation([])], 0, max_depth, max_registers+1)
print("\n".join([str(obs) for obs in obss]))
obs_traces = [obs.trace() for obs in obss]
#print(obs_traces)
return obs_traces
def _generate_observations(self, prev_obs, crt_depth, max_depth, max_values):
def _generate_observations(self, prev_obs:List[RegisterMachineObservation], crt_depth, max_depth, max_values) \
-> List[RegisterMachineObservation]:
if crt_depth > max_depth:
return []
else:
new_obs = []
for (num_val, obs) in prev_obs:
for obs in prev_obs:
num_val = max(obs.values()) + 1 if len(obs.values()) > 0 else 1
for act_sig in self.act_sigs:
label = act_sig.label
if act_sig.num_params == 1:
for i in range(0, min(num_val+1, max_values)):
seq = [inp for (inp, _) in obs]
for i in range(0, min(num_val, max_values)):
seq = obs.inputs()
seq.append(Action(label, i))
new_obs.append((max(num_val+1, i), self.sut.run(seq)))
new_obs.append(self.sut.run(seq))
else:
seq = [inp for (inp, _) in obs]
seq = obs.inputs()
seq.append(Action(label, None))
new_obs.append((num_val, self.sut.run(seq)))
new_obs.append(self.sut.run(seq))
if crt_depth < max_depth:
extended_obs = self._generate_observations(new_obs, crt_depth + 1, max_depth, max_values)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment