Commit 70be9b43 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Before modification

parent e9332f60
......@@ -11,12 +11,12 @@ from learn.fa import FALearner
from learn.ra import RALearner
from model import Automaton
from model.ra import RegisterMachine
from sut import SUTType
from sut import SUTType, StatsSUT, DFAObservation, RAObservation, MealyObservation, IORAObservation
from sut.login import LoginClass
from sut.scalable import ScalableSUTClass
from sut.fifoset import FIFOSetClass
from sut.login import LoginClass
from sut.stack import StackClass
from test import TestGenerator
from sut.sut_cache import AcceptorCache, IOCache, CacheSUT
from learn.algorithm import learn_mbt, Statistics
from test.rwalk import DFARWalkFromState, MealyRWalkFromState, RARWalkFromState, IORARWalkFromState
from statistics import stdev, median
......@@ -27,22 +27,23 @@ class SutDesc(collections.namedtuple("SutDesc", 'sut_class type size')):
return str(self.type).replace("SUTType.","") + "_" + str(self.sut_class.__class__.__name__).replace("Class", "") + "(" + str(self.size) + ")"
TestDesc = collections.namedtuple("TestDesc", 'max_tests rand_length prop_reset')
class ExperimentStats(collections.namedtuple("CollectedStats", "states registers learn_tests "
"learn_inputs total_tests learn_time")):
class ExperimentStats(collections.namedtuple("CollectedStats", "states registers tests "
"inputs max_ltime learn_time")):
pass
class CollatedStats(collections.namedtuple("CollatedStats", "exp_succ states registers consistent avg_ltests std_ltests avg_inputs std_inputs avg_ltime std_ltime")):
class CollatedStats(collections.namedtuple("CollatedStats", "exp_succ states registers consistent avg_tests std_tests avg_inputs std_inputs max_ltime avg_ltime std_ltime")):
pass
def get_learner_setup(sut_type:SUTType):
def get_learner_setup(sut, sut_type:SUTType, size, test_desc:TestDesc):
args = (sut, test_desc.rand_length+size, test_desc.prop_reset)
if sut_type is SUTType.DFA:
return (FALearner(DFAEncoder()), DFARWalkFromState)
return (FALearner(DFAEncoder()), DFARWalkFromState(*args))
elif sut_type is SUTType.Mealy:
return (FALearner(MealyEncoder()), MealyRWalkFromState)
return (FALearner(MealyEncoder()), MealyRWalkFromState(*args))
elif sut_type is SUTType.RA:
return (RALearner(RAEncoder()), RARWalkFromState)
return (RALearner(RAEncoder()), RARWalkFromState(*args))
elif sut_type is SUTType.IORA:
return (RALearner(IORAEncoder()), IORARWalkFromState)
return (RALearner(IORAEncoder()), IORARWalkFromState(*args))
raise Exception("Invalid setup")
class Benchmark:
......@@ -72,14 +73,26 @@ class Benchmark:
while True:
if max_size is not None and size > max_size:
break
print("Learning ", size)
sut = sut_class.new_sut(sut_type, size)
learner,test_gen = get_learner_setup(sut_type)
stats_sut = StatsSUT(sut)
sut_stats = stats_sut.stats_tracker()
if sut_type.is_acceptor():
if sut_type is SUTType.DFA:
cache = AcceptorCache(DFAObservation)
else:
cache = AcceptorCache(RAObservation)
else:
if sut_type is SUTType.Mealy:
cache = IOCache(MealyObservation)
else:
cache = IOCache(IORAObservation)
cache_sut = CacheSUT(stats_sut, cache)
learner,tester = get_learner_setup(cache_sut, sut_type, size, test_desc)
learner.set_timeout(tout)
# ugly but there you go
rand_length = size + test_desc.rand_length
prop_reset = 1/rand_length
tester = test_gen(sut, rand_length, prop_reset)
(model, statistics) = learn_mbt(learner, tester, test_desc.max_tests)
(model, statistics) = learn_mbt(learner, tester, test_desc.max_tests, stats_tracker=sut_stats)
if model is None:
break
else:
......@@ -92,12 +105,11 @@ class Benchmark:
def _collect_stats(self, model:Automaton, statistics:Statistics) -> ExperimentStats:
states = len(model.states())
registers = len(model.registers()) if isinstance(model, RegisterMachine) else None
learn_tests = statistics.num_learner_tests
learn_inputs = statistics.num_learner_inputs
total_tests = statistics.suite_size
learn_tests = statistics.resets
learn_inputs = statistics.inputs
learn_time = sum(statistics.learning_times)
return ExperimentStats(states=states, registers=registers, learn_tests=learn_tests, learn_inputs=learn_inputs,
total_tests=total_tests, learn_time=learn_time)
max_ltime = max(statistics.learning_times)
return ExperimentStats(states=states, registers=registers, tests=learn_tests, inputs=learn_inputs, max_ltime=max_ltime, learn_time=learn_time)
def run_benchmarks(self, test_desc:TestDesc, timeout:int, max_size:int=None) -> List[Tuple[SutDesc, ExperimentStats]]:
results = []
......@@ -110,7 +122,6 @@ def collate_stats(sut_desc: SutDesc, exp_stats:List[ExperimentStats]):
if exp_stats is None:
return None
else:
states = [e.states for e in exp_stats]
avg_states = median(states)
regs = [e.registers for e in exp_stats]
......@@ -121,18 +132,20 @@ def collate_stats(sut_desc: SutDesc, exp_stats:List[ExperimentStats]):
consistent = len(set(states)) == 1 and \
(not sut_desc.type.has_registers() or len(set(regs)) == 1)
exp_succ = len(exp_stats)
ltests = [e.learn_tests for e in exp_stats]
linputs = [e.learn_inputs for e in exp_stats]
ltests = [e.tests for e in exp_stats]
linputs = [e.inputs for e in exp_stats]
ltime = [e.learn_time for e in exp_stats]
maxltimes = [e.max_ltime for e in exp_stats]
return CollatedStats(
exp_succ=exp_succ,
states=avg_states,
registers=avg_reg,
consistent=consistent,
avg_ltests=median(ltests),
std_ltests=0 if len(ltests) == 1 else stdev(ltests),
avg_tests=median(ltests),
std_tests=0 if len(ltests) == 1 else stdev(ltests),
avg_inputs=median(linputs),
std_inputs=0 if len(linputs) == 1 else stdev(linputs),
max_ltime=max(maxltimes),
avg_ltime=median(ltime),
std_ltime=0 if len(ltime) == 1 else stdev(ltime),
)
......@@ -159,27 +172,27 @@ b = Benchmark()
#b.add_setup(SUTType.IORA, RALearner(IORAEncoder()), IORARWalkFromState)
# add the sut classes we want to benchmark
#b.add_sut(FIFOSetClass())
#b.add_sut(LoginClass())
#b.add_sut(StackClass())
b.add_sut(FIFOSetClass())
b.add_sut(LoginClass())
b.add_sut(StackClass())
b.add_sut(FIFOSetClass(), SUTType.Mealy)
#b.add_sut(FIFOSetClass(), SUTType.DFA)
#b.add_sut(FIFOSetClass(), SUTType.IORA)
#b.add_sut(LoginClass(), SUTType.IORA)
#b.add_sut(StackClass(), SUTType.IORA)
# create a test description
t_desc = TestDesc(max_tests=100000, prop_reset=0.2, rand_length=3)
t_desc = TestDesc(max_tests=10000, prop_reset=0.2, rand_length=3)
# give an smt timeout value (in ms)
timeout = 600
timeout = 150000
# how many times each experiment should be run
num_exp = 1
num_exp = 5
# up to what systems of what size do we want to run experiments (set to None if size is ignored as a stop condition)
max_size = 6
max_size = None
# run the benchmark and collect results
results = []
......
......@@ -2,14 +2,17 @@ from abc import ABCMeta, abstractmethod
import z3
def enum(name, elements):
def dt_enum(name, elements):
d = z3.Datatype(name)
for element in elements:
d.declare(element)
d = d.create()
return d, [d.__getattribute__(element) for element in elements]
def int_enum(name, elements):
d = z3.IntSort()
return d, [z3.IntVal(i) for i in range(1, len(elements) + 1)]
class Automaton(metaclass=ABCMeta):
@abstractmethod
def export(self, model):
......
......@@ -2,21 +2,21 @@ from abc import ABCMeta, abstractmethod
import z3
import model.fa
from define import enum, Automaton
from define import Automaton, dt_enum
from model import Transition
class FSM(Automaton,metaclass=ABCMeta):
@abstractmethod
def __init__(self, num_states):
self.State, self.states = enum('State', ['state{0}'.format(i) for i in range(num_states)])
def __init__(self, num_states, state_enum=dt_enum):
self.State, self.states = state_enum('State', ['state{0}'.format(i) for i in range(num_states)])
self.start = self.states[0]
class DFA(FSM):
def __init__(self, labels, num_states):
super().__init__(num_states)
labels = list(labels)
self.Label, elements = enum('Label', labels)
self.Label, elements = dt_enum('Label', labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self.transition = z3.Function('transition', self.State, self.Label, self.State)
self.output = z3.Function('output', self.State, z3.BoolSort())
......@@ -28,11 +28,11 @@ class DFA(FSM):
class MealyMachine(FSM):
def __init__(self, input_labels, output_labels, num_states):
super().__init__(num_states)
self.Input, elements = enum('Input', input_labels)
def __init__(self, input_labels, output_labels, num_states, state_enum=dt_enum, label_enum=dt_enum):
super().__init__(num_states, state_enum=state_enum)
self.Input, elements = label_enum('Input', input_labels)
self.inputs = {input_labels[i]: elements[i] for i in range(len(input_labels))}
self.Output, elements = enum('Output', output_labels)
self.Output, elements = label_enum('Output', output_labels)
self.outputs = {output_labels[i]: elements[i] for i in range(len(output_labels))}
self.transition = z3.Function('transition', self.State, self.Input, self.State)
self.output = z3.Function('output', self.State, self.Input, self.Output)
......
import z3
from model.ra import *
from define import enum, Automaton
from define import dt_enum, Automaton
class RegisterMachine(Automaton):
def __init__(self, num_locations, num_registers, param_size):
self.Location, self.locations = enum('Location', ['location{0}'.format(i) for i in range(num_locations)])
self.Register, self.registers = enum('Register',
self.Location, self.locations = dt_enum('Location', ['location{0}'.format(i) for i in range(num_locations)])
self.Register, self.registers = dt_enum('Register',
['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
self.param_size = param_size
......@@ -14,7 +14,7 @@ class RegisterAutomaton(RegisterMachine):
def __init__(self, labels, param_size, num_locations, num_registers):
super().__init__(num_locations, num_registers, param_size)
self.Label, elements = enum('Label', labels)
self.Label, elements = dt_enum('Label', labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self.start = self.locations[0]
self.fresh = self.registers[-1]
......@@ -33,7 +33,7 @@ class IORegisterAutomaton(RegisterMachine):
def __init__(self, input_labels, output_labels, param_size, num_locations, num_registers):
super().__init__(num_locations, num_registers, param_size)
labels = input_labels + output_labels
self.Label, elements = enum('Label', input_labels + output_labels)
self.Label, elements = dt_enum('Label', input_labels + output_labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self._input_labels = [self.labels[lbl] for lbl in input_labels]
self.sink = self.locations[-1]
......@@ -55,10 +55,10 @@ class IORegisterAutomaton(RegisterMachine):
class IORegisterAutomaton2(Automaton):
def __init__(self, inputs, outputs, num_locations, num_registers):
self.Input, self.inputs = enum('Input', inputs)
self.Output, self.outputs = enum('Output', outputs)
self.Location, self.locations = enum('Location', ['location{0}'.format(i) for i in range(num_locations)])
self.Register, self.registers = enum('Register', ['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
self.Input, self.inputs = dt_enum('Input', inputs)
self.Output, self.outputs = dt_enum('Output', outputs)
self.Location, self.locations = dt_enum('Location', ['location{0}'.format(i) for i in range(num_locations)])
self.Register, self.registers = dt_enum('Register', ['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
self.start = self.locations[0]
self.fresh = self.registers[-1]
self.transition = z3.Function('transition', self.Location, self.Input, self.Register, self.Location)
......
......@@ -10,4 +10,20 @@ class Encoder(metaclass=ABCMeta):
@abstractmethod
def build(self, *args) -> (Automaton, z3.ExprRef):
pass
# an encoder which builds trace and automaton constraints independently from each other. This allows Learners to use incremental
# solving algorithms
# TODO build incremental encoders and learners
class IncrementalEncoder(metaclass=ABCMeta):
@abstractmethod
def add(self, trace) -> None:
pass
@abstractmethod
def transitions(self):
pass
@abstractmethod
def automaton(self, *args):
pass
\ No newline at end of file
import itertools
import z3
from define.fa import DFA, MealyMachine, Mapper
from encode import Encoder
from utils import Tree
......@@ -99,5 +100,4 @@ class MealyEncoder(Encoder):
i = mm.inputs[input]
c = mapper.element(child.id)
constraints.append(mm.transition(mapper.map(n), i) == mapper.map(c))
return constraints
return constraints
\ No newline at end of file
......@@ -32,7 +32,7 @@ class FALearner(Learner):
# to = fa
# return (None, to)
def _learn_model(self, min_states=1, max_states=20):
def _learn_model(self, min_states=1, max_states=10):
"""generates the definition and model for an fa whose traces include the traces added so far
In case of failure, the second argument of the tuple signals occurrence of a timeout"""
for num_states in range(min_states, max_states + 1):
......
......@@ -153,7 +153,6 @@ class DotImporter(metaclass=ABCMeta):
# the original stream is still updated, iterators constructed from other iterators share state
for line in stream:
print(line)
line = line.strip()
# if the line only contained spaces continue
if len(line) == 0 or line.startswith("__start0"):
......@@ -182,7 +181,6 @@ class DotImporter(metaclass=ABCMeta):
(state_matches, last_line) = self._matching_lines(line_stream, self.PAT_STATE_STR,
stop_pat=self.PAT_TRANSITION, prev_line=last_line)
for state_match in state_matches:
print(state_match, "state")
self._visit_state(state_match.group("state"), state_match.group("content"))
(trans_matches, last_line) = self._matching_lines(line_stream, self.PAT_TRANSITION,
......
......@@ -83,7 +83,7 @@ class RAObservation(AcceptorObservation, RegisterMachineObservation):
super().__init__(seq, acc)
def trace(self) -> Tuple[List[Action], bool]:
return self.trace()
return super().trace()
def inputs(self) -> List[Action]:
return super().inputs()
......@@ -127,7 +127,7 @@ class SUTType(Enum):
DFA = 5
def is_acceptor(self):
return self == SUTType.RA or self.DFA
return self == SUTType.RA or self == SUTType.DFA
def has_registers(self):
return self == SUTType.RA or self == SUTType.IORA
......
......@@ -2,7 +2,7 @@ from abc import abstractmethod, ABCMeta
from typing import List, Type
import itertools
from sut import SUT, TransducerObservation, Observation, MealyObservation
from sut import SUT, TransducerObservation, Observation, MealyObservation, AcceptorObservation
from utils import Tree
......@@ -39,6 +39,23 @@ class IOCache(Cache):
to_add = list(itertools.chain(*map(iter, obs.trace())))
self._tree[to_add]
class AcceptorCache(Cache):
def __init__(self, obs_gen):
self._tree = Tree(itertools.count(0))
self._obs_gen = obs_gen
self._acc = {}
def from_cache(self, seq):
node = self._tree[seq]
if node in self._acc:
return self._obs_gen(seq, self._acc[node])
else:
return None
def update_cache(self, obs: AcceptorObservation):
node = self._tree[obs.inputs()]
self._acc[node] = obs.acc
class CacheSUT(SUT):
def __init__(self, sut:SUT, cache:Cache):
self._sut = sut
......
......@@ -183,7 +183,7 @@ class RARWalkFromState(RWalkFromState):
inp = Action(trans.start_label, inp_val)
reg_val = trans.update(reg_val, inp)
else:
raise Exception("Unkown guard")
raise Exception("Unknown guard")
else:
inp_val = None
inp = Action(trans.start_label, inp_val)
......
......@@ -11,7 +11,7 @@ class YannakakisTestGenerator(TestGenerator):
def __init__(self, sut,
yan_path=os.path.join(os.path.dirname(__file__), "..", "..","resources", "binaries", "yannakakis.exe"),
mode="random", max_k=3, rand_length=3):
mode="random", max_k=1, rand_length=1):
self.yan_path = yan_path
self.mode = mode
self.max_k = max_k
......@@ -22,13 +22,17 @@ class YannakakisTestGenerator(TestGenerator):
def initialize(self, model: Automaton):
dot_content = parse.exporter.to_dot(model)
#print(" ".join([self.yan_path, "=", self.mode, str(self.max_k), str(self.r_length)]))
self.yan_proc = subprocess.Popen([self.yan_path, "=", self.mode, str(self.max_k), str(self.r_length)],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=10, universal_newlines=True)
self.yan_proc.stdin.write(dot_content)
self.yan_proc.stdin.write("\n")
self.yan_proc.stdin.flush()
#to use exhaustive yanna (
#self.yan_proc = subprocess.Popen([self.yan_path, os.path.join("..", "resources", "models","bankcards", "VISA.dot"),
# "fixed", "1", "1"],
# stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=10, universal_newlines=True)
def gen_test(self, model: Automaton):
if model is None:
seq = self.gen_blind_inp_seq(self.sut)
......
from abc import ABCMeta
from typing import Tuple, List, Dict
import collections
from encode.fa import DFAEncoder, MealyEncoder
from learn import Learner
from encode.fa import MealyEncoder
from learn.fa import FALearner
from model import Automaton
from model.ra import RegisterMachine
......@@ -119,7 +117,7 @@ bankcards_path = os.path.join(models_path, "bankcards")
pdu_path = os.path.join(models_path, "pdu")
biometric = ModelDesc("biometric", "MealyMachine", os.path.join(models_path, "biometric.dot"))
bankard_names= ["MAESTRO", "MasterCard", "PIN", "SecureCode"]
bankard_names= ["MAESTRO", "MasterCard", "PIN", "SecureCode", "VISA"]
bankcards = [ModelDesc(name, "MealyMachine", os.path.join(bankcards_path, "{}.dot".format(name))) for name in bankard_names]
pdus = [ModelDesc("pdu" + str(i), "MealyMachine",
......@@ -128,18 +126,18 @@ pdus = [ModelDesc("pdu" + str(i), "MealyMachine",
#b.add_experiment(biometric)
#for bankard in bankcards:
# b.add_experiment(bankard)
#for pdu in pdus:
#for pdu in pdus[:-2]:
# b.add_experiment(pdu)
b.add_experiment(pdus[5])
b.add_experiment(bankcards[1])
# create a test description
t_desc = TestDesc(max_tests=10000, max_k=3, rand_length=3)
# give the smt timeout value (in ms)
timeout = 60
timeout = 150000
# how many times each experiment should be run
num_exp = 1
num_exp = 5
# run the benchmark and collect results
results = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment