Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Rick Smetsers
z3gi
Commits
e71fcef7
Commit
e71fcef7
authored
Oct 06, 2017
by
Paul Fiterau Brostean
Browse files
Added cache.
parent
ca6f7661
Changes
7
Hide whitespace changes
Inline
Side-by-side
z3gi/learn/algorithm.py
View file @
e71fcef7
import
copy
from
typing
import
List
,
Tuple
,
Union
,
cast
from
model
import
Automaton
...
...
@@ -14,36 +15,29 @@ __all__ = [
class
Statistics
():
"""We only refe"""
def
__init__
(
self
):
self
.
num_learner_tests
=
0
self
.
num_learner_inputs
=
0
self
.
suite_size
=
0
self
.
learning_times
=
[]
self
.
model_stats
=
None
def
add_learner_test
(
self
,
test
:
Test
):
""" updates the stats with relevant information from the added test"""
self
.
num_learner_inputs
+=
test
.
size
()
self
.
num_learner_tests
+=
1
pass
def
set_suite_size
(
self
,
num
):
self
.
suite_size
=
num
self
.
resets
=
0
self
.
inputs
=
0
def
add_learning_time
(
self
,
time
):
self
.
learning_times
.
append
(
time
)
def
__str__
(
self
):
return
"Total number of tests used in learning/testing: {0}
\n
"
\
"Total number of inputs used in learning/testing: {1}
\n
"
\
"Total number of hypotheses used in learning/testing: {2}
\n
"
\
"Test suite size: {3}
\n
"
\
"Average learner test size: {4}
\n
"
\
"Learning time for each model: {5}
\n
"
\
"Total learning time: {6} "
.
format
(
self
.
num_learner_tests
,
self
.
num_learner_inputs
,
def
set_num_resets
(
self
,
test_num
):
self
.
resets
=
test_num
def
set_num_inputs
(
self
,
inp_num
):
self
.
inputs
=
inp_num
def
__str__
(
self
):
return
\
"Tests until last hyp: {}
\n
"
\
"Inputs until last hyp: {}
\n
"
\
"Hypotheses used in learning: {}
\n
"
\
"Learning time for each model: {}
\n
"
\
"Total learning time: {} "
.
format
(
self
.
resets
,
self
.
inputs
,
len
(
self
.
learning_times
),
self
.
suite_size
,
self
.
num_learner_inputs
/
self
.
num_learner_tests
,
self
.
learning_times
,
self
.
learning_times
,
sum
(
self
.
learning_times
))
...
...
@@ -92,7 +86,6 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
else
:
definition
=
None
learner
.
add
(
next_test
.
trace
())
statistics
.
add_learner_test
(
next_test
)
done
=
False
learner_tests
=
[
next_test
]
generated_tests
=
[
next_test
]
...
...
@@ -107,9 +100,6 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
if
ret
is
None
:
return
(
None
,
statistics
)
(
model
,
definition
)
=
ret
# for learner_test in learner_tests:
# if learner_test.check(model) is not None:
# raise Exception("Learner test doesn't pass "+str(learner_test.trace()))
end_time
=
int
(
time
.
time
()
*
1000
)
statistics
.
add_learning_time
(
end_time
-
start_time
)
done
=
True
...
...
@@ -117,13 +107,16 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
for
next_test
in
generated_tests
:
ce
=
next_test
.
check
(
model
)
if
ce
is
not
None
:
#print("TEST: ", next_test.trace())
print
(
"CE: "
,
ce
)
#print(model)
learner
.
add
(
ce
)
done
=
False
break
if
not
done
:
continue
test_generator
.
initialize
(
model
)
# we then generate and check new tests, until either we find a CE,
# or the suite is exhausted or we have run the intended number of tests
for
i
in
range
(
0
,
max_tests
):
...
...
@@ -135,11 +128,9 @@ def learn_mbt(learner:Learner, test_generator:TestGenerator, max_tests:int) -> T
if
ce
is
not
None
:
learner_tests
.
append
(
next_test
)
learner
.
add
(
ce
)
statistics
.
add_learner_test
(
next_test
)
done
=
False
break
test_generator
.
terminate
()
#print([str(test.trace() for test in learner_tests)])
statistics
.
set_suite_size
(
len
(
generated_tests
))
return
(
model
,
statistics
)
z3gi/sut/__init__.py
View file @
e71fcef7
...
...
@@ -140,3 +140,22 @@ from sut.scalable import scalable_sut_classes, get_scalable_sut
from
sut.simulation
import
get_simulation
class
StatsSUT
(
SUT
):
def
__init__
(
self
,
sut
:
SUT
):
self
.
_sut
=
sut
self
.
_inputs
=
0
self
.
_resets
=
0
def
input_interface
(
self
):
return
self
.
_sut
.
input_interface
()
def
run
(
self
,
seq
:
List
[
object
]):
self
.
_inputs
+=
len
(
seq
)
self
.
_resets
+=
1
return
self
.
_sut
.
run
(
seq
)
def
inputs
(
self
):
return
self
.
_inputs
def
resets
(
self
):
return
self
.
_resets
z3gi/sut/sut_cache.py
0 → 100644
View file @
e71fcef7
from
abc
import
abstractmethod
,
ABCMeta
from
typing
import
List
,
Type
import
itertools
from
sut
import
SUT
,
TransducerObservation
,
Observation
,
MealyObservation
from
utils
import
Tree
class
Cache
(
metaclass
=
ABCMeta
):
@
abstractmethod
def
from_cache
(
self
,
seq
)
->
Observation
:
pass
@
abstractmethod
def
update_cache
(
self
,
obs
):
pass
class
IOCache
(
Cache
):
def
__init__
(
self
,
obs_gen
):
self
.
_tree
=
Tree
(
itertools
.
count
(
0
))
self
.
_obs_gen
=
obs_gen
def
from_cache
(
self
,
seq
):
node
=
self
.
_tree
outs
=
[]
for
inp
in
seq
:
if
inp
in
node
.
children
:
out_node
=
node
[[
inp
]]
(
out
,
node
)
=
next
(
iter
(
out_node
.
children
.
items
()))
outs
.
append
(
out
)
else
:
return
None
trace
=
list
(
zip
(
seq
,
outs
))
return
self
.
_obs_gen
(
trace
)
def
update_cache
(
self
,
obs
:
TransducerObservation
):
to_add
=
list
(
itertools
.
chain
(
*
map
(
iter
,
obs
.
trace
())))
self
.
_tree
[
to_add
]
class
CacheSUT
(
SUT
):
def
__init__
(
self
,
sut
:
SUT
,
cache
:
Cache
):
self
.
_sut
=
sut
self
.
_cache
=
cache
def
run
(
self
,
seq
:
List
[
object
]):
obs
=
self
.
_cache
.
from_cache
(
seq
)
if
obs
is
None
:
obs
=
self
.
_sut
.
run
(
seq
)
self
.
_cache
.
update_cache
(
obs
)
obs_t
=
self
.
_cache
.
from_cache
(
seq
)
if
obs_t
is
None
or
obs_t
.
trace
()
!=
obs
.
trace
():
print
(
obs
.
trace
())
print
(
self
.
_cache
.
_tree
)
print
(
obs_t
.
trace
())
exit
(
0
)
return
obs
def
input_interface
(
self
):
return
self
.
_sut
.
input_interface
()
#c = IOCache(MealyObservation)
#c.update_cache(MealyObservation([("a","oa"), ("a","ob")]))
#print(c._tree)
#c.update_cache(MealyObservation([("a","oa"), ("b","ob")]))
#print(c._tree)
#c.update_cache(MealyObservation([("b","ob"), ("b","ob")]))
#print(c._tree)
#
#print(c.from_cache(["b", "b"]))
\ No newline at end of file
z3gi/test/__init__.py
View file @
e71fcef7
...
...
@@ -34,6 +34,10 @@ class Test(metaclass=ABCMeta):
"""returns the size (in terms of inputs) of the test"""
pass
@
abstractmethod
def
covers
(
self
,
test
)
->
bool
:
pass
class
EqualTestsMixin
(
metaclass
=
ABCMeta
):
"""doesn't work unfortunately"""
def
__eq__
(
self
,
other
):
...
...
@@ -148,6 +152,15 @@ class TransducerTest(Test):
def
size
(
self
):
return
len
(
self
.
tr
)
def
covers
(
self
,
test
):
if
type
(
test
)
is
type
(
self
)
and
len
(
test
.
trace
())
<=
len
(
self
.
trace
()):
for
((
inp
,
_
),(
inp2
,
_
))
in
zip
(
self
.
trace
(),
test
.
trace
()):
if
inp
!=
inp2
:
return
False
return
True
return
False
class
MealyTest
(
TransducerTest
):
def
__init__
(
self
,
trace
:
List
[
Tuple
[
Symbol
,
Symbol
]]):
super
().
__init__
(
trace
)
...
...
@@ -203,3 +216,11 @@ class AcceptorTest(Test):
def
__ne__
(
self
,
other
):
return
not
self
.
__eq__
(
other
)
def
covers
(
self
,
test
):
if
type
(
test
)
is
type
(
self
)
and
self
.
size
()
<=
test
.
size
():
(
seq
,
_
)
=
self
.
trace
()
(
seq2
,
_
)
=
test
.
trace
()
return
seq
==
seq2
return
False
z3gi/test/yanna.py
View file @
e71fcef7
...
...
@@ -22,6 +22,7 @@ class YannakakisTestGenerator(TestGenerator):
def
initialize
(
self
,
model
:
Automaton
):
dot_content
=
parse
.
exporter
.
to_dot
(
model
)
#print(" ".join([self.yan_path, "=", self.mode, str(self.max_k), str(self.r_length)]))
self
.
yan_proc
=
subprocess
.
Popen
([
self
.
yan_path
,
"="
,
self
.
mode
,
str
(
self
.
max_k
),
str
(
self
.
r_length
)],
stdin
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
bufsize
=
10
,
universal_newlines
=
True
)
self
.
yan_proc
.
stdin
.
write
(
dot_content
)
...
...
z3gi/utils.py
View file @
e71fcef7
...
...
@@ -51,6 +51,7 @@ class Tree(object):
return
seq
def
determinize
(
seq
):
neat
=
{}
neat
[
None
]
=
None
...
...
@@ -59,4 +60,12 @@ def determinize(seq):
if
value
not
in
neat
:
neat
[
value
]
=
i
i
=
i
+
1
return
[(
label
,
neat
[
value
])
for
label
,
value
in
seq
]
\ No newline at end of file
return
[(
label
,
neat
[
value
])
for
label
,
value
in
seq
]
tree
=
Tree
(
itertools
.
count
(
0
))
tree
[[
"ia"
,
"oa"
]]
tree
[[
"ia"
,
"oa"
,
"ia"
,
"oa"
]]
tree
[[
"ia"
,
"oa"
,
"ib"
,
"ob"
]]
tree
[[
"ib"
,
"ob"
,
"ib"
,
"ob"
]]
print
(
"ia"
in
tree
.
children
)
print
(
tree
)
\ No newline at end of file
z3gi/yan_benchmark.py
0 → 100644
View file @
e71fcef7
from
abc
import
ABCMeta
from
typing
import
Tuple
,
List
,
Dict
import
collections
from
encode.fa
import
DFAEncoder
,
MealyEncoder
from
learn
import
Learner
from
learn.fa
import
FALearner
from
model
import
Automaton
from
model.ra
import
RegisterMachine
from
parse.importer
import
build_automaton_from_dot
from
sut
import
SUTType
,
get_simulation
,
MealyObservation
,
StatsSUT
from
learn.algorithm
import
learn_mbt
,
Statistics
from
statistics
import
stdev
,
median
import
os.path
from
sut.sut_cache
import
CacheSUT
,
IOCache
from
test.yanna
import
YannakakisTestGenerator
TestDesc
=
collections
.
namedtuple
(
"TestDesc"
,
'max_tests max_k rand_length'
)
ExpDesc
=
collections
.
namedtuple
(
"ExpDesc"
,
'model_name num_states'
)
class
ExperimentStats
(
collections
.
namedtuple
(
"CollectedStats"
,
"states registers ces tests inputs learn_time"
)):
pass
class
CollatedStats
(
collections
.
namedtuple
(
"CollatedStats"
,
"exp_succ states consistent avg_tests std_tests avg_inputs std_inputs avg_ltime std_ltime"
)):
pass
ModelDesc
=
collections
.
namedtuple
(
"ModelDesc"
,
'name type path'
)
def
get_learner_setup
(
aut
:
Automaton
,
test_desc
:
TestDesc
):
sut
=
get_simulation
(
aut
)
stats_sut
=
StatsSUT
(
sut
)
sut
=
CacheSUT
(
stats_sut
,
IOCache
(
MealyObservation
))
learner
=
FALearner
(
MealyEncoder
())
tester
=
YannakakisTestGenerator
(
sut
,
max_k
=
test_desc
.
max_k
,
rand_length
=
test_desc
.
rand_length
)
return
(
learner
,
tester
,
stats_sut
)
class
Benchmark
:
def
__init__
(
self
):
self
.
model_desc
:
List
[
ModelDesc
]
=
[]
def
add_experiment
(
self
,
mod_desc
:
ModelDesc
):
self
.
model_desc
.
append
(
mod_desc
)
return
self
def
_run_benchmark
(
self
,
mod_desc
:
ModelDesc
,
test_desc
:
TestDesc
,
tout
:
int
)
\
->
List
[
Tuple
[
ExpDesc
,
ExperimentStats
]]:
results
=
[]
aut
=
build_automaton_from_dot
(
mod_desc
.
type
,
mod_desc
.
path
)
(
learner
,
tester
,
stats_sut
)
=
get_learner_setup
(
aut
,
test_desc
)
learner
.
set_timeout
(
tout
)
(
model
,
statistics
)
=
learn_mbt
(
learner
,
tester
,
test_desc
.
max_tests
)
exp_desc
=
ExpDesc
(
mod_desc
.
name
,
len
(
aut
.
states
()))
if
model
is
None
:
results
.
append
((
exp_desc
,
None
))
else
:
statistics
.
inputs
=
stats_sut
.
inputs
()
statistics
.
resets
=
stats_sut
.
resets
()
imp_stats
=
self
.
_collect_stats
(
model
,
statistics
)
results
.
append
(
(
exp_desc
,
imp_stats
))
return
results
def
_collect_stats
(
self
,
model
:
Automaton
,
stats
:
Statistics
)
->
ExperimentStats
:
states
=
len
(
model
.
states
())
registers
=
len
(
model
.
registers
())
if
isinstance
(
model
,
RegisterMachine
)
else
None
return
ExperimentStats
(
states
=
states
,
registers
=
registers
,
inputs
=
stats
.
inputs
,
tests
=
stats
.
resets
,
ces
=
len
(
stats
.
learning_times
),
learn_time
=
sum
(
stats
.
learning_times
))
def
run_benchmarks
(
self
,
test_desc
:
TestDesc
,
timeout
:
int
)
->
List
[
Tuple
[
ExpDesc
,
ExperimentStats
]]:
results
=
[]
for
mod_desc
in
self
.
model_desc
:
res
=
self
.
_run_benchmark
(
mod_desc
,
test_desc
,
timeout
)
results
.
extend
(
res
)
return
results
def
collate_stats
(
sut_desc
:
ExpDesc
,
exp_stats
:
List
[
ExperimentStats
]):
if
exp_stats
is
None
:
return
None
else
:
states
=
[
e
.
states
for
e
in
exp_stats
]
avg_states
=
median
(
states
)
consistent
=
len
(
set
(
states
))
==
1
exp_succ
=
len
(
exp_stats
)
tests
=
[
e
.
tests
for
e
in
exp_stats
]
inputs
=
[
e
.
inputs
for
e
in
exp_stats
]
time
=
[
e
.
learn_time
for
e
in
exp_stats
]
return
CollatedStats
(
exp_succ
=
exp_succ
,
states
=
avg_states
,
consistent
=
consistent
,
avg_tests
=
median
(
tests
),
std_tests
=
0
if
len
(
tests
)
==
1
else
stdev
(
tests
),
avg_inputs
=
median
(
inputs
),
std_inputs
=
0
if
len
(
inputs
)
==
1
else
stdev
(
inputs
),
avg_ltime
=
median
(
time
),
std_ltime
=
0
if
len
(
time
)
==
1
else
stdev
(
time
),
)
#"exp_succ states registers consistent "
# "avg_ltests std_ltests avg_inputs std_inputs "
# "avg_ltime std_ltime"
def
print_results
(
results
:
List
[
Tuple
[
ExpDesc
,
ExperimentStats
]]):
if
len
(
results
)
==
0
:
print
(
"No statistics to report on"
)
else
:
for
sut_desc
,
stats
in
results
:
print
(
sut_desc
,
" "
,
stats
)
b
=
Benchmark
()
# adding learning setups for each type of machine
# no longer used, built function which returns tuple
models_path
=
os
.
path
.
join
(
".."
,
"resources"
,
"models"
)
bankcards_path
=
os
.
path
.
join
(
models_path
,
"bankcards"
)
pdu_path
=
os
.
path
.
join
(
models_path
,
"pdu"
)
biometric
=
ModelDesc
(
"biometric"
,
"MealyMachine"
,
os
.
path
.
join
(
models_path
,
"biometric.dot"
))
bankard_names
=
[
"MAESTRO"
,
"MasterCard"
,
"PIN"
,
"SecureCode"
]
bankcards
=
[
ModelDesc
(
name
,
"MealyMachine"
,
os
.
path
.
join
(
bankcards_path
,
"{}.dot"
.
format
(
name
)))
for
name
in
bankard_names
]
pdus
=
[
ModelDesc
(
"pdu"
+
str
(
i
),
"MealyMachine"
,
os
.
path
.
join
(
pdu_path
,
"model{}.dot"
.
format
(
i
)))
for
i
in
range
(
1
,
7
)]
#b.add_experiment(biometric)
#for bankard in bankcards:
# b.add_experiment(bankard)
#for pdu in pdus:
# b.add_experiment(pdu)
b
.
add_experiment
(
bankcards
[
0
])
# create a test description
t_desc
=
TestDesc
(
max_tests
=
10000
,
max_k
=
3
,
rand_length
=
3
)
# give the smt timeout value (in ms)
timeout
=
60000
# how many times each experiment should be run
num_exp
=
1
# run the benchmark and collect results
results
=
[]
for
i
in
range
(
0
,
num_exp
):
results
+=
b
.
run_benchmarks
(
t_desc
,
timeout
)
print
(
"============================"
)
print_results
(
results
)
sut_dict
=
dict
()
for
sut_desc
,
exp
in
results
:
if
sut_desc
not
in
sut_dict
:
sut_dict
[
sut_desc
]
=
list
()
sut_dict
[
sut_desc
].
append
(
exp
)
# sort according to the sut_desc (the first element)
#results.sort()
# print results
print_results
(
results
)
sut_dict
=
dict
()
for
sut_desc
,
exp
in
results
:
if
sut_desc
not
in
sut_dict
:
sut_dict
[
sut_desc
]
=
list
()
sut_dict
[
sut_desc
].
append
(
exp
)
collated_stats
=
[(
sut_desc
,
collate_stats
(
sut_desc
,
experiments
))
for
sut_desc
,
experiments
in
sut_dict
.
items
()]
for
(
sut_desc
,
c_stat
)
in
collated_stats
:
print
(
sut_desc
,
" "
,
c_stat
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment