Source code for dda.scipy

#
# Copyright (c) 2020 anabrid GmbH
# Contact: https://www.anabrid.com/licensing/
#
# This file is part of the DDA module of the PyAnalog toolkit.
#
# ANABRID_BEGIN_LICENSE:GPL
# Commercial License Usage
# Licensees holding valid commercial anabrid licenses may use this file in
# accordance with the commercial license agreement provided with the
# Software or, alternatively, in accordance with the terms contained in
# a written agreement between you and Anabrid GmbH. For licensing terms
# and conditions see https://www.anabrid.com/licensing. For further
# information use the contact form at https://www.anabrid.com/contact.
# 
# GNU General Public License Usage
# Alternatively, this file may be used under the terms of the GNU 
# General Public License version 3 as published by the Free Software
# Foundation and appearing in the file LICENSE.GPL3 included in the
# packaging of this file. Please review the following information to
# ensure the GNU General Public License version 3 requirements
# will be met: https://www.gnu.org/licenses/gpl-3.0.html.
# For Germany, additional rules exist. Please consult /LICENSE.DE
# for further agreements.
# ANABRID_END_LICENSE
#
 
"""
The ``dda.scipy`` module allows in-python evaluation of DDA systems
as well as their solution with ODE Integrators in Python,
such as `scipy.integrate <https://docs.scipy.org/doc/scipy/reference/tutorial/integrate.html>`_.
For the usage and examples, see the main class :class:`to_scipy`.

.. note::

   In order to run this code, you need, obviously, `SciPy <https://www.scipy.org/>`_ next 
   to `NumPy <https://numpy.org/>`_.
   
.. warning::

   This module exposes a solver of a DDA system which is quite different to the
   :mod:`cpp_exporter`. In particular,
   
   * The solver is required to be told the solution time span or final time in
     time units, not iteration indices.
   * The solver only spills out the evolved (integration) quantities and not any
     derived quantities. You can recompute them at any timestep, but there is currently
     no code helping you in order to archieve this result. This can result in confusion
     when you cannot query the fields you asked for (in particular in the CLI frontend).
   * The SciPy time integrator tries to find an optimal (and minimal) time step, yielding
     in a quite "rough" solution. You can turn on *dense output* in order to tell the
     SciPy solver to integrate between these time steps, yielding a more smooth output
     with more datapoints.   
"""

from . import State, Symbol, symbols, clean
import math, functools, itertools, operator, copy, argparse, inspect, sys # builtins
import numpy as np # externals
from builtins import sum # just to be explicit, sum is not overwritten
from collections import namedtuple, OrderedDict

is_number = lambda obj: isinstance(obj, float) or isinstance(obj, int)
first = lambda iterable: next(x for x in iterable if x)  # first element of iterator/list

dda2python = {  #: Mapping directory of Symbol heads to their pure-python implementation.
    "const": lambda x: x,
    "neg":   lambda x: -x,
    "div":   lambda x,y: x/y,
    "Int":   lambda *x: - sum(x), # mangled integral
    "sum":   lambda *x: - sum(x),
    "mult":  lambda x,y: x*y,
    "sqrt":  lambda x: math.sqrt(x),
    "abs":   lambda x: math.abs(x),
    "exp":   lambda x: math.exp(x),
    "floor": lambda x: math.floor(x),
}

[docs]def evaluate_values(smbl, values): "Evaluate a symbol within the context of an already evaluated values dictionary given." if isinstance(smbl, float) or isinstance(smbl, int): return smbl # usable node if not isinstance(smbl, Symbol): raise TypeError(f"Expecting symbol, got {smbl}") if smbl.is_variable(): return values[smbl.head] # can raise KeyError if variable not found. else: # symbl.is_term() if smbl.head in dda2python: return dda2python[smbl.head](*(evaluate_values(t, values) for t in smbl.tail)) else: raise ValueError(f"DDA Symbol {smbl.head} in expression {smbl} not (yet) implemented.")
[docs]class to_scipy: """ The SciPy exporter. When initializing this class with your DDA system, it will setup a function ``f(y)`` which can be evaluated as any right hand side in the ordinary differential equation (ODE) system ``dy/dt = f(y)``. Here, ``y`` are the evolution quantities, i.e. a vector which is composed automatically from the linearized DDA system (see :meth:`ast.State.name_computing_elements` and :meth:`ast.State.variable_ordering`). Furthermore, this class prepares the initial values ``y0`` for the integration. You can evaluate these quantities in any python context, i.e. with any scientific python ODE solver library. For the time being, this class provides a convenience method :meth:`solve` which calls `scipy.integrate.solve_ivp <https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.solve_ivp.html>`_. There is no other scipy dependence in this code. Usage example: >>> from dda.computing_elements import neg,int,mult >>> from dda import State, symbols >>> x, y, z = symbols("x, y, z") >>> dda_state = State({x: int(y, 1, 1), y: mult(2,x), z: neg(int(z, 1, -1)) }) >>> py_state = to_scipy(dda_state) >>> py_state.state # state has been linearized before processing State({'int_1': Int(z), 'x': Int(y), 'y': mult(2, x), 'z': neg(int_1)}) >>> py_state.vars.evolved # evolved variables are therefore not [x,z] but [int_1,x] ['int_1', 'x'] >>> py_state.y0 # initial values array([-1, 1]) >>> py_state.dt # same timestep for all integrals (but see note below) 1 >>> py_state.rhs(py_state.y0) # evaluation of f(y) on y0 array([-1, -2]) >>> y1 = py_state.rhs(py_state.y0) * py_state.dt # a single Euler integration timestep, for instance >>> y1 array([-1, -2]) >>> sol = py_state.solve(10) # ODE integration with SciPy >>> sol.t # integration went from 0->10 with 17 timesteps # doctest:+ELLIPSIS array([ 0. , ..., 10. ]) >>> sol.y[:,-1] # the first solution is ~exp(+t)->inf, the second exp(-t)->0 array([-2.20269685e+04, 1.94334984e-08]) >>> from pylab import plot, legend, show # plotting example # doctest:+SKIP >>> for i,fieldname in enumerate(py_state.vars.evolved): # doctest:+SKIP plot(sol.t, sol.y[i], label=fieldname) >>> legend(); show() # doctest:+SKIP .. warning:: Due to the way how widespread ODE integrators work, the per-integral step size ``dt`` is required to be the same for every integration which is part of the DDA system. That is, the following generates an error: >>> from dda import dda, symbols >>> a,b=symbols("a,b") >>> state = State({ a: dda.int(a, 0.2, 0), b: dda.int(b, 0.5, 0) }) >>> to_scipy(state) Traceback (most recent call last): ... ValueError: Scipy requires all timesteps to be the same, however dt_(['a', 'b']) = [0.2 0.5] Most high-level integrators available in scientific Python toolkits (such as scipy) assume the overall system to have a single timestep size ``Δt`` (which is also quite natural from a mathematical perspective). The signature ``dda.int(f,dt,ic)`` is thus quirky from a mathematical or numerical viewpoint. It is written in such a way because analog computing integrators have a tunable time scale ``k_0 ~= 1/dt`` which however can also be consumed in the integrand itself: ``dda.int(f,1/k_0,ic) == dda.int(f/k_0,1,ic)``. Furthermore, most high-level integrators do adaptive timestepping anyway. The fine-tuning of timestep sizes is something which is only paid respect to in the :mod:`cpp_exporter` module. """ def __init__(self, state): "state needs to be an instance of :class:`ast.State`" self.state = clean(state, target="python").name_computing_elements() self.vars = self.state.variable_ordering() if not self.vars.evolved: raise ValueError("Nothing to evolve. Lacking some dda.int(f,dt,ic) integrators") # by intention, dt and initial condition are ignored # Extract int(..., timestep, initial_data) and mark integrator as visited (mangled) timesteps = {} initial_data = {} def map_and_treat_integrals(var): if not var in self.vars.evolved: return self.state[var] tail = self.state[var].tail if not len(tail) >= 3: raise ValueError("int(...) requires at least int(value, dt, ic)") timesteps[var] = self.evaluate_const(tail[-2]) initial_data[var] = self.evaluate_const(tail[-1]) return Symbol("Int", *tail[0:len(tail)-2]) self.state = State({ var: map_and_treat_integrals(var) for var in self.state }) # Scipy needs numpy arrays for further processing timesteps = np.array([timesteps[k] for k in self.vars.evolved]) self.y0 = np.array([initial_data[k] for k in self.vars.evolved]) # The Scipy integrate methods cannot treat different timestep sizes. self.dt = timesteps[0] if not np.all(timesteps == self.dt): raise ValueError(f"Scipy requires all timesteps to be the same, however dt_({self.vars.evolved}) = {timesteps}") # set beginning values for each call of f(y). This dictionary can be # mutated over repeated calls, but if you want to go sure, make a # deep copy at every call. self.evaluation_default_values = { k: np.nan for k in self.state.keys() } for var in self.vars.explicit_constants: self.evaluation_default_values[var] = self.state[var].tail[0] self.debug = False
[docs] def evaluate_const(self, var): """ Translate ``const(foo)`` by stripping foo or some bar by looking up if it is an explicit constant. Dynamical variables are not allowed here. This is somewhat similar but different to ``cpp_exporter.lookup_const(var)``. """ if isinstance(var,Symbol): if var.head == "const": var = var.tail[0] # continue elif var.is_variable(): if not var.head in self.vars.explicit_constants: raise ValueError(f"Only constants allowed in this context. {var} however refers to {var.head}.") return self.evaluate_const(self.state[var.head]) else: # remaining case: var.is_term() raise ValueError(f"Was expecting const(foo) or so, but got term {var}.") if not is_number(var): raise ValueError(f"Got a weird {type(var)} in a constant context: {var}") return var
[docs] def evaluate_state(self, evolution_vector, copy=False): """ Recomputes the full state from the evolution state vector. Returns a dictionary with same keys as ``self.state`` and scalars (floats) as values. This will especially compute the aux variables, while for the evaluation variables the RHS of ``dy / dt = f(y)`` is computed. .. note:: As a user, you most likely want to call :meth:`reconstruct_state` or :meth:`rhs` instead of this function. For optimization purpose, numerical state evaluation is always carried out on the ``evaluation_default_values`` member (which also hold the initial values for the first ``rhs`` evaluation). If you set ``copy=True``, a shallow copy (which is equal to a deep copy for a dict holding floats) is returned. In external calls, you should probably always set ``copy=True``. .. note:: The implementation of this function currently evaluates the (prepared) DDA sytem by recursive calls with the help of a variable assignment directory. This is basically a *run-time compilation* (JIT/VM) in pure python. Needless to say, this won't give a great performance! There are plenty of low-hanging fruits to provide optimized versions of this code: One could call the efficient (but still scalar) C++ implementation which :mod:`cpp_exporter` provides by `methods provided by Cython <https://cython.readthedocs.io/en/latest/src/userguide/external_C_code.html>`_. One could also map the DDA abstract syntax tree (AST) to the python one and use some unparser code to evaluate the DDA code as pure python (see for instance `Python: Modify AST and write back python code <https://stackoverflow.com/questions/768634/parse-a-py-file-read-the-ast-modify-it-then-write-back-the-modified-source-c>`_). For the time being, this code remains as a pure demonstration code. Thanks to using the linearized state, there should be no troubles with call stack overflows, however cyclic dependencies may not be properly resolved and can result in infinite recursions (stack overflow). """ values = self.evaluation_default_values # A shallow copy should be fine, since self.evaluation_default_values # only holds immutable datat types (floats) and never mutable ones # (such as lists). We allow for *not* doing this copy only for performance # reasons, as calls to self.rhs() basically overwrite the values dict # completely and thus don't need to reallocate all the dict on every call. if copy or self.debug: values = values.copy() #copy.deepcopy(values) for i,var in enumerate(self.vars.evolved): values[var] = evolution_vector[i] # scatter # TODO: md3.py doesnt work when vars.aux.unneeded is skipped, because # Int(...) depends on such an unneeded guy. This is wrong. for var in itertools.chain(self.vars.aux.sorted, self.vars.aux.cyclic, self.vars.aux.unneeded): values[var] = evaluate_values(self.state[var], values) nanmask = np.isnan(list(values.values())) if any(nanmask): nankeys = np.array(list(values.keys()))[nanmask] print(f"to_scipy(state).evaluate_state({evolution_vector}): NaN detected in evaluation of {nankeys}") print("Here is a PDB shell for debugging:") from pdb import set_trace as bp bp() return values
[docs] def reconstruct_state(self, evolution_vector, copy=True): """ Given the evolution vector sizes, this computes the full state. That is, this function differs from :meth:`evaluate` at all evaluation quantities where the values of the evolution vector itself are put in place. """ values = self.evaluate_state(evolution_vector, copy=copy) for i,var in self.vars.evolved: values[var] = evolution_vector[i] return values
[docs] def rhs(self, evolution_vector): """ The ODE *right hand side* in ``dy / dt = f(y)``. ``y`` is a numpy vector, and ``f(y)`` returns a similarly sized (numpy) vector which we call ``rhs`` here: >>> ode = to_scipy(State({ Symbol("x"): Symbol("int")(Symbol("x"),0.1,1) })) >>> y1 = ode.y0 + ode.rhs(ode.y0) * ode.dt # perform some euler integration step >>> y1 array([0.9]) Usually, you want to pass this function to some scipy integrator. See also :meth:`ft`. """ values = self.evaluate_state(evolution_vector) dqdt = [ evaluate_values(self.state[var], values) for var in self.vars.evolved ] # gather if np.any(np.isnan(np.array(dqdt))): from pdb import set_trace as bp bp() return np.array(dqdt)
[docs] def rhst(self, t, evolution_vector): """Syntactic sugar for scipy integrators who want a signature ``rhst(t,y)``. Will just call ``rhs(y)`` instead.""" return self.rhs(evolution_vector)
[docs] def solve(self, tfinal, **kwargs): """ Basically passes all arguments to ``scipy.integrate.solve_ivp``. See documentation for :class:`to_scipy` for usage example. Currently, it is hardcodedly ``tspan=[0,tfinal]``. All other (keyword) arguments are passed to ``solve_ivp``. """ from scipy.integrate import solve_ivp return solve_ivp(self.rhst, [0, tfinal], self.y0, **kwargs)
[docs]def cli_scipy(): """ A Command Line Interface (CLI) for :mod:`dda.scipy`. This CLI API basically solves a DDA file (see :mod:`dda.dsl` for the syntax). This is a different approach then using the :mod:`dda.cpp_exporter` Instead of code generation (and the need for a C++ compiler), this evaluates the DDA file within python. The disadvantage is that this is damned slow, the advantage is that the time integrator is much better then the selfmade one in the :mod:`dda.cpp_exporter` module. And there is no need for a C++ compiler at all, all is (more or less) pure python. Invocation is like ``python -m dda.scipy --help`` anywhere from the system. Run this to explore the usage of this command line interface. The output will be CSV (file or stdout), in terms of one line per integration step (called *dense solution* in scipy ODESolver language). """ from .dsl import read_traditional_dda parser = argparse.ArgumentParser(description="PyDDA's scipy interface simulation runner", epilog=inspect.getdoc(cli_scipy)) parser.add_argument("circuit_file", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="DDA setup (traditional file). Default is stdin.") parser.add_argument("-o", "--output", nargs='?', type=argparse.FileType('w'), default=sys.stdout, help="Where to write output CSV to. Default is stdout.") parser.add_argument("-q", "--query-fields", nargs='*', help="List of fields to plot. Just pass whitespace seperated (i.e. -q a b c). Also add 't' if you want to have the solution time (recommended).") g = parser.add_argument_group(title="Arguments passed to scipy.integrate.solve_ivp") g.add_argument("-t", "--tfinal", required=True, type=float, help="Time (in simulation units) to run up to. Do not confuse this with some iteration counter.") g.add_argument("-m", "--method", nargs="?", default=False, help="Integration method to use") g.add_argument("-d", "--dense-output", action='store_true', help="Dense Output (default is not dense)") # add whatever you want to expose and add them to the passing list below: scipy_args = [ "tfinal", "method", "dense_output" ] # FIXME: Dense output does not yet seem to work. # FIXME: Leaving --method out does not yet work! arg = parser.parse_args() dda_text = arg.circuit_file.read() dda_state = read_traditional_dda(dda_text) scipy_state = to_scipy(dda_state) scipy_args = { k: vars(arg)[k] for k in scipy_args if k } sol = scipy_state.solve(**scipy_args) named_sol = OrderedDict() named_sol["t"] = sol.t for i,fieldname in enumerate(sorted(scipy_state.vars.evolved)): named_sol[fieldname] = sol.y[i] if arg.query_fields: try: writeout = OrderedDict((q, named_sol[q]) for q in arg.query_fields) except KeyError as e: raise KeyError(f"Make sure that your query fields {arg.query_fields} are within the available keys {list(named_sol)}.") from e else: writeout = named_sol sep = "\t" # don't using np.savetxt(header=...) because it prepends a comment sign "# " print(sep.join(writeout.keys()), file=arg.output) np.savetxt(arg.output, np.array(list(writeout.values())).T, delimiter=sep)
if __name__ == "__main__": cli_scipy()