Commit f321c0d1 authored by Rick Smetsers's avatar Rick Smetsers
Browse files

Tree improvement and IORA

parent 56e6339e
from z3gi.learn.ra import RALearner
from tests.iora_testscenario import *
from encode.ra import IORAEncoder
def check_ra_against_obs(learner, learned_ra, m, test_scenario):
"""Checks if the learned RA corresponds to the scenario observations"""
for trace, acc in test_scenario.traces:
if learned_ra.accepts(trace) != acc:
learner.print_tree()
print("Register automaton {0} \n\n with model {1} \n doesn't correspond to the observation {2}"
.format(learned_ra, m, str((trace, acc))))
return
for i in range(1,10):
print("Experiment ",i)
learner = RALearner(labels, encoder=IORAEncoder())
exp = sut_m2
for trace in exp.traces:
learner.add(trace)
(_, ra, m) = learner._learn_model(exp.nr_locations,exp.nr_locations+1,exp.nr_registers)
#print(m)
if m is not None:
print(m)
# model = ra.export(m)
# loc = ra.locations
# lbl = ra.labels[labels[0]]
# fresh = ra.fresh
# check_ra_against_obs(learner, model, m, exp)
else:
print("unsat")
\ No newline at end of file
......@@ -25,11 +25,18 @@ RaTestScenario = collections.namedtuple('RaTestCase',
# Define data
sut_m1 = RaTestScenario("Accept everything", [
[(act(0, 'in'), act(100, 'OK'))],
[(act(0, 'in'), act(101, 'OK')), (act(0, 'in'), act(102, 'OK'))]
],
4, 1
)
# IO
sut_m1 = RaTestScenario( "Store value and produce OK output if you get that same value", [
sut_m2 = RaTestScenario( "Store value and produce OK output if you get that same value", [
[(act(0, 'in'), act(100, 'OK')), (act(0, 'in'), act(101, 'OK')), (act(0, 'in'), act(102, 'OK')), (act(0, 'in'), act(103, 'OK'))],
[(act(0, 'in'), act(100, 'OK')), (act(0, 'in'), act(101, 'OK')), (act(0, 'in'), act(102, 'OK')), (act(1, 'in'), act(103, 'NOK'))],
[(act(0, 'in'), act(100, 'OK')), (act(0, 'in'), act(101, 'OK')), (act(1, 'in'), act(102, 'NOK')), (act(0, 'in'), act(103, 'OK'))],
[(act(0, 'in'), act(100, 'OK')), (act(1, 'in'), act(101, 'NOK')), (act(0, 'in'), act(102, 'OK')), (act(0, 'in'), act(103, 'OK'))],
[(act(1, 'in'), act(100, 'NOK')), (act(0, 'in'), act(101, 'OK')), (act(0, 'in'), act(102, 'OK')), (act(0, 'in'), act(103, 'OK'))]],
5, 1)
20, 2)
# TODO make standalone
from define import Automaton
class IORegisterAutomaton(Automaton):
def __init__(self, inputs, outputs, num_locations, num_registers):
super().__init__(inputs + outputs, num_locations + 1, num_registers)
del self.output
self.sink = self.locations[-1]
self.statetype = z3.Function('statetype', self.Location, z3.BoolSort())
def export(self, model):
raise NotImplementedError()
......@@ -27,6 +27,28 @@ class RegisterAutomaton(Automaton):
return ra
class IORegisterAutomaton(Automaton):
def __init__(self, input_labels, output_labels, num_locations, num_registers):
labels = input_labels + output_labels
self.Label, elements = enum('Label', input_labels + output_labels)
self.labels = {labels[i]: elements[i] for i in range(len(labels))}
self.Location, self.locations = enum('Location', ['location{0}'.format(i) for i in range(num_locations + 1)])
self.sink = self.locations[-1]
self.Register, self.registers = enum('Register', ['register{0}'.format(i) for i in range(num_registers)] + ['fresh'])
self.start = self.locations[0]
self.fresh = self.registers[-1]
self.transition = z3.Function('transition', self.Location, self.Label, self.Register, self.Location)
self.output = z3.Function('output', self.Location, z3.BoolSort())
self.used = z3.Function('used', self.Location, self.Register, z3.BoolSort())
self.update = z3.Function('update', self.Location, self.Label, self.Register)
self.loctype = z3.Function('loctype', self.Location, z3.BoolSort())
def export(self, model : z3.ModelRef) -> RegisterAutomaton:
raise NotImplementedError()
class Mapper(object):
def __init__(self, ra):
self.Value = z3.DeclareSort('Value')
......
......@@ -2,7 +2,7 @@ import itertools
import z3
from encode import Encoder
from define.ra import RegisterAutomaton, Mapper
from define.ra import RegisterAutomaton, IORegisterAutomaton, Mapper
from utils import Tree, determinize
......@@ -24,13 +24,11 @@ class RAEncoder(Encoder):
ra = RegisterAutomaton(list(self.labels), num_locations, num_registers)
mapper = Mapper(ra)
constraints = self.axioms(ra, mapper) + \
self.output_constraints(ra, mapper) + \
self.node_constraints(ra, mapper) + \
self.transition_constraints(ra, mapper)
return ra, constraints
@staticmethod
def axioms(ra : RegisterAutomaton, mapper):
def axioms(self, ra: RegisterAutomaton, mapper: Mapper):
l = z3.Const('l', ra.Label)
q, qp = z3.Consts('q qp', ra.Location)
r, rp = z3.Consts('r rp', ra.Register)
......@@ -100,7 +98,7 @@ class RAEncoder(Encoder):
return axioms
def output_constraints(self, ra, mapper):
def node_constraints(self, ra, mapper):
constraints = []
for node, accept in self.cache.items():
n = mapper.element(node.id)
......@@ -113,13 +111,15 @@ class RAEncoder(Encoder):
def transition_constraints(self, ra, mapper):
constraints = [ra.start == mapper.map(mapper.start)]
values = {mapper.init}
for node, label, value, child in self.tree.transitions():
for node, (label, value), child in self.tree.transitions():
n = mapper.element(node.id)
l = ra.labels[label]
v = mapper.value(value)
c = mapper.element(child.id)
r, rp = z3.Consts('r rp', ra.Register)
values.add(v)
constraints.extend([
# If the transition is over a register, then the register is in use.
z3.ForAll(
......@@ -206,7 +206,385 @@ class RAEncoder(Encoder):
),
ra.transition(mapper.map(n), l, ra.fresh) == mapper.map(c)),
])
constraints.append(z3.Distinct(list(values)))
return constraints
class IORAEncoder(Encoder):
def __init__(self):
self.tree = Tree(itertools.count(0))
self.values = set()
self.input_labels = set()
self.output_labels = set()
def add(self, trace):
seq = list(itertools.chain(*map(iter, trace)))
_ = self.tree[determinize(seq)]
self.values.update([action.value for action in seq])
self.input_labels.update([action.label for action in [i for (i, o) in trace]])
self.output_labels.update([action.label for action in [o for (i, o) in trace]])
def build(self, num_locations, num_registers):
ra = IORegisterAutomaton(list(self.input_labels), list(self.output_labels), num_locations, num_registers)
mapper = Mapper(ra)
constraints = []
constraints.extend(self.axioms(ra, mapper))
constraints.extend(self.node_constraints(ra, mapper))
constraints.extend(self.transition_constraints(ra, mapper))
return ra, constraints
def axioms(self, ra: IORegisterAutomaton, mapper: Mapper):
l, lp = z3.Consts('l lp', ra.Label)
q, qp = z3.Consts('q qp', ra.Location)
r, rp = z3.Consts('r rp', ra.Register)
axioms = [
# In the start state of the mapper,
# all registers contain an uninitialized value.
z3.ForAll(
[r],
mapper.valuation(mapper.start, r) == mapper.init
),
# # If two locations are connected with both register and fresh transitions,
# # then you have to do an update on a different register (otherwise you should merge the two transitions)
z3.ForAll(
[q, l, r],
z3.Implies(
z3.And(
r != ra.fresh,
ra.transition(q, l, ra.fresh) == ra.transition(q, l, r),
ra.transition(q, l, ra.fresh) != ra.sink
),
z3.And(
ra.update(q, l) != ra.fresh,
ra.update(q, l) != r
)
)
),
# The fresh register is never used
z3.ForAll(
[q],
z3.Implies(
q != ra.sink,
ra.used(q, ra.fresh) == False
)
),
# If a variable is used after a transition, it means it was either used before, or it was assigned
z3.ForAll(
[q, l, r, rp],
z3.Implies(
z3.And(
ra.used(ra.transition(q, l, rp), r) == True,
q != ra.sink
),
z3.Or(
ra.used(q, r) == True,
z3.And(
ra.update(q, l) == r,
rp == ra.fresh
),
)
)
),
# If a variable is updated, then it should have been used.
z3.ForAll(
[q, l, r],
z3.Implies(
z3.And(
r != ra.fresh,
ra.update(q, l) == r,
q != ra.sink
),
ra.used(ra.transition(q, l, ra.fresh), r) == True
)
),
# Registers are not used in the start state
z3.ForAll(
[r],
ra.used(ra.start, r) == False
)
]
# # IO axioms
axioms.extend([
# The sink is labeled as an input location
ra.loctype(ra.sink) == True,
# Alternating input and output locations
z3.ForAll(
[q, l, r],
z3.If(
ra.loctype(q) == True,
z3.Or(
ra.loctype(ra.transition(q, l, r)) == False,
ra.transition(q, l, r) == ra.sink
),
ra.loctype(ra.transition(q, l, r)) == True
)
),
# The sink is the only rejecting state
z3.ForAll(
[q],
z3.If(
q == ra.sink,
ra.output(q) == False,
ra.output(q) == True
)
),
# The start location is an input location
ra.loctype(ra.start) == True,
# In output locations, there's only one transition possible
z3.ForAll(
[q, l, lp, r, rp],
z3.Implies(
z3.And(
q != ra.sink,
ra.loctype(q) == False,
ra.transition(q, l, r) != ra.sink,
z3.Or(
r != rp,
l != lp
)
),
ra.transition(q, lp, rp) == ra.sink
)
),
# The sink is a sink
z3.ForAll(
[l, r],
ra.transition(ra.sink, l, r) == ra.sink
),
# The sink doesn't update registers
z3.ForAll(
[l],
ra.update(ra.sink, l) == ra.fresh
)
])
# From input locations, all output transitions go to sink.
axioms.extend([
z3.ForAll(
[q, r],
z3.Implies(
ra.loctype(q) == True,
ra.transition(q, ra.labels[l], r) == ra.sink
)
)
for l in self.output_labels])
# From output locations, all input transitions go to sink.
axioms.extend([
z3.ForAll(
[q, r],
z3.Implies(
ra.loctype(q) == False,
ra.transition(q, ra. labels[l], r) == ra.sink
)
)
for l in self.input_labels])
# # Input enabled
# axioms.extend([
# z3.ForAll(
# [q, r],
# z3.Implies(
# z3.And(
# ra.loctype(q) == True,
# q != ra.sink
# ),
# ra.transition(q, ra.labels[l], r) != ra.sink
# )
# )
# for l in self.input_labels])
return axioms
def node_constraints(self, ra, mapper):
constraints = []
for node in self.tree:
n = mapper.element(node.id)
constraints.append(mapper.map(n) != ra.sink)
return constraints
def transition_constraints(self, ra, mapper):
constraints = [ra.start == mapper.map(mapper.start), ra.sink != ra.start]
values = {mapper.init}
for node, (label, value), child in self.tree.transitions():
n = mapper.element(node.id)
l = ra.labels[label]
v = mapper.value(value)
c = mapper.element(child.id)
r, rp = z3.Consts('r rp', ra.Register)
constraints.extend([
# If the transition is over a register, then the register is in use.
z3.ForAll(
[r],
z3.Implies(
z3.And(
r != ra.fresh,
ra.transition(mapper.map(n), l, r) == mapper.map(c)
),
ra.used(mapper.map(n), r) == True
)
),
# If a non-fresh register has changed, it must have been updated
z3.ForAll(
[r],
z3.Implies(
z3.And(
r != ra.fresh,
mapper.valuation(c, r) != mapper.valuation(n, r)
),
ra.update(mapper.map(n), l) == r
)
),
# If a used, non-fresh register keeps its value, then it was not updated.
z3.ForAll(
[r],
z3.Implies(
z3.And(
r != ra.fresh,
mapper.valuation(c, r) == mapper.valuation(n, r),
ra.used(mapper.map(n), r) == True
),
ra.update(mapper.map(n), l) != r
)
),
# If a non-fresh register is updated, and c and n are connect by fresh,
# then in c there is a register whose value is v,
# else the valuation is maintained.
z3.ForAll(
[r],
z3.If(
z3.And(
r != ra.fresh,
ra.update(mapper.map(n), l) == r,
ra.transition(mapper.map(n), l, ra.fresh) == mapper.map(c)
),
z3.Exists(
[rp],
z3.And(
rp != ra.fresh,
mapper.valuation(c, rp) == v
)
),
mapper.valuation(c, r) == mapper.valuation(n, r)
)
)
])
# Map to the right transition
path = [v for (l, v) in node.path()]
print(path)
if value in path:
if label in self.input_labels:
constraints.append(
z3.If(
z3.Exists(
[r],
z3.And(
r != ra.fresh,
mapper.valuation(n, r) == v
)
),
z3.ForAll(
[r],
z3.Implies(
z3.And(
r != ra.fresh,
mapper.valuation(n, r) == v
),
z3.If(
ra.used(mapper.map(n), r) == True,
# it might not keep the valuation
ra.transition(mapper.map(n), l, r) == mapper.map(c),
ra.transition(mapper.map(n), l, ra.fresh) == mapper.map(c)
)
)
),
ra.transition(mapper.map(n), l, ra.fresh) == mapper.map(c)
)
)
elif label in self.output_labels:
constraints.append(
z3.If(
z3.Exists(
[r],
z3.And(
r != ra.fresh,
mapper.valuation(n, r) == v
)
),
z3.ForAll(
[r],
z3.Implies(
z3.And(
r != ra.fresh,
mapper.valuation(n, r) == v
),
z3.And(
ra.used(mapper.map(n), r) == True,
ra.transition(mapper.map(n), l, r) == mapper.map(c),
)
)
),
False
)
)
else:
raise Exception("We did something wrong")
else:
# Take the fresh transition
constraints.append(ra.transition(mapper.map(n), l, ra.fresh) == mapper.map(c))
# # Map to the right transition
# z3.If(
# z3.Exists(
# [r],
# z3.And(
# r != ra.fresh,
# mapper.valuation(n, r) == v
# )
# ),
# z3.ForAll(
# [r],
# z3.Implies(
# z3.And(
# r != ra.fresh,
# mapper.valuation(n, r) == v
# ),
# z3.If(
# ra.used(mapper.map(n), r) == True,
# # it might not keep the valuation
# ra.transition(mapper.map(n), l, r) == mapper.map(c),
# ra.transition(mapper.map(n), l, ra.fresh) == mapper.map(c)
# )
# )
# ),
# ra.transition(mapper.map(n), l, ra.fresh) == mapper.map(c)
# )
values.add(v)
constraints.append(z3.Distinct(list(values)))
# Do we need to make labels distinct as well?
return constraints
......@@ -35,21 +35,19 @@ class RALearner(Learner):
def print_tree(self):
self.encoder.print_tree()
def _learn_model(self, min_locations, max_locations, num_registers):
def _learn_model(self, min_locations=1, max_locations=20, max_registers=10):
"""generates the definition and model for an ra whose traces include the traces added so far"""
if self.num_locations is None:
self.num_locations = min_locations
if self.num_registers is None:
self.num_registers = num_registers
num_values = len(self.encoder.values)
for num_locations in range(max(self.num_locations, min_locations), max_locations + 1):
for num_registers in range(self.num_registers, max(self.num_registers, min(num_values, num_locations))):
for num_locations in range(min_locations, max_locations + 1):
# TODO: calculate max registers based on repeating values
for num_registers in range(0, min(max_registers, num_locations) + 1):
ra, constraints = self.encoder.build(num_locations, num_registers)
self.solver.add(constraints)
print(num_locations, num_registers)
result = self.solver.check()
if self.verbose:
print("Learning with {0} locations and {1} registers. Result: {2}"
.format(num_locations, num_registers, result))
.format(num_locations, num_registers, result))
if result == z3.sat:
model = self.solver.model()
self.num_locations = num_locations
......
......@@ -6,12 +6,15 @@ class Tree(object):
self.id = next(counter)
self.counter = counter
self.children = {}
self.parent = None
def __getitem__(self, seq):
trie = self
for action in seq:
if action not in trie.children:
trie.children[action] = Tree(self.counter)
child = Tree(self.counter)
child.parent = trie
trie.children[action] = child
trie = trie.children[action]
return trie
......@@ -22,8 +25,8 @@ class Tree(object):
def transitions(self):
for node in self:
for label, value in node.children:
yield node, label, value, node.children[(label, value)]
for action in node.children:
yield node, action, node.children[action]
def __str__(self, tabs=0):
space = "".join(["\t" for _ in range(0, tabs)])
......@@ -33,6 +36,20 @@ class Tree(object):
tree += ")"
return tree
def path(self):
seq = []
node = self
parent = node.parent
while parent:
for action in parent.children:
if parent.children[action] == node:
seq.append(action)
break
node = parent