Commit e9332f60 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Updated files

parent e71fcef7
......@@ -3,6 +3,7 @@ from typing import List, Tuple, Union,cast
from model import Automaton
from learn import Learner
from sut import StatsTracker
from test import TestGenerator, Test
import time
......@@ -77,7 +78,7 @@ def learn(learner:Learner, test_type:type, traces: List[object]) -> Tuple[Automa
statistics.set_suite_size(len(traces))
return (model, statistics)
def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> Tuple[Union[Automaton,None], Statistics]:
def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int, stats_tracker:StatsTracker=None) -> Tuple[Union[Automaton, None], Statistics]:
""" takes learner, a test generator, and bound on the number of tests and generates a model"""
next_test = test_generator.gen_test(None)
statistics = Statistics()
......@@ -117,6 +118,10 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
continue
test_generator.initialize(model)
if stats_tracker is not None:
old_rsts = stats_tracker.resets()
old_inputs = stats_tracker.inputs()
# we then generate and check new tests, until either we find a CE,
# or the suite is exhausted or we have run the intended number of tests
for i in range(0, max_tests):
......@@ -132,5 +137,9 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
break
test_generator.terminate()
if stats_tracker is not None:
statistics.resets = old_rsts
statistics.inputs = old_inputs
#print([str(test.trace() for test in learner_tests)])
return (model, statistics)
......@@ -5,6 +5,8 @@ import sys
import inspect
import re
import itertools
from model import Automaton, Acceptor, Transition
from model.fa import IOTransition, MutableMealyMachine
from model.ra import RATransition, IORATransition, Action
......@@ -120,6 +122,8 @@ class DotImporter(metaclass=ABCMeta):
PAT_NO_NEWLINE = "[^(\r\n|[\r\n])]"
PAT_NEWLINE = "(\r\n|[\r\n])"
PAT_STATE = "s[0-9]*"
PAT_STATE_IGNORE = "__start0.*"
#(?P < content > [ ^ >\[] *)"+"(\[. *\])?;?"
PAT_STATE_STR = "(?P<state>"+PAT_STATE+")" + "(?P<content>[^>\[]*)"+"(\[.*\])?;?"
PAT_TRANS_LABEL = "\[label=\"(?P<label>.*)\"\];?"
PAT_TRANS_MOVE = "(?P<src>" + PAT_STATE + ")\s*->\s*" + "(?P<dst>"+ PAT_STATE + ")"
......@@ -140,22 +144,29 @@ class DotImporter(metaclass=ABCMeta):
def build_aut(self):
pass
def _matching_lines(self, stream, pat, expected=True, stop_at_mismatch=True):
def _matching_lines(self, stream, match_pat, stop_pat=None, prev_line:str=None, expect_match=True):
"""iterates as long as the lines of a stream match a pattern and returns the matches together with
the non-matching line"""
matches = []
if prev_line is not None:
stream = itertools.chain(iter([prev_line]), stream)
# the original stream is still updated, iterators constructed from other iterators share state
for line in stream:
print(line)
line = line.strip()
# if the line only contained spaces continue
if len(line) == 0:
if len(line) == 0 or line.startswith("__start0"):
continue
match = re.fullmatch(pat, line)
match = re.fullmatch(match_pat, line)
if match is not None:
matches.append(match)
else:
if stop_at_mismatch is True:
if stop_pat is not None and re.fullmatch(stop_pat, line) is not None:
break
if len(matches) == 0:
print("Wrong format, cannot find match for ",pat)
if expect_match and len(matches) == 0:
print("Wrong format, cannot find match for ", match_pat)
print("Expected format (empty lines/whitespaces ignored): \n"+
self.PAT_START+"\n" +
self.PAT_STATE_STR+"\n" +
......@@ -164,16 +175,18 @@ class DotImporter(metaclass=ABCMeta):
"...")
exit(1)
return matches
return (matches, line)
def parse_dot(self, line_stream:str):
patterns = dict()
_ = self._matching_lines(line_stream, self.PAT_START, patterns)
state_matches = self._matching_lines(line_stream, self.PAT_STATE_STR, patterns)
(_, last_line) = self._matching_lines(line_stream, self.PAT_START, stop_pat=self.PAT_STATE_STR)
(state_matches, last_line) = self._matching_lines(line_stream, self.PAT_STATE_STR,
stop_pat=self.PAT_TRANSITION, prev_line=last_line)
for state_match in state_matches:
print(state_match, "state")
self._visit_state(state_match.group("state"), state_match.group("content"))
trans_matches = self._matching_lines(line_stream, self.PAT_TRANSITION, patterns, stop_at_mismatch=False)
(trans_matches, last_line) = self._matching_lines(line_stream, self.PAT_TRANSITION,
prev_line=last_line)
for trans_match in trans_matches:
self._visit_transition(trans_match.group("src"), trans_match.group("dst"), trans_match.group("label"))
......@@ -192,7 +205,7 @@ class TransducerDotImporter(DotImporter, metaclass=ABCMeta):
exit(1)
inp_label = match.group("input")
out_label = match.group("output")
self._visit_iotransition(from_state, to_state, inp_label, out_label)
self._visit_iotransition(from_state, to_state, inp_label.strip(), out_label.strip())
class MealyMachineDotImporter(TransducerDotImporter):
def __init__(self):
......
......@@ -140,6 +140,16 @@ from sut.scalable import scalable_sut_classes, get_scalable_sut
from sut.simulation import get_simulation
class StatsTracker(object):
def __init__(self, sut):
self.__sut = sut
def inputs(self):
return self.__sut._inputs
def resets(self):
return self.__sut._resets
class StatsSUT(SUT):
def __init__(self, sut:SUT):
self._sut = sut
......@@ -154,8 +164,8 @@ class StatsSUT(SUT):
self._resets += 1
return self._sut.run(seq)
def inputs(self):
return self._inputs
def stats_tracker(self):
return StatsTracker(self)
def resets(self):
return self._resets
......@@ -9,7 +9,7 @@ from learn.fa import FALearner
from model import Automaton
from model.ra import RegisterMachine
from parse.importer import build_automaton_from_dot
from sut import SUTType, get_simulation, MealyObservation, StatsSUT
from sut import SUTType, get_simulation, MealyObservation, StatsSUT, StatsTracker
from learn.algorithm import learn_mbt, Statistics
from statistics import stdev, median
import os.path
......@@ -30,10 +30,11 @@ ModelDesc = collections.namedtuple("ModelDesc", 'name type path')
def get_learner_setup(aut:Automaton, test_desc:TestDesc):
sut = get_simulation(aut)
stats_sut = StatsSUT(sut)
sut_stats = stats_sut.stats_tracker()
sut = CacheSUT(stats_sut, IOCache( MealyObservation))
learner = FALearner(MealyEncoder())
tester = YannakakisTestGenerator(sut, max_k=test_desc.max_k, rand_length=test_desc.rand_length)
return (learner, tester, stats_sut)
return (learner, tester, sut, sut_stats)
class Benchmark:
def __init__(self):
......@@ -48,15 +49,11 @@ class Benchmark:
-> List[Tuple[ExpDesc, ExperimentStats]]:
results = []
aut = build_automaton_from_dot(mod_desc.type, mod_desc.path)
(learner, tester, stats_sut) = get_learner_setup(aut, test_desc)
(learner, tester, sut, sut_stats) = get_learner_setup(aut, test_desc)
learner.set_timeout(tout)
(model, statistics) = learn_mbt(learner, tester, test_desc.max_tests)
(model, statistics) = learn_mbt(learner, tester, test_desc.max_tests, stats_tracker=sut_stats)
exp_desc = ExpDesc(mod_desc.name, len(aut.states()))
if model is None:
results.append((exp_desc, None))
else:
statistics.inputs = stats_sut.inputs()
statistics.resets = stats_sut.resets()
if model is not None:
imp_stats = self._collect_stats(model, statistics)
results.append( (exp_desc, imp_stats))
return results
......@@ -82,7 +79,6 @@ def collate_stats(sut_desc: ExpDesc, exp_stats:List[ExperimentStats]):
if exp_stats is None:
return None
else:
states = [e.states for e in exp_stats]
avg_states = median(states)
consistent = len(set(states)) == 1
......@@ -135,12 +131,12 @@ pdus = [ModelDesc("pdu" + str(i), "MealyMachine",
#for pdu in pdus:
# b.add_experiment(pdu)
b.add_experiment(bankcards[0])
b.add_experiment(pdus[5])
# create a test description
t_desc = TestDesc(max_tests=10000, max_k=3, rand_length=3)
# give the smt timeout value (in ms)
timeout = 60000
timeout = 60
# how many times each experiment should be run
num_exp = 1
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment