Commit 810b5324 authored by Michele's avatar Michele

Merge branch 'development'

parents f7f05033 a01c49a9
Learning Labelled Transition Systems
==================
This project learns Labelled Transition Systems using concepts of the
Model-Based Testing framework known as **ioco**.
It is an implementation of the algorithm described in the paper
"Active Learning of Nnodeterministic Systems from an **ioco** Perspective"
(see http://dx.doi.org/10.1007/978-3-662-45234-9_16) and successive paper
"Approximate Active Learning of Nondeterministic Input Output Transition
Systems".
*Abstract from the first paper*:
Model-based testing allows the creation of test cases from a model of the
system under test. Often, such models are difficult to obtain, or even not
available. Automata learning helps in inferring the model of a system by
observing its behaviour. The model can be employed for many purposes, such as
testing other implementations, regression testing, or model checking. We
present an algorithm for active learning of nondeterministic, input-enabled,
labelled transitions systems, based on the well known Angluin's L* algorithm.
Under some assumptions, for dealing with nondeterminism, input-enabledness and
equivalence checking, we prove that the algorithm produces a model whose
behaviour is equivalent to the one under learning. We define new properties
for the structure used in the algorithm, derived from the semantics of labelled
transitions systems. Such properties help the learning, by avoiding to query
the system under learning when it is not necessary.
*Abstract from the second paper*:
Constructing a model of a system for model-based testing,
simulation, or model checking can be cumbersome for existing, third
party, or legacy components. Active automata learning, a form of blackbox
reverse engineering, and in particular Angluin’s L*
algorithm, support the automatic inference of a model from a System Under
Learning (SUL), through observations and tests.
Most of the algorithms based on L*, however, deal with complete learning of
deterministic models, thus being unable to cope with nondeterministic SULs,
and always learning a complete and correct model as they are based on
equivalence between the SUL and the model. We present an adaptation of
Angluin’s algorithm for active learning of nondeterministic, input-enabled,
input-output transition systems. It enables dealing with nondeterministic
SULs, and it allows to construct partial, or approximate models, by expressing
the relation between the SUL and the learned model as a refinement relation,
not necessarily an equivalence. Thus, we can reason about learned
models being more, or less precise than others. Approximate learning
has benefits in model-based regression testing: we need not to wait until
a complete model has been learned; with an approximate model **ioco**-based
regression testing can start.
## Synopsis
The active-learning-nondeterministic-systems is an implementation of an
adaptation of
[L*](http://www.cs.berkeley.edu/~dawnsong/teaching/s10/papers/angluin87.pdf) to
nondeterministic systems. The code is based on these scientific papers:
* [Active Learning of Nondeterminisitc Systems from an ioco Perspective](http://link.springer.com/chapter/10.1007%2F978-3-662-45234-9_16)
* [Approximate Active Learning of Nondeterministic Input Output Transition Systems](http://www.italia.cs.ru.nl/html/papers/VT15.pdf)
The goal is to constructing a model of a system for model-based testing,
simulation, or model checking.
## Included Libraries
[NumPy](https://github.com/numpy/numpy)
## Code Example
Check [the examples folder](examples/) for how to use it.
## Motivation
This code exists as a support implementation to the papers mentioned previously.
## Installation
Clone the repository. Then you can modify any file in [examples](examples/),
or create your own.
If you are using a real system under testing (an actual running black box
software), you need to write your own adapters to connect it to the
learning tool. The adapters should inherit from `AbstractTeacher` in
[baseteacher.py](teachers/baseteacher.py) and from `AbstractOracle` in
[baseoracle.py](teachers/baseoracle.py).
In particular, the adapters should implement the abstract methods in
`AbstractTeacher` and `AbstractOracle`. Those method are used by the
learner to ask so called **output** and **observation** queries.
Then you can start learning your system:
```python
teacher = YourOwnAdapterTeacher()
oracle = YourOwnAdapterOracle()
underModel, overModel = LearningAlgorithm(teacher, oracle, maxLoops=10,
tablePreciseness = 10000, modelPreciseness = 0.1)
```
where `underModel` and `overModel` are the under and over approximations
of your system, respectively, `maxLoops` is the limit of learning loops
when the learned models are not changing any more, `tablePreciseness` and
`modelPreciseness` are the levels of preciseness you would like to reach
before stopping.
The learning process stops when either the learned model does not change for
`maxLoops` loops, or when both the preciseness levels are met.
## Contributors
If you want to contribute, or if you have questions, you can contact me by
checking [my contact details](https://gitlab.science.ru.nl/u/mvolpato).
## License
I wrote this code. You can use it, modify it, include it in other projects.
I take no responsibility if something goes wrong. I take compliments if
everything goes well.
I just would like to know if you are using this project (or modifying it or
including it in other projects), so please let me know.
import os,sys,inspect
# Include project dir in path
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.append(parentdir)
print(parentdir)
from systems.implementations import InputOutputLTS
inputs = set([1,2])
outputs = set(['a','b'])
quiescence = 'delta'
I3 = InputOutputLTS(2, inputs, outputs, quiescence)
I3.addTransition(1,1,0)
I3.addTransition(1,2,0)
I3.addTransition(0,1,0)
I3.addTransition(0,2,1)
inputs = set([1,'c'])
outputs = set(['a','b'])
quiescence = 'delta'
I4 = InputOutputLTS(3, inputs, outputs, quiescence)
I4.isInputEnabled()
I4.move(2)
I3.isInputEnabled()
inputs = set(['a','b'])
outputs = set(['x','y'])
quiescence = 'delta'
I1=InputOutputLTS(8, inputs, outputs, quiescence)
I1.addTransition(0,'a',1)
I1.addTransition(0,'a',2)
I1.addTransition(0,'b',0)
I1.addTransition(1,'a',0)
I1.addTransition(1,'b',3)
I1.addTransition(2,'a',0)
I1.addTransition(2,'b',3)
I1.addTransition(2,'x',4)
I1.addTransition(3,'a',0)
I1.addTransition(3,'x',4)
I1.addTransition(3,'b',3)
I1.addTransition(4,'y',2)
I1.addTransition(4,'b',4)
I1.addTransition(4,'a',0)
I1.move('a')
print(I1.currentState())
I1.isInputEnabled()
import os, inspect, sys
# Include project dir in path
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.append(parentdir)
from learning.learning import LearningAlgorithm
from teachers.ltsteachers import InputOutputTeacher
from systems.implementations import InputOutputLTS
from teachers.ltsoracles import InputOutputPowerOracle
import logging
import helpers.bisimulation as bi
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
inputs = set(['a','b'])
outputs = set(['x','y'])
quiescence = 'delta'
I1=InputOutputLTS(5, inputs, outputs, quiescence)
I1.addTransition(0,'a',1)
I1.addTransition(0,'a',2)
I1.addTransition(0,'b',0)
I1.addTransition(1,'a',0)
I1.addTransition(1,'b',3)
I1.addTransition(2,'a',0)
I1.addTransition(2,'b',3)
I1.addTransition(2,'x',4)
I1.addTransition(3,'a',0)
I1.addTransition(3,'x',4)
I1.addTransition(3,'b',3)
I1.addTransition(4,'y',2)
I1.addTransition(4,'b',4)
I1.addTransition(4,'a',0)
I1.makeInputEnabled()
T1 = InputOutputTeacher(I1)
O1 = InputOutputPowerOracle(I1)
currentdir = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))
path = os.path.join(currentdir, "dotFiles")
print("Starting learning...")
# change printPath=None to printPath=path for dot files
L2 = LearningAlgorithm(T1, O1, printPath=None, maxLoops=2, logger=logger)
minus, plus = L2.run()
print("Models learned. Check language equivalence...")
print("hMinus bisimilar to target: " + str(bi.bisimilar(I1,minus)))
print("hPlus bisimilar to target: " + str(bi.bisimilar(I1,plus)))
def bisimilar(system1, system2):
# starting from initial state
state1 = (0,)
state2 = (0,)
past = set()
wait = set()
trace = ()
wait.add((state1, state2, trace))
while wait:
current = wait.pop()
past.add((current[0],current[1]))
system1Labels = system1.getInputs().union(system1.getOutputs())
system1Labels.add(system1.getQuiescence())
system2Labels = system2.getInputs().union(system2.getOutputs())
system2Labels.add(system2.getQuiescence())
enabledLabels_1 = set()
for state in current[0]:
enabledLabels_1 = enabledLabels_1.union(system1.outputs(state))
enabledLabels_1 = enabledLabels_1.union(system1.inputs(state))
enabledLabels_2 = set()
for state in current[1]:
enabledLabels_2 = enabledLabels_2.union(system2.outputs(state))
enabledLabels_2 = enabledLabels_2.union(system2.inputs(state))
if enabledLabels_1 != enabledLabels_2:
# Proved not bisimilar, return False
return (current[0],current[1],enabledLabels_1,enabledLabels_2,current[2])
else:
# Add new pair of states for further checking bisimilarity
transitions_1 = system1.getTransitions()
transitions_2 = system2.getTransitions()
for label in enabledLabels_1:
newStates2 = set()
newStates1 = set()
for state in current[0]:
key = (state, label)
if key not in transitions_1:
if (label == system1.getQuiescence() and
label in system1.outputs(state)):
newStates1.add(state)
continue
for nextState in transitions_1[key]:
newStates1.add(nextState)
tupledStates1 = tuple(newStates1)
tupledStates1 = tuple(sorted(tupledStates1))
for state in current[1]:
key = (state, label)
if key not in transitions_2:
if (label == system2.getQuiescence() and
label in system2.outputs(state)):
newStates2.add(state)
continue
for nextState in transitions_2[key]:
newStates2.add(nextState)
tupledStates2 = tuple(newStates2)
tupledStates2 = tuple(sorted(tupledStates2))
if (tupledStates1, tupledStates2) not in past:
wait.add((tupledStates1, tupledStates2, current[2]+(label,)))
return True
import graphviz as gv
def add_nodes(graph, nodes):
for n in nodes:
if isinstance(n, tuple):
graph.node(n[0], **n[1])
else:
graph.node(n)
return graph
def add_edges(graph, edges):
for e in edges:
if isinstance(e[0], tuple):
graph.edge(*e[0], **e[1])
else:
graph.edge(*e)
return graph
def createDOTFile(lts, path, format='svg'):
g1 = gv.Digraph(format)
states = list(range(lts.getNumStates()))
nodes = []
# Create fake initial state
nodes.append(('_nil', {'style': 'invis'}))
try:
if lts.isChaotic():
# Add chaotic states, useful for suspension automata
nodes.append((str(-1), {'label': u"\u03C7"}))
nodes.append((str(-2), {'label': u"\u03B4"}))
except AttributeError:
pass
for x in states:
if x == 0:
# state 0 is initial state
nodes.append((str(x), {'label': '', 'initial': 'initial'}))
else:
# from numbers to strings
nodes.append((str(x), {'label': ''}))
edges = []
# Create transition from fake initial state
edges.append((('_nil', '0'), {'label': ''}))
transitions = lts.getTransitions()
keys = transitions.keys()
for (state1, label) in keys:
for state2 in transitions[(state1, label)]:
edges.append(((str(state1), str(state2)), {'label': str(label)}))
add_edges(add_nodes(g1, nodes), edges)
filename = g1.render(filename = path)
from .observationtable import Table
import random
from systems.implementations import SuspensionAutomaton
import os, inspect
import helpers.graphhelper as gh
import logging
class LearningAlgorithm:
def __init__(self, teacher, oracle , tablePreciseness = 1000,
modelPreciseness = 0.1, closeStrategy = None,
printPath = None, maxLoops=10, logger=None):
self._logger = logger or logging.getLogger(__name__)
self._teacher = teacher
self._oracle = oracle
self._tablePreciseness = tablePreciseness
self._modelPreciseness = modelPreciseness
self._table = Table(self._teacher.getInputAlphabet().copy(),
self._teacher.getOutputAlphabet().copy(),
self._teacher.getQuiescence(),
closeStrategy, logger=self._logger)
# Maximum number of loops with no effect on hPlus model
self._noEffectLimit = maxLoops
# Current number of loops
self._currentLoop = 0
outputs = self._teacher.getOutputAlphabet()
self._hMinus = SuspensionAutomaton(1,
self._teacher.getInputAlphabet().copy(),
self._teacher.getOutputAlphabet().copy(),
self._teacher.getQuiescence())
self._hPlus = SuspensionAutomaton(1,
self._teacher.getInputAlphabet().copy(),
self._teacher.getOutputAlphabet().copy(),
self._teacher.getQuiescence(),
chaos = True)
self._printPath = printPath
def updateTable(self):
# For all traces for which we did not observed all possible outputs
# TODO: we repeat the update multiple times because otherwise
# it is very easy to get a non closed/non consistent table
# resulting in adding many rows to the top part.
# This value should not be hardcoded
for c in range(500):
for trace in self._table.getObservableTraces():
observedOutputs = self._table.getOutputs(trace)
output = self._teacher.process(trace)
for i in range(10):
# try again if retrieving output is unsuccesful
if output != None:
break
output = self._teacher.process(trace)
if output != None:
# Could not process entire trace
# TODO: We need a clever way of handling this, for isinstance
# we could organize getObservableTraces in a tree, then we follow
# the tree for outputs. I expect that in this way the number of
# traces we are not able to process is smaller.
# Only if trace is a prefix in S, then
# add trace + output to row (S cdot L_delta)
if self._table.isInS(trace):
self._table.addOneLetterExtension(trace, output)
# Update set of outputs for traces where deltas are removed
for deltaTrace in self._table.getDeltaTraces(trace):
self._table.updateEntry(deltaTrace, output)
# Add this output to the set of outputs observed after trace
observedOutputs.add(output)
observation = self._oracle.observation(trace, observedOutputs)
self._table.updateEntry(trace, output, observation)
def stabilizeTable(self):
# While nothing changes, keep closing and consistent the table
closingRows = self._table.isNotGloballyClosed()
consistentCheck = self._table.isNotGloballyConsistent()
while closingRows or consistentCheck:
while closingRows:
self._logger.info("Closing table")
self._logger.debug(closingRows)
self._table.promote(closingRows)
# After promoting one should check if some one letter
# extensions should also be added
if self._table.addOneLetterExtensions(closingRows):
self._logger.info("something changed")
if self._logger.isEnabledFor(logging.DEBUG):
self._table.printTable(prefix="_c_")
self.updateTable()
closingRows = self._table.isNotGloballyClosed()
consistentCheck = self._table.isNotGloballyConsistent()
# Table is closed, check for consistency
if consistentCheck:
self._logger.info("Consistency check")
self._logger.debug(consistentCheck)
if self._table.addColumn(consistentCheck, force=True):
self._logger.info("something changed")
if self._logger.isEnabledFor(logging.DEBUG):
self._table.printTable(prefix="_i_")
# TODO: is an update needed here? in theory, when I encounter
# an inconsistency, by adding a column, the interesting row
# will immediately make the table not closed, no need of
# update, right?
#self.updateTable()
closingRows = self._table.isNotGloballyClosed()
consistentCheck = self._table.isNotGloballyConsistent()
def getHypothesis(self, chaos=False):
# If table is not closed, ERROR
if self._table.isNotGloballyClosed():
self._logger.error("Tried to get hipotheses with table not \
closed or not consistent")
return None, None
# Get equivalence classes
rows = self._table.getEquivalenceClasses(chaos)
hyp = SuspensionAutomaton(len(rows),
self._teacher.getInputAlphabet().copy(),
self._teacher.getOutputAlphabet().copy(),
self._teacher.getQuiescence(),
chaos)
# assign to each equivalence class a state number
# start with equivalence class of empty trace to 0
assignments = {():0}
count = 1
for row in rows:
if row != ():
assignments[row] = count
count = count + 1
allLabels = self._teacher.getInputAlphabet().union(
self._teacher.getOutputAlphabet(),
set((self._teacher.getQuiescence(),)))
# add transitions
for row in rows:
for label in allLabels:
# create row and search it in the table
extension = row + (label,)
if self._table.isInRows(extension):
for target in rows:
found = False
# TODO: the method of table called at next line is
# private. Change to public, or add a public version
if self._table._rowEquality(extension, target, chaos):
hyp.addTransition(assignments[row], label,
assignments[target])
found = True
break
if not found:
# Either the table is not closed, or
# First column of extension has an empty set.
# If label is an input, then send it to ChaosDelta
# If it is an output, then send it to Chaos
# It cannot be quiescence
if label in self._teacher.getInputAlphabet():
hyp.addTransition(assignments[row], label,
hyp.getChaosDelta())
else:
hyp.addTransition(assignments[row], label,
hyp.getChaos())
elif (chaos and label in self._teacher.getOutputAlphabet()):
# Add transitions to chaotic state if necessary
if row in self._table.getObservableTraces():
if label != hyp.getQuiescence():
hyp.addTransition(assignments[row], label,
hyp.getChaos())
else:
hyp.addTransition(assignments[row], label,
hyp.getChaosDelta())
return hyp
# Generate DOT files for hypotheses. hyp = hMinus|hPlus|both
def generateDOT(self, path=None, hyp="both", format="pdf"):
if path == None:
path = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))
pathHminus = os.path.join(path, "hypotheses", "loop_" +
str(self._currentLoop), "hMinus")
pathHplus = os.path.join(path, "hypotheses", "loop_" +
str(self._currentLoop), "hPlus")
if hyp != None:
if hyp == "hMinus" or hyp == "both":
gh.createDOTFile(self._hMinus, pathHminus, format)
if hyp == "hPlus" or hyp == "both":
gh.createDOTFile(self._hPlus, pathHplus, format)
# Start the learning process
def run(self):
noEffectLoops = 0
oldTablePreciseness = self._table.preciseness()
oldModelPreciseness = 0
# While we have not reached our limit for lopps with no effect on
# the table
while noEffectLoops < self._noEffectLimit:
self._currentLoop = self._currentLoop + 1
self._logger.info("Learning loop number " + str(self._currentLoop))
# Fill the table and make it closed and consistent
self.updateTable()
self.stabilizeTable()
# Is the table quiescence reducible? If not make it so and
# then fill it again, make it closed and consitent
while (self._table.isNotQuiescenceReducible() or
not self._table.isStable()):
if self._table.isNotQuiescenceReducible():
self._logger.info("Quiescence reducible")
self._logger.debug(self._table.isNotQuiescenceReducible())
if self._logger.isEnabledFor(logging.DEBUG):
self._table.printTable(prefix="_Q_")
if self._table.addColumn(self._table.isNotQuiescenceReducible(),
force=True):
self._logger.info("somethig changed")
if self._logger.isEnabledFor(logging.DEBUG):
self._table.printTable(prefix="_Qafter_")
self.updateTable()
self.stabilizeTable()
self._hMinus = self.getHypothesis()
self._hPlus = self.getHypothesis(chaos=True)
if self._printPath != None:
self.generateDOT(path=self._printPath)
self._table.printTable(self._printPath,
prefix="loop_"+str(self._currentLoop))
# Table preciseness will increase easily, it does not depend
# on nondeterministic choices. Adding rows and columns to the
# table is enough to increase it.
# On the contrary, model preciseness will increase only if
# something really changed in the behaviour of the system.
# If model preciseness remain the same for too long, then we
# might want to stop learning.
currentModelPreciseness = self._hPlus.preciseness()
if oldModelPreciseness == currentModelPreciseness:
# TODO: problem here. These two values could be the same even
# if the behaviour of the system changed. This might happen in
# rare cases. For the moment I consider those cases so rare
# that I do not handle them.
noEffectLoops = noEffectLoops + 1
else:
oldModelPreciseness = currentModelPreciseness
noEffectLoops = 0
if (self._table.preciseness() < self._tablePreciseness or
currentModelPreciseness < self._modelPreciseness):
self._logger.info("Requested table preciseness: " +
str(self._tablePreciseness) + ". Actual: "+
str(self._table.preciseness()))
self._logger.info("Requested model preciseness: " +
str(self._modelPreciseness) + ". Actual: "+
str(currentModelPreciseness))
# TODO: Improve Preciseness
# For the moment, select a random way to do it:
# Add a random row or a random column, or update the table.
choice = random.sample(set(['extendTable', 'update',
'testing']), 1)[0]
if choice == 'extendTable':
self._logger.info("Improve preciseness method: extend the table")
# TODO: this actually does nothing
self._table.extend()
elif choice == 'update':
self._logger.info("Improve preciseness method: update the table")