Commit ac2fc966 authored by Paul Fiterau Brostean's avatar Paul Fiterau Brostean
Browse files

Added a console and trace parsing functionality

parent 4314e699
......@@ -69,7 +69,7 @@ def sim_learn_mbt_ra():
learner = FALearner(MealyEncoder())
learner.set_timeout(10000)
import os.path
maestro_aut = build_automaton_from_dot("MealyMachine", os.path.join("resources", "bankcards", "MAESTRO.dot"))
maestro_aut = build_automaton_from_dot("MealyMachine", os.path.join("resources", "models", "bankcards", "MAESTRO.dot"))
print(maestro_aut)
#exit(2)
maestro_sut = MealyMachineSimulation(maestro_aut)
......
import unittest
from encode.fa import DFAEncoder
from tests.dfa_testscenario import *
from z3gi.learn.fa import FALearner
from tests.dfa_testscenario import sut_m1
from learn.fa import FALearner
num_exp = 1
......
......@@ -17,7 +17,7 @@ class MealyImporterTest(unittest.TestCase):
self.assertEqual(len(aut.transitions()), exp_num_trans, "Number of transitions not equal to expected")
def test_banking(self):
maestro_model_file = os.path.join("..","resources","MAESTRO.dot")
maestro_model_file = os.path.join("..","resources","models","MAESTRO.dot")
mealy = build_automaton_from_dot("MealyMachine", maestro_model_file)
self.check_aut(mealy, exp_num_states=6)
sim = MealyMachineSimulation(mealy)
......
"""A standalone for using the z3gi package from the command line."""
import argparse
import learn.algorithm as alg
import parse.importer as parse
import sut
import model.fa
from encode.fa import MealyEncoder, DFAEncoder
from encode.iora import IORAEncoder
from encode.ra import RAEncoder
from learn.fa import FALearner
from learn.ra import RALearner
from test import AcceptorTest, IORATest, MealyTest
from test.rwalk import RARWalkFromState
from test.rwalk import IORARWalkFromState
from test.rwalk import MealyRWalkFromState
from test.rwalk import DFARWalkFromState
"""some configuration mappings"""
aut2learner={
model.ra.RegisterAutomaton:RALearner(RAEncoder()),
model.ra.IORegisterAutomaton:RALearner(IORAEncoder()),
model.fa.MealyMachine:FALearner(MealyEncoder()),
model.fa.DFA:FALearner(DFAEncoder())
}
aut2rwalkcls={
model.ra.RegisterAutomaton:RARWalkFromState,
model.ra.IORegisterAutomaton:IORARWalkFromState,
model.fa.MealyMachine:MealyRWalkFromState,
model.fa.DFA:DFARWalkFromState
}
aut2testcls={
model.ra.RegisterAutomaton:AcceptorTest,
model.ra.IORegisterAutomaton:IORATest,
model.fa.MealyMachine:MealyTest,
model.fa.DFA:AcceptorTest
}
aut2suttype={
model.ra.RegisterAutomaton: sut.SUTType.RA,
model.ra.IORegisterAutomaton: sut.SUTType.IORA,
model.fa.MealyMachine:sut.SUTType.Mealy,
model.fa.DFA:sut.SUTType.DFA
}
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Inference tool which can:\n'
'(1) passively learn from traces file\n'
'(2) actively learn a system described by a .dot file\n'
'(3) actively learn a scalable SUT (system) of a given size')
parser.add_argument('-m', '--mode', type=str, choices=['traces', 'dot', 'scalable'], required=True)
parser.add_argument('-f', '--file', type=str, help='the file location to the dot/traces file')
parser.add_argument('-a', '--aut', type=str, choices=list(model.defined_formalisms().keys()), required=True,
help='the type of automaton (formalism) described in the file or '
'which characterizes the (scalable) SUT')
parser.add_argument('-sc', '--sut_class', type=str, choices=list(sut.scalable_sut_classes().keys()),
help='the class of the scalable SUT')
parser.add_argument('-s', '--size', type=int, help='the size of the scalable SUT')
# test parameters
parser.add_argument('-mt', '--max_tests', type=int, default=1000, help='the max number of test executed per hyp')
parser.add_argument('-rl', '--rand_length', type=int, default=5, help='the maximum length of the random sequence '
' used by rwalkfromstate')
parser.add_argument('-rp', '--reset_prob', type=float, default=0.2, help='the probability of reseting the SUT after '
'running each test input')
parser.add_argument('-y', '--yannakakis', action='store_true', help='use yannakakis instead of rwalkfromstate '
'(only supports Mealy Machines)')
args = parser.parse_args()
formalism = args.aut
formalisms = model.defined_formalisms()
aut_type = formalisms[formalism]
learner = aut2learner[aut_type]
if args.mode == 'traces':
trace_file = args.file
traces = parse.extract_traces_from_file(args.file, formalism)
(automaton, statistics) = alg.learn(learner, aut2testcls[aut_type], traces)
else:
if args.mode == 'dot':
dot_file = args.file
aut_to_learn = parse.build_automaton_from_dot(formalism, dot_file)
sut_to_learn = sut.get_simulation(aut_to_learn)
elif args.mode == 'scalable':
sut_class_name = args.sut_class
sut_size = args.sut_size
sut_to_learn = sut.get_scalable_sut(sut_class_name, aut2suttype[aut_type], sut_size)
else:
print("Invalid mode ", args.mode)
exit(1)
num_tests = args.max_tests
rand_test_length = args.rand_length
reset_prob = args.reset_prob
test_generator = aut2rwalkcls[aut_type](rand_test_length, reset_prob)
(automaton, statistics) = alg.learn_mbt(learner, test_generator, num_tests)
print("Learned\n", automaton, "\nWith stats\n", statistics)
\ No newline at end of file
......@@ -8,11 +8,11 @@ import define.ra
class RALearner(Learner):
def __init__(self, encoder, solver=None, verbose=False):
super().__init__()
if encoder is None:
raise Exception("RAEncoder has to be supplied")
if not solver:
solver = z3.Solver()
print(z3.describe_tactics())
#solver = z3.Z3_mk_simple_solver("smt")
# example of tactics
#solver = z3.Then('simplify',
......
......@@ -34,16 +34,17 @@ class MultipleTransitionsFired(Exception):
def defined_formalisms():
"""retrieves a dictionary containing the formalisms implemented and their respective classes"""
import inspect
sc = dict()
crt = Automaton
to_visit = set(crt.__subclasses__())
while len(to_visit) > 0:
subclass = to_visit.pop()
if not inspect.isabstract(subclass):
if not inspect.isabstract(subclass) and not subclass.__name__.startswith("Mutable"):
sc[subclass.__name__] = subclass
else:
to_visit.add(subclass)
to_visit = to_visit.union(to_visit, set(subclass.__subclasses__()))
return sc
......
import sys
from model import Automaton, Acceptor, Transition
from model.fa import IOTransition
from model.ra import RATransition, IORATransition
__all__ = [
"to_dot",
]
sep = "_"
def w_state(automaton : Automaton, state):
if isinstance(automaton, Acceptor):
return sep.join(state, str(automaton.is_accepting(state)))
return state
def w_trans(automaton:Automaton, trans : Transition):
input_elements = [str(trans.start_label)]
if isinstance(trans, RATransition):
input_elements.extend([str(trans.guard), str(trans.assignment)])
input_label = sep.join(input_elements)
output_elements = []
if isinstance(automaton, Acceptor):
output_elements.append(str(automaton.is_accepting(trans.end_state)))
elif isinstance(trans, IOTransition):
output_elements.append(trans.output)
elif isinstance(trans, IORATransition):
output_elements.extend(
map(lambda x:str(x), [trans.output_label, trans.output_mapping, trans.output_assignment]))
output_label = sep.join(output_elements)
return '{0} -> {1} [label="{2}/{3}"]'.format( w_state(automaton, trans.start_state),
w_state(automaton, trans.end_state),
input_label,
output_label)
def to_dot (automaton:Automaton, file_name:str):
"""For an automaton produces a .dot representation"""
f = open(file_name, 'w') if file_name is not None else sys.stdout
print('digraph g {\n', file=f)
for state in automaton.states():
print('\t%s;' % w_state(automaton,state), file=f)
for state in automaton.states():
for trans in automaton.transitions(state):
print("\t%s;" % w_trans(automaton, trans), file=f)
print('}', file=f)
if file_name is not None:
f.close()
\ No newline at end of file
from abc import ABCMeta, abstractmethod
from typing import Type, Dict, List, Union
import sys
import inspect
import re
from model import Automaton, Acceptor, Transition
from model.fa import IOTransition, MutableMealyMachine
from model.ra import RATransition, IORATransition, Action
__all__ = [
"extract_traces_from_file",
"build_automaton_from_dot"
]
def extract_traces_from_file(file_name:str, aut_type:Type[Union[Automaton,str]]) -> List[object]:
"""Parses file with traces that is formatted in an adapted Abbadingo format, and returns an accepting/rejecting or an
input/output trace (depending on the line's format).
The format of the file is:
trace1
trace2
...
Depending on the formalism(aut_type), the format of a trace is one of the following:
acc_value[ label]*
acc_value[ label(?value)]
input/output[ input/output]*
input(?value)/output(?value)[ input(?value)/output(?value)]*
"""
line_stream = file_stream(file_name)
if isinstance(aut_type, Automaton):
formalism = aut_type.__name__
else:
formalism = aut_type
formalism=formalism.replace("RegisterAutomaton", "RA")
crt = sys.modules[__name__]
parse_fun = crt.__dict__[formalism+"_parse"]
if parse_fun is None:
sup_form = [a.replace("_parse","") for a in crt.__dict__.keys() if a.endswith("parse") and isinstance(crt.__dict__[a], function)]
print("Formalism not supported")
print("Supported formalisms: ", sup_form)
exit(0)
traces = [parse_fun(line) for line in line_stream]
return traces
def act_string_to_act(act_string):
act_label = act_string[:act_string.index("(")]
param_val = int(act_string[act_string.index("(") + 1:act_string.index(")")]) \
if act_string.index(")") > act_string.index("(") + 1 else None
return Action(act_label, param_val)
def DFA_parse(line):
fields = line.split()
acc = bool(int(fields[0]))
labels = fields[1:]
return (labels, acc)
def RA_parse(line):
fields = line.split()
acc = bool(int(fields[0]))
act_strings = fields[1:]
actions = list(map(act_string_to_act, act_strings))
return actions
def MealyMachine_parse(line):
fields = line.split()
trace = []
for field in fields:
split = field.split('/')
trace.append((split[0], split[1]))
return trace
def IORA_parse(line):
fields = line.split()
trace = []
for field in fields:
split = field.split('/')
trace.append((act_string_to_act(split[0]), act_string_to_act(split[1])))
return trace
def build_automaton_from_dot (aut_type:str, file_name:str) -> Automaton:
"""
Builds an automaton from a .dot model
:param aut_type: the type of automaton described in the .dot file
:param file_name: path/name of the .dot file
:return: The constructed automaton or None
"""
crt = sys.modules[__name__]
importer_cls = getattr(crt, aut_type + "DotImporter")
if importer_cls is None:
print("DotImporter not available for automaton ", aut_type)
print("Available DotImporters: ",
[a.replace("DotImporter","") for a in crt.__dict__.keys() if a.endswith("DotImporter") and isinstance(crt.__dict__[a], type)
and not inspect.isabstract(crt.__dict__[a])])
exit(1)
"""From .dot representation produces an automata"""
dot_stream = file_stream(file_name)
importer = importer_cls()
importer.parse_dot(dot_stream)
automata = importer.build_aut()
return automata
def file_stream(file_name, from_line=0, line_processor=None):
f = open(file_name, 'r')
_ = [f.readline() for i in range(from_line)]
for line in f:
yield line if line_processor is None else line_processor(line)
f.close()
# a simple dot parser using REGEX, in particular, we use labelled capturing groups
# e.g. (?<allones>1*) captures the regex 1*. In case of a capture (or match), the matching string will be stored
# in a group indexed by "allones"
# note, this parser ABSOLUTELY does not implement thorough grammar checks
class DotImporter(metaclass=ABCMeta):
# some patterns
PAT_NO_NEWLINE = "[^(\r\n|[\r\n])]"
PAT_NEWLINE = "(\r\n|[\r\n])"
PAT_STATE = "s[0-9]*"
PAT_STATE_STR = "(?P<state>"+PAT_STATE+")" + "(?P<content>[^>\[]*)"+"(\[.*\])?;?"
PAT_TRANS_LABEL = "\[label=\"(?P<label>.*)\"\];?"
PAT_TRANS_MOVE = "(?P<src>" + PAT_STATE + ")\s*->\s*" + "(?P<dst>"+ PAT_STATE + ")"
PAT_TRANSITION = PAT_TRANS_MOVE+ "\s*" + PAT_TRANS_LABEL
PAT_START = "digraph.*"
PAT_ENDS_ON_LABEL = "(?= " + PAT_TRANS_LABEL + ")"
@abstractmethod
def _visit_state(self, state_str:str, state_content:str):
pass
@abstractmethod
def _visit_transition(self, from_state:str, to_state:str, trans_label:str):
pass
@abstractmethod
def build_aut(self):
pass
def _matching_lines(self, stream, pat, expected=True, stop_at_mismatch=True):
matches = []
for line in stream:
line = line.strip()
print(line)
# if the line only contained spaces continue
if len(line) == 0:
continue
match = re.fullmatch(pat, line)
if match is not None:
matches.append(match)
else:
if stop_at_mismatch is True:
break
if len(matches) == 0:
print("Wrong format, cannot find match for ",pat)
print("Expected format (empty lines/whitespaces ignored): \n"+
self.PAT_START+"\n" +
self.PAT_STATE_STR+"\n" +
"..." +
self.PAT_TRANSITION +
"...")
exit(1)
return matches
def parse_dot(self, line_stream:str):
patterns = dict()
_ = self._matching_lines(line_stream, self.PAT_START, patterns)
state_matches = self._matching_lines(line_stream, self.PAT_STATE_STR, patterns)
for state_match in state_matches:
self._visit_state(state_match.group("state"), state_match.group("content"))
trans_matches = self._matching_lines(line_stream, self.PAT_TRANSITION, patterns, stop_at_mismatch=False)
for trans_match in trans_matches:
self._visit_transition(trans_match.group("src"), trans_match.group("dst"), trans_match.group("label"))
class TransducerDotImporter(DotImporter, metaclass=ABCMeta):
PAT_IOTRANSITION = "(?P<input>[^/]*)/(?P<output>.*)"
@abstractmethod
def _visit_iotransition(self, from_state:str, to_state:str, input_label:str, output_label):
pass
def _visit_transition(self, from_state:str, to_state:str, trans_label:str):
match = re.match(self.PAT_IOTRANSITION, trans_label)
if match is None:
print("Invalid transition label " + trans_label)
print("Expected transition label pattern " + self.PAT_IOTRANSITION)
exit(1)
inp_label = match.group("input")
out_label = match.group("output")
self._visit_iotransition(from_state, to_state, inp_label, out_label)
class MealyMachineDotImporter(TransducerDotImporter):
def __init__(self):
super().__init__()
self._mut_mealy = MutableMealyMachine()
def _visit_state(self, state_str:str, state_content:str):
self._mut_mealy.add_state(state_str)
def _visit_iotransition(self, from_state:str, to_state:str, input_label:str, output_label):
self._mut_mealy.add_transition(from_state, IOTransition(from_state, input_label, output_label, to_state))
def build_aut(self):
return self._mut_mealy.to_immutable()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment