Commit 733be5e5 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Added random walk from state test generation algorithms. Nothing yet tested.

parent 272c8c8f
......@@ -72,7 +72,7 @@ class MealyMachineBuilder(object):
tr.z3_to_label(inp),
tr.z3_to_label(output),
tr.z3_to_state(to_state))
mut_mm.generate_acc_seq()
return mut_mm.to_immutable()
class DFABuilder(object):
......@@ -94,7 +94,7 @@ class DFABuilder(object):
tr.z3_to_label(labels),
tr.z3_to_state(to_state))
mut_dfa.add_transition(tr.z3_to_state(state), trans)
mut_dfa.generate_acc_seq()
return mut_dfa.to_immutable()
class FATranslator(object):
......
......@@ -102,6 +102,7 @@ class RegisterAutomatonBuilder(object):
self._add_state(model, translator, mut_ra, z3state)
for z3label in self.ra.labels.values():
self._add_transitions(model, translator, mut_ra, z3state, z3label)
mut_ra.generate_acc_seq()
return mut_ra.to_immutable()
def _add_state(self, model, translator, mut_ra, z3state):
......@@ -176,6 +177,7 @@ class IORegisterAutomatonBuilder(object):
self._add_state(translator, mut_ra, z3state)
for z3label in z3input_labels:
self._add_transitions(model, translator, mut_ra, z3state, z3label, z3output_labels)
mut_ra.generate_acc_seq()
return mut_ra.to_immutable()
def _add_state(self, translator, mut_ra, z3state):
......
......@@ -249,6 +249,18 @@ class IORAEncoder(Encoder):
ra.transition(q, ra.labels[l], r) == ra.transition(q, ra.labels[l], ra.fresh)
)
)
for l in self.output_labels:
if self.param_size[l] == 0:
print("Added for ", l)
axioms.append(
z3.ForAll(
[q,r],
z3.Implies(
r != ra.fresh,
ra.transition(q, ra.labels[l], r) == ra.sink
)
)
)
return axioms
......
......@@ -11,7 +11,7 @@ class Learner(metaclass=ABCMeta):
pass
@abstractmethod
def model(self, old_definition:define.Automaton=None) -> Tuple[model.Automaton, define.Automaton]:
def model(self, old_definition:define.Automaton=None, old_model:model.Automaton=None) -> Tuple[model.Automaton, define.Automaton]:
""""Infers a minimal model whose behavior corresponds to the traces added so far.
Returns None if no model could be obtained."""
pass
......
......@@ -20,8 +20,9 @@ class RALearner(Learner):
def add(self, trace):
self.encoder.add(trace)
def model(self, min_locations=1, max_locations=20, min_registers=0, max_registers=10,
old_definition:define.ra.RegisterMachine = None) -> Tuple[model.ra.RegisterMachine, define.ra.RegisterMachine]:
def model(self, min_locations=1, max_locations=20, min_registers=0, max_registers=3,
old_definition:define.ra.RegisterMachine = None, old_model:model.ra.RegisterMachine = None) -> \
Tuple[model.ra.RegisterMachine, define.ra.RegisterMachine]:
if old_definition is not None:
min_locations = len(old_definition.locations)
min_registers = len(old_definition.registers) - 1
......
......@@ -15,17 +15,18 @@ from test.generation import ExhaustiveRAGenerator
# (model, stats) = learn(learner, IORATest, obs)
# print(model, "\n \n", stats)
# set_sut = new_fifoset_sut(2)
# expensive peace of work
# set_sut = new_fifoset_sut(3)
# gen = ExhaustiveRAGenerator(set_sut)
# obs = gen.generate_observations(6, max_registers=2)
# obs = gen.generate_observations(8, max_registers=3)
# print("\n".join( [str(obs) for obs in obs]))
# learner = RALearner(IORAEncoder())
# (model, stats) = learn(learner, IORATest, obs)
# print(model, "\n \n", stats)
login_sut = new_login_sut(2)
login_sut = new_login_sut(1)
gen = ExhaustiveRAGenerator(login_sut)
obs = gen.generate_observations(8, max_registers=2)
obs = gen.generate_observations(6, max_registers=1)
print("\n".join( [str(obs) for obs in obs]))
learner = RALearner(IORAEncoder())
(model, stats) = learn(learner, IORATest, obs)
......
......@@ -45,8 +45,19 @@ class Automaton(metaclass=ABCMeta):
def start_state(self):
return self._states[0]
def acc_seq(self, state):
return self._acc_seq[state]
def acc_trans_seq(self, state=None) -> List[Transition]:
"""returns the access sequence to a state in the form of transitions"""
if state is not None:
return list(self._acc_seq[state])
else:
return dict(self._acc_seq)
def acc_seq(self, state=None):
"""returns the access sequence to a state in the form of sequences of inputs"""
if state is not None:
return self._seq(self._acc_seq[state])
else:
return {state:self._seq(self._acc_seq[state]) for state in self.states()}
def states(self):
return list(self._states)
......@@ -76,6 +87,11 @@ class Automaton(metaclass=ABCMeta):
return crt_state
@abstractmethod
def _seq(self, transitions:List[Transition])->List[object]:
"""returns the sequence of inputs generated by execution of these transitions"""
pass
def input_labels(self):
return set([trans.start_label for trans in self.transitions(self.start_state())])
......@@ -86,6 +102,7 @@ class Automaton(metaclass=ABCMeta):
# Basic __str__ function which works for most FSMs.
def __str__(self):
str_rep = ""
str_rep += str(self._acc_seq) + "\n"
for state in self.states():
str_rep += str(state) + "\n"
for tran in self.transitions(state):
......@@ -94,17 +111,26 @@ class Automaton(metaclass=ABCMeta):
return str_rep
class MutableAutomatonMixin(metaclass=ABCMeta):
def add_state(self, state):
def add_state(self:Automaton, state):
if state not in self._states:
self._states.append(state)
def add_transition(self, state, transition):
def add_transition(self:Automaton, state, transition):
if state not in self._state_to_trans:
self._state_to_trans[state] = []
self._state_to_trans[state].append(transition)
def add_acc_seq(self, state, trace):
self._acc_seq[state] = trace
def generate_acc_seq(self:Automaton):
"""generates access sequences for an automaton. It optionally takes in old access sequences, which are """
new_acc_seq = dict()
ptree = get_prefix_tree(self)
for state in self.states():
pred = lambda x: (x.state == state)
node = ptree.find_node(pred)
if node is None:
raise Exception("Could not find state {0} in tree {1}".format(state, ptree))
new_acc_seq[state] = node.path()
self._acc_seq = new_acc_seq
@abstractmethod
def to_immutable(self) -> Automaton:
......@@ -156,14 +182,7 @@ class MutableAcceptorMixin(MutableAutomatonMixin, metaclass=ABCMeta):
self._state_to_acc[state] = accepts
def get_acc_seq(aut : Automaton, runner, old_acc_seq = dict()):
new_acc_seq = {aut.state(acc_seq):acc_seq for acc_seq in old_acc_seq.values()}
not_covered = [state for state in aut.states() if state not in new_acc_seq.keys()]
ptree = get_prefix_tree(aut)
for state in not_covered:
trace = runner(ptree.path(state))
new_acc_seq[state] = trace
return new_acc_seq
def get_prefix_tree(aut : Automaton):
......@@ -179,27 +198,45 @@ def get_prefix_tree(aut : Automaton):
if trans.end_state not in visited:
child_node = PrefixTree(trans.end_state)
crt_node.add_child(trans, child_node)
to_visit.add(child_node)
return root
class PrefixTree():
def __init__(self, state):
self.state = state
self.tr_tree:dict = {}
self.parent = None
self.parent:PrefixTree = None
def add_child(self, trans, tree):
self.tr_tree[trans] = tree
self.tr_tree[trans].parent = self
def path(self, state) -> List[Transition]:
if len(self.tr_tree):
def path(self) -> List[Transition]:
if self.parent is None:
return []
else:
for (tr, node) in self.parent.tr_tree.items():
if node is self:
return self.parent.path()+[tr]
raise Exception("Miscontructed tree")
def find_node(self, predicate):
if predicate(self):
return self
elif len(self.tr_tree) == 0:
return None
else:
for (tran, child) in self.tr_tree.items():
path = child.path(state)
if path is not None:
return [tran] + path
return None
\ No newline at end of file
node = child.find_node(predicate)
if node is not None:
return node
return None
def __str__(self, tabs=0):
space = "".join(["\t" for _ in range(0, tabs)])
tree = "(n_{0}".format(self.state)
for (tr, node) in self.tr_tree.items():
tree += "\n" + space + " {0} -> {1}".format(tr, node.__str__(tabs=tabs + 1))
tree += ")"
return tree
\ No newline at end of file
......@@ -32,10 +32,16 @@ class DFA(Acceptor):
# print(tr_str)
return crt_state
def _seq(self, transitions: List[Transition]):
return [trans.start_label for trans in transitions]
class MutableDFA(DFA, MutableAcceptorMixin):
def __init__(self):
super().__init__([], {}, {})
def _runner(self):
return None
def to_immutable(self) -> DFA:
return DFA(self._states, self._state_to_trans, self._state_to_acc)
......@@ -55,6 +61,10 @@ class MooreMachine(Transducer):
crt_state = self.state(trace)
return self.state_to_out[crt_state]
def _seq(self, transitions:List[Transition]):
#trace = [(trans.start_label, self.state_to_out[trans.end_state]) for trans in transitions]
return [trans.start_label for trans in transitions]#trace
class MealyMachine(Transducer):
def __init__(self, states, state_to_trans):
super().__init__(states, state_to_trans)
......@@ -66,6 +76,10 @@ class MealyMachine(Transducer):
crt_state = super().state(trace)
return crt_state
def _seq(self, transitions:List[IOTransition]):
#trace = [(trans.start_label, trans.output) for trans in transitions]
return [trans.start_label for trans in transitions]
def output(self, trace: List[Symbol]) -> Output:
if len(trace) == 0:
return None
......
......@@ -137,6 +137,20 @@ class RegisterAutomaton(Acceptor, RegisterMachine):
return crt_state
def _seq(self, transitions:List[RATransition]):
run = []
values = set()
reg_val = dict()
for trans in transitions:
if isinstance(trans.guard, EqualityGuard) or isinstance(trans.guard, OrGuard):
inp_val = reg_val[trans.guard.registers()[0]]
else:
inp_val = 0 if len(values) == 0 else max(values) + 1
values.add(inp_val)
inp = Action(trans.start_label, inp_val)
run.append(inp)
return run
class IORegisterAutomaton(Transducer, RegisterMachine):
def __init__(self, locations, loc_to_trans, registers):
......@@ -194,6 +208,28 @@ class IORegisterAutomaton(Transducer, RegisterMachine):
output = fired_transition.output(valuation, values)
return output
def _seq(self, transitions: List[IORATransition]):
seq = []
values = set()
reg_val = dict()
for trans in transitions:
if isinstance(trans.guard, EqualityGuard) or isinstance(trans.guard, OrGuard):
inp_val = reg_val[trans.guard.registers()[0]]
else:
inp_val = 0 if len(values) == 0 else max(values) + 1
values.add(inp_val)
inp = Action(trans.start_label, inp_val)
reg_val = trans.update(reg_val, inp)
if isinstance(trans.output_mapping, Register):
out_val = reg_val[trans.output_mapping]
else:
out_val = 0 if len(values) == 0 else max(values) + 1
values.add(out_val)
out = Action(trans.output_label, out_val)
reg_val = trans.output_update(reg_val, out)
seq.append(inp)
return seq
class MutableRegisterAutomaton(RegisterAutomaton, MutableAcceptorMixin):
def __init__(self):
super().__init__([], dict(), dict(), [])
......
......@@ -3,6 +3,7 @@ from typing import List
import collections
from model.fa import Symbol
from model.ra import Action
from enum import Enum
......@@ -11,7 +12,7 @@ class SUT(metaclass=ABCMeta):
OK = "OK"
NOK = "NOK"
@abstractmethod
def run(self, seq:List[object]):
def run(self, seq:List[object]) -> Observation:
"""Runs a sequence of inputs on the SUT and returns an observation"""
pass
......@@ -59,13 +60,35 @@ class RASUT(metaclass=ABCMeta):
class Observation():
@abstractmethod
def trace(self):
"""returns the trace to be added to the solver"""
pass
@abstractmethod
def inputs(self):
"""returns all the values in the trace"""
"""returns all the inputs from an observation"""
pass
class DFAObservation():
def __init__(self, seq, acc):
self.seq = seq
self.acc = acc
def trace(self):
return (self.seq, self.acc)
def inputs(self) -> List[Symbol]:
return self.seq
class MealyObservation():
def __init__(self, trace):
self.tr = trace
def trace(self):
return self.tr
def inputs(self) -> List[Symbol]:
return [a for (a,_) in self.tr]
class RegisterMachineObservation(Observation):
@abstractmethod
......@@ -75,34 +98,37 @@ class RegisterMachineObservation(Observation):
# class RAObservation(Observation):
# def __init__(self, trace):
#
#
#
# def trace(self):
# return self.trace
#
# def values(self):
# return
class RAObservation(RegisterMachineObservation):
def __init__(self, seq, acc):
self.seq = seq
self.acc = acc
def trace(self):
return (self.trace, self.acc)
def inputs(self):
return self.seq
def values(self):
return set([a.value for a in self.seq if a.value is not None])
class IORAObservation(RegisterMachineObservation):
def __init__(self, trace):
self.trace = trace
self.tr = trace
def trace(self):
return self.trace
return self.tr
def inputs(self):
return [a for (a,_) in self.trace]
return [a for (a,_) in self.tr]
def values(self):
iv = [a.value for (a,_) in self.trace if a.value is not None]
ov = [a.value for (_,a) in self.trace if a.value is not None]
iv = [a.value for (a,_) in self.tr if a.value is not None]
ov = [a.value for (_,a) in self.tr if a.value is not None]
return set(iv+ov)
def __str__(self):
return "Obs: " + str(self.trace)
return "Obs: " + str(self.tr)
......
from sut import SUT, ObjectSUT, ActionSignature, SUTType
class Login():
INTERFACE = [ActionSignature("register", 0), ActionSignature("login", 1), ActionSignature("logout", 1)]
def __init__(self, size):
super()
self.size = size
self.logged = {}
def register(self):
if len(self.logged) == self.size:
return SUT.NOK
else:
new_id = 100 if len(self.logged) == 0 else max(self.logged.keys()) + 100
self.logged[new_id] = False
return ("OREG", new_id)
def login(self, val):
if val not in self.logged or self.logged[val]:
return SUT.NOK
else:
self.logged[val] = True
return SUT.OK
def logout(self, val):
if val not in self.logged or not self.logged[val]:
return SUT.NOK
else:
self.logged[val] = False
return SUT.OK
def new_login_sut(size, sut_type = SUTType.IORA):
return ObjectSUT(Login.INTERFACE, lambda : Login(size))
......@@ -25,10 +25,9 @@ class ExhaustiveRAGenerator(ObservationGeneration):
raise Exception("This generator assumes at most one parameter per action")
def generate_observations(self, max_depth, max_registers=3) -> List[Tuple[Action, Action]]:
obss = self._generate_observations([IORAObservation([])], 0, max_depth, max_registers+1)
print("\n".join([str(obs) for obs in obss]))
obs_traces = [obs.trace() for obs in obss]
#print(obs_traces)
observations = self._generate_observations([IORAObservation([])], 0, max_depth, max_registers+1)
print("\n".join([str(obs) for obs in observations]))
obs_traces = [obs.trace() for obs in observations]
return obs_traces
......@@ -40,11 +39,11 @@ class ExhaustiveRAGenerator(ObservationGeneration):
else:
new_obs = []
for obs in prev_obs:
num_val = max(obs.values()) + 1 if len(obs.values()) > 0 else 1
num_val = max(obs.values()) + 1 if len(obs.values()) > 0 else 0
for act_sig in self.act_sigs:
label = act_sig.label
if act_sig.num_params == 1:
for i in range(0, min(num_val, max_values)):
for i in range(0, min(num_val+1, max_values)):
seq = obs.inputs()
seq.append(Action(label, i))
new_obs.append(self.sut.run(seq))
......
from abc import ABCMeta, abstractmethod
from typing import List
import collections
import itertools
from model import Automaton, Transition
from model.ra import IORATransition, IORegisterAutomaton, EqualityGuard, OrGuard, Action, Register
from sut import SUT
from test import TestGenerator, Test, AcceptorTest, MealyTest, IORATest
import random
rand = random.Random(0)
class RWalkFromState(TestGenerator, metaclass=ABCMeta):
def __init__(self, sut:SUT, test_gen, rand_length, rand_start_state=True):
self.rand_length = rand_length
self.rand_start_state = rand_start_state
self.sut = sut
self.test_gen = test_gen
def gen_test(self, model: Automaton) -> Test:
"""generates a test comprising an access sequence and a random sequence"""
if self.rand_start_state:
crt_state = model.states()[rand.randint(0, len(model.states()) - 1)]
else:
crt_state = model.start_state()
trans_path = list(model.acc_trans_seq(crt_state))
for _ in range(0, self.rand_length):
transitions = model.transitions(crt_state)
r_trans = transitions[rand.randint(0, len(transitions)-1)]
crt_state = r_trans.end_state
trans_path.append(r_trans)
seq = self._generate_seq(model, trans_path)
obs = self.sut.run(seq)
test = self.test_gen(obs.trace())
return test
@abstractmethod
def _generate_seq(self, model: Automaton, trans_path:List[Transition]):
"""generates a sequence of inputs for the randomly chosen transition path"""
pass
# FSM versions of the algorithm
class FSMRWalkFromState(RWalkFromState, metaclass=ABCMeta):
def __init__(self, sut: SUT, test_gen, rand_length, rand_start_state=True):
super().__init__(sut, test_gen, rand_length, rand_start_state)
def _generate_seq(self, model: Automaton, trans_path:List[Transition]):
return [trans.start_label for trans in trans_path]
class DFARWalkFromState(RWalkFromState):
def __init__(self, sut:SUT, rand_length, rand_start_state=True):
super().__init__(self, sut, AcceptorTest, rand_length, rand_start_state)
class MealyRWalkFromState(RWalkFromState):
def __init__(self, sut:SUT, rand_length, rand_start_state=True):
super().__init__(self, sut, MealyTest, rand_length, rand_start_state)
class ValueProb(collections.nametuple("ValueProb", ("history", "register", "fresh"))):
def select(self, reg_vals:List[int], his_vals:List[int], fresh_value):
pick = rand.random()
if pick < self.register and len(reg_vals) > 0:
return reg_vals[rand.randint(0, len(reg_vals)-1)]
elif pick >= self.register and pick < self.register + self.history and len(his_vals) > 0:
return reg_vals[rand.randint(0, len(his_vals) - 1)]
else:
return fresh_value
class IORARWalkFromState(RWalkFromState):
def __init__(self, sut: SUT, rand_length, prob = ValueProb(0.4, 0.4, 0.2), rand_start_state=True):
super().__init__(sut, IORATest, rand_length, rand_start_state)
self.prob = prob
def _generate_seq(self, model: IORegisterAutomaton, transitions:List[IORATransition]):
"""generates a sequence of inputs for the randomly chosen transition path"""
seq = []
values = set() # we may consider using lists to preserve order (sets cause randomness)
reg_val = dict()
for trans in transitions:
if isinstance(trans.guard, EqualityGuard) or isinstance(trans.guard, OrGuard):
inp_val = reg_val[trans.guard.registers()[0]]
else:
# we have a fresh transition, which means we can either pick a fresh value or any past value
# as long as it is not stored in one of the guarded registers in this location
fresh_val = 0 if len(values) == 0 else max(values) + 1
active_regs = set(itertools.chain([tr.guard.registers() for tr in
model.transitions(trans.start_state) if not tr is not trans]))
active_reg_vals = [reg_val[reg] for reg in active_regs]
selectable_reg_vals = [val for val in reg_val.values() if val not in active_reg_vals]
selectable_his_vals = [val for val in values if val not in active_reg_vals
and val not in selectable_reg_vals]
inp_val = self.prob.select(selectable_reg_vals, selectable_his_vals, fresh_val)
values.add(inp_val)