Source code for hycon.replay

#!/usr/bin/env python3
#
# Copyright (c) 2020 anabrid GmbH
# Contact: https://www.anabrid.com/licensing/
#
# This file is part of the HyCon module of the PyAnalog toolkit.
#
# ANABRID_BEGIN_LICENSE:GPL
# Commercial License Usage
# Licensees holding valid commercial anabrid licenses may use this file in
# accordance with the commercial license agreement provided with the
# Software or, alternatively, in accordance with the terms contained in
# a written agreement between you and Anabrid GmbH. For licensing terms
# and conditions see https://www.anabrid.com/licensing. For further
# information use the contact form at https://www.anabrid.com/contact.
# 
# GNU General Public License Usage
# Alternatively, this file may be used under the terms of the GNU 
# General Public License version 3 as published by the Free Software
# Foundation and appearing in the file LICENSE.GPL3 included in the
# packaging of this file. Please review the following information to
# ensure the GNU General Public License version 3 requirements
# will be met: https://www.gnu.org/licenses/gpl-3.0.html.
# For Germany, additional rules exist. Please consult /LICENSE.DE
# for further agreements.
# ANABRID_END_LICENSE
#

"""
A HyCon command stream interpreter.

Will spill out LISP-like commands which can be fed into the hycon
again. This allows for replaying, which is helpful for a number of 
special scenarios such as:

* Offline-validating a HyCon instruction stream
* Man-in-the-middle inspecting an HyCon instruction stream
* Validating the correctness of high-level HyCon instructions
  (such as emitted by the PyFPAA or autosetup codes)

The code basically implements a character-by-character tokenizer/parser.
It is built based on a simple mapping datastructure which assigns each
one-letter command the respective PyHyCon method name. Furthermore,
method arguments can be read and converted.

It would be nice to join HyCon.py and replay.py to a single file which
translates between the serial protocol and the OOP API calls. The transformation
is quite trivial, but now we have a lot of code doing nothing of 
bigger interest.

The ordering follows the AVR Ino code.
"""

import sys, io, functools
identity = lambda x:x

from .HyCon import HyCon


[docs]def delayed(static_method): """ This is a decorator for a static method in a class. It will make the function body "delayed", i.e. when calling the function, a future/promise/delay/deferred element is returned. Some parameter bounding (closure) happens: The decorated arguments are evaluated early while the later execution expects only a single reader argument. Example to follow the logic: >>> f = lambda a,b,c,d: print(a,b,c,d) >>> g = delayed(f) >>> h = g(1,2,3) >>> h(4) 4 1 2 3 """ @functools.wraps(static_method) def decorated(*w,**kw): def deferred(reader): return static_method(reader, *w, **kw) #deferred.__name__ = f"{static_method.__name__}({str(w)[1:-1]}{',' if kw else ''}{str(kw)[1:-1]})" return deferred return decorated
[docs]class consume: """ Consume is an ugly namespace and not a class, actually. The basic idea of these (static!) functions is to be called delayedly with a function as it's argument which acts like the io.IOBase.read() function, i.e. advances an internal cursor (side effect) and returns *n* characters from the stream. Crude Example: >>> tokenizer = [consume.exact("test"), consume.decimals(3), consume.exact("foo"), consume.hex(2)] >>> test = io.StringIO("test123fooAA") >>> reader = test.read >>> [ token(reader) for token in tokenizer ] ['test', 123, 'foo', 170] """ @delayed def exact(reader, text): proof = reader(len(text)) if proof != text: raise ValueError(f"Expected '{text}' but got '{proof}'.") return text
[docs] @delayed def number(reader, digits, base, multiply=1): """ Reads a number with #digit digits in some base. Can perform a multiplication afterwards. >>> consume.number(8,16)(io.StringIO("deadbeef").read) 3735928559 >>> consume.number(2,10,multiply=2)(io.StringIO("42").read) 84 """ text = reader(digits) if not text: raise ValueError(f"Encountered end of input but expected {digits} digits in base {base}") try: return int(text,base) * multiply except ValueError as e: raise ValueError(f"Could not cast '{text}' as a base-{base} number.")
@delayed def decimals(reader, number_of_digits): return consume.number(number_of_digits, 10)(reader) @delayed def hex(reader, number_of_digits): return consume.number(number_of_digits, 16)(reader)
[docs] @delayed def list(reader, split, end, digits, base): """ Reads a list of numbers. Limitations: * Always expects end token to come * All numbers must have same number of digits (and same base) * Cannot handle empty list or and won't accept end-of-file before end token. Examples: >>> consume.list(split=",",digits=1,base=10,end=".")(io.StringIO("1,5,2,3,9.").read) [1, 5, 2, 3, 9] >>> consume.list(split=":",digits=2,base=16,end=";")(io.StringIO("5a:88:ff:ff;").read) [90, 136, 255, 255] """ assert len(split) == len(end) retlst = [] while True: retlst.append( consume.number(digits, base)(reader) ) #print(f"List: {retlst}") sep = reader(len(split)) #print(f"Sep: '{sep}'") if sep == end: break elif sep != split: raise ValueError(f"Unexpected character(s) or end of stream: '{sep}'") return retlst
not_implemented = lambda comment: ("NOT_IMPLEMENTED", comment) mapping = { "a": "disable_ovl_halt", "A": "enable_ovl_halt", "b": "disable_ext_halt", "B": "enable_ext_halt", "c": ("set_op_time", consume.decimals(6)), "C": ("set_ic_time", consume.decimals(6)), "d": ("digital_output", consume.decimals(1), False), "D": ("digital_output", consume.decimals(1), True), "e": "repetitive_run", "E": "single_run", "F": "single_run_sync", "f": "read_ro_group", "G": ("set_ro_group", consume.list(split=";", digits=4, base=16, end=".")), "g": ("read_element_by_address", consume.hex(4)), "h": "halt", "i": "ic", "L": not_implemented("Locate a computing element"), "l": "get_data", "o": "op", "P": ("set_pt", consume.hex(4), consume.hex(2), consume.number(digits=4, base=10, multiply=1./HyCon.DPT_MAX_INT_VALUE)), "q": "read_dpts", # is not_implemented() "R": "read_digital", "s": "get_status", "S": "pot_set", "t": "get_op_time", "x": "reset", "X": ("set_xbar", consume.hex(4), consume.hex(HyCon.XBAR_CONFIG_BYTES*2)), "?": not_implemented("Prints help") }
[docs]class HyConRequestReader: """ Converts HyCon "configuration strings" to high level API calls. This can be seen as the inverse operation to calling the HyCon. Instances of this class act as iterator. It will consume the incoming stream character by character (or the whole string, if given as a string). Example: >>> instructions = 'C000100c015000P0200000204P0300030000G0362;0363;0220;0221;0222;0223.' >>> commands = list(HyConRequestReader(instructions)) >>> for c in commands: print(c) ('set_ic_time', 100) ('set_op_time', 15000) ('set_pt', 512, 0, 0.19941348973607037) ('set_pt', 768, 3, 0.0) ('set_ro_group', [866, 867, 544, 545, 546, 547]) >>> replayed = io.StringIO() >>> hc = HyCon(replayed, unidirectional=True) >>> replay(hc, commands) >>> replayed.getvalue() == instructions True """ def __init__(self, stream_or_string, mapping=mapping): self.subject = io.StringIO(stream_or_string) if isinstance(stream_or_string, str) else stream_or_string self.shadow = io.StringIO() self.mapping = mapping def read(self, size=-1): ret = self.subject.read(size) self.shadow.write(ret) return ret def debugline(self): read_in = self.shadow.getvalue() return f"Encountered after having read {len(read_in)} characters '{read_in}'" def read_arguments(self, rhs): if isinstance(rhs, tuple): return tuple(map(self.read_arguments, rhs)) if callable(rhs): res = rhs(self.read) return res return rhs def read_command(self): command = self.read(1) if not command: raise StopIteration # end of input if command not in self.mapping: raise ValueError(f"Command '{command}' not part of the valid HyCon command characters. {self.debugline()}") return self.read_arguments(self.mapping[command]) # act as an iterator __iter__ = identity __next__ = read_command
[docs]def replay(hycon, commands): """ Given commands a list of tuples, this will mostly act like operator.methodcaller on them. If no arguments are given, the tuple can be omitted. Basic example: >>> class WannaBeHyCon: ... def toot(self,x): print("too(%s)ooot" % x) ... def bar(self): print("this is bar") ... def buz(self,a,b,c): print(f"a*b = {a*b} but what is {c}") >>> replay(WannaBeHyCon(), [ "bar", ("toot", "fuz"), ("buz", 1,2,3) ]) this is bar too(fuz)ooot a*b = 2 but what is 3 The command format is produced by the HyConRequestReader and thus can be fed into this replay function: >>> replay(HyCon(sys.stdout, unidirectional=True), HyConRequestReader("xiohaARt")) xiohaARt This works for almost any useful instruction stream. """ for command in commands: if not isinstance(command, tuple): command = (command,) method,*args = command # LISP-like unpacking (command, *args) getattr(hycon, method)(*args) # calling