# Copyright (c) 2015 Michele Volpato # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from .observationtable import Table import random from systems.implementations import SuspensionAutomaton import os, inspect import helpers.graphhelper as gh import logging import helpers.traces as th from systems.iopurpose import InputPurpose, OutputPurpose class LearningAlgorithm: def __init__(self, teacher, oracle, tester, tablePreciseness = 1000, modelPreciseness = 0.1, closeStrategy = None, printPath = None, maxLoops=10, logger=None, outputPurpose=None, inputPurpose=None): self._logger = logger or logging.getLogger(__name__) self._inputPurpose = inputPurpose self._outputPurpose = outputPurpose # If input (output) purpose is not defined, then add one that # returns always all inputs (outputs) if self._inputPurpose == None: self._inputPurpose = InputPurpose(teacher.getInputAlphabet().copy()) if self._outputPurpose == None: self._outputPurpose = OutputPurpose(teacher.getOutputAlphabet().union(set((teacher.getQuiescence(),)))) self._teacher = teacher self._oracle = oracle self.tester = tester self._tablePreciseness = tablePreciseness self._modelPreciseness = modelPreciseness self._table = Table(teacher.getInputAlphabet().copy(), teacher.getOutputAlphabet().copy(), teacher.getQuiescence(), closeStrategy, logger=logger, outputPurpose=self._outputPurpose, inputPurpose=self._inputPurpose) # Maximum number of loops with no effect on hPlus model self._noEffectLimit = maxLoops # Current number of loops self._currentLoop = 0 outputs = self._teacher.getOutputAlphabet() self._hMinus = SuspensionAutomaton(1, self._teacher.getInputAlphabet().copy(), self._teacher.getOutputAlphabet().copy(), self._teacher.getQuiescence()) self._hPlus = SuspensionAutomaton(1, self._teacher.getInputAlphabet().copy(), self._teacher.getOutputAlphabet().copy(), self._teacher.getQuiescence(), chaos = True) self._printPath = printPath # this update uses a realistic teacher. If I need an output to happen I # cannot force it to happen. def updateTable(self): # First, try to avoid impossible traces: ask observation query for trace in self._table.getObservableTraces(): observedOutputs = self._table.getOutputs(trace) observation = self._oracle.observation(trace, observedOutputs) if observation: self._table.updateEntry(trace, observation=observation) # For all traces for which we did not observed all possible outputs oTraces = self._table.getObservableTraces() trie = th.make_trie(oTraces) # Until we tried K times with no results, where K is the number of # observable traces times the number of outputs (including quiescence) K = len(oTraces) * 35 # (len(self._teacher.getOutputAlphabet()) + 1) # TODO comment from * found = 0 tries = 0 while tries < K: tries += 1 oTraces = self._table.getObservableTraces() trie = th.make_trie(oTraces) subtrie = trie # if no trace is observable (best scenario) if len(oTraces) == 0: break observations = {} # Dictionary with obtained outputs consecutiveInputs = () # keep track of last inputs sequence currentTrace = () # keep track of current trace # We build a trace until we either # 1 - observe an output that makes the trace not a prefix # 2 - there is no continuation of that trace in prefixes # We stop when we observed at least an output for each observable while len(oTraces) > len(observations.keys()): # check if trie contains no traces (but still has a child) children = trie.keys() hasTrace = False for child in children: if trie[child] != {}: hasTrace = True if not hasTrace: break # if currentTrace is observable and we did not process it # already, we ask an output and we add the result to # observations[currentTrace] if (currentTrace in oTraces and currentTrace not in observations.keys()): # there might be some inputs waiting to be processed if consecutiveInputs != (): output = self._processInputs(consecutiveInputs) # reset the inputs, because we processed them consecutiveInputs = () if output == None: # SUT not input enabled: reset currentTrace = () subtrie = trie self._teacher.reset() continue else: # no input to process, ask an output output = self._teacher.output() # we have an output for currentTrace, add it to observations # this is the first output we observe for currentTrace observations[currentTrace] = set([output]) # remove currentTrace from trie th.remove_from_trie(trie, currentTrace) # if that output is not a valid continuation if output not in subtrie.keys(): # reset the process currentTrace = () subtrie = trie self._teacher.reset() continue # navigate trie subtrie = subtrie[output] currentTrace = currentTrace + (output,) else: # currentTrace not observable, or already observed # get an input from subtries children = subtrie.keys() inputChildren = [x for x in children \ if x in self._teacher.getInputAlphabet()] if len(inputChildren) > 0: # process this input, add it to consecutiveInputs # and navigate subtrie input = random.sample(inputChildren,1)[0] consecutiveInputs = consecutiveInputs + (input,) subtrie = subtrie[input] currentTrace = currentTrace + (input,) continue else: # no inputs available, wait for output # there might be some inputs waiting to be processed if consecutiveInputs != (): output = self._processInputs(consecutiveInputs) # reset the inputs, because we processed them consecutiveInputs = () if output == None: # SUT not input enabled: reset currentTrace = () subtrie = trie self._teacher.reset() continue else: # no input to process, ask an output output = self._teacher.output() # we have an output for currentTrace, # if currentTrace is in otraces add it to observations if currentTrace in oTraces: observations[currentTrace].add(output) # remove currentTrace from trie if th.in_trie(trie, currentTrace): th.remove_from_trie(trie, currentTrace) # if that output is not a valid continuation if output not in subtrie.keys(): # reset the process currentTrace = () subtrie = trie self._teacher.reset() continue # navigate trie subtrie = subtrie[output] currentTrace = currentTrace + (output,) # end while loop # observations contains observed outputs found += len(observations.keys()) for trace in observations.keys(): # Only if trace is a prefix in S, then # add trace + output to row (S cdot L_delta) if self._table.isInS(trace): for output in observations[trace]: self._table.addOneLetterExtension(trace, output) # Update set of outputs for traces where deltas are removed for deltaTrace in self._table.getDeltaTraces(trace): for output in observations[trace]: self._table.updateEntry(deltaTrace, output=output) for output in observations[trace]: self._table.updateEntry(trace, output=output) # Observation query # ask observation query for all entries because I could have added # some 'impossible' traces for trace in self._table.getObservableTraces(): observedOutputs = self._table.getOutputs(trace) observation = self._oracle.observation(trace, observedOutputs) if observation: self._table.updateEntry(trace, observation=observation) # # this update function uses teacher.process(trace) # # in case a InputOutputTeacher is used, outputs in trace are forced to happen # # this is not realistic, but still useful at the moment. # def oldUpdateTable(self): # temp = 0 # tot = 0 # for c in range(200): # for trace in self._table.getObservableTraces(): # observedOutputs = self._table.getOutputs(trace) # output = self._teacher.process(trace) # for i in range(10): # # try again if retrieving output is unsuccesful # if output != None: # break # output = self._teacher.process(trace) # tot += 1 # if output != None: # # Only if trace is a prefix in S, then # # add trace + output to row (S cdot L_delta) # if self._table.isInS(trace): # self._table.addOneLetterExtension(trace, output) # # # Update set of outputs for traces where deltas are removed # for deltaTrace in self._table.getDeltaTraces(trace): # self._table.updateEntry(deltaTrace, output) # # # Add this output to the set of outputs observed after trace # observedOutputs.add(output) # else: # temp += 1 # # observation = self._oracle.observation(trace, observedOutputs) # self._table.updateEntry(trace, output, observation) def _processInputs(self, consecutiveInputs): if consecutiveInputs != (): output = self._teacher.oneOutput(consecutiveInputs) if output == None: # SUT did not accept an input. self._logger.warning("SUT did not accept input in " + str(consecutiveInputs)) return None return output return self._teacher.output() def stabilizeTable(self): # While nothing changes, keep closing and consistent the table closingRows = self._table.isNotGloballyClosed() consistentCheck = self._table.isNotGloballyConsistent() while closingRows or consistentCheck: while closingRows: self._logger.debug("Table is not closed") self._logger.debug(closingRows) self._table.promote(closingRows) # After promoting one should check if some one letter # extensions should also be added self._table.addOneLetterExtensions(closingRows) if self._logger.isEnabledFor(logging.DEBUG): self._table.printTable(prefix="_c_") self.updateTable() closingRows = self._table.isNotGloballyClosed() consistentCheck = self._table.isNotGloballyConsistent() # Table is closed, check for consistency if consistentCheck: self._logger.debug("Table is not consistent") self._logger.debug(consistentCheck) self._table.addColumn(consistentCheck, force=True) if self._logger.isEnabledFor(logging.DEBUG): self._table.printTable(prefix="_i_") # TODO: is an update needed here? in theory, when I encounter # an inconsistency, by adding a column, the interesting row # will immediately make the table not closed, no need of # update, right? #self.updateTable() closingRows = self._table.isNotGloballyClosed() consistentCheck = self._table.isNotGloballyConsistent() def getHypothesis(self, chaos=False): # If table is not closed, ERROR if self._table.isNotGloballyClosed(): self._logger.error("Tried to get hypotheses with table not \ closed or not consistent") return None, None # Get equivalence classes rows = self._table.getEquivalenceClasses(chaos) hyp = SuspensionAutomaton(len(rows), self._teacher.getInputAlphabet().copy(), self._teacher.getOutputAlphabet().copy(), self._teacher.getQuiescence(), chaos) # assign to each equivalence class a state number # start with equivalence class of empty trace to 0 assignments = {():0} count = 1 for row in rows: if row != (): assignments[row] = count count = count + 1 # add transitions for row in rows: # TODO reduce allLabels set, use outputExpert enabledOuptuts = self._table.getOutputs(row) allLabels = self._getAllLabels(row, enabledOuptuts) for label in allLabels: # create row and search it in the table extension = row + (label,) if self._table.isInRows(extension): for target in rows: found = False # TODO: the method of table called at next line is # private. Change to public, or add a public version if self._table._moreSpecificRow(extension, target, chaos): hyp.addTransition(assignments[row], label, assignments[target]) found = True break if not found: self._logger.warning("Chaotic behaviour") # Either the table is not closed, or # First column of extension has an empty set. # If label is an input, then send it to ChaosDelta # - unless it is not enabled. # If it is an output, then send it to Chaos # It cannot be quiescence if label in self._teacher.getInputAlphabet(): hyp.addTransition(assignments[row], label, hyp.getChaosDelta()) elif label in self._table.getPossibleOutputs(row): hyp.addTransition(assignments[row], label, hyp.getChaos()) elif (len(row) > 0 and row[-1] == hyp.getQuiescence() and label == hyp.getQuiescence()): # fixing issue #2: non existing row because it ends with # a sequence of quiescence hyp.addTransition(assignments[row], label, assignments[row]) elif (chaos and label in self._table.getPossibleOutputs(row)): self._logger.warning("Chaotic behaviour 2") # Add transitions to chaotic state if necessary if row in self._table.getObservableTraces(): if label != hyp.getQuiescence(): hyp.addTransition(assignments[row], label, hyp.getChaos()) else: hyp.addTransition(assignments[row], label, hyp.getChaosDelta()) return hyp # Get all labels enabled after row # check input enabledness def _getAllLabels(self, row, outputs): enabledInputs = self._teacher.getInputAlphabet() enabledOutputs = self._teacher.getOutputAlphabet() if self._inputPurpose != None: enabledInputs = self._inputPurpose.getEnabled(row, outputs) if self._outputPurpose != None: enabledOutputs = self._outputPurpose.getEnabled(row, outputs) allLabels = enabledInputs.union(enabledOutputs, set((self._teacher.getQuiescence(),))) return allLabels # Generate DOT files for hypotheses. hyp = hMinus|hPlus|both def generateDOT(self, path=None, hyp="both", format="pdf"): if path == None: path = os.path.dirname(os.path.abspath( inspect.getfile(inspect.currentframe()))) pathHminus = os.path.join(path, "hypotheses", "loop_" + str(self._currentLoop), "hMinus") pathHplus = os.path.join(path, "hypotheses", "loop_" + str(self._currentLoop), "hPlus") if hyp != None: if hyp == "hMinus" or hyp == "both": gh.createDOTFile(self._hMinus, pathHminus, format) if hyp == "hPlus" or hyp == "both": gh.createDOTFile(self._hPlus, pathHplus, format) # Start the learning process def run(self): noEffectLoops = 0 oldTablePreciseness = self._table.preciseness() oldModelPreciseness = 0 # While we have not reached our limit for lopps with no effect on # the table while noEffectLoops < self._noEffectLimit: self._currentLoop = self._currentLoop + 1 self._logger.info("Learning loop number " + str(self._currentLoop)) # Fill the table and make it closed and consistent self.updateTable() # TODO learning sometimes stops here! self.stabilizeTable() # Is the table quiescence reducible? If not make it so and # then fill it again, make it closed and consitent newSuffixes = self._table.isNotQuiescenceReducible() while (newSuffixes or not self._table.isStable()): if newSuffixes: self._logger.debug("Table is not quiescence reducible") self._logger.debug(newSuffixes) if self._table.addColumn(newSuffixes, force=True): if self._logger.isEnabledFor(logging.DEBUG): self._table.printTable(prefix="_Q_") self.updateTable() self.stabilizeTable() newSuffixes = self._table.isNotQuiescenceReducible() self._hMinus = self.getHypothesis() self._hPlus = self.getHypothesis(chaos=True) if self._printPath != None: self.generateDOT(path=self._printPath) self._table.printTable(self._printPath, prefix="loop_"+str(self._currentLoop)) # Table preciseness will increase easily, it does not depend # on nondeterministic choices. Adding rows and columns to the # table is enough to increase it. # On the contrary, model preciseness will increase only if # something really changed in the behaviour of the system. # If model preciseness remain the same for too long, then we # might want to stop learning. currentModelPreciseness = self._hPlus.preciseness() if oldModelPreciseness == currentModelPreciseness: # TODO: problem here. These two values could be the same even # if the behaviour of the system changed. This might happen in # rare cases. For the moment I consider those cases so rare # that I do not handle them. noEffectLoops = noEffectLoops + 1 else: oldModelPreciseness = currentModelPreciseness noEffectLoops = 0 if (self._table.preciseness() < self._tablePreciseness or currentModelPreciseness < self._modelPreciseness): self._logger.info("Requested table preciseness: " + str(self._tablePreciseness) + ". Actual: "+ str(self._table.preciseness())) self._logger.info("Requested model preciseness: " + str(self._modelPreciseness) + ". Actual: "+ str(currentModelPreciseness)) samples = set(['extendTable', 'testing']) # if model preciseness is 1 (maximum), update does not make sense if (currentModelPreciseness) < 1: samples.add('update') choice = random.sample(samples, 1)[0] # TODO: forcing testing: remove!! choice = 'testing' if choice == 'extendTable': self._logger.info("Improve preciseness method: extend the table") # create a set of suffix closed suffixes and add them # to the table # TODO: are there clever ways to create it? columns = set() # Get maximum length in order to be sure to modify the Table length = self._table.getLengthColumns() + 1 # we want to avoid quiescence for the moment TODO inAndOut = self._teacher.getInputAlphabet().union( self._teacher.getOutputAlphabet()) newColumn = () for i in range(length): label = random.sample(inAndOut,1)[0] newColumn = (label,) + newColumn columns.add(newColumn) self._logger.debug(newColumn) self._table.addColumn(columns,True) # TODO: I do not like the next part very much, commented # for the moment. Maybe just increasing columns (as we # would have found a counterexample) is enough # promote 10% of random rows # rows = [x for x in self._table._rows if \ # not self._table.isInS(x) and self._table.isDefined(x)] # randomRows = random.sample(rows, int(len(rows)*0.1)) # self._logger.debug(randomRows) # self._table.promote(randomRows) # self._table.addOneLetterExtensions(randomRows) elif choice == 'update': self._logger.info("Improve preciseness method: update the table") # TODO: consider to remove this function call and just # add a continue statement, because it is the first # thing to do in the main while loop. Keep for now # for clarity. self.updateTable() elif choice == 'testing': self._logger.info("Improve preciseness method: testing") ce, output = self.tester.findCounterexample(self._hMinus) # Handle counterexample if ce: if not self._table.handleCounterexample(ce, output): self._logger.info("Handling counterexample: failed") else: self._logger.info("Handling counterexample: succeded") else: self._logger.info("No counterexample found, proceed") continue else: self._logger.info("Requested table preciseness: " + str(self._tablePreciseness) + ". Actual: "+ str(self._table.preciseness())) self._logger.info("Requested model preciseness: " + str(self._modelPreciseness) + ". Actual: "+ str(currentModelPreciseness)) self._logger.info("Stop learning") # Exit while loop, return hMinus and hPlus break if self._logger.isEnabledFor(logging.DEBUG): self._table.printTable(prefix="_final_") return (self._hMinus, self._hPlus)