Commit 545fb840 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Removed old test files, added benchmark modules

parent 56db3d15
from unittest import TestCase
import z3
from z3gi_old import define
class TestDefine(TestCase):
def test_state_type(self):
self.assertTrue(define.State.eq(z3.DeclareSort('STATE')))
def test_input_type(self):
self.assertTrue(define.Input.eq(z3.DeclareSort('INPUT')))
def test_output_type(self):
self.assertTrue(define.Output.eq(z3.DeclareSort('OUTPUT')))
def test_element_type(self):
self.assertTrue(define.Element.eq(z3.DeclareSort('ELEMENT')))
# ...
from unittest import TestCase
import z3
from z3gi_old import define, encode
fsms = [define.MooreMachine(), define.MealyMachine()]
# We're only testing type safety here, not the actual implementation
class TestFSMEncoder(TestCase):
def test_state(self):
for fsm in fsms:
encoder = encode.NestingFSMEncoder(fsm)
try:
self.assertTrue(encoder.state('').sort().eq(define.State))
self.assertTrue(encoder.state((1, 2, 3)).sort().eq(define.State))
self.assertTrue(encoder.state([i for i in range(5)]).sort().eq(define.State))
self.assertTrue(encoder.state('').eq(encoder.state([])))
except encode.EncodeError:
self.fail()
with self.assertRaises(encode.EncodeError):
encoder.state(1)
with self.assertRaises(encode.EncodeError):
encoder.state(True)
def test_transition(self):
for fsm in fsms:
encoder = encode.NestingFSMEncoder(fsm)
try:
self.assertTrue(encoder.transition(encoder.state('abc'), define.input()).sort().eq(define.State))
self.assertTrue(encoder.transition(define.state(), define.input()).sort().eq(define.State))
except encode.EncodeError:
self.fail()
with self.assertRaises(encode.EncodeError):
encoder.transition('state', 'input')
with self.assertRaises(encode.EncodeError):
encoder.transition(None, None)
def test_output(self):
encoder = encode.NestingFSMEncoder(define.MooreMachine())
try:
self.assertTrue(encoder.output(encoder.state('abc')).sort().eq(define.Output))
self.assertTrue(encoder.output(define.state()).sort().eq(define.Output))
except encode.EncodeError:
self.fail()
with self.assertRaises(encode.EncodeError):
encoder.output('state')
with self.assertRaises(encode.EncodeError):
encoder.output(None)
with self.assertRaises(encode.EncodeError):
encoder.output(define.state(), define.input())
encoder = encode.NestingFSMEncoder(define.MealyMachine())
try:
self.assertTrue(encoder.output(encoder.state('abc'), define.input()).sort().eq(define.Output))
self.assertTrue(encoder.output(define.state(), define.input()).sort().eq(define.Output))
except encode.EncodeError:
self.fail()
with self.assertRaises(encode.EncodeError):
encoder.output('state', 'input')
with self.assertRaises(encode.EncodeError):
encoder.output(None)
with self.assertRaises(encode.EncodeError):
encoder.output(define.state())
def test_states(self):
for fsm in fsms:
encoder = encode.NestingFSMEncoder(fsm)
try:
self.assertTrue(encoder.states(range(10)).sort().eq(z3.BoolSort()))
self.assertTrue(encoder.states(['a']).sort().eq(z3.BoolSort()))
except encode.EncodeError:
self.fail()
with self.assertRaises(encode.EncodeError):
encoder.states([])
with self.assertRaises(encode.EncodeError):
encoder.states(1)
def test_outputs(self):
for fsm in fsms:
encoder = encode.NestingFSMEncoder(fsm)
try:
self.assertTrue(encoder.outputs(range(10)).sort().eq(z3.BoolSort()))
self.assertTrue(encoder.outputs(['a']).sort().eq(z3.BoolSort()))
self.assertTrue(encoder.outputs('ab').sort().eq(z3.BoolSort()))
self.assertTrue(encoder.outputs({'a', 'b', 1}).sort().eq(z3.BoolSort()))
except encode.EncodeError:
self.fail()
with self.assertRaises(encode.EncodeError):
encoder.outputs([])
with self.assertRaises(encode.EncodeError):
encoder.outputs(1)
from unittest import TestCase
import z3
from z3gi_old import define, encode, learn
class TestFSMLearner(TestCase):
def test_add(self):
fsm = define.MooreMachine()
for encoder in [encode.NestingFSMEncoder, encode.MappingFSMEncoder]:
learner = learn.FSMLearner(fsm, encoder)
try:
learner.add('', 0)
learner.add('', '0')
learner.add('abc', '0123')
learner.add(['a', 'b', 'c'], (0, 1, 2, 3))
learner.add('abc', 3)
except (encode.EncodeError, learn.NonDeterminismError) as e:
self.fail("Unexpected exception: {0}".format(e))
with self.assertRaises(encode.EncodeError):
learner.add('', '')
with self.assertRaises(encode.EncodeError):
learner.add('', '00')
with self.assertRaises(encode.EncodeError):
learner.add('a', '')
with self.assertRaises(encode.EncodeError):
learner.add(True, True)
with self.assertRaises(learn.NonDeterminismError):
learner.add('', 1)
with self.assertRaises(learn.NonDeterminismError):
learner.add('abc', '1123')
self.assertEqual(learner.outputs, {'0', '1', '2', '3'})
fsm = define.MealyMachine()
for encoder in [encode.NestingFSMEncoder, encode.MappingFSMEncoder]:
learner = learn.FSMLearner(fsm, encoder)
try:
learner.add('a', 0)
learner.add('a', '0')
learner.add('abc', '012')
learner.add(['a', 'b', 'c'], (0, 1, 2))
learner.add('abc', 2)
learner.add('abc', '2')
except (encode.EncodeError, learn.NonDeterminismError) as e:
self.fail("Unexpected exception: {0}".format(e))
with self.assertRaises(encode.EncodeError):
learner.add('', '')
with self.assertRaises(encode.EncodeError):
learner.add('', '0')
with self.assertRaises(encode.EncodeError):
learner.add('a', '')
with self.assertRaises(encode.EncodeError):
learner.add(True, True)
with self.assertRaises(learn.NonDeterminismError):
learner.add('a', 1)
with self.assertRaises(learn.NonDeterminismError):
learner.add('abc', '022')
self.assertEqual(learner.outputs, {'0', '1', '2'})
def test_model(self):
fsm = define.MooreMachine()
for encoder in [encode.NestingFSMEncoder, encode.MappingFSMEncoder]:
learner = learn.FSMLearner(fsm, encoder)
learner.add('aaaa', '10010')
with self.assertRaises(learn.LearnError):
learner.model(max_states=2)
model = learner.model()
self.assertTrue(model.eval(z3.Int('n') == 3))
from unittest import TestCase
from z3gi_old import define, encode
class TestMappingEncoder(TestCase):
def test_state(self):
for fsm in [define.MooreMachine(), define.MealyMachine()]:
encoder = encode.MappingFSMEncoder(fsm)
tests = {'': encoder._statemap(define.element(0)),
'ab': encoder._statemap(define.element(2)),
(1, 2): encoder._statemap(define.element(4)),
'a': encoder._statemap(define.element(1)),
'1': encoder._statemap(define.element(3))}
for key in ['', 'ab', (1, 2), 'a', '1']:
state = encoder.state(key)
self.assertTrue(state.sort().eq(define.State))
self.assertEqual(state, tests[key])
def test_transition(self):
for fsm in [define.MooreMachine(), define.MealyMachine()]:
encoder = encode.MappingFSMEncoder(fsm)
tests = {'ab': fsm.transition(encoder.state('a'), define.input('b')),
(1, 2): fsm.transition(encoder.state((1,)), define.input(2))}
encoder = encode.MappingFSMEncoder(fsm)
for key, value in tests.items():
self.assertTrue(encoder.transition(encoder.state(key[:-1]), define.input(key[-1])).eq(tests[key]))
def test_output(self):
fsm = define.MooreMachine()
encoder = encode.MappingFSMEncoder(fsm)
tests = {'': fsm.output(encoder.state('')),
'ab': fsm.output(encoder.state('ab')),
(1, 2): fsm.output(encoder.state((1, 2)))}
for key, value in tests.items():
self.assertTrue(encoder.output(encoder.state(key)).eq(tests[key]))
self.assertTrue(tests[key].sort().eq(define.Output))
fsm = define.MealyMachine()
encoder = encode.MappingFSMEncoder(fsm)
tests = {'a': fsm.output(encoder.state(''), define.input('a')),
'ab': fsm.output(encoder.state(('a',)), define.input('b')),
(1, 2): fsm.output(encoder.state('1'), define.input(2))}
for key, value in tests.items():
self.assertTrue(encoder.output(encoder.state(key[:-1]), define.input(key[-1])).eq(tests[key]))
def test_states(self):
# how can we properly test this at this level?
# simply copying the code is cheating...
# we might as well test this from a functional perspective in `learn'
pass
def test_outputs(self):
# testing this would also be a matter of copying the code,
# and is therefore not very useful.
pass
def test_node_iter(self):
encoder = encode.MappingFSMEncoder(define.MooreMachine())
encoder.state('ab')
encoder.state('aa')
nodes = {0, 1, 2, 3}
for node in encoder._trie:
self.assertTrue(node.id in nodes)
transitions = {(0, 'a', 1), (1, 'b', 2), (1, 'a', 3)}
for node, input, child in encoder._trie.transitions():
self.assertTrue((node.id, input, child.id) in transitions)
from unittest import TestCase
from z3gi_old import define, encode
class TestNestingEncoder(TestCase):
def test_state(self):
for fsm in [define.MooreMachine(), define.MealyMachine()]:
tests = {'': fsm.start,
'ab': fsm.transition(fsm.transition(fsm.start, define.input('a')), define.input('b')),
(1, 2): fsm.transition(fsm.transition(fsm.start, define.input(1)), define.input(2))}
encoder = encode.NestingFSMEncoder(fsm)
for key, value in tests.items():
self.assertTrue(encoder.state(key).eq(tests[key]))
def test_transition(self):
for fsm in [define.MooreMachine(), define.MealyMachine()]:
tests = {'ab': fsm.transition(fsm.transition(fsm.start, define.input('a')), define.input('b')),
(1, 2): fsm.transition(fsm.transition(fsm.start, define.input(1)), define.input(2))}
encoder = encode.NestingFSMEncoder(fsm)
for key, value in tests.items():
self.assertTrue(encoder.transition(encoder.state(key[:-1]), define.input(key[-1])).eq(tests[key]))
def test_output(self):
fsm = define.MooreMachine()
tests = {'': fsm.output(fsm.start),
'ab': fsm.output(fsm.transition(fsm.transition(fsm.start, define.input('a')), define.input('b'))),
(1, 2): fsm.output(fsm.transition(fsm.transition(fsm.start, define.input(1)), define.input(2)))}
encoder = encode.NestingFSMEncoder(fsm)
for key, value in tests.items():
self.assertTrue(encoder.output(encoder.state(key)).eq(tests[key]))
fsm = define.MealyMachine()
tests = {'a': fsm.output(fsm.start, define.input('a')),
'ab': fsm.output(fsm.transition(fsm.start, define.input('a')), define.input('b')),
(1, 2): fsm.output(fsm.transition(fsm.start, define.input(1)), define.input(2))}
encoder = encode.NestingFSMEncoder(fsm)
for key, value in tests.items():
self.assertTrue(encoder.output(encoder.state(key[:-1]), define.input(key[-1])).eq(tests[key]))
def test_states(self):
# how can we properly test this at this level?
# simply copying the code is cheating...
# we might as well test this from a functional perspective in `learn'
pass
def test_outputs(self):
# testing this would also be a matter of copying the code,
# and is therefore not very useful.
pass
import os.path
from encode.fa import MealyEncoder, DFAEncoder
from encode.incremental.fa import MealyIEncoder, MealyTreeIEncoder
from encode.ra import RAEncoder
from learn.algorithm import learn
from learn.algorithm import learn_mbt
from learn.fa import FALearner
from learn.incremental.fa import FAILearner
from learn.ra import RALearner
from parse.importer import build_automaton_from_dot
from sut import SUTType, MealyObservation, StatsSUT, RAObservation, IORAObservation
from sut.fifoset import FIFOSetClass
from sut.login import new_login_sut, LoginClass
from sut.simulation import MealyMachineSimulation
from sut.sut_cache import IOCache, CacheSUT, AcceptorCache
from test import IORATest, MealyTest
from test.chain import TestGeneratorChain
from test.coloring import ColoringTestGenerator
from test.rwalk import IORARWalkFromState, MealyRWalkFromState, DFARWalkFromState, RARWalkFromState
from test.yanna import YannakakisTestGenerator
from tests.iora_testscenario import *
from encode.iora import IORAEncoder
# this module gives examples of simple routines for learning models actively or from traces
# some paths
yan_cmd = os.path.join("..","resources", "binaries", "yannakakis.exe")
models_loc = os.path.join("..","resources", "models")
maestro = os.path.join(models_loc, "bankcards", "MAESTRO.dot")
visa = os.path.join(models_loc, "bankcards", "VISA.dot")
biometric = os.path.join(models_loc, "biometric.dot")
# some example runs
def scalable_learn_iora():
learner = RALearner(IORAEncoder())
test_type = IORATest
exp = sut_m5
(model, statistics) = learn(learner, test_type, exp.traces)
print(model)
print(statistics)
def scalable_learn_mbt_iora():
learner = RALearner(IORAEncoder())
learner.set_timeout(10000)
sut = new_login_sut(1)
mbt = IORARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def scalable_learn_mbt_mealy():
learner = FALearner(MealyEncoder())
learner.set_timeout(1000)
login = LoginClass()
sut = login.new_sut(SUTType.Mealy, 2)
mbt = MealyRWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def scalable_learn_mbt_dfa():
learner = FALearner(DFAEncoder())
learner.set_timeout(100000)
login = LoginClass()
sut = login.new_sut(SUTType.DFA, 2)
mbt = DFARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def scalable_learn_mbt_ra():
learner = RALearner(RAEncoder())
learner.set_timeout(600000)
login = FIFOSetClass()
sut = login.new_sut(SUTType.RA, 1)
mbt = RARWalkFromState(sut, 5, 0.2)
(model, statistics) = learn_mbt(sut,learner, mbt, 10000)
print(model)
print(statistics)
def sim_learn_mbt_yan_mealy(dot_path):
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
(model, statistics) = learn_mbt(cache_sut, learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def sim_learn_mbt_coloryan_mealy(dot_path):
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt1 = ColoringTestGenerator(cache_sut, cache)
mbt2 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt = TestGeneratorChain([mbt1, mbt2])
(model, statistics) = learn_mbt(cache_sut,learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def sim_inc_learn_mbt_coloryan_mealy(dot_path):
learner = FAILearner(MealyTreeIEncoder())
learner.set_timeout(200000)
dot_aut = build_automaton_from_dot("MealyMachine", dot_path)
dot_sut = MealyMachineSimulation(dot_aut)
stats_sut = StatsSUT(dot_sut)
cache = IOCache(MealyObservation)
cache_sut = CacheSUT(stats_sut, cache)
mbt1 = ColoringTestGenerator(cache_sut, cache)
mbt2 = YannakakisTestGenerator(cache_sut, yan_cmd, seed=1)
mbt = TestGeneratorChain([mbt1, mbt2])
(model, statistics) = learn_mbt(cache_sut,learner, mbt, 10000, stats_tracker=stats_sut.stats_tracker())
print(model)
print(statistics)
def scalable_learn_mbt_chainrw_iora():
learner = RALearner(IORAEncoder())
learner.set_timeout(600000)
login = FIFOSetClass()
sut = login.new_sut(SUTType.IORA, 1)
cache = IOCache(IORAObservation)
cache_sut = CacheSUT(sut, cache)
mbt = TestGeneratorChain([ColoringTestGenerator(cache_sut, cache), RARWalkFromState(sut, 5, 0.2)])
(model, statistics) = learn_mbt(cache_sut, learner, mbt, 10000)
print(model)
print(statistics)
#scalable_learn_mbt_mealy()
#scalable_learn_mbt_iora()
sim_learn_mbt_yan_mealy(maestro)
#sim_learn_mbt_mealy()
import functools
from abc import ABCMeta
from typing import Tuple, List, Dict
import collections
from encode.fa import MealyEncoder
from learn import Learner
from learn.fa import FALearner
from model import Automaton
from model.ra import RegisterMachine
from sut import SUTType
from sut import get_no_rst_simulation
from sut.scalable import ScalableSUTClass
from sut.fifoset import FIFOSetClass
from sut.stack import StackClass
from learn.algorithm import Statistics
from statistics import stdev, median
import learn.algorithm as alg
import model.gen as gen
# dirty benchmark module for systems without resets
class SutDesc(collections.namedtuple("SutDesc", 'type size')):
def __str__(self):
return str(self.type).replace("SUTType.","") + "_" + "(" + str(self.size) + ")"
TestDesc = collections.namedtuple("TestDesc", 'max_inputs')
class ExperimentStats(collections.namedtuple("CollectedStats", "states registers "
"inputs ces max_ltime learn_time")):
pass
class CollatedStats(collections.namedtuple("CollatedStats", "exp_succ states registers avg_inputs std_inputs max_ltime avg_ces, avg_ltime std_ltime")):
pass
class Benchmark:
def __init__(self):
self.suts = list()
def add_sut(self, sut_desc):
self.suts.append(sut_desc)
def _run_benchmark(self, sut_desc:SutDesc, test_desc:TestDesc, tout:int,\
min_size:int, max_size:int) -> List[Tuple[SutDesc, ExperimentStats]]:
results = []
size = min_size
while True:
if max_size is not None and size > max_size:
break
sut_type = sut_desc.type
if sut_type is not SUTType.Mealy:
print("No reset not supported")
aut = gen.random_mealy(2, 2, size)
sut = get_no_rst_simulation(aut)
learner = FALearner(MealyEncoder())
learner.set_timeout(tout)
(model, statistics) = alg.learn_no_reset(sut, learner, test_desc.max_inputs)
if model is None:
break
else:
new_sut_desc = SutDesc(sut_desc.type, size)
imp_stats = self._collect_stats(new_sut_desc, model, statistics)
results.append( (new_sut_desc, imp_stats))
size += 1
return results
def _collect_stats(self, sut_desc:SutDesc, model:Automaton, statistics:Statistics) -> ExperimentStats:
learn_inputs = statistics.inputs
learn_time = sum(statistics.learning_times)
max_ltime = max(statistics.learning_times)
ces = len(statistics.learning_times)
return ExperimentStats(states=sut_desc.size, registers=None, inputs=learn_inputs, ces=ces, max_ltime=max_ltime, learn_time=learn_time)
def run_benchmarks(self, test_desc:TestDesc, timeout:int, min_size:int=1, max_size:int=1) -> List[Tuple[SutDesc, ExperimentStats]]:
results = []
for sut_desc in self.suts:
res = self._run_benchmark(sut_desc, test_desc, timeout, min_size, max_size)
results.extend(res)
return results
def collate_stats(sut_desc: SutDesc, exp_stats:List[ExperimentStats]):
if exp_stats is None:
return None
else:
states = [e.states for e in exp_stats]
avg_states = median(states)
regs = [e.registers for e in exp_stats]
if sut_desc.type.has_registers():
avg_reg = median(regs)
else:
avg_reg = None
(not sut_desc.type.has_registers() or len(set(regs)) == 1)
exp_succ = len(exp_stats)
linputs = [e.inputs for e in exp_stats]
ltime = [e.learn_time for e in exp_stats]
maxltimes = [e.max_ltime for e in exp_stats]
ces = [e.ces for e in exp_stats]
return CollatedStats(
exp_succ=exp_succ,
states=avg_states,
registers=avg_reg,
avg_inputs=median(linputs),
avg_ces=median(ces),
std_inputs=0 if len(linputs) == 1 else stdev(linputs),
max_ltime=max(maxltimes),
avg_ltime=median(ltime),
std_ltime=0 if len(ltime) == 1 else stdev(ltime),
)
def print_results(results : List[Tuple[SutDesc, ExperimentStats]]):
if len(results) == 0:
print ("No statistics to report on")
else:
for sut_desc,stats in results:
print(sut_desc, " ", stats)
b = Benchmark()
# the SUT is currently hard-coded
b.add_sut(SutDesc(type=SUTType.Mealy, size=0))
# create a test description
t_desc = TestDesc(max_inputs=2000)
# give an smt timeout value (in ms)
timeout = 30000
# how many times each experiment should be run
num_exp = 20