Skip to content

Block library

collimator.library

Abs

Bases: FeedthroughBlock

Output the absolute value of the input signal.

Input ports

None

Output ports

(0) The absolute value of the input signal.

Events

An event is triggered when the output changes from positive to negative or vice versa.

Source code in collimator/library/primitives.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class Abs(FeedthroughBlock):
    """Output the absolute value of the input signal.

    Input ports:
        None

    Output ports:
        (0) The absolute value of the input signal.

    Events:
        An event is triggered when the output changes from positive to negative
        or vice versa.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(cnp.abs, *args, **kwargs)

    def _zero_crossing(self, _time, _state, u):
        return u

    def initialize_static_data(self, context):
        # Add a zero-crossing event so ODE solvers can't try to integrate
        # through a discontinuity. For efficiency, only do this if the output is
        # fed to an ODE.
        if not self.has_zero_crossing_events and is_discontinuity(self.output_ports[0]):
            self.declare_zero_crossing(self._zero_crossing, direction="crosses_zero")

        return super().initialize_static_data(context)

Adder

Bases: ReduceBlock

Computes the sum/difference of the input.

The add/subtract operation can be switched by setting the operators parameter. For example, a 3-input block specified as Adder(3, operators="+-+") would add the first and third inputs and subtract the second input.

Input ports

(0..n_in-1) The input signals to add/subtract.

Output ports

(0) The sum/difference of the input signals.

Source code in collimator/library/primitives.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class Adder(ReduceBlock):
    """Computes the sum/difference of the input.

    The add/subtract operation can be switched by setting the `operators` parameter.
    For example, a 3-input block specified as `Adder(3, operators="+-+")` would add
    the first and third inputs and subtract the second input.

    Input ports:
        (0..n_in-1) The input signals to add/subtract.

    Output ports:
        (0) The sum/difference of the input signals.
    """

    def __init__(self, n_in, *args, operators=None, **kwargs):
        if operators is None:
            _func = sum

        else:
            signs = [1 if op == "+" else -1 for op in operators]

            def _func(inputs):
                signed_inputs = [s * u for (s, u) in zip(signs, inputs)]
                return sum(signed_inputs)

        super().__init__(n_in, _func, *args, **kwargs)
        if operators is not None and any(char not in {"+", "-"} for char in operators):
            name = kwargs.get("name", self.__class__.__name__)
            raise BlockParameterError(
                message=f"Adder block {name} has invalid operators {operators}. Can only contain '+' and '-'",
                system=self,
                parameter_name="operators",
            )
        self.declare_configuration_parameters(operators=operators)

BatteryCell

Bases: LeafSystem

Dynamic electro-checmical Li-ion cell model.

Based on Tremblay and Dessaint (2009).

By using appropriate parameters, the cell model can be used to model a battery pack with the assumption that the cells of the pack behave as a single unit.

Parameters E0, K, A, below are abstract parameters used in the model presented in the reference paper. As described in the reference paper, these parameters can be extracted from typical cell manufacturer datasheets; see section 3. Section 3 also provides a table of example values for these parameters.

Input ports

(0) The current (A) flowing through the cell. Positive is discharge.

Output ports

(0) The voltage across the cell terminals (V) (1) The state of charge of the cell (normalized between 0 and 1)

Parameters:

Name Type Description Default
E0 float

described as "battery constant voltage (V)" by the reference paper.

3.366
K float

described as "polarization constant (V/Ah)" by the reference paper.

0.0076
Q float

battery capacity in Ah

2.3
R float

internal resistance (Ohms)

0.01
A float

described as "exponential zone amplitude (V)" by the reference paper.

0.26422
B float

described as "exponential zone time constant inverse (1/Ah)" by the reference paper.

26.5487
initial_SOC float

initial state of charge, normalized between 0 and 1.

1.0
Source code in collimator/library/battery_cell.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class BatteryCell(LeafSystem):
    """Dynamic electro-checmical Li-ion cell model.

    Based on [Tremblay and Dessaint (2009)](https://doi.org/10.3390/wevj3020289).

    By using appropriate parameters, the cell model can be used to model a battery pack
    with the assumption that the cells of the pack behave as a single unit.

    Parameters E0, K, A, below are abstract parameters used in the model presented in
    the reference paper. As described in the reference paper, these parameters can be
    extracted from typical cell manufacturer datasheets; see section 3. Section 3 also
    provides a table of example values for these parameters.

    Input ports:
        (0) The current (A) flowing through the cell. Positive is discharge.

    Output ports:
        (0) The voltage across the cell terminals (V)
        (1) The state of charge of the cell (normalized between 0 and 1)

    Parameters:
        E0: described as "battery constant voltage (V)" by the reference paper.
        K: described as "polarization constant (V/Ah)" by the reference paper.
        Q: battery capacity in Ah
        R: internal resistance (Ohms)
        A: described as "exponential zone amplitude (V)" by the reference paper.
        B:
            described as "exponential zone time constant inverse (1/Ah)" by the
            reference paper.
        initial_SOC: initial state of charge, normalized between 0 and 1.
    """

    class BatteryStateType(NamedTuple):
        soc: float
        i_star: float
        i_lb: float

    class FirstOrderFilter(NamedTuple):
        A: float
        B: float
        C: float

    def __init__(
        self,
        E0: float = 3.366,
        K: float = 0.0076,
        Q: float = 2.3,
        R: float = 0.01,
        tau: float = 30.0,
        A: float = 0.26422,
        B: float = 26.5487,
        initial_SOC: float = 1.0,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.declare_parameter("E0", E0)
        self.declare_parameter("K", K)
        self.declare_parameter("Q", Q)
        self.declare_parameter("R", R)
        self.declare_parameter("tau", tau)
        self.declare_parameter("A", A)
        self.declare_parameter("B", B)
        self.declare_parameter("initial_SOC", initial_SOC)

        # Filter for input current
        self.current_filter = self.FirstOrderFilter(-0.05, 1.0, 0.05)

        # Filter for loop-breaker
        self.lb_filter = self.FirstOrderFilter(-10.0, 1.0, 10.0)

        initial_state = self.BatteryStateType(
            soc=initial_SOC,
            i_star=0.0,  # Filtered input current
            i_lb=0.0,  # Filtered current for loop-breaking
        )

        self.declare_input_port()  # Current flowing through the cell

        self.declare_continuous_state(
            default_value=initial_state,
            as_array=False,
            ode=self._ode,
        )

        self.declare_output_port(
            self._voltage_output,
            prerequisites_of_calc=[DependencyTicket.xc],
            requires_inputs=False,
            name="voltage",
        )

        self.declare_output_port(
            self._soc_output,
            prerequisites_of_calc=[DependencyTicket.xc],
            requires_inputs=False,
            name="soc",
        )

    def _ode(self, _time, state, *inputs, **parameters) -> BatteryStateType:
        xc = state.continuous_state
        Q = parameters["Q"]

        (u,) = inputs

        soc_der_unsat = -u / (Q * Ah_to_As)

        # SoC must be between 0 and 1
        llim_violation = (xc.soc <= 0.0) & (soc_der_unsat < 0.0)
        ulim_violation = (xc.soc >= 1.0) & (soc_der_unsat > 0.0)

        # Saturated time derivative
        soc_der = cnp.where(llim_violation | ulim_violation, 0.0, soc_der_unsat)

        # Derivative of istar, the filtered current signal
        i_star_der = self.current_filter.A * xc.i_star + self.current_filter.B * u

        # Derivative of ilb, the filtered current signal for loop-breaking
        i_lb_der = self.lb_filter.A * xc.i_lb + self.lb_filter.B * u

        return self.BatteryStateType(
            soc=soc_der,
            i_star=i_star_der,
            i_lb=i_lb_der,
        )

    def _voltage_output(self, _time, state, *_inputs, **parameters) -> Array:
        E0 = parameters["E0"]
        Q = parameters["Q"]
        K = parameters["K"]
        A = parameters["A"]
        B = parameters["B"]
        R = parameters["R"]
        xc = state.continuous_state

        # Filtered input current
        i_star = self.current_filter.C * xc.i_star

        # Loop-breaking current
        i_lb = self.lb_filter.C * xc.i_lb

        # Apply limits to state of charge
        soc = cnp.clip(xc.soc, 0.0, 1.0)

        # Undo normalization by Q - this is ∫i*dt, the integral of current
        i_int = Q * (1 - soc)

        chg_mode_Q_gain = 0.1
        vdyn_den = cnp.where(i_star >= 0, Q - i_int, i_int + chg_mode_Q_gain * Q)
        vdyn = i_star * K * Q / vdyn_den

        vbatt_ulim = 2 * E0  # Reasonable upper limit on battery voltage
        vbatt_presat = (
            E0 - R * i_lb - i_int * K * Q / (Q - i_int) + A * cnp.exp(-B * i_int) - vdyn
        )
        return cnp.clip(vbatt_presat, 0.0, vbatt_ulim)

    def _soc_output(self, _time, state, *_inputs, **_parameters) -> Array:
        return state.continuous_state.soc

Chirp

Bases: SourceBlock

Produces a signal like the linear method of

https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.chirp.html

Parameters:

Name Type Description Default
f0 float

Frequency (Hz) at time t=phi.

required
f1 float

Frequency (Hz) at time t=stop_time.

required
stop_time float

Time to end the signal (seconds).

required
phi float

Phase offset (radians).

0.0
Input ports

None

Output ports

(0) The chirp signal.

Source code in collimator/library/primitives.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class Chirp(SourceBlock):
    """Produces a signal like the linear method of

    https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.chirp.html

    Parameters:
        f0 (float): Frequency (Hz) at time t=phi.
        f1 (float): Frequency (Hz) at time t=stop_time.
        stop_time (float): Time to end the signal (seconds).
        phi (float): Phase offset (radians).

    Input ports:
        None

    Output ports:
        (0) The chirp signal.
    """

    def __init__(self, f0, f1, stop_time, phi=0.0, **kwargs):
        # FIXME: There's an extra factor of 2 that doesn't seem like it's in the SciPy version.
        def _func(time, stop_time, f0, f1, phi):
            f = f0 + (f1 - f0) * time / (2 * stop_time)
            return cnp.cos(f * time + phi)

        super().__init__(_func, **kwargs)
        self.declare_parameter("stop_time", stop_time)
        self.declare_parameter("f0", f0)
        self.declare_parameter("f1", f1)
        self.declare_parameter("phi", phi)

Clock

Bases: SourceBlock

Source block returning simulation time.

Input ports

None

Output ports

(0) The simulation time.

Parameters:

Name Type Description Default
dtype

The data type of the output signal. The default is "None", which will default to the current default floating point precision

None
Source code in collimator/library/primitives.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class Clock(SourceBlock):
    """Source block returning simulation time.

    Input ports:
        None

    Output ports:
        (0) The simulation time.

    Parameters:
        dtype:
            The data type of the output signal.  The default is "None", which will
            default to the current default floating point precision
    """

    def __init__(self, dtype=None, **kwargs):
        super().__init__(lambda t: cnp.array(t, dtype=dtype), **kwargs)

Comparator

Bases: LeafSystem

Compare two signals using typical relational operators.

When using == and != operators, the block uses tolerances to determine if the expression is true or false.

Parameters:

Name Type Description Default
operator

one of ("==", "!=", ">=", ">", ">=", "<")

None
atol

the absolute tolerance value used with "==" or "!="

1e-05
rtol

the relative tolerance value used with "==" or "!="

1e-08
Input Ports

(0) The left side operand (1) The right side operand

Output Ports

(0) The result of the comparison (boolean signal)

Events

An event is triggered when the output changes from true to false or vice versa.

Source code in collimator/library/primitives.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
class Comparator(LeafSystem):
    """Compare two signals using typical relational operators.

    When using == and != operators, the block uses tolerances to determine if the
    expression is true or false.

    Parameters:
        operator: one of ("==", "!=", ">=", ">", ">=", "<")
        atol: the absolute tolerance value used with "==" or "!="
        rtol: the relative tolerance value used with "==" or "!="

    Input Ports:
        (0) The left side operand
        (1) The right side operand

    Output Ports:
        (0) The result of the comparison (boolean signal)

    Events:
        An event is triggered when the output changes from true to false or vice versa.
    """

    def __init__(self, atol=1e-5, rtol=1e-8, operator=None, **kwargs):
        super().__init__(**kwargs)

        func_lookup = {
            ">": cnp.greater,
            ">=": cnp.greater_equal,
            "<": cnp.less,
            "<=": cnp.less_equal,
            "==": self._equal,
            "!=": self._ne,
        }

        if operator not in func_lookup:
            name = kwargs.get("name", self.__class__.__name__)
            message = (
                f"Comparator block '{name}' has invalid selection "
                + f"'{operator}' for parameter 'operator'. Valid options: "
                + ",".join([k for k in func_lookup.keys()])
            )
            raise BlockParameterError(
                message=message, system=self, parameter_name="operator"
            )

        self.declare_configuration_parameters(
            operator=operator,
            rtol=rtol,
            atol=atol,
        )
        self.rtol = rtol
        self.atol = atol

        self.declare_input_port()
        self.declare_input_port()

        compare = func_lookup[operator]

        def _compute_output(_time, _state, *inputs, **_params):
            return compare(*inputs)

        self.declare_output_port(
            _compute_output,
            prerequisites_of_calc=[port.ticket for port in self.input_ports],
        )

        self.evt_direction = self._process_operator(operator)

    def _equal(self, x, y):
        if cnp.issubdtype(x.dtype, cnp.floating):
            return cnp.isclose(x, y, self.rtol, self.atol)
        return x == y

    def _ne(self, x, y):
        if cnp.issubdtype(x.dtype, cnp.floating):
            return cnp.logical_not(cnp.isclose(x, y, self.rtol, self.atol))
        return x != y

    def _zero_crossing(self, _time, _state, *inputs, **_params):
        return inputs[0] - inputs[1]

    def _process_operator(self, operator):
        if operator in ["<", "<="]:
            return "positive_then_non_positive"
        if operator in [">", ">="]:
            return "negative_then_non_negative"
        return "crosses_zero"

    def initialize_static_data(self, context):
        # Add a zero-crossing event so ODE solvers can't try to integrate
        # through a discontinuity. For efficiency, only do this if the output is
        # fed to an ODE.
        if not self.has_zero_crossing_events and is_discontinuity(self.output_ports[0]):
            self.declare_zero_crossing(
                self._zero_crossing, direction=self.evt_direction
            )

        return super().initialize_static_data(context)

Constant

Bases: LeafSystem

A source block that emits a constant value.

Parameters:

Name Type Description Default
value

The constant value of the block.

required
Input ports

None

Output ports

(0) The constant value.

Source code in collimator/library/primitives.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
class Constant(LeafSystem):
    """A source block that emits a constant value.

    Parameters:
        value: The constant value of the block.

    Input ports:
        None

    Output ports:
        (0) The constant value.
    """

    def __init__(self, value, *args, **kwargs):
        super().__init__(**kwargs)
        self.declare_parameter("value", value)

        self.declare_output_port(
            self._func,
            name="out_0",
            prerequisites_of_calc=[DependencyTicket.nothing],
            requires_inputs=False,
        )

    def _func(self, time, state, *inputs, **parameters):
        return parameters["value"]

ContinuousTimeInfiniteHorizonKalmanFilter

Bases: LeafSystem

Continuous-time Infinite Horizon Kalman Filter for the following system:

dot_x =  A x + B u + G w
y   = C x + D u + v

E(w) = E(v) = 0
E(ww') = Q
E(vv') = R
E(wv') = N = 0
Input ports

(0) u : continuous-time control vector (1) y : continuous-time measurement vector

Output ports

(1) x_hat : continuous-time state vector estimate

Parameters:

Name Type Description Default
A

ndarray State transition matrix

required
B

ndarray Input matrix

required
C

ndarray Output matrix

required
D

ndarray Feedthrough matrix

required
G

ndarray Process noise matrix

required
Q

ndarray Process noise covariance matrix

required
R

ndarray Measurement noise covariance matrix

required
x_hat_0

ndarray Initial state estimate

required
Source code in collimator/library/state_estimators/continuous_time_infinite_horizon_kalman_filter.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class ContinuousTimeInfiniteHorizonKalmanFilter(LeafSystem):
    """
    Continuous-time Infinite Horizon Kalman Filter for the following system:

    ```
    dot_x =  A x + B u + G w
    y   = C x + D u + v

    E(w) = E(v) = 0
    E(ww') = Q
    E(vv') = R
    E(wv') = N = 0
    ```

    Input ports:
        (0) u : continuous-time control vector
        (1) y : continuous-time measurement vector

    Output ports:
        (1) x_hat : continuous-time state vector estimate

    Parameters:
        A: ndarray
            State transition matrix
        B: ndarray
            Input matrix
        C: ndarray
            Output matrix
        D: ndarray
            Feedthrough matrix
        G: ndarray
            Process noise matrix
        Q: ndarray
            Process noise covariance matrix
        R: ndarray
            Measurement noise covariance matrix
        x_hat_0: ndarray
            Initial state estimate
    """

    def __init__(self, A, B, C, D, G, Q, R, x_hat_0, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.A = A
        self.B = B
        self.C = C
        self.D = D
        self.G = G
        self.Q = Q
        self.R = R

        self.nx, self.nu = B.shape
        self.ny = C.shape[0]

        L, P, E = control.lqe(A, G, C, Q, R)

        self.A_minus_LC = A - cnp.matmul(L, C)
        self.B_minus_LD = B - cnp.matmul(L, D)
        self.L = L

        self.declare_input_port()  # u
        self.declare_input_port()  # y

        self.declare_continuous_state(
            ode=self._ode, shape=x_hat_0.shape, default_value=x_hat_0, as_array=True
        )  # continuous state for x_hat

        self.declare_continuous_state_output()

    def _ode(self, time, state, *inputs, **params):
        x_hat = state.continuous_state

        u, y = inputs

        u = cnp.atleast_1d(u)
        y = cnp.atleast_1d(y)

        dot_x_hat = (
            cnp.dot(self.A_minus_LC, x_hat)
            + cnp.dot(self.B_minus_LD, u)
            + cnp.dot(self.L, y)
        )

        return dot_x_hat

    #######################################
    # Make filter for a continuous plant  #
    #######################################
    @staticmethod
    def for_continuous_plant(
        plant,
        x_eq,
        u_eq,
        Q,
        R,
        G=None,
        x_hat_bar_0=None,
        name=None,
    ):
        """
        Obtain a continuous-time Infinite Horizon Kalman Filter system for a
        continuous-time plant after linearization at equilibrium point (x_eq, u_eq)

        The input plant contains the deterministic forms of the forward and observation
        operators:

        ```
            dx/dt = f(x,u)
            y = g(x,u)
        ```

        Note: Only plants with one vector-valued input and one vector-valued output
        are currently supported. Furthermore, the plant LeafSystem/Diagram should have
        only one vector-valued integrator.

        A plant with disturbances of the following form is then considered
        following form:

        ```
            dx/dt = f(x,u) + G w
            y = g(x,u) +  v
        ```

        where:

            `w` represents the process noise,
            `v` represents the measurement noise,

        and

        ```
            E(w) = E(v) = 0
            E(ww') = Q
            E(vv') = R
            E(wv') = N = 0
        ```

        This plant with disturbances is linearized (only `f` and `q`) around the
        equilibrium point to obtain:

        ```
            d/dt (x_bar) = A x_bar + B u_bar + G w    --- (C1)
            y_bar = C x_bar + D u_bar + v             --- (C2)
        ```

        where,

        ```
            x_bar = x - x_eq
            u_bar = u - u_eq
            y_bar = y - y_bar
            y_eq = g(x_eq, u_eq)
        ```

        A continuous-time Kalman Filter estimator for the system of equations (C1) and
        (C2) is returned. This filter is in the `x_bar`, `u_bar`, and `y_bar`
        states.

        The returned system will have

        Input ports:
            (0) u_bar : continuous-time control vector relative to equilibrium point
            (1) y_bar : continuous-time measurement vector relative to equilibrium point

        Output ports:
            (1) x_hat_bar : continuous-time state vector estimate relative to
                            equilibrium point

        Parameters:
            plant : a `Plant` object which can be a LeafSystem or a Diagram.
            x_eq: ndarray
                Equilibrium state vector for discretization
            u_eq: ndarray
                Equilibrium control vector for discretization
            dt: float
                Time step for the discretization.
            Q: ndarray
                Process noise covariance matrix.
            R: ndarray
                Measurement noise covariance matrix.
            G: ndarray
                Process noise matrix. If `None`, `G=B` is assumed making disrurbances
                additive to control vector `u`, i.e. `u_disturbed = u_orig + w`.
            x_hat_bar_0: ndarray
                Initial state estimate relative to equilibrium point.
                If None, an identity matrix is assumed.
        """

        y_eq, linear_plant = linearize_plant(plant, x_eq, u_eq)

        A, B, C, D = linear_plant.A, linear_plant.B, linear_plant.C, linear_plant.D

        nx, nu = B.shape
        ny, _ = D.shape

        if G is None:
            G = B

        if x_hat_bar_0 is None:
            x_hat_bar_0 = cnp.zeros(nx)

        # Instantiate a Kalman Filter instance for the linearized plant
        kf = ContinuousTimeInfiniteHorizonKalmanFilter(
            A,
            B,
            C,
            D,
            G,
            Q,
            R,
            x_hat_bar_0,
            name=name,
        )

        return y_eq, kf

for_continuous_plant(plant, x_eq, u_eq, Q, R, G=None, x_hat_bar_0=None, name=None) staticmethod

Obtain a continuous-time Infinite Horizon Kalman Filter system for a continuous-time plant after linearization at equilibrium point (x_eq, u_eq)

The input plant contains the deterministic forms of the forward and observation operators:

    dx/dt = f(x,u)
    y = g(x,u)

Note: Only plants with one vector-valued input and one vector-valued output are currently supported. Furthermore, the plant LeafSystem/Diagram should have only one vector-valued integrator.

A plant with disturbances of the following form is then considered following form:

    dx/dt = f(x,u) + G w
    y = g(x,u) +  v

where:

`w` represents the process noise,
`v` represents the measurement noise,

and

    E(w) = E(v) = 0
    E(ww') = Q
    E(vv') = R
    E(wv') = N = 0

This plant with disturbances is linearized (only f and q) around the equilibrium point to obtain:

    d/dt (x_bar) = A x_bar + B u_bar + G w    --- (C1)
    y_bar = C x_bar + D u_bar + v             --- (C2)

where,

    x_bar = x - x_eq
    u_bar = u - u_eq
    y_bar = y - y_bar
    y_eq = g(x_eq, u_eq)

A continuous-time Kalman Filter estimator for the system of equations (C1) and (C2) is returned. This filter is in the x_bar, u_bar, and y_bar states.

The returned system will have

Input ports

(0) u_bar : continuous-time control vector relative to equilibrium point (1) y_bar : continuous-time measurement vector relative to equilibrium point

Output ports

(1) x_hat_bar : continuous-time state vector estimate relative to equilibrium point

Parameters:

Name Type Description Default
plant

a Plant object which can be a LeafSystem or a Diagram.

required
x_eq

ndarray Equilibrium state vector for discretization

required
u_eq

ndarray Equilibrium control vector for discretization

required
dt

float Time step for the discretization.

required
Q

ndarray Process noise covariance matrix.

required
R

ndarray Measurement noise covariance matrix.

required
G

ndarray Process noise matrix. If None, G=B is assumed making disrurbances additive to control vector u, i.e. u_disturbed = u_orig + w.

None
x_hat_bar_0

ndarray Initial state estimate relative to equilibrium point. If None, an identity matrix is assumed.

None
Source code in collimator/library/state_estimators/continuous_time_infinite_horizon_kalman_filter.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@staticmethod
def for_continuous_plant(
    plant,
    x_eq,
    u_eq,
    Q,
    R,
    G=None,
    x_hat_bar_0=None,
    name=None,
):
    """
    Obtain a continuous-time Infinite Horizon Kalman Filter system for a
    continuous-time plant after linearization at equilibrium point (x_eq, u_eq)

    The input plant contains the deterministic forms of the forward and observation
    operators:

    ```
        dx/dt = f(x,u)
        y = g(x,u)
    ```

    Note: Only plants with one vector-valued input and one vector-valued output
    are currently supported. Furthermore, the plant LeafSystem/Diagram should have
    only one vector-valued integrator.

    A plant with disturbances of the following form is then considered
    following form:

    ```
        dx/dt = f(x,u) + G w
        y = g(x,u) +  v
    ```

    where:

        `w` represents the process noise,
        `v` represents the measurement noise,

    and

    ```
        E(w) = E(v) = 0
        E(ww') = Q
        E(vv') = R
        E(wv') = N = 0
    ```

    This plant with disturbances is linearized (only `f` and `q`) around the
    equilibrium point to obtain:

    ```
        d/dt (x_bar) = A x_bar + B u_bar + G w    --- (C1)
        y_bar = C x_bar + D u_bar + v             --- (C2)
    ```

    where,

    ```
        x_bar = x - x_eq
        u_bar = u - u_eq
        y_bar = y - y_bar
        y_eq = g(x_eq, u_eq)
    ```

    A continuous-time Kalman Filter estimator for the system of equations (C1) and
    (C2) is returned. This filter is in the `x_bar`, `u_bar`, and `y_bar`
    states.

    The returned system will have

    Input ports:
        (0) u_bar : continuous-time control vector relative to equilibrium point
        (1) y_bar : continuous-time measurement vector relative to equilibrium point

    Output ports:
        (1) x_hat_bar : continuous-time state vector estimate relative to
                        equilibrium point

    Parameters:
        plant : a `Plant` object which can be a LeafSystem or a Diagram.
        x_eq: ndarray
            Equilibrium state vector for discretization
        u_eq: ndarray
            Equilibrium control vector for discretization
        dt: float
            Time step for the discretization.
        Q: ndarray
            Process noise covariance matrix.
        R: ndarray
            Measurement noise covariance matrix.
        G: ndarray
            Process noise matrix. If `None`, `G=B` is assumed making disrurbances
            additive to control vector `u`, i.e. `u_disturbed = u_orig + w`.
        x_hat_bar_0: ndarray
            Initial state estimate relative to equilibrium point.
            If None, an identity matrix is assumed.
    """

    y_eq, linear_plant = linearize_plant(plant, x_eq, u_eq)

    A, B, C, D = linear_plant.A, linear_plant.B, linear_plant.C, linear_plant.D

    nx, nu = B.shape
    ny, _ = D.shape

    if G is None:
        G = B

    if x_hat_bar_0 is None:
        x_hat_bar_0 = cnp.zeros(nx)

    # Instantiate a Kalman Filter instance for the linearized plant
    kf = ContinuousTimeInfiniteHorizonKalmanFilter(
        A,
        B,
        C,
        D,
        G,
        Q,
        R,
        x_hat_bar_0,
        name=name,
    )

    return y_eq, kf

ContinuousTimeSindyWithControl

Bases: LeafSystem

Sindy with control (SINDyc) block: Continuous-time.

This class implements System Identification (SINDy) algorithm with control inputs.

Attributes:

Name Type Description
equations list of strings

The identified system equations.

base_feature_names list of strings

features x_i and u_i.

feature_names list of strings

Composed features with basis libraries.

coefficients ndarray

Coefficients of the identified model.

nx int

Number of states in the model.

Source code in collimator/library/sindy.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
class ContinuousTimeSindyWithControl(LeafSystem):
    """
    Sindy with control (SINDyc) block: Continuous-time.

    This class implements System Identification (SINDy) algorithm with control inputs.

    Attributes:
        equations (list of strings): The identified system equations.
        base_feature_names (list of strings): features x_i and u_i.
        feature_names (list of strings): Composed features with basis libraries.
        coefficients (ndarray): Coefficients of the identified model.
        nx (int): Number of states in the model.
    """

    def __init__(
        self,
        file_name,
        state_columns,
        control_input_columns,
        *args,
        dt=None,
        time_column=None,
        state_derivatives_columns=None,
        threshold=0.1,
        poly_order=2,
        fourier_n_frequencies=None,
        pretrained=False,
        pretrained_file_path=None,
        coefficients=None,
        feature_names=None,
        base_feature_names=None,
        equations=None,
        initial_state=None,
        **kwargs,
    ):
        """Initializes the LeafSystem.

        Args:
            file_name (str): Path to the CSV file containing the training data.

            state_columns: Column names (str or list[str])
                          or column indices (int or list[int]) for x_train.

            control_input_columns: Column names (str or list[str])
                          or column indices (int or list[int]) for u_train.

            dt: fixed dt (float)

            time_column : Column name (str) for column index (int) for time data.
                       If time_column is provided, then fixed `dt` above will be ignored.

            state_derivatives_columns (optional): Column names (str or list[str]) or column
                                         indices (int or list[int]) for x_dot_train.

            poly_order (int): Degree of polynomial features. Set to `None` to
                                     omit this library.

            fourier_n_frequencies (int): Number of Fourier frequencies. Set to `None`
                                         to omit this library.

            pretrained (bool): If True, use a pretrained model specified by the
                               `pretrained_file_path` argument.

            pretrained_file_path (str, optional): Path to the pretrained model file.

            initial_state: Initial state of the system for propagating the ODE forward
                           during inference.
        """
        super().__init__(*args, **kwargs)

        if (pretrained is True) and (pretrained_file_path is None):
            raise ValueError(
                "Please provide `pretrained_file_path` as boolean "
                "`pretrained` is set to True"
            )

        if (pretrained is False) and (pretrained_file_path is not None):
            raise ValueError(
                "Boolean `pretrained` is False but `pretrained_file_path` "
                "is provided."
            )

        if pretrained is False:
            if (dt is None) and (time_column is None):
                raise ValueError(
                    "Neither fixed dt nor column for time data are provided."
                )

        if (poly_order is None) and (fourier_n_frequencies is None):
            raise ValueError(
                "Please use atleast one of the Polynomial or Fourier libraries "
                "by setting `poly_order` and/or `fourier_n_frequencies` "
            )

        # validate training data from UI
        ui_pretrained_data = [coefficients, feature_names, base_feature_names]
        all_ui_pretrained_data_is_none = all(v is None for v in ui_pretrained_data)
        all_ui_pretrained_data_is_not_none = all(
            v is not None for v in ui_pretrained_data
        )

        if not (all_ui_pretrained_data_is_none or all_ui_pretrained_data_is_not_none):
            raise ValueError(
                f"Only some pretrained data from UI was provided for SINDY block: {self.name}. "
                f"{ui_pretrained_data}"
            )

        if all_ui_pretrained_data_is_not_none:
            self.base_feature_names = base_feature_names.tolist()
            self.feature_names = feature_names.tolist()
            self.coefficients = jnp.array(coefficients)

        elif pretrained:
            with open(pretrained_file_path, "r") as f:
                deserialized_model = json.load(f)

            self.equations = deserialized_model["equations"]
            self.base_feature_names = deserialized_model["base_feature_names"]
            self.feature_names = deserialized_model["feature_names"]
            self.coefficients = jnp.array(deserialized_model["coefficients"])
        else:
            df = _read_csv(file_name)

            if time_column:
                time_column_name = (
                    df.columns[time_column]
                    if isinstance(time_column, int)
                    else time_column
                )
                df = df.drop_duplicates(subset=time_column_name, keep="first")
                time = np.array(_extract_columns(df, time_column_name)[:, 0])
            else:
                time = dt

            x_train = _extract_columns(df, state_columns)
            u_train = _extract_columns(df, control_input_columns)
            x_dot_train = (
                _extract_columns(df, state_derivatives_columns)
                if state_derivatives_columns
                else None
            )

            (
                self.equations,
                self.base_feature_names,
                self.feature_names,
                self.coefficients,
            ) = train(
                x_train,
                u_train,
                x_dot_train,
                time,
                poly_order=poly_order,
                fourier_n_frequencies=fourier_n_frequencies,
                threshold=threshold,
            )

        _, self.nx = self.coefficients.shape
        self.declare_input_port()  # one vector valued input port for u

        if initial_state is None:
            initial_state = jnp.zeros(self.nx)

        self.declare_continuous_state(
            shape=(self.nx,), ode=self.ode, default_value=jnp.array(initial_state)
        )
        self.declare_continuous_state_output()  # output of the state in ODE

        # SymPy parsing to compute $\dot{x} = f(x,u)$
        sympy_base_features = sp.symbols(self.base_feature_names)

        # Convert feature names to sympy expressions
        sympy_feature_expressions = []
        for name in self.feature_names:
            # Replace spaces with multiplication
            name = name.replace(" ", "*")
            expr = sp.sympify(name)
            sympy_feature_expressions.append(expr)

        x_and_u_vec = sp.Matrix(sympy_base_features)
        self.features_func = sp.lambdify(
            (x_and_u_vec,), sympy_feature_expressions, modules="jax"
        )  # feature functions

        self.declare_continuous_state(
            shape=(self.nx,),
            ode=self.ode,
            default_value=jnp.array(initial_state),
        )

        self.declare_configuration_parameters(
            file_name=file_name,
            state_columns=state_columns,
            control_input_columns=control_input_columns,
            dt=dt,
            time_column=time_column,
            state_derivatives_columns=state_derivatives_columns,
            threshold=threshold,
            poly_order=poly_order,
            fourier_n_frequencies=fourier_n_frequencies,
            pretrained=pretrained,
            coefficients=self.coefficients.tolist(),
            feature_names=self.feature_names,
            base_feature_names=self.base_feature_names,
            equations=equations,
            initial_state=initial_state,
        )

        if pretrained_file_path:
            self.declare_configuration_parameters(
                pretrained_file_path=pretrained_file_path
            )

    def ode(self, time, state, inputs, **params):
        """
        The ODE system RHS. The RHS is given by
        coefficients @ features
        """
        x = state.continuous_state
        u = inputs
        x_and_u = jnp.hstack([x, u])
        features_evaluated = self.features_func(x_and_u)
        x_dot = jnp.matmul(self.coefficients, jnp.atleast_1d(features_evaluated))
        return x_dot

    def serialize(self, filename):
        """
        Save the relevant class attributes post training
        so that model state can be restored
        """
        sindy_model = {
            "equations": self.equations,
            "base_feature_names": self.base_feature_names,
            "feature_names": self.feature_names,
            "coefficients": self.coefficients.tolist(),  # Can't serialize numpy arrays
        }
        with open(filename, "w") as f:
            json.dump(sindy_model, f)

__init__(file_name, state_columns, control_input_columns, *args, dt=None, time_column=None, state_derivatives_columns=None, threshold=0.1, poly_order=2, fourier_n_frequencies=None, pretrained=False, pretrained_file_path=None, coefficients=None, feature_names=None, base_feature_names=None, equations=None, initial_state=None, **kwargs)

Initializes the LeafSystem.

Parameters:

Name Type Description Default
file_name str

Path to the CSV file containing the training data.

required
state_columns

Column names (str or list[str]) or column indices (int or list[int]) for x_train.

required
control_input_columns

Column names (str or list[str]) or column indices (int or list[int]) for u_train.

required
dt

fixed dt (float)

None
time_column

Column name (str) for column index (int) for time data. If time_column is provided, then fixed dt above will be ignored.

None
state_derivatives_columns optional

Column names (str or list[str]) or column indices (int or list[int]) for x_dot_train.

None
poly_order int

Degree of polynomial features. Set to None to omit this library.

2
fourier_n_frequencies int

Number of Fourier frequencies. Set to None to omit this library.

None
pretrained bool

If True, use a pretrained model specified by the pretrained_file_path argument.

False
pretrained_file_path str

Path to the pretrained model file.

None
initial_state

Initial state of the system for propagating the ODE forward during inference.

None
Source code in collimator/library/sindy.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def __init__(
    self,
    file_name,
    state_columns,
    control_input_columns,
    *args,
    dt=None,
    time_column=None,
    state_derivatives_columns=None,
    threshold=0.1,
    poly_order=2,
    fourier_n_frequencies=None,
    pretrained=False,
    pretrained_file_path=None,
    coefficients=None,
    feature_names=None,
    base_feature_names=None,
    equations=None,
    initial_state=None,
    **kwargs,
):
    """Initializes the LeafSystem.

    Args:
        file_name (str): Path to the CSV file containing the training data.

        state_columns: Column names (str or list[str])
                      or column indices (int or list[int]) for x_train.

        control_input_columns: Column names (str or list[str])
                      or column indices (int or list[int]) for u_train.

        dt: fixed dt (float)

        time_column : Column name (str) for column index (int) for time data.
                   If time_column is provided, then fixed `dt` above will be ignored.

        state_derivatives_columns (optional): Column names (str or list[str]) or column
                                     indices (int or list[int]) for x_dot_train.

        poly_order (int): Degree of polynomial features. Set to `None` to
                                 omit this library.

        fourier_n_frequencies (int): Number of Fourier frequencies. Set to `None`
                                     to omit this library.

        pretrained (bool): If True, use a pretrained model specified by the
                           `pretrained_file_path` argument.

        pretrained_file_path (str, optional): Path to the pretrained model file.

        initial_state: Initial state of the system for propagating the ODE forward
                       during inference.
    """
    super().__init__(*args, **kwargs)

    if (pretrained is True) and (pretrained_file_path is None):
        raise ValueError(
            "Please provide `pretrained_file_path` as boolean "
            "`pretrained` is set to True"
        )

    if (pretrained is False) and (pretrained_file_path is not None):
        raise ValueError(
            "Boolean `pretrained` is False but `pretrained_file_path` "
            "is provided."
        )

    if pretrained is False:
        if (dt is None) and (time_column is None):
            raise ValueError(
                "Neither fixed dt nor column for time data are provided."
            )

    if (poly_order is None) and (fourier_n_frequencies is None):
        raise ValueError(
            "Please use atleast one of the Polynomial or Fourier libraries "
            "by setting `poly_order` and/or `fourier_n_frequencies` "
        )

    # validate training data from UI
    ui_pretrained_data = [coefficients, feature_names, base_feature_names]
    all_ui_pretrained_data_is_none = all(v is None for v in ui_pretrained_data)
    all_ui_pretrained_data_is_not_none = all(
        v is not None for v in ui_pretrained_data
    )

    if not (all_ui_pretrained_data_is_none or all_ui_pretrained_data_is_not_none):
        raise ValueError(
            f"Only some pretrained data from UI was provided for SINDY block: {self.name}. "
            f"{ui_pretrained_data}"
        )

    if all_ui_pretrained_data_is_not_none:
        self.base_feature_names = base_feature_names.tolist()
        self.feature_names = feature_names.tolist()
        self.coefficients = jnp.array(coefficients)

    elif pretrained:
        with open(pretrained_file_path, "r") as f:
            deserialized_model = json.load(f)

        self.equations = deserialized_model["equations"]
        self.base_feature_names = deserialized_model["base_feature_names"]
        self.feature_names = deserialized_model["feature_names"]
        self.coefficients = jnp.array(deserialized_model["coefficients"])
    else:
        df = _read_csv(file_name)

        if time_column:
            time_column_name = (
                df.columns[time_column]
                if isinstance(time_column, int)
                else time_column
            )
            df = df.drop_duplicates(subset=time_column_name, keep="first")
            time = np.array(_extract_columns(df, time_column_name)[:, 0])
        else:
            time = dt

        x_train = _extract_columns(df, state_columns)
        u_train = _extract_columns(df, control_input_columns)
        x_dot_train = (
            _extract_columns(df, state_derivatives_columns)
            if state_derivatives_columns
            else None
        )

        (
            self.equations,
            self.base_feature_names,
            self.feature_names,
            self.coefficients,
        ) = train(
            x_train,
            u_train,
            x_dot_train,
            time,
            poly_order=poly_order,
            fourier_n_frequencies=fourier_n_frequencies,
            threshold=threshold,
        )

    _, self.nx = self.coefficients.shape
    self.declare_input_port()  # one vector valued input port for u

    if initial_state is None:
        initial_state = jnp.zeros(self.nx)

    self.declare_continuous_state(
        shape=(self.nx,), ode=self.ode, default_value=jnp.array(initial_state)
    )
    self.declare_continuous_state_output()  # output of the state in ODE

    # SymPy parsing to compute $\dot{x} = f(x,u)$
    sympy_base_features = sp.symbols(self.base_feature_names)

    # Convert feature names to sympy expressions
    sympy_feature_expressions = []
    for name in self.feature_names:
        # Replace spaces with multiplication
        name = name.replace(" ", "*")
        expr = sp.sympify(name)
        sympy_feature_expressions.append(expr)

    x_and_u_vec = sp.Matrix(sympy_base_features)
    self.features_func = sp.lambdify(
        (x_and_u_vec,), sympy_feature_expressions, modules="jax"
    )  # feature functions

    self.declare_continuous_state(
        shape=(self.nx,),
        ode=self.ode,
        default_value=jnp.array(initial_state),
    )

    self.declare_configuration_parameters(
        file_name=file_name,
        state_columns=state_columns,
        control_input_columns=control_input_columns,
        dt=dt,
        time_column=time_column,
        state_derivatives_columns=state_derivatives_columns,
        threshold=threshold,
        poly_order=poly_order,
        fourier_n_frequencies=fourier_n_frequencies,
        pretrained=pretrained,
        coefficients=self.coefficients.tolist(),
        feature_names=self.feature_names,
        base_feature_names=self.base_feature_names,
        equations=equations,
        initial_state=initial_state,
    )

    if pretrained_file_path:
        self.declare_configuration_parameters(
            pretrained_file_path=pretrained_file_path
        )

ode(time, state, inputs, **params)

The ODE system RHS. The RHS is given by coefficients @ features

Source code in collimator/library/sindy.py
327
328
329
330
331
332
333
334
335
336
337
def ode(self, time, state, inputs, **params):
    """
    The ODE system RHS. The RHS is given by
    coefficients @ features
    """
    x = state.continuous_state
    u = inputs
    x_and_u = jnp.hstack([x, u])
    features_evaluated = self.features_func(x_and_u)
    x_dot = jnp.matmul(self.coefficients, jnp.atleast_1d(features_evaluated))
    return x_dot

serialize(filename)

Save the relevant class attributes post training so that model state can be restored

Source code in collimator/library/sindy.py
339
340
341
342
343
344
345
346
347
348
349
350
351
def serialize(self, filename):
    """
    Save the relevant class attributes post training
    so that model state can be restored
    """
    sindy_model = {
        "equations": self.equations,
        "base_feature_names": self.base_feature_names,
        "feature_names": self.feature_names,
        "coefficients": self.coefficients.tolist(),  # Can't serialize numpy arrays
    }
    with open(filename, "w") as f:
        json.dump(sindy_model, f)

CoordinateRotation

Bases: LeafSystem

Computes the rotation of a 3D vector between coordinate systems.

Given sufficient information to construct a rotation matrix C_AB from orthogonal coordinate system B to orthogonal coordinate system A, along with an input vector x_B expressed in B-axes, this block will compute the matrix-vector product x_A = C_AB @ x_B.

Note that depending on the type of rotation representation, this matrix may not be explicitly computed. The types of rotations supported are Quaternion, Euler Angles, and Direction Cosine Matrix (DCM).

By default, the rotations have the following convention:

  • Quaternion: The rotation is represented by a 4-component quaternion q. The rotation is carried out by the product p_A = q⁻¹ * p_B * q, where q⁻¹ is the quaternion inverse of q, * is the quaternion product, and p_A and p_B are the quaternion extensions of the vectors x_A and x_B, i.e. p_A = [0, x_A] and p_B = [0, x_B].

  • Roll-Pitch-Yaw (Euler Angles): The rotation is represented by the set of Euler angles ϕ (roll), θ (pitch), and ψ (yaw), in the "1-2-3" convention for intrinsic rotations. The resulting rotation matrix C_AB(ϕ, θ, ψ) is the same as the product of the three single-axis rotation matrices C_AB = Cz(ψ) * Cy(θ) * Cx(ϕ).

    For example, if B represents a fixed "world" frame with axes xyz and A is a body-fixed frame with axes XYZ, then C_AB represents a rotation from the world frame to the body frame, in the following sequence:

    1. Right-hand rotation about the world frame x-axis by ϕ (roll), resulting in the intermediate frame x'y'z' with x' = x.
    2. Right-hand rotation about the intermediate frame y'-axis by θ (pitch), resulting in the intermediate frame x''y''z'' with y'' = y'.
    3. Right-hand rotation about the intermediate frame z''-axis by ψ (yaw), resulting in the body frame XYZ with z = z''.
  • Direction Cosine Matrix: The rotation is directly represented as a 3x3 matrix C_AB. The rotation is carried out by the matrix-vector product x_A = C_AB @ x_B.

Input ports

(0): The input vector x_B expressed in the B-axes.

(1): (if enable_external_rotation_definition=True) The rotation representation (quaternion, Euler angles, or cosine matrix) that defines the rotation from B to A (or A to B if inverse=True).

Output ports

(0): The output vector x_A expressed in the A-axes.

Parameters:

Name Type Description Default
rotation_type str

The type of rotation representation to use. Must be one of ("quaternion", "roll_pitch_yaw", "dcm").

required
enable_external_rotation_definition

If True, the block will have one input port for the rotation representation (quaternion, Euler angles, or cosine matrix). Otherwise the rotation must be provided as a block parameter.

True
rotation

The constant value for the rotation matrix when enable_external_rotation_definition=False. Must not be None in that case

required
inverse

If True, the block will compute the inverse transformation, i.e. if the matrix representation of the rotation is C_AB from frame B to frame A, the block will compute the inverse transformation C_BA = C_AB⁻¹ = C_AB.T

False
Source code in collimator/library/rotations.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
class CoordinateRotation(LeafSystem):
    """Computes the rotation of a 3D vector between coordinate systems.

    Given sufficient information to construct a rotation matrix `C_AB` from orthogonal
    coordinate system `B` to orthogonal coordinate system `A`, along with an input
    vector `x_B` expressed in `B`-axes, this block will compute the matrix-vector
    product `x_A = C_AB @ x_B`.

    Note that depending on the type of rotation representation, this matrix may not be
    explicitly computed.  The types of rotations supported are Quaternion, Euler
    Angles, and Direction Cosine Matrix (DCM).

    By default, the rotations have the following convention:

    - __Quaternion:__ The rotation is represented by a 4-component quaternion `q`.
        The rotation is carried out by the product `p_A = q⁻¹ * p_B * q`, where
        `q⁻¹` is the quaternion inverse of `q`, `*` is the quaternion product, and
        `p_A` and `p_B` are the quaternion extensions of the vectors `x_A` and `x_B`,
        i.e. `p_A = [0, x_A]` and `p_B = [0, x_B]`.

    - __Roll-Pitch-Yaw (Euler Angles):__ The rotation is represented by the set of Euler angles
        ϕ (roll), θ (pitch), and ψ (yaw), in the "1-2-3" convention for intrinsic
        rotations. The resulting rotation matrix `C_AB(ϕ, θ, ψ)` is the same as the product of
        the three  single-axis rotation matrices `C_AB = Cz(ψ) * Cy(θ) * Cx(ϕ)`.

        For example, if `B` represents a fixed "world" frame with axes `xyz` and `A`
        is a body-fixed frame with axes `XYZ`, then `C_AB` represents a rotation from
        the world frame to the body frame, in the following sequence:

        1. Right-hand rotation about the world frame `x`-axis by `ϕ` (roll), resulting
            in the intermediate frame `x'y'z'` with `x' = x`.
        2. Right-hand rotation about the intermediate frame `y'`-axis by `θ` (pitch),
            resulting in the intermediate frame `x''y''z''` with `y'' = y'`.
        3. Right-hand rotation about the intermediate frame `z''`-axis by `ψ` (yaw),
            resulting in the body frame `XYZ` with `z = z''`.

    - __Direction Cosine Matrix:__ The rotation is directly represented as a
        3x3 matrix `C_AB`. The rotation is carried out by the matrix-vector product
        `x_A = C_AB @ x_B`.

    Input ports:
        (0): The input vector `x_B` expressed in the `B`-axes.

        (1): (if `enable_external_rotation_definition=True`) The rotation
            representation (quaternion, Euler angles, or cosine matrix) that defines
            the rotation from `B` to `A` (or `A` to `B` if `inverse=True`).

    Output ports:
        (0): The output vector `x_A` expressed in the `A`-axes.

    Parameters:
        rotation_type (str): The type of rotation representation to use. Must be one of
            ("quaternion", "roll_pitch_yaw", "dcm").
        enable_external_rotation_definition: If `True`, the block will have one
            input port for the rotation representation (quaternion, Euler angles, or
            cosine matrix).  Otherwise the rotation must be provided as a block
            parameter.
        rotation: The constant value for the rotation matrix when
            `enable_external_rotation_definition=False`.  Must not be None in that case
        inverse: If `True`, the block will compute the inverse transformation, i.e.
            if the matrix representation of the rotation is `C_AB` from frame `B` to
            frame `A`, the block will compute the inverse transformation
            `C_BA = C_AB⁻¹ = C_AB.T`
    """

    def __init__(
        self,
        rotation_type,
        enable_external_rotation_definition=True,
        quaternion=None,
        roll_pitch_yaw=None,
        direction_cosine_matrix=None,
        inverse=False,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.rotation_type = rotation_type
        self.external_rotation = enable_external_rotation_definition
        self.inverse = inverse

        # Note: all of the possible rotation specifications are passed as parameters
        # to make the serialization work, but only one is valid at a time. This makes
        # sense from the UI, but is a bit strange when working directly with the code.
        # In any case, the typical use case is to have the external rotation port
        # enabled, so all of these should usually be None.  If more than one is
        # provided (which can happen for instance via hidden parameters in the JSON)
        # then only the rotation corresponding to the `rotation_type` will be used, and
        # the rest will be ignored.
        rotation = self._check_config(
            rotation_type,
            quaternion,
            roll_pitch_yaw,
            direction_cosine_matrix,
        )

        self.vector_input_index = self.declare_input_port()

        if enable_external_rotation_definition:
            self.rotation_input_index = self.declare_input_port()

        else:
            # Store the static rotation as a parameter (will be None if external
            # rotation is enabled)
            self.declare_parameter("rotation", rotation)

        self.declare_output_port(
            self._output_func,
            prerequisites_of_calc=[port.ticket for port in self.input_ports],
        )

        # Serialization
        self.declare_configuration_parameters(
            quaternion=quaternion,
            roll_pitch_yaw=roll_pitch_yaw,
            direction_cosine_matrix=direction_cosine_matrix,
            rotation_type=rotation_type,
            enable_external_rotation_definition=enable_external_rotation_definition,
            inverse=inverse,
        )

    def _check_config(
        self, rotation_type, quaternion, roll_pitch_yaw, direction_cosine_matrix
    ):
        if rotation_type not in ("quaternion", "roll_pitch_yaw", "DCM"):
            message = f"Invalid rotation type: {rotation_type}."
            raise BlockParameterError(
                message=message, system=self, parameter_name="rotation_type"
            )

        if self.external_rotation:
            # Input type checking will be done by `check_types`
            return

        if rotation_type == "quaternion":
            if quaternion is None:
                message = (
                    "A static quaternion must be provided if external rotation "
                    + "definition is disabled."
                )
                raise BlockParameterError(
                    message=message, system=self, parameter_name="quaternion"
                )
            rotation = cnp.asarray(quaternion)
            if rotation.shape != (4,):
                message = (
                    "The quaternion must have shape (4,), but has shape "
                    + f"{rotation.shape}."
                )
                raise BlockParameterError(
                    message=message, system=self, parameter_name="quaternion"
                )

        elif rotation_type == "roll_pitch_yaw":
            if roll_pitch_yaw is None:
                message = (
                    "A static roll-pitch-yaw sequence must be provided if external "
                    + "rotation definition is disabled."
                )
                raise BlockParameterError(
                    message=message, system=self, parameter_name="roll_pitch_yaw"
                )
            rotation = cnp.asarray(roll_pitch_yaw)
            if rotation.shape != (3,):
                message = (
                    "The Euler angles must have shape (3,), but has shape "
                    + f"{rotation.shape}."
                )
                raise BlockParameterError(
                    message=message, system=self, parameter_name="roll_pitch_yaw"
                )

        elif rotation_type == "DCM":
            if direction_cosine_matrix is None:
                message = (
                    "A static direction cosine matrix must be provided if external "
                    + "rotation definition is disabled."
                )
                raise BlockParameterError(
                    message=message,
                    system=self,
                    parameter_name="direction_cosine_matrix",
                )
            rotation = cnp.asarray(direction_cosine_matrix)
            if rotation.shape != (3, 3):
                message = (
                    "The direction cosine matrix must have shape (3, 3), but has shape "
                    + f"{rotation.shape}."
                )
                raise BlockParameterError(
                    message=message,
                    system=self,
                    parameter_name="direction_cosine_matrix",
                )

        return rotation

    def _apply(self, rotation: Rotation, vector: Array) -> Array:
        rot = {
            "quaternion": Rotation.from_quat,
            "roll_pitch_yaw": partial(Rotation.from_euler, EULER_SEQ),
            "DCM": Rotation.from_matrix,
        }[self.rotation_type](rotation)

        if self.inverse:
            rot = rot.inv()

        return rot.apply(vector)

    def _output_func(self, _time, _state, *inputs, **parameters):
        vector = inputs[self.vector_input_index]
        if self.external_rotation:
            rotation = inputs[self.rotation_input_index]
        else:
            rotation = parameters["rotation"]
        return self._apply(rotation, vector)

    def check_types(
        self,
        context,
        error_collector: ErrorCollector = None,
    ):
        vec = self.input_ports[self.vector_input_index].eval(context)

        with ErrorCollector.context(error_collector):
            if vec.shape != (3,):
                raise ShapeMismatchError(
                    system=self,
                    expected_shape=(3,),
                    actual_shape=vec.shape,
                )

        if self.external_rotation:
            rot = self.input_ports[self.rotation_input_index].eval(context)

            with ErrorCollector.context(error_collector):
                if self.rotation_type == "quaternion" and rot.shape != (4,):
                    raise ShapeMismatchError(
                        system=self,
                        expected_shape=(4,),
                        actual_shape=rot.shape,
                    )
                elif self.rotation_type == "roll_pitch_yaw" and rot.shape != (3,):
                    raise ShapeMismatchError(
                        system=self,
                        expected_shape=(3,),
                        actual_shape=rot.shape,
                    )
                elif self.rotation_type == "DCM" and rot.shape != (3, 3):
                    raise ShapeMismatchError(
                        system=self,
                        expected_shape=(3, 3),
                        actual_shape=rot.shape,
                    )

CoordinateRotationConversion

Bases: LeafSystem

Converts between different representations of rotations.

See CoordinateRotation block documentation for descriptions of the different rotation representations supported. This block supports conversion between quaternion, roll-pitch-yaw (Euler angles), and direction cosine matrix (DCM).

Note that conversions are reversible in terms of the abstract rotation, although creating a quaternion from a direction cosine matrix (and therefore creating a quaternion from roll-pitch-yaw sequence) results in an arbitrary sign assignment.

Input ports

(0): The input rotation representation.

Output ports

(1): The output rotation representation.

Parameters:

Name Type Description Default
conversion_type str

The type of rotation conversion to perform. Must be one of ("quaternion_to_euler", "quaternion_to_dcm", "euler_to_quaternion", "euler_to_dcm", "dcm_to_quaternion", "dcm_to_euler")

required
Source code in collimator/library/rotations.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
class CoordinateRotationConversion(LeafSystem):
    """Converts between different representations of rotations.

    See CoordinateRotation block documentation for descriptions of the different
    rotation representations supported. This block supports conversion between
    quaternion, roll-pitch-yaw (Euler angles), and direction cosine matrix (DCM).

    Note that conversions are reversible in terms of the abstract rotation, although
    creating a quaternion from a direction cosine matrix (and therefore creating a
    quaternion from roll-pitch-yaw sequence) results in an arbitrary sign assignment.

    Input ports:
        (0): The input rotation representation.

    Output ports:
        (1): The output rotation representation.

    Parameters:
        conversion_type (str): The type of rotation conversion to perform.
            Must be one of ("quaternion_to_euler", "quaternion_to_dcm",
            "euler_to_quaternion", "euler_to_dcm", "dcm_to_quaternion", "dcm_to_euler")
    """

    def __init__(self, conversion_type, **kwargs):
        super().__init__(**kwargs)

        if conversion_type not in (
            "quaternion_to_RPY",
            "quaternion_to_DCM",
            "RPY_to_quaternion",
            "RPY_to_DCM",
            "DCM_to_quaternion",
            "DCM_to_RPY",
        ):
            message = f"Invalid rotation conversion type: {conversion_type}."
            raise BlockParameterError(
                message=message, system=self, parameter_name="conversion_type"
            )

        self.declare_input_port()

        _func = {
            "quaternion_to_RPY": quat_to_euler,
            "quaternion_to_DCM": quat_to_dcm,
            "RPY_to_quaternion": euler_to_quat,
            "RPY_to_DCM": euler_to_dcm,
            "DCM_to_quaternion": dcm_to_quat,
            "DCM_to_RPY": dcm_to_euler,
        }[conversion_type]

        def _output(_time, _state, *inputs, **_parameters):
            (u,) = inputs
            return _func(u)

        self.declare_output_port(
            _output,
            requires_inputs=True,
        )

        # Serialization
        self.conversion_type = conversion_type
        self.declare_configuration_parameters(conversion_type=conversion_type)

    def check_types(
        self,
        context,
        error_collector: ErrorCollector = None,
    ):
        rot = self.input_ports[0].eval(context)

        with ErrorCollector.context(error_collector):
            if self.conversion_type in (
                "quaternion_to_RPY",
                "quaternion_to_DCM",
            ) and rot.shape != (4,):
                raise ShapeMismatchError(
                    system=self,
                    expected_shape=(4,),
                    actual_shape=rot.shape,
                )
            elif self.conversion_type in (
                "RPY_to_quaternion",
                "RPY_to_DCM",
            ) and rot.shape != (3,):
                raise ShapeMismatchError(
                    system=self,
                    expected_shape=(3,),
                    actual_shape=rot.shape,
                )
            elif self.conversion_type in (
                "DCM_to_quaternion",
                "DCM_to_RPY",
            ) and rot.shape != (3, 3):
                raise ShapeMismatchError(
                    system=self,
                    expected_shape=(3, 3),
                    actual_shape=rot.shape,
                )

CrossProduct

Bases: ReduceBlock

Compute the cross product between the inputs.

See NumPy docs for details: https://numpy.org/doc/stable/reference/generated/numpy.cross.html

Input ports

(0) The first input vector. (1) The second input vector.

Output ports

(0) The cross product of the inputs.

Source code in collimator/library/primitives.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
class CrossProduct(ReduceBlock):
    """Compute the cross product between the inputs.

    See NumPy docs for details:
    https://numpy.org/doc/stable/reference/generated/numpy.cross.html

    Input ports:
        (0) The first input vector.
        (1) The second input vector.

    Output ports:
        (0) The cross product of the inputs.
    """

    def __init__(self, *args, **kwargs):
        def _cross(inputs):
            return cnp.cross(*inputs)

        super().__init__(2, _cross, *args, **kwargs)

CustomJaxBlock

Bases: LeafSystem

JAX implementation of the PythonScript block.

A few important notes and changes/limitations to this JAX implementation: - For this block all code must be written using the JAX-supported subset of Python: * Numerical operations should use jax.numpy = jnp instead of numpy = np * Standard control flow is not supported (if/else, for, while, etc.). Instead use lax.cond, lax.fori_loop, lax.while_loop, etc. https://jax.readthedocs.io/en/latest/notebooks/Common_Gotchas_in_JAX.html#structured-control-flow-primitives Where possible, NumPy-style operations like jnp.where or jnp.select should be preferred to lax control flow primitives. * Functions must be pure and arrays treated as immutable. https://jax.readthedocs.io/en/latest/notebooks/Common_Gotchas_in_JAX.html#in-place-updates Provided these assumptions hold, the code can be JIT compiled, differentiated, run on GPU, etc. - Variable scoping: the init_code and step_code are executed in the same scope, so variables declared in the init_code will be available in the step_code and can be modified in that scope. Internally, everything declared in init_code is treated as a single state-like cache entry. However, variables declared in the step_code will NOT persist between evaluations. Users should think of step_code as a normal Python function where locally declared variables will disappear on leaving the scope. - Persistent variables (outputs and anything declared in init_code) must have static shapes and dtypes. This means that you cannot declare x = 0.0 in init_code and then later assign x = jnp.zeros(4) in step_code.

These changes mean that many older PythonScript blocks may not be backwards compatible.

Input ports

Variable number of input ports, one for each input variable declared in inputs. The order of the input ports is the same as the order of the input variables.

Output ports

Variable number of output ports, one for each output variable declared in outputs. The order of the output ports is the same as the order of the output variables.

Parameters:

Name Type Description Default
dt float

The discrete time step of the block, or None if the block is in agnostic time mode.

None
init_script str

A string containing Python code that will be executed once when the block is initialized. This code can be used to declare persistent variables that will be available in the step_code.

''
user_statements str

A string containing Python code that will be executed once per time step (or per output port evaluation, in agnostic mode). This code can use the persistent variables declared in init_script and the block inputs.

''
finalize_script str

A string containing Python code that will be executed once when the block is finalized. This code can use the persistent variables declared in init_script and the block inputs. (Currently not yet supported).

''
accelerate_with_jax bool

If True, the block will be JIT compiled. If False, the block will be executed in pure Python. This parameter exists for compatibility with UI options; when creating pure Python blocks from code (e.g. for testing), explicitly create the CustomPythonBlock class.

True
time_mode str

One of "discrete" or "agnostic". If "discrete", the block step code will be evaluated at peridodic intervals specified by "dt". If "agnostic", the block step code will be evaluated once per output port evaluation, and the block will not have a discrete time step.

'discrete'
inputs List[str]

A list of input variable names. The order of the input ports is the same as the order of the input variables.

None
outputs Mapping[str, Tuple[DTypeLike, ShapeLike]]

A dictionary mapping output variable names to a tuple of dtype and shape. The order of the output ports is the same as the order of the output variables.

None
parameters Mapping[str, Array]

A dictionary mapping parameter names to values. Parameters are treated as immutable and cannot be modified in the step code. Parameters can be arrays or scalars, but must have static shapes and dtypes in order to support JIT compilation.

None
Source code in collimator/library/custom.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
class CustomJaxBlock(LeafSystem):
    """JAX implementation of the PythonScript block.

    A few important notes and changes/limitations to this JAX implementation:
    - For this block all code must be written using the JAX-supported subset of Python:
        * Numerical operations should use `jax.numpy = jnp` instead of `numpy = np`
        * Standard control flow is not supported (if/else, for, while, etc.). Instead
            use `lax.cond`, `lax.fori_loop`, `lax.while_loop`, etc.
            https://jax.readthedocs.io/en/latest/notebooks/Common_Gotchas_in_JAX.html#structured-control-flow-primitives
            Where possible, NumPy-style operations like `jnp.where` or `jnp.select` should
            be preferred to lax control flow primitives.
        * Functions must be pure and arrays treated as immutable.
            https://jax.readthedocs.io/en/latest/notebooks/Common_Gotchas_in_JAX.html#in-place-updates
        Provided these assumptions hold, the code can be JIT compiled, differentiated,
        run on GPU, etc.
    - Variable scoping: the `init_code` and `step_code` are executed in the same scope,
        so variables declared in the `init_code` will be available in the `step_code`
        and can be modified in that scope. Internally, everything declared in
        `init_code` is treated as a single state-like cache entry.
        However, variables declared in the `step_code` will NOT persist between
        evaluations. Users should think of `step_code` as a normal Python function
        where locally declared variables will disappear on leaving the scope.
    - Persistent variables (outputs and anything declared in `init_code`) must have
        static shapes and dtypes. This means that you cannot declare `x = 0.0` in
        `init_code` and then later assign `x = jnp.zeros(4)` in `step_code`.

    These changes mean that many older PythonScript blocks may not be backwards compatible.

    Input ports:
        Variable number of input ports, one for each input variable declared in `inputs`.
        The order of the input ports is the same as the order of the input variables.

    Output ports:
        Variable number of output ports, one for each output variable declared in `outputs`.
        The order of the output ports is the same as the order of the output variables.

    Parameters:
        dt (float): The discrete time step of the block, or None if the block is
            in agnostic time mode.
        init_script (str): A string containing Python code that will be executed
            once when the block is initialized. This code can be used to declare
            persistent variables that will be available in the `step_code`.
        user_statements (str): A string containing Python code that will be executed
            once per time step (or per output port evaluation, in agnostic mode).
            This code can use the persistent variables declared in `init_script` and
            the block inputs.
        finalize_script (str): A string containing Python code that will be executed
            once when the block is finalized. This code can use the persistent
            variables declared in `init_script` and the block inputs. (Currently not
            yet supported).
        accelerate_with_jax (bool): If True, the block will be JIT compiled. If False,
            the block will be executed in pure Python.  This parameter exists for
            compatibility with UI options; when creating pure Python blocks from code
            (e.g. for testing), explicitly create the CustomPythonBlock class.
        time_mode (str): One of "discrete" or "agnostic". If "discrete", the block
            step code will be evaluated at peridodic intervals specified by "dt".
            If "agnostic", the block step code will be evaluated once per output
            port evaluation, and the block will not have a discrete time step.
        inputs (List[str]): A list of input variable names. The order of the input
            ports is the same as the order of the input variables.
        outputs (Mapping[str, Tuple[DTypeLike, ShapeLike]]): A dictionary mapping
            output variable names to a tuple of dtype and shape. The order of the
            output ports is the same as the order of the output variables.
        parameters (Mapping[str, Array]): A dictionary mapping parameter names to
            values. Parameters are treated as immutable and cannot be modified in
            the step code. Parameters can be arrays or scalars, but must have static
            shapes and dtypes in order to support JIT compilation.
    """

    def __init__(
        self,
        dt: float = None,
        init_script: str = "",
        user_statements: str = "",
        finalize_script: str = "",  # presently ignored for JAX block
        accelerate_with_jax: bool = True,
        time_mode: str = "discrete",  # [discrete, agnostic]
        inputs: List[str] = None,  # [name]
        outputs: List[str] = None,
        parameters: Mapping[str, Array] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        parameters = parameters or {}

        if time_mode not in ["discrete", "agnostic"]:
            raise BlockInitializationError(
                f"Invalid time mode '{time_mode}' for PythonScript block", system=self
            )

        if time_mode == "discrete" and dt is None:
            raise BlockInitializationError(
                "When in discrete time mode, dt is required for block", system=self
            )

        self.time_mode = time_mode

        if inputs is None:
            inputs = []
        if outputs is None:
            outputs = []

        self.dt = dt

        # Note: 'optimize' level could be lowered in debug mode
        self.init_code = compile(
            init_script, filename="<init>", mode="exec", optimize=2
        )
        self.step_code = compile(
            user_statements, filename="<step>", mode="exec", optimize=2
        )

        if finalize_script != "" and not isinstance(self, CustomPythonBlock):
            raise BlockInitializationError(
                f"PythonScript block '{self.name_path_str}' has finalize_script "
                "but this is not supported at the moment.",
                system=self,
                parameter_name="finalize_script",
            )

        # Declare parameters
        for param_name, value in parameters.items():
            if isinstance(value, list):
                value = cnp.asarray(value)
            as_array = isinstance(value, cnp.ndarray) or cnp.isscalar(value)
            self.declare_parameter(param_name, value, as_array=as_array)

        # Run the init_script
        persistent_env = self.exec_init()

        # Declare an input port for each of the input variables
        self.input_names = inputs
        for name in inputs:
            self.declare_input_port(name)

        # Declare a cache component for each of the output variables
        self._create_cache_type(outputs)

        if time_mode == "discrete":
            self._configure_discrete(dt, outputs, persistent_env)
        else:
            self._configure_agnostic(outputs, persistent_env)

        self.declare_configuration_parameters(
            dt=dt,
            init_script=init_script,
            user_statements=user_statements,
            finalize_script=finalize_script,
            accelerate_with_jax=accelerate_with_jax,
            time_mode=time_mode,
        )

    def _initialize_outputs(self, outputs, persistent_env):
        default_outputs = {name: None for name in outputs}

        for name in outputs:
            # If the initial value is set explicitly in the init script,
            # override the default value.  We don't need to do this for
            # agnostic configuration since the outputs will be calculated
            # every evaluation anyway.
            if name in persistent_env:
                value = cnp.asarray(persistent_env[name])
                default_outputs[name] = value

                # Also update the persistent environment so that the data types
                # are consistent with the state.
                persistent_env[name] = value

            # Otherwise throw an error, since we don't know what the initial values
            # should be, or even what shape/dtype they should have.
            else:
                msg = (
                    f"Output variable '{name}' not explicitly initialized in "
                    "init_script for PythonScript block in 'Discrete' time mode. "
                    "Either initialize the variable as an array with the correct "
                    "shape and dtype, or make the block time mode 'Agnostic'."
                )
                raise PythonScriptError(message=msg, system=self)

        return self.CacheType(
            persistent_env=persistent_env,
            **default_outputs,
        )

    def _configure_discrete(self, dt, outputs, persistent_env):
        default_values = self._initialize_outputs(outputs, persistent_env)

        # The step function acts as a periodic update that will update all components
        # of the discrete state.
        self.step_callback_index = self.declare_cache(
            self.exec_step,
            period=dt,
            offset=dt,
            requires_inputs=True,
            default_value=default_values,
        )

        cache = self.callbacks[self.step_callback_index]

        # Get the index into the state cache (different in general from the index
        # into the callback list, since not all callbacks are cached).
        self.step_cache_index = cache.cache_index

        def _make_callback(o_port_name):
            def _output(time, state, *inputs, **parameters):
                return getattr(state.cache[self.step_cache_index], o_port_name)

            return _output

        # Declare output ports for each state variable
        for o_port_name in outputs:
            self.declare_output_port(
                _make_callback(o_port_name),
                name=o_port_name,
                prerequisites_of_calc=[cache.ticket],
                requires_inputs=False,
                period=dt,
                offset=0.0,
            )

    def _configure_agnostic(self, outputs, persistent_env):
        # Create a callback to evaluate the step code and extract the
        # output. Note that this is inefficient since the step code will
        # be evaluated once _for each output port_, but it's the only way
        # to do this unless (until) we implement some variety of block
        # or function pre-ordering.
        def _make_callback(o_port_name):
            def _output(time, state, *inputs, **parameters):
                xd = self.exec_step(time, state, *inputs, **parameters)
                return getattr(xd, o_port_name)

            return _output

        # Declare output ports for each state variable
        for o_port_name in outputs:
            self.declare_output_port(
                _make_callback(o_port_name),
                name=o_port_name,
                requires_inputs=True,
            )

        # This callback doesn't need to do anything since it's never
        # actually called - the cache here just stores the initial environment
        # and the output ports are evaluated directly.  This should be changed
        # to avoid re-evaluation with multiple output ports once we can do full
        # function ordering.
        def _cache_callback(time, state, *inputs, **parameters):
            return state.cache[self.step_cache_index]

        # Since this is the return type for `exec_step` we have to declare all
        # the output ports as entries in the namedtuple, even though those values
        # won't actually be cached in "agnostic" time mode.  This is just so that
        # both "discrete" and "agnostic" modes can share the same code.
        default_values = self.CacheType(
            persistent_env=persistent_env,
            **{o_port_name: None for o_port_name in outputs},
        )
        self.step_callback_index = self.declare_cache(
            _cache_callback,
            default_value=default_values,
            requires_inputs=False,
            prerequisites_of_calc=[inport.ticket for inport in self.input_ports],
        )

        cache = self.callbacks[self.step_callback_index]
        self.step_cache_index = cache.cache_index

    def _create_cache_type(self, outputs):
        # Store the output ports as a name for type inference and casting
        self.output_names = outputs

        # Also store the dictionary of local environment variables as a cache entry
        # This is the only persistent state of the system (besides outputs) - anything
        # declared in the "step" function will be forgotten at the end of the step

        self.CacheType = namedtuple("CacheType", self.output_names + ["persistent_env"])

    @property
    def local_env_base(self):
        # Define a starting point for the local code execution environment.
        # we have to inclide __main__ so that the code behaves like a module.
        # this allows for code like this:
        #   imports ...
        #   a = 1
        #   def f(b):
        #       return a+b
        #   out_0 = f(2)
        #
        # without getting a 'a not defined' error.
        return {
            "__main__": {},
        }

    def exec_init(self) -> dict[str, Array]:
        # Before executing the step code, we have to build up the local environment.
        # This includes specified modules, python block user defined parameters.

        default_parameters = {
            name: param.get() for name, param in self.default_parameters.items()
        }

        local_env = {
            **self.local_env_base,
            **default_parameters,
        }

        # similar to above where we included __main__ so the code behaves as a module,
        # here we have to pass the local_env with __main__ as 1] globals, since that
        # is what allow the code to be executed as a module. 2] local since that is where
        # the new bindings will be written, that we need to retain since the code in step_code
        # may depend on these bindings.
        exec(self.init_code, local_env, local_env)

        # persistent_env contains bindings for parameters and for values from init_script
        persistent_env, static_env = _filter_non_traceable(local_env)

        # Since this is called during block initialization and not any JIT-compiled code,
        # we can safely store any untraceable variables as block attributes.  For example,
        # this may contain custom functions, classes, etc.
        self.static_env = static_env

        return persistent_env

    def exec_step(self, time: float, state: LeafState, *inputs, **parameters):
        # Before executing the step code, we have to build up the local environment.
        # This includes the persistent variables (anything declared in `init_code`),
        # time, block inputs, user-defined parameters, and specified modules.

        # Retrieve the variables declared in `init_code` from the discrete state
        full_env = state.cache[self.step_cache_index]
        persistent_env = full_env.persistent_env

        # Inputs are in order of port declaration, so they match `self.input_names`
        input_env = dict(zip(self.input_names, inputs))

        # Create a dictionary of all the information that the step function will need
        base_copy = self.local_env_base.copy()
        local_env = {
            **self.static_env,
            **base_copy,
            **persistent_env,
            **input_env,
        }

        # Execute the step code in the local environment
        exec(self.step_code, local_env, local_env)

        # Updated state variables are stored in the local environment
        xd = {name: local_env[name] for name in self.output_names}

        # Store the persistent variables in the corresponding discrete state
        xd["persistent_env"] = {key: local_env[key] for key in persistent_env}

        # Make sure the results have a consistent data type
        for name in self.output_names:
            xd[name] = cnp.asarray(local_env[name])

            # Also make sure the value stored in the persistent environment
            # has the same data type
            if name in persistent_env:
                xd["persistent_env"][name] = xd[name]

        return self.CacheType(**xd)

    def check_types(
        self,
        context: ContextBase,
        error_collector: ErrorCollector = None,
    ):
        """Test-compile the init and step code to check for errors."""
        try:
            jit(self.wrap_callback(self.exec_step))(context)
        except BaseException as exc:
            with ErrorCollector.context(error_collector):
                name_error = _caused_by_nameerror(exc)
                if name_error and name_error.name == "time":
                    raise PythonScriptTimeNotSupportedError(system=self) from exc
                raise PythonScriptError(message=str(exc), system=self) from exc

check_types(context, error_collector=None)

Test-compile the init and step code to check for errors.

Source code in collimator/library/custom.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def check_types(
    self,
    context: ContextBase,
    error_collector: ErrorCollector = None,
):
    """Test-compile the init and step code to check for errors."""
    try:
        jit(self.wrap_callback(self.exec_step))(context)
    except BaseException as exc:
        with ErrorCollector.context(error_collector):
            name_error = _caused_by_nameerror(exc)
            if name_error and name_error.name == "time":
                raise PythonScriptTimeNotSupportedError(system=self) from exc
            raise PythonScriptError(message=str(exc), system=self) from exc

CustomPythonBlock

Bases: CustomJaxBlock

Container for arbitrary user-defined Python code.

Implemented to support legacy PythonScript blocks.

Not traceable (no JIT compilation or autodiff). The internal implementation and behavior of this block differs vastly from the JAX-compatible block as this block stores state directly within the Python instance. Objects and modules can be kept as discrete state.

Note that in "agnostic" mode, the step code will be evaluated once per output port evaluation. Because locally defined environment variables (in the init script) are preserved between evaluations, any mutation of these variables will be preserved. This can lead to unexpected behavior and should be avoided. Stateful behavior should be implemented using discrete state variables instead.

Source code in collimator/library/custom.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
class CustomPythonBlock(CustomJaxBlock):
    """Container for arbitrary user-defined Python code.

    Implemented to support legacy PythonScript blocks.

    Not traceable (no JIT compilation or autodiff). The internal implementation
    and behavior of this block differs vastly from the JAX-compatible block as
    this block stores state directly within the Python instance. Objects
    and modules can be kept as discrete state.

    Note that in "agnostic" mode, the step code will be evaluated _once per
    output port evaluation_. Because locally defined environment variables
    (in the init script) are preserved between evaluations, any mutation of
    these variables will be preserved. This can lead to unexpected behavior
    and should be avoided. Stateful behavior should be implemented using
    discrete state variables instead.
    """

    __exec_fn = _default_exec

    def __init__(
        self,
        dt: float = None,
        init_script: str = "",
        user_statements: str = "",
        finalize_script: str = "",  # presently ignored
        inputs: List[str] = None,  # [name]
        outputs: List[str] = None,
        accelerate_with_jax: bool = False,
        time_mode: str = "discrete",
        parameters: Mapping[str, Array] = None,
        **kwargs,
    ):
        self._parameters = parameters or {}
        self._persistent_env = {}

        # Will populate return type information during static initialization
        self.result_shape_dtypes = None
        self.return_dtypes = None

        super().__init__(
            dt=dt,
            init_script=init_script,
            user_statements=user_statements,
            finalize_script=finalize_script,
            inputs=inputs,
            outputs=outputs,
            accelerate_with_jax=accelerate_with_jax,
            time_mode=time_mode,
            parameters=self._parameters,
            **kwargs,
        )

        if time_mode == "agnostic" and cnp.active_backend == "jax":
            logger.warning(
                "System %s is in agnostic time mode but is not traced with JAX. Be "
                "advised that the step code will be evaluated once per output port "
                "evaluation. Any mutation of the local environment should be strictly "
                "avoided as it will likely lead to unexpected behavior.",
                self.name_path_str,
            )

    @property
    def has_feedthrough_side_effects(self) -> bool:
        # See explanation in `SystemBase.has_ode_side_effects`.
        return self.time_mode == "agnostic"

    @staticmethod
    def set_exec_fn(exec_fn: callable):
        CustomPythonBlock.__exec_fn = exec_fn

    @property
    def local_env_base(self):
        # Define a starting point for the local code execution environment.
        # For now this is just numpy, but we could add other modules as well.
        # @jp FIXME? We probably should just have a clean env here. Importing
        # math, np and such should be done in the user code.
        return {
            "__main__": {},
            "true": True,
            "false": False,
        }

    def exec_init(self) -> None:

        default_parameters = {
            name: param.get() for name, param in self.default_parameters.items()
        }

        local_env = {
            **self.local_env_base,
            **self._parameters,
            **default_parameters,
        }

        exec_fn = functools.partial(
            CustomPythonBlock.__exec_fn,
            code=self.init_code,
            env=local_env,
            logger=logger,
            system=self,
            code_name="init_script",
        )

        try:
            io_callback(exec_fn, None)
        except KeyboardInterrupt as e:
            logger.error(
                "Python block '%s' init script execution was interrupted.", self.name
            )
            raise PythonScriptError(
                message="Python block init script execution was interrupted.",
                system=self,
            ) from e
        except BaseException as e:
            raise PythonScriptError(message=str(e), system=self) from e
        self._persistent_env = local_env

        return None

    def exec_step(self, time, state, *inputs, **parameters):
        logger.debug(
            "Executing step for %s with state=%s, inputs=%s",
            self.name,
            state,
            inputs,
        )

        # Inputs are in order of port declaration, so they match `self.input_names`
        input_env = dict(zip(self.input_names, inputs))

        base_copy = self.local_env_base.copy()
        local_env = {
            **base_copy,
            **self._persistent_env,
        }

        exec_fn = functools.partial(
            CustomPythonBlock.__exec_fn,
            code=self.step_code,
            env=local_env,
            logger=logger,
            return_vars=self.output_names,
            return_dtypes=self.return_dtypes,
            system=self,
            code_name="step_code",
        )

        try:
            return_vars = io_callback(
                exec_fn, self.result_shape_dtypes, inputs=input_env
            )
        except KeyboardInterrupt:
            logger.error(
                "Python block '%s' init script execution was interrupted.", self.name
            )
            raise
        except NameError as e:
            err_msg = (
                f"Python block '{self.name}' step script execution failed with a NameError on"
                + f" missing variable '{e.name}'."
                + " All names used in this script should be declared in the init script."
                + f" The execution environment contains the following names: {', '.join(list(local_env.keys()))}"
            )
            logger.error(err_msg)
            logger.error("NameError: %s", e)
            raise PythonScriptError(message=str(e), system=self) from e
        except BaseException as e:
            logger.error("%s: exec_step failed: %s.", self.name, e)
            raise PythonScriptError(message=str(e), system=self) from e

        # Keep local env for next step but only if defined in init_script
        # NOTE: If this restriction turns out to be counterproductive, we can
        # remove it and remove the NameError handling above as well. The thinking
        # here is that this could help avoiding stuff like `if time == 0: x = 0`
        # See https://collimator.atlassian.net/browse/WC-98
        self._persistent_env = {
            key: local_env[key] for key in self._persistent_env if key in local_env
        }

        # Updated state variables are stored in the local environment
        xd = {name: return_vars[i] for i, name in enumerate(self.output_names)}

        return self.CacheType(persistent_env=None, **xd)

    def _initialize_outputs(self, outputs, _persistent_env):
        # Override the base implemenetation since `persistent_env` will be None
        # in this case. Instead, pass the class attribute where the environment
        # is actually maintained.
        default_outputs = {name: None for name in outputs}
        default_values = self.CacheType(
            persistent_env=self._persistent_env,
            **default_outputs,
        )
        default_values = super()._initialize_outputs(outputs, self._persistent_env)
        default_outputs = default_values._asdict()
        self._persistent_env = default_outputs.pop("persistent_env")

        # Determine return data types
        self._initialize_result_shape_dtypes(
            [default_outputs[output] for output in outputs]
        )

        return self.CacheType(
            persistent_env=None,
            **default_outputs,
        )

    def _initialize_result_shape_dtypes(self, outputs):
        self.result_shape_dtypes = []
        self.return_dtypes = []
        for value in outputs:
            self.result_shape_dtypes.append(
                jax.ShapeDtypeStruct(value.shape, value.dtype)
            )
            self.return_dtypes.append(value.dtype)

    def initialize_static_data(self, context):
        # If in agnostic mode, call the step function once to determine the
        # data types and then store those in result_shape_dtype and return_dtypes.
        context = LeafSystem.initialize_static_data(self, context)

        if self.result_shape_dtypes is not None:
            # These data types are already known (block is in discrete mode)
            return context

        inputs = self.collect_inputs(context)
        input_env = dict(zip(self.input_names, inputs))

        base_copy = self.local_env_base.copy()
        local_env = {
            **base_copy,
            **self._persistent_env,
        }

        # Will not do any type conversion
        return_dtypes = [None for _ in self.output_names]

        exec_fn = functools.partial(
            CustomPythonBlock.__exec_fn,
            self.step_code,
            local_env,
            logger,
            return_vars=self.output_names,
            return_dtypes=return_dtypes,
            system=self,
            code_name="step_code",
        )

        return_vars = exec_fn(inputs=input_env)

        self._initialize_result_shape_dtypes(return_vars)
        return context

    def check_types(
        self,
        context: ContextBase,
        error_collector=None,
    ):
        pass

DataSource

Bases: SourceBlock

Produces outputs from an imported .csv file.

The block's output(s) must be synchronized with simulation time. This can be achieved by two mechanisms:

  1. Each data row in the file is accompanied by a time value. The time value for each row is provided as a column in the data file. For this option, the values in the time column must be strictly increasing, with no duplicates, from the first data row to the last. The block will check that this condition is satisfied at compile time. The column with the time values is identified by the column index. This option assumes the left most column is index 0, counting up to the right. to select this option, set Time samples as column to True, and provide the index of the column.

  2. The time value for each data row is defined using a fixed time step between each row. For this option, the Sampling parameter defines the time step. The block then computes the time values for each data row starting with zero for the first row. Note that by definition, this results in a strictly increasing set. To select this option, set time_samples_as_column to False, and provide the sampling_interval value.

When block output(s) are requested at a simulation time that falls between time values for adjacent data rows, there are two options for how the block should compute the interpolation:

  1. Zero Order Hold: the block returns data from the row with the lower time value.

  2. Linear: the block performs a linear interpolation between the lower and higher time value data rows.

There are several mechanism for selecting which data columns are included in the block output(s). All options are applied using the data_columns parameter:

  1. Column name: enter a string that matches a column name in the header. For this option, header_as_first_row must be set to True. For this option, it is only possible to select a single column for the output. The block will output a scalar.

  2. Column index: enter an integer index for the desired column. This option again assumes the left most column is index 0, counting up to the right. This option assumes the same column index regardless of of whether time_samples_as_column is True or False, therefore it is possible to select the same column for time and output. With this option, the block will output a scalar.

  3. Slice: enter a slice used to identify a set of sequential columns to be used as the desired data for output. The slice works like a NumPy slice. For example, if the file has 10 columns, 3:8 will results in the block returning a vector of length 5, containing, in order, columns 3,4,5,6,7. Note that like NumPy, the second integer in the slice is excluded in the set of indices. Only positive integers are allowed for the slice (e.g. 2:-1, -3:-1, and 3: are not allowed).

Presently, there is only one option for extrapolation beyond the end of data in the file. The block will have reached the end of data if the simulation time is greater than the time value for the last row of data. Once this occurs, the block output(s) will be the values in the last row of data.

Parameters:

Name Type Description Default
file_name str

The name of the imported file which contains the data.

required
header_as_first_row bool

Check this box if the first row is meant to be a header.

False
time_samples_as_column bool

Check this box to select a column form the file to use as the time values. Uncheck it to provide time as a fixed time step between rows.

False
time_column str

Only used when time_samples_as_column is True. This is the index of the column to be used as time.

'0'
sampling_interval float

only used when time_samples_as_column is False. Provide the fixed time step value here.

1.0
data_columns str

Enter name, index, or slice to select columns from the data file.

'1'
extrapolation str

the extrapolation method. One of "hold" or "zero".

'hold'
interpolation str

the interpolation method. One of "zero_order_hold" or "linear".

'zero_order_hold'
Source code in collimator/library/data_source.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class DataSource(SourceBlock):
    """Produces outputs from an imported .csv file.

    The block's output(s) must be synchronized with simulation time. This can be
    achieved by two mechanisms:

    1. Each data row in the file is accompanied by a time value. The time value
        for each row is provided as a column in the data file. For this option,
        the values in the time column must be strictly increasing, with no duplicates,
        from the first data row to the last. The block will check that this condition
        is satisfied at compile time. The column with the time values is identified by
        the column index. This option assumes the left most column is index 0, counting
        up to the right. to select this option, set Time samples as column to True, and
        provide the index of the column.

    2. The time value for each data row is defined using a fixed time step between
        each row. For this option, the Sampling parameter defines the time step.
        The block then computes the time values for each data row starting with zero
        for the first row. Note that by definition, this results in a strictly
        increasing set. To select this option, set `time_samples_as_column` to False,
        and provide the `sampling_interval` value.

    When block output(s) are requested at a simulation time that falls between time
    values for adjacent data rows, there are two options for how the block should
    compute the interpolation:

    1. Zero Order Hold: the block returns data from the row with the lower time value.

    2. Linear: the block performs a linear interpolation between the lower and higher
        time value data rows.

    There are several mechanism for selecting which data columns are included in the
    block output(s). All options are applied using the `data_columns` parameter:

    1. Column name: enter a string that matches a column name in the header. For
        this option, `header_as_first_row` must be set to True. For this option, it
        is only possible to select a single column for the output. The block will
        output a scalar.

    2. Column index: enter an integer index for the desired column. This option
        again assumes the left most column is index 0, counting up to the right. This
        option assumes the same column index regardless of of whether
        `time_samples_as_column` is True or False, therefore it is possible to select
        the same column for time and output. With this option, the block will output
        a scalar.

    3. Slice: enter a slice used to identify a set of sequential columns to be used
        as the desired data for output. The slice works like a NumPy slice. For
        example, if the file has 10 columns, `3:8` will results in the block returning
        a vector of length 5, containing, in order, columns 3,4,5,6,7. Note that
        like NumPy, the second integer in the slice is excluded in the set of
        indices. Only positive integers are allowed for the slice (e.g. `2:-1`,
        `-3:-1`, and `3:` are not allowed).

    Presently, there is only one option for extrapolation beyond the end of data in
    the file. The block will have reached the end of data if the simulation time is
    greater than the time value for the last row of data. Once this occurs, the block
    output(s) will be the values in the last row of data.

    Parameters:
        file_name:
            The name of the imported file which contains the data.
        header_as_first_row:
            Check this box if the first row is meant to be a header.
        time_samples_as_column:
            Check this box to select a column form the file to use as the time values.
            Uncheck it to provide time as a fixed time step between rows.
        time_column:
            Only used when `time_samples_as_column` is True. This is the index of
            the column to be used as time.
        sampling_interval: only used when `time_samples_as_column` is False. Provide
            the fixed time step value here.
        data_columns:
            Enter name, index, or slice to select columns from the data file.
        extrapolation: the extrapolation method.  One of "hold" or "zero".
        interpolation: the interpolation method.  One of "zero_order_hold" or "linear".
    """

    def __init__(
        self,
        file_name: str,
        data_columns: str = "1",  # slice, e.g. 3:4
        extrapolation: str = "hold",
        header_as_first_row: bool = False,
        interpolation: str = "zero_order_hold",
        sampling_interval: float = 1.0,
        time_column: str = "0",  # @am. could be an int
        time_samples_as_column: bool = False,
        **kwargs,
    ):
        # FIXME: legacy compat
        if "data_integration_id" in kwargs:
            del kwargs["data_integration_id"]

        # name = kwargs.get("name", "no name")
        # print(f"DataSource.init(). name={name} fnam={file_name}")

        # Load and preprocess the csv
        if header_as_first_row:
            header = 0
        else:
            header = None
        # print(f"header={header}")
        usecols = self.str2colindices(data_columns)
        # print(f"usecols={usecols}")

        df = pd.read_csv(
            file_name, header=header, skipinitialspace=True, dtype=np.float64
        )

        # print(f"df after read_csv\n{df}")

        if time_samples_as_column:
            index = int(time_column)
            index = df.columns[index]
        else:
            index = self.make_time_col(0.0, sampling_interval, df.shape[0])
        # print(f"index={index}")
        # print(f"df.columns={df.columns}")
        # print(f"df.columns.isin(usecols)={df.columns.isin(usecols)}")

        # for all colmuns that are not requested by user for output,
        # create a filter that can be used to remove these from the dataframe.
        # this ensures we get the correct output dimension, with the correct values.
        # Note, if one of the columns is time samples, we compute this filter before
        # setting the time column as the index of the dataframe because the user
        # may have provided data_columns as indices, and these would be relative to
        # the file including the time column.
        # first check if usecols and df.columns are the same identifiers
        col_filter = df.columns.isin(usecols)
        if not any(col_filter):
            # if usecols is indices, and df.coliumn is names
            # we need to create a filter that first converts usecols
            # to a list of column names
            # but first ensure that usecols range doens't go beyond
            # the number of columns
            # print(f"len(df.columns)={len(df.columns)}")
            if len(usecols) == 1:
                if usecols[0] > len(df.columns) - 1:
                    usecols[0] = len(df.columns) - 1
            else:
                lwr = min(usecols)
                # add 1 here because max(range(1,4))=3, and range(1,3) would be incorrect.
                upr = max(usecols) + 1
                if upr > len(df.columns) + 1:
                    upr = len(df.columns) + 1
                usecols = range(lwr, upr)

            # print(f"usecols={usecols}")
            col_filter = df.columns.isin(df.columns[usecols])

        # print(f"col_filter={col_filter} before removing time col if its in there.")

        # now that we have computed the filter based on orignal col indices of the file
        # we can move the time col to the dataframe index if that is what was requested
        df.set_index(index, inplace=True)
        # print(f"df after set index\n{df}")

        if time_samples_as_column:
            # if the time col was in the file, and it was used as the dataframe index
            # it would have been part of the filter, but we have since removed the column,
            # this means we need to remove that element of the filter.
            # del col_filter[int(time_column)]
            col_filter = np.delete(col_filter, int(time_column))
            # print(f"col_filter={col_filter} after removing time")

        df = df.loc[:, col_filter]

        # print(f"df when done\n{df}")

        times = cnp.array(df.index.to_numpy())
        data = cnp.array(df.to_numpy())
        max_i_zoh = len(times) - 1
        max_i_interp = len(times) - 2
        output_dim = data.shape[1]
        self._scalar_output = output_dim == 1
        # print(f"self._scalar_output ={self._scalar_output }")

        def get_below_row_idx(time, max_i):
            """
            first we clip the value of 'time' so that it falls inside the
            range of 'times'. this ensures we dont get strange extrapolation behavior.
            then, find the index of 'times' row value that is largest but still smaller
            than 'time'. we use this to locate the rows in 'times' that bound 'time'.
            """
            time_clipped = cnp.clip(time, times[0], times[-1])
            i = cnp.argmin(time_clipped >= times) - 1
            return i, time_clipped

        def _func_zoh(time):
            i, time_clipped = get_below_row_idx(time, max_i_zoh)
            return cnp.where(
                extrapolation == "zero" and time > times[-1],
                cnp.zeros(output_dim),
                data[i, :],
            )

        def _func_interp(time):
            """
            the second lambda function does this:
            y = (yp2-yp1)/(xp2-xp1)*(x-xp1) + yp1
            but does so by operating on the arrays
            ap1 and ap2 which provide the yp1's and yp2's.
            the xp1's and xp2's are time values.
            """
            i, time_clipped = get_below_row_idx(time, max_i_interp)
            ap1 = data[i, :]
            ap2 = data[i + 1, :]

            return cnp.where(
                extrapolation == "zero" and time > times[-1],
                cnp.zeros(output_dim),
                (ap2 - ap1) / (times[i + 1] - times[i]) * (time_clipped - times[i])
                + ap1,
            )

        # wrap output function to return scalar when only one column selected.
        def _wrap_func(_func):
            def _wrapped_func(time):
                output = _func(time)
                return output[0]

            return _wrapped_func

        if interpolation == "zero_order_hold":
            _func = _func_zoh
        else:
            _func = _func_interp

        if self._scalar_output:
            _func = _wrap_func(_func)

        super().__init__(_func, **kwargs)

        if data.size == 0:
            raise ValueError(
                f"DataSource {self.name} did not succeed in getting the requested data columns."
            )

        self.declare_configuration_parameters(
            file_name=file_name,
            data_columns=data_columns,
            extrapolation=extrapolation,
            header_as_first_row=header_as_first_row,
            interpolation=interpolation,
            sampling_interval=sampling_interval,
            time_column=time_column,
            time_samples_as_column=time_samples_as_column,
        )

    def make_time_col(self, start, step, n):
        """
        this function computes the time vector from sample
        interval when this is requested by user
        """
        return np.array([i * step + start for i in range(n)])

    def str2colindices(self, s):
        """
        this function parses the data_columns arg and returns
        a range, list, as appropriate to filter for the requested
        columns.
        """
        slice_thing = s.split(":")
        list_thing = s.split(",")
        if s.isdigit():
            # user provided a single index as integer
            index = int(s)
            return [index]
        elif len(slice_thing) == 2:
            start = slice_thing[0]
            end = slice_thing[1]
            if start.isdigit() and end.isdigit():
                # user provided a slice, e.g. 2:4
                return range(int(start), int(end))
            else:
                raise ValueError(f"DataSource data_columns={s} is not a proper slice")
        elif len(list_thing) > 1:
            is_list_of_digits = [i.isdigit() for i in list_thing]
            if all(is_list_of_digits):
                # user prvied a list of indices as integers
                return [int(i) for i in list_thing]
            elif any(is_list_of_digits):
                raise ValueError(
                    f"DataSource data_columns={s} is not a list of either ints or col names. mixting not allowed."
                )
            else:
                # user maybe provided a list of column names
                # we cant know if any or none of the provided names
                # match until we load the csv
                return list_thing

        elif len(slice_thing) == 1 and len(list_thing) == 1:
            # might be a single col name, e.g. 'c1'
            # again, we dont know if it matches the column names
            # until we load the csv
            return [s]
        else:
            raise ValueError(f"DataSource data_columns={s} is not comprehensible")

__init__(file_name, data_columns='1', extrapolation='hold', header_as_first_row=False, interpolation='zero_order_hold', sampling_interval=1.0, time_column='0', time_samples_as_column=False, **kwargs)

Source code in collimator/library/data_source.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def __init__(
    self,
    file_name: str,
    data_columns: str = "1",  # slice, e.g. 3:4
    extrapolation: str = "hold",
    header_as_first_row: bool = False,
    interpolation: str = "zero_order_hold",
    sampling_interval: float = 1.0,
    time_column: str = "0",  # @am. could be an int
    time_samples_as_column: bool = False,
    **kwargs,
):
    # FIXME: legacy compat
    if "data_integration_id" in kwargs:
        del kwargs["data_integration_id"]

    # name = kwargs.get("name", "no name")
    # print(f"DataSource.init(). name={name} fnam={file_name}")

    # Load and preprocess the csv
    if header_as_first_row:
        header = 0
    else:
        header = None
    # print(f"header={header}")
    usecols = self.str2colindices(data_columns)
    # print(f"usecols={usecols}")

    df = pd.read_csv(
        file_name, header=header, skipinitialspace=True, dtype=np.float64
    )

    # print(f"df after read_csv\n{df}")

    if time_samples_as_column:
        index = int(time_column)
        index = df.columns[index]
    else:
        index = self.make_time_col(0.0, sampling_interval, df.shape[0])
    # print(f"index={index}")
    # print(f"df.columns={df.columns}")
    # print(f"df.columns.isin(usecols)={df.columns.isin(usecols)}")

    # for all colmuns that are not requested by user for output,
    # create a filter that can be used to remove these from the dataframe.
    # this ensures we get the correct output dimension, with the correct values.
    # Note, if one of the columns is time samples, we compute this filter before
    # setting the time column as the index of the dataframe because the user
    # may have provided data_columns as indices, and these would be relative to
    # the file including the time column.
    # first check if usecols and df.columns are the same identifiers
    col_filter = df.columns.isin(usecols)
    if not any(col_filter):
        # if usecols is indices, and df.coliumn is names
        # we need to create a filter that first converts usecols
        # to a list of column names
        # but first ensure that usecols range doens't go beyond
        # the number of columns
        # print(f"len(df.columns)={len(df.columns)}")
        if len(usecols) == 1:
            if usecols[0] > len(df.columns) - 1:
                usecols[0] = len(df.columns) - 1
        else:
            lwr = min(usecols)
            # add 1 here because max(range(1,4))=3, and range(1,3) would be incorrect.
            upr = max(usecols) + 1
            if upr > len(df.columns) + 1:
                upr = len(df.columns) + 1
            usecols = range(lwr, upr)

        # print(f"usecols={usecols}")
        col_filter = df.columns.isin(df.columns[usecols])

    # print(f"col_filter={col_filter} before removing time col if its in there.")

    # now that we have computed the filter based on orignal col indices of the file
    # we can move the time col to the dataframe index if that is what was requested
    df.set_index(index, inplace=True)
    # print(f"df after set index\n{df}")

    if time_samples_as_column:
        # if the time col was in the file, and it was used as the dataframe index
        # it would have been part of the filter, but we have since removed the column,
        # this means we need to remove that element of the filter.
        # del col_filter[int(time_column)]
        col_filter = np.delete(col_filter, int(time_column))
        # print(f"col_filter={col_filter} after removing time")

    df = df.loc[:, col_filter]

    # print(f"df when done\n{df}")

    times = cnp.array(df.index.to_numpy())
    data = cnp.array(df.to_numpy())
    max_i_zoh = len(times) - 1
    max_i_interp = len(times) - 2
    output_dim = data.shape[1]
    self._scalar_output = output_dim == 1
    # print(f"self._scalar_output ={self._scalar_output }")

    def get_below_row_idx(time, max_i):
        """
        first we clip the value of 'time' so that it falls inside the
        range of 'times'. this ensures we dont get strange extrapolation behavior.
        then, find the index of 'times' row value that is largest but still smaller
        than 'time'. we use this to locate the rows in 'times' that bound 'time'.
        """
        time_clipped = cnp.clip(time, times[0], times[-1])
        i = cnp.argmin(time_clipped >= times) - 1
        return i, time_clipped

    def _func_zoh(time):
        i, time_clipped = get_below_row_idx(time, max_i_zoh)
        return cnp.where(
            extrapolation == "zero" and time > times[-1],
            cnp.zeros(output_dim),
            data[i, :],
        )

    def _func_interp(time):
        """
        the second lambda function does this:
        y = (yp2-yp1)/(xp2-xp1)*(x-xp1) + yp1
        but does so by operating on the arrays
        ap1 and ap2 which provide the yp1's and yp2's.
        the xp1's and xp2's are time values.
        """
        i, time_clipped = get_below_row_idx(time, max_i_interp)
        ap1 = data[i, :]
        ap2 = data[i + 1, :]

        return cnp.where(
            extrapolation == "zero" and time > times[-1],
            cnp.zeros(output_dim),
            (ap2 - ap1) / (times[i + 1] - times[i]) * (time_clipped - times[i])
            + ap1,
        )

    # wrap output function to return scalar when only one column selected.
    def _wrap_func(_func):
        def _wrapped_func(time):
            output = _func(time)
            return output[0]

        return _wrapped_func

    if interpolation == "zero_order_hold":
        _func = _func_zoh
    else:
        _func = _func_interp

    if self._scalar_output:
        _func = _wrap_func(_func)

    super().__init__(_func, **kwargs)

    if data.size == 0:
        raise ValueError(
            f"DataSource {self.name} did not succeed in getting the requested data columns."
        )

    self.declare_configuration_parameters(
        file_name=file_name,
        data_columns=data_columns,
        extrapolation=extrapolation,
        header_as_first_row=header_as_first_row,
        interpolation=interpolation,
        sampling_interval=sampling_interval,
        time_column=time_column,
        time_samples_as_column=time_samples_as_column,
    )

make_time_col(start, step, n)

this function computes the time vector from sample interval when this is requested by user

Source code in collimator/library/data_source.py
267
268
269
270
271
272
def make_time_col(self, start, step, n):
    """
    this function computes the time vector from sample
    interval when this is requested by user
    """
    return np.array([i * step + start for i in range(n)])

str2colindices(s)

this function parses the data_columns arg and returns a range, list, as appropriate to filter for the requested columns.

Source code in collimator/library/data_source.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def str2colindices(self, s):
    """
    this function parses the data_columns arg and returns
    a range, list, as appropriate to filter for the requested
    columns.
    """
    slice_thing = s.split(":")
    list_thing = s.split(",")
    if s.isdigit():
        # user provided a single index as integer
        index = int(s)
        return [index]
    elif len(slice_thing) == 2:
        start = slice_thing[0]
        end = slice_thing[1]
        if start.isdigit() and end.isdigit():
            # user provided a slice, e.g. 2:4
            return range(int(start), int(end))
        else:
            raise ValueError(f"DataSource data_columns={s} is not a proper slice")
    elif len(list_thing) > 1:
        is_list_of_digits = [i.isdigit() for i in list_thing]
        if all(is_list_of_digits):
            # user prvied a list of indices as integers
            return [int(i) for i in list_thing]
        elif any(is_list_of_digits):
            raise ValueError(
                f"DataSource data_columns={s} is not a list of either ints or col names. mixting not allowed."
            )
        else:
            # user maybe provided a list of column names
            # we cant know if any or none of the provided names
            # match until we load the csv
            return list_thing

    elif len(slice_thing) == 1 and len(list_thing) == 1:
        # might be a single col name, e.g. 'c1'
        # again, we dont know if it matches the column names
        # until we load the csv
        return [s]
    else:
        raise ValueError(f"DataSource data_columns={s} is not comprehensible")

DeadZone

Bases: FeedthroughBlock

Generates zero output within a specified range.

Applies the following function:

         [ input,       input < -half_range
output = | 0,           -half_range <= input <= half_range
         [ input        input > half_range

Parameters:

Name Type Description Default
half_range

The range of the dead zone. Must be > 0.

1.0
Input ports

(0) The input signal.

Output ports

(0) The input signal modified by the dead zone.

Events

An event is triggered when the signal enters or exits the dead zone in either direction.

Source code in collimator/library/primitives.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
class DeadZone(FeedthroughBlock):
    """Generates zero output within a specified range.

    Applies the following function:
    ```
             [ input,       input < -half_range
    output = | 0,           -half_range <= input <= half_range
             [ input        input > half_range
    ```

    Parameters:
        half_range: The range of the dead zone.  Must be > 0.

    Input ports:
        (0) The input signal.

    Output ports:
        (0) The input signal modified by the dead zone.

    Events:
        An event is triggered when the signal enters or exits the dead zone
        in either direction.
    """

    def __init__(self, half_range=1.0, **kwargs):
        super().__init__(
            self._dead_zone, parameters={"half_range": half_range}, **kwargs
        )
        if half_range <= 0:
            name = kwargs.get("name", self.__class__.__name__)
            raise BlockParameterError(
                message=f"DeadZone block {name} has invalid half_range {half_range}. Must be > 0.",
                system=self,
                parameter_name="half_range",
            )

    def _dead_zone(self, x, **params):
        return cnp.where(abs(x) < params["half_range"], x * 0, x)

    def _lower_limit_event_value(self, _time, _state, *inputs, **params):
        (u,) = inputs
        return u + params["half_range"]

    def _upper_limit_event_value(self, _time, _state, *inputs, **params):
        (u,) = inputs
        return u - params["half_range"]

    def initialize_static_data(self, context):
        # Add zero-crossing events so ODE solvers can't try to integrate
        # through a discontinuity.
        if not self.has_zero_crossing_events and (self.output_ports[0]):
            self.declare_zero_crossing(
                self._lower_limit_event_value, direction="crosses_zero"
            )
            self.declare_zero_crossing(
                self._upper_limit_event_value, direction="crosses_zero"
            )

        return super().initialize_static_data(context)

Demultiplexer

Bases: LeafSystem

Split a vector signal into its components.

Input ports

(0) The vector signal to split.

Output ports

(0..n_out-1) The components of the input signal.

Source code in collimator/library/primitives.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
class Demultiplexer(LeafSystem):
    """Split a vector signal into its components.

    Input ports:
        (0) The vector signal to split.

    Output ports:
        (0..n_out-1) The components of the input signal.
    """

    def __init__(self, n_out, **kwargs):
        super().__init__(**kwargs)

        self.declare_input_port()

        # Need a helper function so that the lambda captures the correct value of i
        # and doesn't use something that ends up fixed in scope.
        def _declare_output(i):
            def _compute_output(_time, _state, *inputs, **_params):
                (input_vec,) = inputs
                return input_vec[i]

            self.declare_output_port(
                _compute_output,
                prerequisites_of_calc=[self.input_ports[0].ticket],
            )

        for i in cnp.arange(n_out):
            _declare_output(i)

Derivative

Bases: LTISystem

Causal estimate of the derivative of a signal in continuous time.

This is implemented as a state-space system with matrices (A, B, C, D), which are then used to create a (first-order) LTISystem. Note that this only supports single-input, single-output derivative blocks.

The derivative is implemented as a filter with a filter coefficient of N, which is used to construct the following proper transfer function:

    H(s) = Ns / (s + N)

As N -> ∞, the transfer function approaches a pure differentiator. However, this system becomes increasingly stiff and difficult to integrate, so it is recommended to select a value of N based on the time scales of the system.

From the transfer function, scipy.signal.tf2ss is used to convert to state-space form and create an LTISystem.

Input ports

(0) u: Input (scalar)

Output ports

(0) y: Output (scalar), estimating the time derivative du/dt

Source code in collimator/library/linear_system.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
class Derivative(LTISystem):
    """Causal estimate of the derivative of a signal in continuous time.

    This is implemented as a state-space system with matrices (A, B, C, D),
    which are then used to create a (first-order) LTISystem.  Note that this
    only supports single-input, single-output derivative blocks.

    The derivative is implemented as a filter with a filter coefficient of `N`,
    which is used to construct the following proper transfer function:
    ```
        H(s) = Ns / (s + N)
    ```
    As N -> ∞, the transfer function approaches a pure differentiator.  However,
    this system becomes increasingly stiff and difficult to integrate, so it is
    recommended to select a value of N based on the time scales of the system.

    From the transfer function, `scipy.signal.tf2ss` is used to convert to
    state-space form and create an LTISystem.

    Input ports:
        (0) u: Input (scalar)

    Output ports:
        (0) y: Output (scalar), estimating the time derivative du/dt
    """

    def __init__(self, filter_coefficient=100, *args, **kwargs):
        N = filter_coefficient
        num = [N, 0]
        den = [1, N]
        A, B, C, D = signal.tf2ss(num, den)
        super().__init__(A, B, C, D, *args, **kwargs)
        self.declare_parameter("filter_coefficient", filter_coefficient)

    def check_types(
        self,
        context,
        error_collector: ErrorCollector = None,
    ):
        inputs = self.collect_inputs(context)
        (u,) = inputs

        if not cnp.ndim(u) == 0:
            with ErrorCollector.context(error_collector):
                raise StaticError(
                    message="Derivative must have scalar input.",
                    system=self,
                )

DerivativeDiscrete

Bases: LeafSystem

Discrete approximation to the derivative of the input signal w.r.t. time.'

By default the block uses a simple backward difference approximation:

y[k] = (u[k] - u[k-1]) / dt

However, the block can also be configured to use a recursive filter for a better approximation. In this case the filter coefficients are determined by the filter_type and filter_coefficient parameters. The filter is a pair of two-element arrays a and b and the filter equation is:

a0*y[k] + a1*y[k-1] = b0*u[k] + b1*u[k-1]

Denoting the filter_coefficient parameter by N, the following filters are available: - "none": The default, a simple finite difference approximation. - "forward": A filtered forward Euler discretization. The filter is: a = [1, (N*dt - 1)] and b = [N, -N]. - "backward": A filtered backward Euler discretization. The filter is: a = [(1 + N*dt), -1] and b = [N, -N]. - "bilinear": A filtered bilinear transform discretization. The filter is: a = [(2 + N*dt), (-2 + N*dt)] and b = [2*N, -2*N].

Input ports

(0) The input signal.

Output ports

(0) The approximate derivative of the input signal.

Parameters:

Name Type Description Default
dt

The time step of the discrete approximation.

required
filter_type

One of "none", "forward", "backward", or "bilinear". This determines the type of filter used to approximate the derivative. The default is "none", corresponding to a simple backward difference approximation.

'none'
filter_coefficient

The coefficient in the filter (N in the equations above). This is only used if filter_type is not "none". The default is 1.0.

1.0
Source code in collimator/library/primitives.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
class DerivativeDiscrete(LeafSystem):
    """Discrete approximation to the derivative of the input signal w.r.t. time.'

    By default the block uses a simple backward difference approximation:
    ```
    y[k] = (u[k] - u[k-1]) / dt
    ```
    However, the block can also be configured to use a recursive filter for a
    better approximation. In this case the filter coefficients are determined
    by the `filter_type` and `filter_coefficient` parameters. The filter is
    a pair of two-element arrays `a` and `b` and the filter equation is:
    ```
    a0*y[k] + a1*y[k-1] = b0*u[k] + b1*u[k-1]
    ```

    Denoting the `filter_coefficient` parameter by `N`, the following filters are
    available:
    - "none": The default, a simple finite difference approximation.
    - "forward": A filtered forward Euler discretization. The filter is:
        `a = [1, (N*dt - 1)]` and `b = [N, -N]`.
    - "backward": A filtered backward Euler discretization. The filter is:
        `a = [(1 + N*dt), -1]` and `b = [N, -N]`.
    - "bilinear": A filtered bilinear transform discretization. The filter is:
        `a = [(2 + N*dt), (-2 + N*dt)]` and `b = [2*N, -2*N]`.

    Input ports:
        (0) The input signal.

    Output ports:
        (0) The approximate derivative of the input signal.

    Parameters:
        dt:
            The time step of the discrete approximation.
        filter_type:
            One of "none", "forward", "backward", or "bilinear". This determines the
            type of filter used to approximate the derivative. The default is "none",
            corresponding to a simple backward difference approximation.
        filter_coefficient:
            The coefficient in the filter (`N` in the equations above). This is only
            used if `filter_type` is not "none". The default is 1.0.
    """

    def __init__(self, dt, filter_type="none", filter_coefficient=1.0, **kwargs):
        super().__init__(**kwargs)
        self.dt = dt

        self.declare_configuration_parameters(
            filter_type=filter_type,
            filter_coefficient=filter_coefficient,
        )

        # Determine the coefficients of the filter, if applicable
        # The filter is a pair of two-element array and the filter
        # equation is:
        # a0*y[k] + a1*y[k-1] = b0*u[k] + b1*u[k-1]
        self.filter = derivative_filter(
            N=filter_coefficient, dt=dt, filter_type=filter_type
        )

        self.declare_input_port()

        self.declare_discrete_state(default_value=None, as_array=False)

        self.declare_periodic_update(
            self._update,
            period=dt,
            offset=0.0,
        )

        self.deriv_output = self.declare_output_port(
            self._output,
            period=dt,
            offset=0.0,
            prerequisites_of_calc=[self.input_ports[0].ticket],
        )

    def _output(self, _time, state, *inputs, **_params):
        # Compute the filtered derivative estimate
        (u,) = inputs
        b, a = self.filter
        y_prev = state.cache[self.deriv_output]
        u_prev = state.discrete_state
        y = (b[0] * u + b[1] * u_prev - a[1] * y_prev) / a[0]
        return y

    def _update(self, time, state, u, **params):
        # Every dt seconds, update the state to the current values
        return u

    def initialize_static_data(self, context):
        """Infer the size and dtype of the internal states"""
        # If building as part of a subsystem, this may not be fully connected yet.
        # That's fine, as long as it is connected by root context creation time.
        # This probably isn't a good long-term solution:
        #   see https://collimator.atlassian.net/browse/WC-51
        try:
            u = self.eval_input(context)
            self._default_discrete_state = u
            self._default_cache[self.deriv_output] = u
            local_context = context[self.system_id].with_discrete_state(u)
            local_context = local_context.with_cached_value(self.deriv_output, u)
            context = context.with_subcontext(self.system_id, local_context)

        except UpstreamEvalError:
            logger.debug(
                "DerivativeDiscrete.initialize_static_data: UpstreamEvalError. "
                "Continuing without default value initialization."
            )
        return super().initialize_static_data(context)

initialize_static_data(context)

Infer the size and dtype of the internal states

Source code in collimator/library/primitives.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def initialize_static_data(self, context):
    """Infer the size and dtype of the internal states"""
    # If building as part of a subsystem, this may not be fully connected yet.
    # That's fine, as long as it is connected by root context creation time.
    # This probably isn't a good long-term solution:
    #   see https://collimator.atlassian.net/browse/WC-51
    try:
        u = self.eval_input(context)
        self._default_discrete_state = u
        self._default_cache[self.deriv_output] = u
        local_context = context[self.system_id].with_discrete_state(u)
        local_context = local_context.with_cached_value(self.deriv_output, u)
        context = context.with_subcontext(self.system_id, local_context)

    except UpstreamEvalError:
        logger.debug(
            "DerivativeDiscrete.initialize_static_data: UpstreamEvalError. "
            "Continuing without default value initialization."
        )
    return super().initialize_static_data(context)

DirectShootingNMPC

Bases: NonlinearMPCIpopt

Implementation of nonlinear MPC with a direct shooting transcription and IPOPT as the NLP solver.

Input ports

(0) x_0 : current state vector. (1) x_ref : reference state trajectory for the nonlinear MPC. (2) u_ref : reference input trajectory for the nonlinear MPC.

Output ports

(1) u_opt : the optimal control input to be applied at the current time step as determined by the nonlinear MPC.

Parameters:

Name Type Description Default
plant

LeafSystem or Diagram The plant to be controlled.

required
Q

Array State weighting matrix in the cost function.

required
QN

Array Terminal state weighting matrix in the cost function.

required
R

Array Control input weighting matrix in the cost function.

required
N

int The prediction horizon, an integer specifying the number of steps to predict. Note: prediction and control horizons are identical for now.

required
nh

int Number of minor steps to take within an RK4 major step.

required
dt

float: Major time step, a scalar indicating the increment in time for each step in the prediction and control horizons.

required
lb_u

Array Lower bound on the control input vector.

None
ub_u

Array Upper bound on the control input vector.

None
u_optvars_0

Array Initial guess for the control vector optimization variables in the NLP.

None
Source code in collimator/library/nmpc/direct_shooting_ipopt_nmpc.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class DirectShootingNMPC(NonlinearMPCIpopt):
    """
    Implementation of nonlinear MPC with a direct shooting transcription and IPOPT as
    the NLP solver.

    Input ports:
        (0) x_0 : current state vector.
        (1) x_ref : reference state trajectory for the nonlinear MPC.
        (2) u_ref : reference input trajectory for the nonlinear MPC.

    Output ports:
        (1) u_opt : the optimal control input to be applied at the current time step
                    as determined by the nonlinear MPC.

    Parameters:
        plant: LeafSystem or Diagram
            The plant to be controlled.

        Q: Array
            State weighting matrix in the cost function.

        QN: Array
            Terminal state weighting matrix in the cost function.

        R: Array
            Control input weighting matrix in the cost function.

        N: int
            The prediction horizon, an integer specifying the number of steps to
            predict. Note: prediction and control horizons are identical for now.

        nh: int
            Number of minor steps to take within an RK4 major step.

        dt: float:
            Major time step, a scalar indicating the increment in time for each step in
            the prediction and control horizons.

        lb_u: Array
            Lower bound on the control input vector.

        ub_u: Array
            Upper bound on the control input vector.

        u_optvars_0: Array
            Initial guess for the control vector optimization variables in the NLP.
    """

    def __init__(
        self,
        plant,
        Q,
        QN,
        R,
        N,
        nh,
        dt,
        lb_u=None,
        ub_u=None,
        u_optvars_0=None,
        name=None,
    ):
        self.plant = plant

        self.Q = Q
        self.QN = QN
        self.R = R

        self.N = N
        self.nh = nh
        self.dt = dt

        self.lb_u = lb_u
        self.ub_u = ub_u

        self.nx = Q.shape[0]
        self.nu = R.shape[0]

        if lb_u is None:
            self.lb_u = -1e20 * jnp.ones(self.nu)

        if ub_u is None:
            self.ub_u = 1e20 * jnp.ones(self.nu)

        # Currently guesses are not taken into account
        self.u_optvars_0 = u_optvars_0  # Currently does nothing
        if u_optvars_0 is None:
            u_optvars_0 = jnp.zeros((N, self.nu))

        self.ode_rhs = make_ode_rhs(plant, self.nu)

        nlp_structure_ipopt = NMPCProblemStructure(
            self.num_optvars,
            self._objective,
        )

        super().__init__(
            dt,
            self.nu,
            self.num_optvars,
            nlp_structure_ipopt,
            name=name,
        )

    @property
    def num_optvars(self):
        return self.N * self.nu

    @property
    def num_constraints(self):
        return 0

    @property
    def bounds_optvars(self):
        lb = jnp.tile(self.lb_u, self.N)
        ub = jnp.tile(self.ub_u, self.N)
        return (lb, ub)

    @property
    def bounds_constraints(self):
        c_lb = []
        c_ub = []
        return (c_lb, c_ub)

    @partial(jax.jit, static_argnames=("self",))
    def _objective(self, optvars, t0, x0, x_ref, u_ref):
        u_flat = optvars
        u = jnp.array(u_flat.reshape((self.N, self.nu)))

        x = jnp.zeros((self.N + 1, x0.size))
        x = x.at[0].set(x0)

        def _update_function(idx, x):
            t_major_start = t0 + self.dt * idx
            x_current = x[idx]
            u_current = u[idx]
            x_next = rk4_major_step_constant_u(
                t_major_start,
                x_current,
                u_current,
                self.dt,
                self.nh,
                self.ode_rhs,
            )
            return x.at[idx + 1].set(x_next)

        x = jax.lax.fori_loop(0, self.N, _update_function, x)

        xdiff = x - x_ref
        udiff = u - u_ref

        # compute sum of quadratic products for x_0 to x_{N-1}
        A = jnp.dot(xdiff[:-1], self.Q)
        qp_x_sum = jnp.sum(xdiff[:-1] * A, axis=None)

        # Compute quadratic product for the x_N
        xN = xdiff[-1]
        qp_x_N = jnp.dot(xN, jnp.dot(self.QN, xN))

        # compute sum of quadratic products for u_0 to u_{N-1}
        B = jnp.dot(udiff, self.R)
        qp_u_sum = jnp.sum(udiff * B, axis=None)

        # Sum the quadratic products
        total_sum = qp_x_sum + qp_x_N + qp_u_sum
        return total_sum

DirectTranscriptionNMPC

Bases: NonlinearMPCIpopt

Implementation of nonlinear MPC with direct transcription and IPOPT as the NLP solver.

Input ports

(0) x_0 : current state vector. (1) x_ref : reference state trajectory for the nonlinear MPC. (2) u_ref : reference input trajectory for the nonlinear MPC.

Output ports

(1) u_opt : the optimal control input to be applied at the current time step as determined by the nonlinear MPC.

Parameters:

Name Type Description Default
plant

LeafSystem or Diagram The plant to be controlled.

required
Q

Array State weighting matrix in the cost function.

required
QN

Array Terminal state weighting matrix in the cost function.

required
R

Array Control input weighting matrix in the cost function.

required
N

int The prediction horizon, an integer specifying the number of steps to predict. Note: prediction and control horizons are identical for now.

required
nh

int Number of minor steps to take within an RK4 major step.

required
dt

float: Major time step, a scalar indicating the increment in time for each step in the prediction and control horizons.

required
lb_x

Array Lower bound on the state vector.

None
ub_x

Array Upper bound on the state vector.

None
lb_u

Array Lower bound on the control input vector.

None
ub_u

Array Upper bound on the control input vector.

None
x_optvars_0

Array Initial guess for the state vector optimization variables in the NLP.

None
u_optvars_0

Array Initial guess for the control vector optimization variables in the NLP.

None
Source code in collimator/library/nmpc/direct_transcription_ipopt_nmpc.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class DirectTranscriptionNMPC(NonlinearMPCIpopt):
    """
    Implementation of nonlinear MPC with direct transcription and IPOPT as the NLP
    solver.

    Input ports:
        (0) x_0 : current state vector.
        (1) x_ref : reference state trajectory for the nonlinear MPC.
        (2) u_ref : reference input trajectory for the nonlinear MPC.

    Output ports:
        (1) u_opt : the optimal control input to be applied at the current time step
                    as determined by the nonlinear MPC.

    Parameters:
        plant: LeafSystem or Diagram
            The plant to be controlled.

        Q: Array
            State weighting matrix in the cost function.

        QN: Array
            Terminal state weighting matrix in the cost function.

        R: Array
            Control input weighting matrix in the cost function.

        N: int
            The prediction horizon, an integer specifying the number of steps to
            predict. Note: prediction and control horizons are identical for now.

        nh: int
            Number of minor steps to take within an RK4 major step.

        dt: float:
            Major time step, a scalar indicating the increment in time for each step in
            the prediction and control horizons.

        lb_x: Array
            Lower bound on the state vector.

        ub_x: Array
            Upper bound on the state vector.

        lb_u: Array
            Lower bound on the control input vector.

        ub_u: Array
            Upper bound on the control input vector.

        x_optvars_0: Array
            Initial guess for the state vector optimization variables in the NLP.

        u_optvars_0: Array
            Initial guess for the control vector optimization variables in the NLP.
    """

    def __init__(
        self,
        plant,
        Q,
        QN,
        R,
        N,
        nh,
        dt,
        lb_x=None,
        ub_x=None,
        lb_u=None,
        ub_u=None,
        x_optvars_0=None,
        u_optvars_0=None,
        name=None,
    ):
        self.plant = plant

        self.Q = Q
        self.QN = QN
        self.R = R

        self.N = N
        self.nh = nh
        self.dt = dt

        self.lb_x = lb_x
        self.ub_x = ub_x
        self.lb_u = lb_u
        self.ub_u = ub_u

        self.nx = Q.shape[0]
        self.nu = R.shape[0]

        if lb_x is None:
            self.lb_x = -1e20 * jnp.ones(self.nx)

        if ub_x is None:
            self.ub_x = 1e20 * jnp.ones(self.nx)

        if lb_u is None:
            self.lb_u = -1e20 * jnp.ones(self.nu)

        if ub_u is None:
            self.ub_u = 1e20 * jnp.ones(self.nu)

        # Currently guesses are not taken into account
        self.x_optvars_0 = x_optvars_0  # Currently does nothing
        self.u_optvars_0 = u_optvars_0  # Currently does nothing
        if x_optvars_0 is None:
            x_optvars_0 = jnp.zeros((N + 1, self.nx))
        if u_optvars_0 is None:
            u_optvars_0 = jnp.zeros((N, self.nu))

        self.ode_rhs = make_ode_rhs(plant, self.nu)

        nlp_structure_ipopt = NMPCProblemStructure(
            self.num_optvars,
            self._objective,
            self._constraints,
        )

        super().__init__(
            dt,
            self.nu,
            self.num_optvars,
            nlp_structure_ipopt,
            name=name,
        )

    @property
    def num_optvars(self):
        return (self.N + 1) * self.nx + self.N * self.nu

    @property
    def num_constraints(self):
        return (self.N + 1) * self.nx

    @property
    def bounds_optvars(self):
        lb = jnp.hstack([jnp.tile(self.lb_u, self.N), jnp.tile(self.lb_x, self.N + 1)])
        ub = jnp.hstack([jnp.tile(self.ub_u, self.N), jnp.tile(self.ub_x, self.N + 1)])
        return (lb, ub)

    @property
    def bounds_constraints(self):
        c_lb = jnp.zeros(self.num_constraints)
        c_ub = jnp.zeros(self.num_constraints)
        return (c_lb, c_ub)

    @partial(jax.jit, static_argnames=("self",))
    def _objective(self, optvars, t0, x0, x_ref, u_ref):
        u_and_x_flat = optvars

        u = u_and_x_flat[: self.nu * self.N].reshape((self.N, self.nu))
        x = u_and_x_flat[self.nu * self.N :].reshape((self.N + 1, self.nx))

        xdiff = x - x_ref
        udiff = u - u_ref

        # compute sum of quadratic products for x_0 to x_{N-1}
        A = jnp.dot(xdiff[:-1], self.Q)
        qp_x_sum = jnp.sum(xdiff[:-1] * A, axis=None)

        # Compute quadratic product for the x_N
        xN = xdiff[-1]
        qp_x_N = jnp.dot(xN, jnp.dot(self.QN, xN))

        # compute sum of quadratic products for u_0 to u_{N-1}
        B = jnp.dot(udiff, self.R)
        qp_u_sum = jnp.sum(udiff * B, axis=None)

        # Sum the quadratic products
        total_sum = qp_x_sum + qp_x_N + qp_u_sum
        return total_sum

    @partial(jax.jit, static_argnames=("self",))
    def _constraints(self, optvars, t0, x0, x_ref, u_ref):
        u_and_x_flat = optvars
        u = u_and_x_flat[: self.nu * self.N].reshape((self.N, self.nu))
        x = u_and_x_flat[self.nu * self.N :].reshape((self.N + 1, self.nx))

        x_sim = jnp.zeros((self.N, x0.size))

        def _update_function(idx, x_sim_l):
            t_major_start = t0 + self.dt * idx
            x_current = x[idx]
            u_current = u[idx]
            x_next = rk4_major_step_constant_u(
                t_major_start,
                x_current,
                u_current,
                self.dt,
                self.nh,
                self.ode_rhs,
            )
            return x_sim_l.at[idx].set(x_next)

        x_sim = jax.lax.fori_loop(0, self.N, _update_function, x_sim)  # x1, x2, ..., xN

        c0 = x0 - x[0]
        c_others = x[1:] - x_sim

        c_all = jnp.hstack([c0.ravel(), c_others.ravel()])

        return c_all

DiscreteClock

Bases: LeafSystem

Source block that produces the time sampled at a fixed rate.

The block maintains the most recently sampled time as a discrete state, provided to the output port during the following interval. Graphically, a discrete clock sampled at 100 Hz would have the following time series:

  x(t)                  ●━
    |                   ┆
.03 |              ●━━━━○
    |              ┆
.02 |         ●━━━━○
    |         ┆
.01 |    ●━━━━○
    |    ┆
  0 ●━━━━○----+----+----+-- t
    0   .01  .02  .03  .04

The recorded states are the closed circles, which should be interpreted at index n as the value seen by all other blocks on the interval (t[n], t[n+1]).

Input ports

None

Output ports

(0) The sampled time.

Parameters:

Name Type Description Default
dt

The sampling period of the clock.

required
start_time

The simulation time at which the clock starts. Defaults to 0.

0
Source code in collimator/library/primitives.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
class DiscreteClock(LeafSystem):
    """Source block that produces the time sampled at a fixed rate.

    The block maintains the most recently sampled time as a discrete state, provided
    to the output port during the following interval. Graphically, a discrete clock
    sampled at 100 Hz would have the following time series:

    ```
      x(t)                  ●━
        |                   ┆
    .03 |              ●━━━━○
        |              ┆
    .02 |         ●━━━━○
        |         ┆
    .01 |    ●━━━━○
        |    ┆
      0 ●━━━━○----+----+----+-- t
        0   .01  .02  .03  .04
    ```

    The recorded states are the closed circles, which should be interpreted at index
    `n` as the value seen by all other blocks on the interval `(t[n], t[n+1])`.

    Input ports:
        None

    Output ports:
        (0) The sampled time.

    Parameters:
        dt:
            The sampling period of the clock.
        start_time:
            The simulation time at which the clock starts. Defaults to 0.
    """

    def __init__(self, dt, dtype=None, start_time=0, **kwargs):
        super().__init__(**kwargs)
        self.dtype = dtype or float
        start_time = cnp.array(start_time, dtype=self.dtype)

        self.declare_output_port(
            self._output,
            period=dt,
            offset=0.0,
            requires_inputs=False,
            default_value=start_time,
            prerequisites_of_calc=[DependencyTicket.time],
        )

    def _output(self, time, _state, *_inputs, **_params):
        return cnp.array(time, dtype=self.dtype)

DiscreteInitializer

Bases: LeafSystem

Discrete Initializer.

Outputs True for first discrete step, then outputs False there after. Or, outputs False for first discrete step, then outputs True there after. Practical for cases where it is necessary to have some signal fed initially by some initialization, but then after from else in the model.

Input ports

None

Output ports

(0) The dot product of the inputs.

Source code in collimator/library/primitives.py
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
class DiscreteInitializer(LeafSystem):
    """Discrete Initializer.

    Outputs True for first discrete step, then outputs False there after.
    Or, outputs False for first discrete step, then outputs True there after.
    Practical for cases where it is necessary to have some signal fed initially
    by some initialization, but then after from else in the model.

    Input ports:
        None

    Output ports:
        (0) The dot product of the inputs.
    """

    def __init__(self, dt, initial_state=True, **kwargs):
        super().__init__(**kwargs)
        self.dt = dt

        self.declare_discrete_state(default_value=initial_state, dtype=cnp.bool_)
        self.declare_output_port(self._output)
        self.declare_parameter("initial_state", initial_state)
        self.declare_periodic_update(
            self._update,
            period=cnp.inf,
            offset=dt,
        )

    def _update(self, time, state, *_inputs, **_params):
        return cnp.logical_not(state.discrete_state)

    def _output(self, _time, state, *_inputs, **_params):
        return state.discrete_state

DiscreteTimeLinearQuadraticRegulator

Bases: LeafSystem

Linear Quadratic Regulator (LQR) for a discrete-time system: x[k+1] = A x[k] + B u[k]. Computes the optimal control input: u[k] = -K x[k], where u minimises the cost function over [0, ∞)]: J = ∑(x[k].T Q x[k] + u[k].T R u[k]).

Input ports

(0) x[k]: state vector of the system.

Output ports

(0) u[k]: optimal control vector.

Parameters:

Name Type Description Default
A

Array State matrix of the system.

required
B

Array Input matrix of the system.

required
Q

Array State cost matrix.

required
R

Array Input cost matrix.

required
dt

float Sampling period of the system.

required
Source code in collimator/library/lqr.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class DiscreteTimeLinearQuadraticRegulator(LeafSystem):
    """
    Linear Quadratic Regulator (LQR) for a discrete-time system:
            x[k+1] = A x[k] + B u[k].
    Computes the optimal control input:
            u[k] = -K x[k],
    where u minimises the cost function over [0, ∞)]:
            J = ∑(x[k].T Q x[k] + u[k].T R u[k]).

    Input ports:
        (0) x[k]: state vector of the system.

    Output ports:
        (0) u[k]: optimal control vector.

    Parameters:
        A: Array
            State matrix of the system.
        B: Array
            Input matrix of the system.
        Q: Array
            State cost matrix.
        R: Array
            Input cost matrix.
        dt: float
            Sampling period of the system.
    """

    def __init__(self, A, B, Q, R, dt, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.K, S, E = control.dlqr(A, B, Q, R)

        self.declare_input_port()  # for state x

        self.declare_output_port(
            self._get_opt_u,
            requires_inputs=True,
            period=dt,
            offset=0.0,
            default_value=jnp.zeros(B.shape[1]),
        )

    def _get_opt_u(self, time, state, x, **params):
        return jnp.matmul(-self.K, x)

DotProduct

Bases: ReduceBlock

Compute the dot product between the inputs.

This block dispatches to jax.numpy.dot, so the semantics, broadcasting rules, etc. are the same. See the JAX docs for details: https://jax.readthedocs.io/en/latest/_autosummary/jax.numpy.dot.html

Input ports

(0) The first input vector. (1) The second input vector.

Output ports

(0) The dot product of the inputs.

Source code in collimator/library/primitives.py
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
class DotProduct(ReduceBlock):
    """Compute the dot product between the inputs.

    This block dispatches to `jax.numpy.dot`, so the semantics, broadcasting rules,
    etc. are the same.  See the JAX docs for details:
        https://jax.readthedocs.io/en/latest/_autosummary/jax.numpy.dot.html

    Input ports:
        (0) The first input vector.
        (1) The second input vector.

    Output ports:
        (0) The dot product of the inputs.
    """

    def __init__(self, **kwargs):
        super().__init__(2, self._compute_output, **kwargs)

    def _compute_output(self, inputs):
        return cnp.dot(inputs[0], inputs[1])

EdgeDetection

Bases: LeafSystem

Output is true only when the input signal changes in a specified way.

The block updates at a discrete rate, checking the boolean- or binary-valued input signal for changes. Available edge detection modes are: - "rising": Output is true when the input changes from False (0) to True (1). - "falling": Output is true when the input changes from True (1) to False (0). - "either": Output is true when the input changes in either direction

Input ports

(0) The input signal. Must be boolean or binary-valued.

Output ports

(0) The edge detection output signal. Boolean-valued.

Parameters:

Name Type Description Default
dt

The sampling period of the block.

required
edge_detection

One of "rising", "falling", or "either". Determines the type of edge detection performed by the block.

required
initial_state

The initial value of the output signal.

False
Source code in collimator/library/primitives.py
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
class EdgeDetection(LeafSystem):
    """Output is true only when the input signal changes in a specified way.

    The block updates at a discrete rate, checking the boolean- or binary-valued input
    signal for changes.  Available edge detection modes are:
        - "rising": Output is true when the input changes from False (0) to True (1).
        - "falling": Output is true when the input changes from True (1) to False (0).
        - "either": Output is true when the input changes in either direction

    Input ports:
        (0) The input signal. Must be boolean or binary-valued.

    Output ports:
        (0) The edge detection output signal. Boolean-valued.

    Parameters:
        dt:
            The sampling period of the block.
        edge_detection:
            One of "rising", "falling", or "either". Determines the type of edge
            detection performed by the block.
        initial_state:
            The initial value of the output signal.
    """

    class DiscreteStateType(NamedTuple):
        prev_input: Array
        output: bool

    def __init__(self, dt, edge_detection, initial_state=False, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Determine the type of edge detection
        _detection_funcs = {
            "rising": self._detect_rising,
            "falling": self._detect_falling,
            "either": self._detect_either,
        }
        if edge_detection not in _detection_funcs:
            raise ValueError(
                f"EdgeDetection block {self.name} has invalid selection "
                f"{edge_detection} for 'edge_detection'"
            )
        self._detect_edge = _detection_funcs[edge_detection]

        self.declare_configuration_parameters(edge_detection=edge_detection)
        self.declare_parameter("initial_state", initial_state)

        self.declare_input_port()

        # The discrete state will contain the previous input value and the output
        self.declare_discrete_state(
            default_value=self.DiscreteStateType(
                prev_input=initial_state, output=False
            ),
            as_array=False,
        )

        # Declare the periodic update
        self.declare_periodic_update(
            self._update,
            period=dt,
            offset=0.0,
        )

        # Declare the output port
        self.declare_output_port(
            self._output,
            prerequisites_of_calc=[DependencyTicket.xd, self.input_ports[0].ticket],
            requires_inputs=False,
        )

    def _update(self, time, state, *inputs, **params):
        # Update the stored previous state
        # and the output as the result of the edge detection function
        (e,) = inputs
        return self.DiscreteStateType(
            prev_input=e,
            output=self._detect_edge(time, state, e, **params),
        )

    def _output(self, _time, state, *_inputs, **_params):
        return state.discrete_state.output

    def _detect_rising(self, _time, state, *inputs, **_params):
        (e,) = inputs
        e_prev = state.discrete_state.prev_input
        e_prev = cnp.array(e_prev)
        e = cnp.array(e)
        not_e_prev = cnp.logical_not(e_prev)
        return cnp.logical_and(not_e_prev, e)

    def _detect_falling(self, _time, state, *inputs, **_params):
        (e,) = inputs
        e_prev = state.discrete_state.prev_input
        e_prev = cnp.array(e_prev)
        e = cnp.array(e)
        not_e = cnp.logical_not(e)
        return cnp.logical_and(e_prev, not_e)

    def _detect_either(self, _time, state, *inputs, **_params):
        (e,) = inputs
        e_prev = state.discrete_state.prev_input
        e_prev = cnp.array(e_prev)
        e = cnp.array(e)
        not_e_prev = cnp.logical_not(e_prev)
        not_e = cnp.logical_not(e)
        rising = cnp.logical_and(not_e_prev, e)
        falling = cnp.logical_and(e_prev, not_e)
        return cnp.logical_or(rising, falling)

Exponent

Bases: FeedthroughBlock

Compute the exponential of the input signal.

Input ports

(0) The input signal.

Output ports

(0) The exponential of the input signal.

Parameters:

Name Type Description Default
base

One of "exp" or "2". Determines the base of the exponential function.

required
Source code in collimator/library/primitives.py
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
class Exponent(FeedthroughBlock):
    """Compute the exponential of the input signal.

    Input ports:
        (0) The input signal.

    Output ports:
        (0) The exponential of the input signal.

    Parameters:
        base:
            One of "exp" or "2". Determines the base of the exponential function.
    """

    def __init__(self, base, **kwargs):
        func_lookup = {"exp": cnp.exp, "2": cnp.exp2}
        if base not in func_lookup:
            name = kwargs.get("name", self.__class__.__name__)
            # cannot pass system=self because this error must be raised BEFORE calling super.__init__()
            # in the case of inheritting from FeedthroughBlock.
            # if we call super.__init__() first, we get missing key error for func_lookup[base].
            raise BlockParameterError(
                message=f"Exponent block {name} has invalid selection {base} for 'base'. Valid selections: "
                + ", ".join([k for k in func_lookup.keys()]),
                parameter_name="base",
            )
        super().__init__(func_lookup[base], **kwargs)
        self.declare_configuration_parameters(base=base)

ExtendedKalmanFilter

Bases: KalmanFilterBase

Extended Kalman Filter (EKF) for the following system:

```
x[n+1] = f(x[n], u[n]) + G(t[n]) w[n]
y[n]   = g(x[n], u[n]) + v[n]

E(w[n]) = E(v[n]) = 0
E(w[n]w'[n]) = Q(t[n], x[n], u[n])
E(v[n]v'[n] = R(t[n])
E(w[n]v'[n] = N(t[n]) = 0
```

f and g are discrete-time functions of state x[n] and control u[n], while RandGare discrete-time functions of timet[n].Qis a discrete-time function oft[n], x[n], u[n]`. This last aspect is included for zero-order-hold discretization of a continuous-time system

Input ports

(0) u[n] : control vector at timestep n (1) y[n] : measurement vector at timestep n

Output ports

(1) x_hat[n] : state vector estimate at timestep n

Parameters:

Name Type Description Default
dt

float Time step of the discrete-time system

required
forward

Callable A function with signature f(x[n], u[n]) -> x[n+1] that represents f in the above equations.

required
observation

Callable A function with signature g(x[n], u[n]) -> y[n] that represents g in the above equations.

required
G_func

Callable A function with signature G(t[n]) -> G[n] that represents G in the above equations.

required
Q_func

Callable A function with signature Q(t[n], x[n], u[n]) -> Q[n] that represents Q in the above equations.

required
R_func

Callable A function with signature R(t[n]) -> R[n] that represents R in the above equations.

required
x_hat_0

ndarray Initial state estimate

required
P_hat_0

ndarray Initial state covariance matrix estimate

required
Source code in collimator/library/state_estimators/extended_kalman_filter.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
class ExtendedKalmanFilter(KalmanFilterBase):
    """
    Extended Kalman Filter (EKF) for the following system:

        ```
        x[n+1] = f(x[n], u[n]) + G(t[n]) w[n]
        y[n]   = g(x[n], u[n]) + v[n]

        E(w[n]) = E(v[n]) = 0
        E(w[n]w'[n]) = Q(t[n], x[n], u[n])
        E(v[n]v'[n] = R(t[n])
        E(w[n]v'[n] = N(t[n]) = 0
        ```

    `f` and `g` are discrete-time functions of state `x[n]` and control `u[n]`,
    while R` and `G` are discrete-time functions of time `t[n]`. `Q` is a discrete-time
    function of `t[n], x[n], u[n]`. This last aspect is included for zero-order-hold
    discretization of a continuous-time system

    Input ports:
        (0) u[n] : control vector at timestep n
        (1) y[n] : measurement vector at timestep n

    Output ports:
        (1) x_hat[n] : state vector estimate at timestep n

    Parameters:
        dt: float
            Time step of the discrete-time system
        forward: Callable
            A function with signature f(x[n], u[n]) -> x[n+1] that represents `f` in
            the above equations.
        observation: Callable
            A function with signature g(x[n], u[n]) -> y[n] that represents `g` in
            the above equations.
        G_func: Callable
            A function with signature G(t[n]) -> G[n] that represents `G` in
            the above equations.
        Q_func: Callable
            A function with signature Q(t[n], x[n], u[n]) -> Q[n] that represents `Q`
            in the above equations.
        R_func: Callable
            A function with signature R(t[n]) -> R[n] that represents `R` in
            the above equations.
        x_hat_0: ndarray
            Initial state estimate
        P_hat_0: ndarray
            Initial state covariance matrix estimate
    """

    def __init__(
        self,
        dt,
        forward,
        observation,
        G_func,
        Q_func,
        R_func,
        x_hat_0,
        P_hat_0,
        name=None,
    ):
        super().__init__(dt, x_hat_0, P_hat_0, name)

        self.G_func = G_func
        self.Q_func = Q_func
        self.R_func = R_func

        self.nx = x_hat_0.size
        self.ny = self.R_func(0.0).shape[0]

        self.forward = forward
        self.observation = observation

        self.jac_forward = jax.jacfwd(forward)
        self.jac_observation = jax.jacfwd(observation)

        self.eye_x = jnp.eye(self.nx)

    def _correct(self, time, x_hat_minus, P_hat_minus, *inputs):
        u, y = inputs
        u = jnp.atleast_1d(u)
        y = jnp.atleast_1d(y)

        C = self.jac_observation(x_hat_minus, u).reshape((self.ny, self.nx))

        R = self.R_func(time)

        # TODO: improved numerics to avoud computing explicit inverse
        K = P_hat_minus @ C.T @ jnp.linalg.inv(C @ P_hat_minus @ C.T + R)

        x_hat_plus = x_hat_minus + jnp.dot(
            K, y - self.observation(x_hat_minus, u)
        )  # n|n

        P_hat_plus = jnp.matmul(self.eye_x - jnp.matmul(K, C), P_hat_minus)  # n|n

        return x_hat_plus, P_hat_plus

    def _propagate(self, time, x_hat_plus, P_hat_plus, *inputs):
        # Predict -- x_hat_plus of current step is propagated to be the
        # x_hat_minus of the next step
        # k+1|k in current step is n|n-1 for next step

        u, y = inputs
        u = jnp.atleast_1d(u)

        A = self.jac_forward(x_hat_plus, u).reshape((self.nx, self.nx))

        G = self.G_func(time)
        Q = self.Q_func(time, x_hat_plus, u)
        GQGT = G @ Q @ G.T

        x_hat_minus = self.forward(x_hat_plus, u)  # n+1|n
        P_hat_minus = A @ P_hat_plus @ A.T + GQGT  # n+1|n

        return x_hat_minus, P_hat_minus

    #######################################
    # Make filter for a continuous plant  #
    #######################################

    @staticmethod
    def for_continuous_plant(
        plant,
        dt,
        G_func,
        Q_func,
        R_func,
        x_hat_0,
        P_hat_0,
        discretization_method="euler",
        discretized_noise=False,
        name=None,
    ):
        """
        Extended Kalman Filter system for a continuous-time plant.

        The input plant contains the deterministic forms of the forward and observation
        operators:

        ```
            dx/dt = f(x,u)
            y = g(x,u)
        ```

        Note: (i) Only plants with one vector-valued input and one vector-valued output
        are currently supported. Furthermore, the plant LeafSystem/Diagram should have
        only one vector-valued integrator; (ii) the user may pass a plant with
        disturbances (not recommended) as the input plant. In this case, the forward
        and observation evaluations will be corrupted by noise.

        A plant with disturbances of the following form is then considered:

        ```
            dx/dt = f(x,u) + G(t) w         -- (C1)
            y = g(x,u) +  v                 -- (C2)
        ```

        where:

            `w` represents the process noise,
            `v` represents the measurement noise,

        and

        ```
            E(w) = E(v) = 0
            E(ww') = Q(t)
            E(vv') = R(t)
            E(wv') = N(t) = 0
        ```

        This plant is discretized to obtain the following form:

        ```
            x[n+1] = fd(x[n], u[n]) + Gd w[n]  -- (D1)
            y[n]   = gd(x[n], u[n]) + v[n]     -- (D2)

            E(w[n]) = E(v[n]) = 0
            E(w[n]w'[n]) = Qd
            E(v[n]v'[n] = Rd
            E(w[n]v'[n] = Nd = 0
        ```

        The above discretization is performed either via the `euler` or the `zoh`
        method, and an Extended Kalman Filter estimator for the system of equations
        (D1) and (D2) is returned.

        Note: If `discretized_noise` is True, then it is assumed that the user is
        directly providing Gd, Qd and Rd. If False, then Qd and Rd are computed from
        continuous-time Q, R, and G, and Gd is set to an Identity matrix.

        The returned system will have:

        Input ports:
            (0) u[n] : control vector at timestep n
            (1) y[n] : measurement vector at timestep n

        Output ports:
            (1) x_hat[n] : state vector estimate at timestep n

        Parameters:
            plant : a `Plant` object which can be a LeafSystem or a Diagram.
            dt: float
                Time step for the discretization.
            G_func: Callable
                A function with signature G(t) -> G that represents `G` in
                the continuous-time equations (C1) and (C2).
            Q_func: Callable
                A function with signature Q(t) -> Q that represents `Q` in
                the continuous-time equations (C1) and (C2).
            R_func: Callable
                A function with signature R(t) -> R that represents `R` in
                the continuous-time equations (C1) and (C2).
            x_hat_0: ndarray
                Initial state estimate
            P_hat_0: ndarray
                Initial state covariance matrix estimate. If `None`, an Identity
                matrix is assumed.
            discretization_method: str ("euler" or "zoh")
                Method to discretize the continuous-time plant. Default is "euler".
            discretized_noise: bool
                Whether the user is directly providing Gd, Qd and Rd. Default is False.
                If True, `G_func`, `Q_func`, and `R_func` provide Gd(t), Qd(t), and
                Rd(t), respectively.
        """

        (
            forward,
            observation,
            Gd_func,
            Qd_func,
            Rd_func,
        ) = prepare_continuous_plant_for_nonlinear_kalman_filter(
            plant,
            dt,
            G_func,
            Q_func,
            R_func,
            x_hat_0,
            discretization_method,
            discretized_noise,
        )

        nx = x_hat_0.size
        if P_hat_0 is None:
            P_hat_0 = jnp.eye(nx)

        # TODO: If Gd_func is None, compute Gd automatically with u = u + w

        ekf = ExtendedKalmanFilter(
            dt,
            forward,
            observation,
            Gd_func,
            Qd_func,
            Rd_func,
            x_hat_0,
            P_hat_0,
            name=name,
        )

        return ekf

    ###################################################################################
    # Make filter from direct specification of forward/observaton operators and noise #
    ###################################################################################

    @staticmethod
    def from_operators(
        dt,
        forward,
        observation,
        G_func,
        Q_func,
        R_func,
        x_hat_0,
        P_hat_0,
        name=None,
    ):
        """
        Extended Kalman Filter (UKF) for the following system:

        ```
            x[n+1] = f(x[n], u[n]) + G(t[n]) w[n]
            y[n]   = g(x[n], u[n]) + v[n]

            E(w[n]) = E(v[n]) = 0
            E(w[n]w'[n]) = Q(t[n], x[n], u[n])
            E(v[n]v'[n] = R(t[n])
            E(w[n]v'[n] = N(t[n]) = 0
        ```

        `f` and `g` are discrete-time functions of state `x[n]` and control `u[n]`,
        while `Q` and `R` and `G` are discrete-time functions of time `t[n]`.

        Input ports:
            (0) u[n] : control vector at timestep n
            (1) y[n] : measurement vector at timestep n

        Output ports:
            (1) x_hat[n] : state vector estimate at timestep n

        Parameters:
            dt: float
                Time step of the discrete-time system
            forward: Callable
                A function with signature f(x[n], u[n]) -> x[n+1] that represents `f`
                in the above equations.
            observation: Callable
                A function with signature g(x[n], u[n]) -> y[n] that represents `g` in
                the above equations.
            G_func: Callable
                A function with signature G(t[n]) -> G[n] that represents `G` in
                the above equations.
            Q_func: Callable
                A function with signature Q(t[n]) -> Q[n] that represents
                `Q` in the above equations.
            R_func: Callable
                A function with signature R(t[n]) -> R[n] that represents `R` in
                the above equations.
            x_hat_0: ndarray
                Initial state estimate
            P_hat_0: ndarray
                Initial state covariance matrix estimate
        """

        def Q_func_aug(t, x_k, u_k):
            return Q_func(t)

        ekf = ExtendedKalmanFilter(
            dt,
            forward,
            observation,
            G_func,
            Q_func_aug,
            R_func,
            x_hat_0,
            P_hat_0,
            name=name,
        )

        return ekf

for_continuous_plant(plant, dt, G_func, Q_func, R_func, x_hat_0, P_hat_0, discretization_method='euler', discretized_noise=False, name=None) staticmethod

Extended Kalman Filter system for a continuous-time plant.

The input plant contains the deterministic forms of the forward and observation operators:

    dx/dt = f(x,u)
    y = g(x,u)

Note: (i) Only plants with one vector-valued input and one vector-valued output are currently supported. Furthermore, the plant LeafSystem/Diagram should have only one vector-valued integrator; (ii) the user may pass a plant with disturbances (not recommended) as the input plant. In this case, the forward and observation evaluations will be corrupted by noise.

A plant with disturbances of the following form is then considered:

    dx/dt = f(x,u) + G(t) w         -- (C1)
    y = g(x,u) +  v                 -- (C2)

where:

`w` represents the process noise,
`v` represents the measurement noise,

and

    E(w) = E(v) = 0
    E(ww') = Q(t)
    E(vv') = R(t)
    E(wv') = N(t) = 0

This plant is discretized to obtain the following form:

    x[n+1] = fd(x[n], u[n]) + Gd w[n]  -- (D1)
    y[n]   = gd(x[n], u[n]) + v[n]     -- (D2)

    E(w[n]) = E(v[n]) = 0
    E(w[n]w'[n]) = Qd
    E(v[n]v'[n] = Rd
    E(w[n]v'[n] = Nd = 0

The above discretization is performed either via the euler or the zoh method, and an Extended Kalman Filter estimator for the system of equations (D1) and (D2) is returned.

Note: If discretized_noise is True, then it is assumed that the user is directly providing Gd, Qd and Rd. If False, then Qd and Rd are computed from continuous-time Q, R, and G, and Gd is set to an Identity matrix.

The returned system will have:

Input ports

(0) u[n] : control vector at timestep n (1) y[n] : measurement vector at timestep n

Output ports

(1) x_hat[n] : state vector estimate at timestep n

Parameters:

Name Type Description Default
plant

a Plant object which can be a LeafSystem or a Diagram.

required
dt

float Time step for the discretization.

required
G_func

Callable A function with signature G(t) -> G that represents G in the continuous-time equations (C1) and (C2).

required
Q_func

Callable A function with signature Q(t) -> Q that represents Q in the continuous-time equations (C1) and (C2).

required
R_func

Callable A function with signature R(t) -> R that represents R in the continuous-time equations (C1) and (C2).

required
x_hat_0

ndarray Initial state estimate

required
P_hat_0

ndarray Initial state covariance matrix estimate. If None, an Identity matrix is assumed.

required
discretization_method

str ("euler" or "zoh") Method to discretize the continuous-time plant. Default is "euler".

'euler'
discretized_noise

bool Whether the user is directly providing Gd, Qd and Rd. Default is False. If True, G_func, Q_func, and R_func provide Gd(t), Qd(t), and Rd(t), respectively.

False
Source code in collimator/library/state_estimators/extended_kalman_filter.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
@staticmethod
def for_continuous_plant(
    plant,
    dt,
    G_func,
    Q_func,
    R_func,
    x_hat_0,
    P_hat_0,
    discretization_method="euler",
    discretized_noise=False,
    name=None,
):
    """
    Extended Kalman Filter system for a continuous-time plant.

    The input plant contains the deterministic forms of the forward and observation
    operators:

    ```
        dx/dt = f(x,u)
        y = g(x,u)
    ```

    Note: (i) Only plants with one vector-valued input and one vector-valued output
    are currently supported. Furthermore, the plant LeafSystem/Diagram should have
    only one vector-valued integrator; (ii) the user may pass a plant with
    disturbances (not recommended) as the input plant. In this case, the forward
    and observation evaluations will be corrupted by noise.

    A plant with disturbances of the following form is then considered:

    ```
        dx/dt = f(x,u) + G(t) w         -- (C1)
        y = g(x,u) +  v                 -- (C2)
    ```

    where:

        `w` represents the process noise,
        `v` represents the measurement noise,

    and

    ```
        E(w) = E(v) = 0
        E(ww') = Q(t)
        E(vv') = R(t)
        E(wv') = N(t) = 0
    ```

    This plant is discretized to obtain the following form:

    ```
        x[n+1] = fd(x[n], u[n]) + Gd w[n]  -- (D1)
        y[n]   = gd(x[n], u[n]) + v[n]     -- (D2)

        E(w[n]) = E(v[n]) = 0
        E(w[n]w'[n]) = Qd
        E(v[n]v'[n] = Rd
        E(w[n]v'[n] = Nd = 0
    ```

    The above discretization is performed either via the `euler` or the `zoh`
    method, and an Extended Kalman Filter estimator for the system of equations
    (D1) and (D2) is returned.

    Note: If `discretized_noise` is True, then it is assumed that the user is
    directly providing Gd, Qd and Rd. If False, then Qd and Rd are computed from
    continuous-time Q, R, and G, and Gd is set to an Identity matrix.

    The returned system will have:

    Input ports:
        (0) u[n] : control vector at timestep n
        (1) y[n] : measurement vector at timestep n

    Output ports:
        (1) x_hat[n] : state vector estimate at timestep n

    Parameters:
        plant : a `Plant` object which can be a LeafSystem or a Diagram.
        dt: float
            Time step for the discretization.
        G_func: Callable
            A function with signature G(t) -> G that represents `G` in
            the continuous-time equations (C1) and (C2).
        Q_func: Callable
            A function with signature Q(t) -> Q that represents `Q` in
            the continuous-time equations (C1) and (C2).
        R_func: Callable
            A function with signature R(t) -> R that represents `R` in
            the continuous-time equations (C1) and (C2).
        x_hat_0: ndarray
            Initial state estimate
        P_hat_0: ndarray
            Initial state covariance matrix estimate. If `None`, an Identity
            matrix is assumed.
        discretization_method: str ("euler" or "zoh")
            Method to discretize the continuous-time plant. Default is "euler".
        discretized_noise: bool
            Whether the user is directly providing Gd, Qd and Rd. Default is False.
            If True, `G_func`, `Q_func`, and `R_func` provide Gd(t), Qd(t), and
            Rd(t), respectively.
    """

    (
        forward,
        observation,
        Gd_func,
        Qd_func,
        Rd_func,
    ) = prepare_continuous_plant_for_nonlinear_kalman_filter(
        plant,
        dt,
        G_func,
        Q_func,
        R_func,
        x_hat_0,
        discretization_method,
        discretized_noise,
    )

    nx = x_hat_0.size
    if P_hat_0 is None:
        P_hat_0 = jnp.eye(nx)

    # TODO: If Gd_func is None, compute Gd automatically with u = u + w

    ekf = ExtendedKalmanFilter(
        dt,
        forward,
        observation,
        Gd_func,
        Qd_func,
        Rd_func,
        x_hat_0,
        P_hat_0,
        name=name,
    )

    return ekf

from_operators(dt, forward, observation, G_func, Q_func, R_func, x_hat_0, P_hat_0, name=None) staticmethod

Extended Kalman Filter (UKF) for the following system:

    x[n+1] = f(x[n], u[n]) + G(t[n]) w[n]
    y[n]   = g(x[n], u[n]) + v[n]

    E(w[n]) = E(v[n]) = 0
    E(w[n]w'[n]) = Q(t[n], x[n], u[n])
    E(v[n]v'[n] = R(t[n])
    E(w[n]v'[n] = N(t[n]) = 0

f and g are discrete-time functions of state x[n] and control u[n], while Q and R and G are discrete-time functions of time t[n].

Input ports

(0) u[n] : control vector at timestep n (1) y[n] : measurement vector at timestep n

Output ports

(1) x_hat[n] : state vector estimate at timestep n

Parameters:

Name Type Description Default
dt

float Time step of the discrete-time system

required
forward

Callable A function with signature f(x[n], u[n]) -> x[n+1] that represents f in the above equations.

required
observation

Callable A function with signature g(x[n], u[n]) -> y[n] that represents g in the above equations.

required
G_func

Callable A function with signature G(t[n]) -> G[n] that represents G in the above equations.

required
Q_func

Callable A function with signature Q(t[n]) -> Q[n] that represents Q in the above equations.

required
R_func

Callable A function with signature R(t[n]) -> R[n] that represents R in the above equations.

required
x_hat_0

ndarray Initial state estimate

required
P_hat_0

ndarray Initial state covariance matrix estimate

required
Source code in collimator/library/state_estimators/extended_kalman_filter.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
@staticmethod
def from_operators(
    dt,
    forward,
    observation,
    G_func,
    Q_func,
    R_func,
    x_hat_0,
    P_hat_0,
    name=None,
):
    """
    Extended Kalman Filter (UKF) for the following system:

    ```
        x[n+1] = f(x[n], u[n]) + G(t[n]) w[n]
        y[n]   = g(x[n], u[n]) + v[n]

        E(w[n]) = E(v[n]) = 0
        E(w[n]w'[n]) = Q(t[n], x[n], u[n])
        E(v[n]v'[n] = R(t[n])
        E(w[n]v'[n] = N(t[n]) = 0
    ```

    `f` and `g` are discrete-time functions of state `x[n]` and control `u[n]`,
    while `Q` and `R` and `G` are discrete-time functions of time `t[n]`.

    Input ports:
        (0) u[n] : control vector at timestep n
        (1) y[n] : measurement vector at timestep n

    Output ports:
        (1) x_hat[n] : state vector estimate at timestep n

    Parameters:
        dt: float
            Time step of the discrete-time system
        forward: Callable
            A function with signature f(x[n], u[n]) -> x[n+1] that represents `f`
            in the above equations.
        observation: Callable
            A function with signature g(x[n], u[n]) -> y[n] that represents `g` in
            the above equations.
        G_func: Callable
            A function with signature G(t[n]) -> G[n] that represents `G` in
            the above equations.
        Q_func: Callable
            A function with signature Q(t[n]) -> Q[n] that represents
            `Q` in the above equations.
        R_func: Callable
            A function with signature R(t[n]) -> R[n] that represents `R` in
            the above equations.
        x_hat_0: ndarray
            Initial state estimate
        P_hat_0: ndarray
            Initial state covariance matrix estimate
    """

    def Q_func_aug(t, x_k, u_k):
        return Q_func(t)

    ekf = ExtendedKalmanFilter(
        dt,
        forward,
        observation,
        G_func,
        Q_func_aug,
        R_func,
        x_hat_0,
        P_hat_0,
        name=name,
    )

    return ekf

FeedthroughBlock

Bases: LeafSystem

Simple feedthrough blocks with a function of a single input

Source code in collimator/library/generic.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class FeedthroughBlock(LeafSystem):
    """Simple feedthrough blocks with a function of a single input"""

    def __init__(self, func, parameters={}, **kwargs):
        super().__init__(**kwargs)
        self.declare_input_port()

        _add_parameters(self, parameters)

        def _callback(time, state, *inputs, **parameters):
            return func(*inputs, **parameters)

        self.declare_output_port(
            _callback,
            prerequisites_of_calc=[self.input_ports[0].ticket],
            requires_inputs=True,
        )

FilterDiscrete

Bases: LeafSystem

Finite Impulse Response (FIR) filter.

Similar to https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.lfilter.html Note: does not implement the IIR filter.

Input ports

(0) The input signal.

Output ports

(0) The filtered signal.

Parameters:

Name Type Description Default
b_coefficients

Array of filter coefficients.

required
Source code in collimator/library/primitives.py
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
class FilterDiscrete(LeafSystem):
    """Finite Impulse Response (FIR) filter.

    Similar to https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.lfilter.html
    Note: does not implement the IIR filter.

    Input ports:
        (0) The input signal.

    Output ports:
        (0) The filtered signal.

    Parameters:
        b_coefficients:
            Array of filter coefficients.
    """

    def __init__(
        self,
        dt,
        b_coefficients,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.dt = dt

        self.declare_input_port()

        initial_state = cnp.zeros(len(b_coefficients) - 1)
        self.declare_discrete_state(default_value=initial_state)

        self.is_feedthrough = b_coefficients[0] != 0

        prerequisites_of_calc = []
        if self.is_feedthrough:
            prerequisites_of_calc.append(self.input_ports[0].ticket)

        self.declare_periodic_update(
            self._update,
            period=dt,
            offset=dt,
        )

        self.declare_output_port(
            self._output,
            period=dt,
            offset=dt,
            requires_inputs=self.is_feedthrough,
            prerequisites_of_calc=prerequisites_of_calc,
        )

        self.declare_parameter("b_coefficients", b_coefficients)

    def _update(self, _time, state, u, **_parameters):
        xd = state.discrete_state
        return cnp.concatenate([cnp.atleast_1d(u), xd[:-1]])

    def _output(self, time, state, *inputs, **parameters):
        xd = state.discrete_state
        b_coefficients = parameters["b_coefficients"]

        y = cnp.sum(cnp.dot(b_coefficients[1:], xd))

        if self.is_feedthrough:
            (u,) = inputs
            y += u * b_coefficients[0]

        return y

FiniteHorizonLinearQuadraticRegulator

Bases: LeafSystem

Finite Horizon Linear Quadratic Regulator (LQR) for a continuous-time system. Solves the Riccati Differential Equation (RDE) to compute the optimal control for the following finitie horizon cost function over [t0, tf]:

Minimise cost J:

J = [x(tf) - xd(tf)].T Qf [x(tf) - xd(tf)]
    + ∫[(x(t) - xd(t)].T Q [(x(t) - xd(t)] dt
    + ∫[(u(t) - ud(t)].T R [(u(t) - ud(t)] dt
    + 2 ∫[(x(t) - xd(t)].T N [(u(t) - ud(t)] dt

subject to the constraints:

dx(t)/dt - dx0(t)/dt = A [x(t)-x0(t)] + B [u(t)-u0(t)] - c(t),

where, x(t) is the state vector, u(t) is the control vector, xd(t) is the desired state vector, ud(t) is the desired control vector, x0(t) is the nominal state vector, u0(t) is the nominal control vector, Q, R, and N are the state, input, and cross cost matrices, Qf is the final state cost matrix,

and A, B, and c are computed from linearisation of the plant df/dx = f(x, u) around the nominal trajectory (x0(t), u0(t)).

A = df/dx(x0(t), u0(t), t)
B = df/du(x0(t), u0(t), t)
c = f(x0(t), u0(t), t) - dx0(t)/dt

The optimal control u obtained by the solution of the above problem is output.

See Section 8.5.1 of https://underactuated.csail.mit.edu/lqr.html#finite_horizon

Parameters:

Name Type Description Default
t0

float Initial time of the finite horizon.

required
tf

float Final time of the finite horizon.

required
plant

a Plant object which can be a LeafSystem or a Diagram. The plant to be controlled. This represents df/dx = f(x, u).

required
Qf

Array Final state cost matrix.

required
func_Q

Callable A function that returns the state cost matrix Q at time t: func_Q(t)->Q

required
func_R

Callable A function that returns the input cost matrix R at time t: func_R(t)->R

required
func_N

Callable A function that returns the cross cost matrix N at time t. func_N(t)->N

required
func_x_0

Callable A function that returns the nominal state vector x0 at time t. func_x_0(t)->x0

required
func_u_0

Callable A function that returns the nominal control vector u0 at time t. func_u_0(t)->u0

required
func_x_d

Callable A function that returns the desired state vector xd at time t. func_x_d(t)->xd

required
func_u_d

Callable A function that returns the desired control vector ud at time t. func_u_d(t)->ud

required
Source code in collimator/library/lqr.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class FiniteHorizonLinearQuadraticRegulator(LeafSystem):
    """
    Finite Horizon Linear Quadratic Regulator (LQR) for a continuous-time system.
    Solves the Riccati Differential Equation (RDE) to compute the optimal control
    for the following finitie horizon cost function over [t0, tf]:

    Minimise cost J:

        J = [x(tf) - xd(tf)].T Qf [x(tf) - xd(tf)]
            + ∫[(x(t) - xd(t)].T Q [(x(t) - xd(t)] dt
            + ∫[(u(t) - ud(t)].T R [(u(t) - ud(t)] dt
            + 2 ∫[(x(t) - xd(t)].T N [(u(t) - ud(t)] dt

    subject to the constraints:

    dx(t)/dt - dx0(t)/dt = A [x(t)-x0(t)] + B [u(t)-u0(t)] - c(t),

    where,
        x(t) is the state vector,
        u(t) is the control vector,
        xd(t) is the desired state vector,
        ud(t) is the desired control vector,
        x0(t) is the nominal state vector,
        u0(t) is the nominal control vector,
        Q, R, and N are the state, input, and cross cost matrices,
        Qf is the final state cost matrix,

    and A, B, and c are computed from linearisation of the plant `df/dx = f(x, u)`
    around the nominal trajectory (x0(t), u0(t)).

        A = df/dx(x0(t), u0(t), t)
        B = df/du(x0(t), u0(t), t)
        c = f(x0(t), u0(t), t) - dx0(t)/dt

    The optimal control `u` obtained by the solution of the above problem is output.

    See Section 8.5.1 of https://underactuated.csail.mit.edu/lqr.html#finite_horizon

    Parameters:
        t0 : float
            Initial time of the finite horizon.
        tf : float
            Final time of the finite horizon.
        plant : a `Plant` object which can be a LeafSystem or a Diagram.
            The plant to be controlled. This represents `df/dx = f(x, u)`.
        Qf : Array
            Final state cost matrix.
        func_Q : Callable
            A function that returns the state cost matrix Q at time `t`: `func_Q(t)->Q`
        func_R : Callable
            A function that returns the input cost matrix R at time `t`: `func_R(t)->R`
        func_N : Callable
            A function that returns the cross cost matrix N at time `t`. `func_N(t)->N`
        func_x_0 : Callable
            A function that returns the nominal state vector `x0` at time `t`.
            func_x_0(t)->x0
        func_u_0 : Callable
            A function that returns the nominal control vector `u0` at time `t`.
            func_u_0(t)->u0
        func_x_d : Callable
            A function that returns the desired state vector `xd` at time `t`.
            func_x_d(t)->xd
        func_u_d : Callable
            A function that returns the desired control vector `ud` at time `t`.
            func_u_d(t)->ud
    """

    def __init__(
        self,
        t0,
        tf,
        plant,
        Qf,
        func_Q,
        func_R,
        func_N,
        func_x_0,
        func_u_0,
        func_x_d,
        func_u_d,
        name=None,
    ):
        super().__init__(name=name)

        self.t0 = t0
        self.tf = tf

        self.func_R = func_R
        self.func_N = func_N
        self.func_x_0 = func_x_0
        self.func_u_0 = func_u_0
        self.func_x_d = func_x_d
        self.func_u_d = func_u_d

        func_dot_x_0 = jax.jacfwd(func_x_0)
        nu = func_R(t0).shape[0]

        ode_rhs = make_ode_rhs(plant, nu)
        get_A = jax.jacfwd(ode_rhs, argnums=0)
        self.get_B = jax.jacfwd(ode_rhs, argnums=1)

        @jax.jit
        def rde(t, rde_state, args):
            t = -t
            Sxx, sx = rde_state

            Sxx = (Sxx + Sxx.T) / 2.0

            # Get nominal trajectories, desired trajectories, and cost matrices
            x_0 = func_x_0(t)
            u_0 = func_u_0(t)

            x_d = func_x_d(t)
            u_d = func_u_d(t)

            Q = func_Q(t)
            R = func_R(t)
            N = func_N(t)

            # Calculate dynamics mismatch due to nominal traj not satisfying dynamics
            dot_x_0 = func_dot_x_0(t)
            dot_x_0_eval = ode_rhs(x_0, u_0, t)
            c = dot_x_0_eval - dot_x_0

            #  Get linearisation around x_0, u_0
            A = get_A(x_0, u_0, t)
            B = self.get_B(x_0, u_0, t)

            #  Desired trajectories relative to nominal
            x_d_0 = x_d - x_0
            u_d_0 = u_d - u_0

            #  Compute RHS of RDE
            qx = -jnp.dot(Q, x_d_0) - jnp.dot(N, u_d_0)
            ru = -jnp.dot(R, u_d_0) - jnp.dot(N.T, x_d_0)

            N_plus_Sxx_B = N + jnp.matmul(Sxx, B)

            Rinv = jnp.linalg.inv(R)
            Sxx_A = jnp.matmul(Sxx, A)

            dot_Sxx = (
                Q
                - jnp.matmul(N_plus_Sxx_B, jnp.matmul(Rinv, N_plus_Sxx_B.T))
                + Sxx_A
                + Sxx_A.T
            )

            dot_sx = (
                qx
                - jnp.dot(N_plus_Sxx_B, jnp.dot(Rinv, ru + jnp.dot(B.T, sx)))
                + jnp.dot(A.T, sx)
                + jnp.dot(Sxx, c)
            )

            return (dot_Sxx, dot_sx)

        term = ODETerm(rde)
        solver = Tsit5()
        stepsize_controller = PIDController(rtol=1e-5, atol=1e-5, dtmax=0.1)
        saveat = SaveAt(dense=True)

        # TODO: Use utilities in ../simulation/ for reduced reliance on diffrax
        self.sol_rde = diffeqsolve(
            term,
            solver,
            -tf,
            -t0,
            y0=(Qf, -jnp.dot(Qf, func_x_d(tf) - func_x_0(tf))),
            dt0=0.0001,
            saveat=saveat,
            stepsize_controller=stepsize_controller,
        )

        # Input: current state (x)
        self.declare_input_port()

        # Output port: Optimal finite horizon LQR control
        self.declare_output_port(self._eval_output, default_value=jnp.zeros(nu))

    def _eval_output(self, time, state, x, **params):
        rde_time = jnp.clip(time, self.t0, self.tf)
        rde_time = -rde_time

        Sxx, sx = self.sol_rde.evaluate(rde_time)

        x_d = self.func_x_d(time)
        u_d = self.func_u_d(time)

        x_0 = self.func_x_0(time)
        u_0 = self.func_u_0(time)

        x_d_0 = x_d - x_0
        u_d_0 = u_d - u_0

        B = self.get_B(x_0, u_0, time)

        R = self.func_R(time)
        N = self.func_N(time)
        Rinv = jnp.linalg.inv(R)

        ru = -jnp.dot(R, u_d_0) - jnp.dot(N.T, x_d_0)

        Rinv = jnp.linalg.inv(R)
        N_plus_Sxx_B = N + jnp.matmul(Sxx, B)

        u = (
            u_0
            - jnp.dot(Rinv, jnp.dot(N_plus_Sxx_B.T, (x - x_0)))
            - jnp.dot(Rinv, ru + jnp.dot(B.T, sx))
        )

        return u

Gain

Bases: FeedthroughBlock

Multiply the input signal by a constant value.

Input ports

(0) The input signal.

Output ports

(0) The input signal multiplied by the gain: y = gain * u.

Parameters:

Name Type Description Default
gain

The value to scale the input signal by.

required
Source code in collimator/library/primitives.py
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
class Gain(FeedthroughBlock):
    """Multiply the input signal by a constant value.

    Input ports:
        (0) The input signal.

    Output ports:
        (0) The input signal multiplied by the gain: `y = gain * u`.

    Parameters:
        gain:
            The value to scale the input signal by.
    """

    def __init__(self, gain, *args, **kwargs):
        super().__init__(lambda x, gain: gain * x, *args, **kwargs)
        self.declare_parameter("gain", gain)

HermiteSimpsonNMPC

Bases: NonlinearMPCIpopt

Implementation of nonlinear MPC with Hermite-Simpson collocation and IPOPT as the NLP solver.

Input ports

(0) x_0 : current state vector. (1) x_ref : reference state trajectory for the nonlinear MPC. (2) u_ref : reference input trajectory for the nonlinear MPC.

Output ports

(1) u_opt : the optimal control input to be applied at the current time step as determined by the nonlinear MPC.

Parameters:

Name Type Description Default
plant

LeafSystem or Diagram The plant to be controlled.

required
Q

Array State weighting matrix in the cost function.

required
QN

Array Terminal state weighting matrix in the cost function.

required
R

Array Control input weighting matrix in the cost function.

required
N

int The prediction horizon, an integer specifying the number of steps to predict. Note: prediction and control horizons are identical for now.

required
dt

float: Major time step, a scalar indicating the increment in time for each step in the prediction and control horizons.

required
lb_x

Array Lower bound on the state vector.

None
ub_x

Array Upper bound on the state vector.

None
lb_u

Array Lower bound on the control input vector.

None
ub_u

Array Upper bound on the control input vector.

None
include_terminal_x_as_constraint

bool If True, the terminal state is included as a constraint in the NLP.

False
include_terminal_u_as_constraint

bool If True, the terminal control input is included as a constraint in the NLP.

False
x_optvars_0

Array Initial guess for the state vector optimization variables in the NLP.

None
u_optvars_0

Array Initial guess for the control vector optimization variables in the NLP.

None
Source code in collimator/library/nmpc/hermite_simpson_ipopt_nmpc.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class HermiteSimpsonNMPC(NonlinearMPCIpopt):
    """
    Implementation of nonlinear MPC with Hermite-Simpson collocation and IPOPT as the
    NLP solver.

    Input ports:
        (0) x_0 : current state vector.
        (1) x_ref : reference state trajectory for the nonlinear MPC.
        (2) u_ref : reference input trajectory for the nonlinear MPC.

    Output ports:
        (1) u_opt : the optimal control input to be applied at the current time step
                    as determined by the nonlinear MPC.

    Parameters:
        plant: LeafSystem or Diagram
            The plant to be controlled.

        Q: Array
            State weighting matrix in the cost function.

        QN: Array
            Terminal state weighting matrix in the cost function.

        R: Array
            Control input weighting matrix in the cost function.

        N: int
            The prediction horizon, an integer specifying the number of steps to
            predict. Note: prediction and control horizons are identical for now.

        dt: float:
            Major time step, a scalar indicating the increment in time for each step in
            the prediction and control horizons.

        lb_x: Array
            Lower bound on the state vector.

        ub_x: Array
            Upper bound on the state vector.

        lb_u: Array
            Lower bound on the control input vector.

        ub_u: Array
            Upper bound on the control input vector.

        include_terminal_x_as_constraint: bool
            If True, the terminal state is included as a constraint in the NLP.

        include_terminal_u_as_constraint: bool
            If True, the terminal control input is included as a constraint in the NLP.

        x_optvars_0: Array
            Initial guess for the state vector optimization variables in the NLP.

        u_optvars_0: Array
            Initial guess for the control vector optimization variables in the NLP.
    """

    def __init__(
        self,
        plant,
        Q,
        QN,
        R,
        N,
        dt,
        lb_x=None,
        ub_x=None,
        lb_u=None,
        ub_u=None,
        include_terminal_x_as_constraint=False,
        include_terminal_u_as_constraint=False,
        x_optvars_0=None,
        u_optvars_0=None,
        name=None,
    ):
        self.Q = Q
        self.QN = QN
        self.R = R

        self.N = N
        self.dt = dt

        self.lb_x = lb_x
        self.ub_x = ub_x
        self.lb_u = lb_u
        self.ub_u = ub_u

        self.include_terminal_x_as_constraint = include_terminal_x_as_constraint
        self.include_terminal_u_as_constraint = include_terminal_u_as_constraint

        self.nx = Q.shape[0]
        self.nu = R.shape[0]

        if lb_x is None:
            self.lb_x = -1e20 * jnp.ones(self.nx)

        if ub_x is None:
            self.ub_x = 1e20 * jnp.ones(self.nx)

        if lb_u is None:
            self.lb_u = -1e20 * jnp.ones(self.nu)

        if ub_u is None:
            self.ub_u = 1e20 * jnp.ones(self.nu)

        # Currently guesses are not taken into account
        self.x_optvars_0 = x_optvars_0  # Currently does nothing
        self.u_optvars_0 = u_optvars_0  # Currently does nothing
        if x_optvars_0 is None:
            x_optvars_0 = jnp.zeros((N + 1, self.nx))
        if u_optvars_0 is None:
            u_optvars_0 = jnp.zeros((N + 1, self.nu))

        self.ode_rhs = make_ode_rhs(plant, self.nu)

        nlp_structure_ipopt = NMPCProblemStructure(
            self.num_optvars,
            self._objective,
            self._constraints,
        )

        super().__init__(
            dt,
            self.nu,
            self.num_optvars,
            nlp_structure_ipopt,
            name=name,
        )

    @property
    def num_optvars(self):
        return (self.N + 1) * (self.nx + self.nu)

    @property
    def num_constraints(self):
        # max size regardless of terminal constraints (for jit compilation)
        num_contraints = (self.N + 2) * self.nx + self.nu
        return num_contraints

    @property
    def bounds_optvars(self):
        lb = jnp.hstack(
            [jnp.tile(self.lb_u, self.N + 1), jnp.tile(self.lb_x, self.N + 1)]
        )
        ub = jnp.hstack(
            [jnp.tile(self.ub_u, self.N + 1), jnp.tile(self.ub_x, self.N + 1)]
        )
        return (lb, ub)

    @property
    def bounds_constraints(self):
        c_lb = jnp.zeros(self.num_constraints)
        c_ub = jnp.zeros(self.num_constraints)
        return (c_lb, c_ub)

    @partial(jax.jit, static_argnames=("self",))
    def _objective(self, optvars, t0, x0, x_ref, u_ref):
        u_and_x_flat = optvars

        u = u_and_x_flat[: self.nu * (self.N + 1)].reshape((self.N + 1, self.nu))
        x = u_and_x_flat[self.nu * (self.N + 1) :].reshape((self.N + 1, self.nx))

        xdiff = x - x_ref
        udiff = u - u_ref

        # compute sum of quadratic products for x_0 to x_{n-1}
        A = jnp.dot(xdiff[:-1], self.Q)
        qp_x_sum = jnp.sum(xdiff[:-1] * A, axis=None)

        # Compute quadratic product for the x_N
        xN = xdiff[-1]
        qp_x_N = jnp.dot(xN, jnp.dot(self.QN, xN))

        # compute sum of quadratic products for u_0 to u_{n-1}
        B = jnp.dot(udiff, self.R)
        qp_u_sum = jnp.sum(udiff * B, axis=None)

        # Sum the quadratic products
        total_sum = qp_x_sum + qp_x_N + qp_u_sum
        return total_sum

    @partial(jax.jit, static_argnames=("self",))
    def _constraints(self, optvars, t0, x0, x_ref, u_ref):
        u_and_x_flat = optvars

        u = u_and_x_flat[: self.nu * (self.N + 1)].reshape((self.N + 1, self.nu))
        x = u_and_x_flat[self.nu * (self.N + 1) :].reshape((self.N + 1, self.nx))

        h = self.dt
        t = t0 + h * jnp.arange(self.N + 1)

        dot_x = jnp.zeros((self.N + 1, self.nx))

        def loop_body_break(idx, dot_x):
            rhs = self.ode_rhs(x[idx], u[idx], t[idx])
            dot_x = dot_x.at[idx].set(rhs)
            return dot_x

        dot_x = jax.lax.fori_loop(0, self.N + 1, loop_body_break, dot_x)

        t = t0 + self.dt * jnp.arange(self.N + 1)
        t_c = 0.5 * (t[:-1] + t[1:])
        u_c = 0.5 * (u[:-1] + u[1:])
        x_c = 0.5 * (x[:-1] + x[1:]) + (h / 8.0) * (dot_x[:-1] - dot_x[1:])

        dot_x_c = (-3.0 / 2.0 / h) * (x[:-1] - x[1:]) - (1.0 / 4.0) * (
            dot_x[:-1] + dot_x[1:]
        )

        c0 = x0 - x[0]

        c_others = jnp.zeros((self.N, self.nx))

        def loop_body_colloc(idx, c_others):
            c_colocation = self.ode_rhs(x_c[idx], u_c[idx], t_c[idx]) - dot_x_c[idx]
            c_others = c_others.at[idx].set(c_colocation)
            return c_others

        c_others = jax.lax.fori_loop(0, self.N, loop_body_colloc, c_others)
        c_all = jnp.hstack([c0.ravel(), c_others.ravel()])

        c_terminal_x = x_ref[self.N] - x[self.N]
        c_terminal_u = u_ref[self.N] - u[self.N]

        c_all = cond(
            self.include_terminal_x_as_constraint,
            lambda c_all, c_terminal_x: jnp.hstack([c_all, c_terminal_x.ravel()]),
            lambda c_all, c_terminal_x: jnp.hstack([c_all, jnp.zeros(self.nx)]),
            c_all,
            c_terminal_x,
        )

        c_all = cond(
            self.include_terminal_u_as_constraint,
            lambda c_all, c_terminal_u: jnp.hstack([c_all, c_terminal_u.ravel()]),
            lambda c_all, c_terminal_x: jnp.hstack([c_all, jnp.zeros(self.nu)]),
            c_all,
            c_terminal_u,
        )

        return c_all

IOPort

Bases: FeedthroughBlock

Simple class for organizing input/output ports for groups/submodels.

Since these are treated as standalone blocks in the UI rather than specific input/output ports exported to the parent model, it is more straightforward to represent them that way here as well.

This class represents a simple one-input, one-output feedthrough block where the feedthrough function is an identity. The input (resp. output) port can then be exported to the parent model to create an Inport (resp. Outport).

Source code in collimator/library/primitives.py
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
class IOPort(FeedthroughBlock):
    """Simple class for organizing input/output ports for groups/submodels.

    Since these are treated as standalone blocks in the UI rather than specific
    input/output ports exported to the parent model, it is more straightforward
    to represent them that way here as well.

    This class represents a simple one-input, one-output feedthrough block where
    the feedthrough function is an identity.  The input (resp. output) port can then
    be exported to the parent model to create an Inport (resp. Outport).
    """

    def __init__(self, *args, **kwargs):
        super().__init__(lambda x: x, *args, **kwargs)

IfThenElse

Bases: LeafSystem

Applies a conditional expression to the input signals.

Given inputs pred, true_val, and false_val, the block computes:

y = true_val if pred else false_val

The true and false values may be any arrays, but must have the same shape and dtype.

Input ports

(0) The boolean predicate. (1) The true value. (2) The false value.

Output ports

(0) The result of the conditional expression. Shape and dtype will match the true and false values.

Events

An event is triggered when the output changes from true to false or vice versa.

Source code in collimator/library/primitives.py
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
class IfThenElse(LeafSystem):
    """Applies a conditional expression to the input signals.

    Given inputs `pred`, `true_val`, and `false_val`, the block computes:
    ```
    y = true_val if pred else false_val
    ```

    The true and false values may be any arrays, but must have the same
    shape and dtype.

    Input ports:
        (0) The boolean predicate.
        (1) The true value.
        (2) The false value.

    Output ports:
        (0) The result of the conditional expression. Shape and dtype will match
            the true and false values.

    Events:
        An event is triggered when the output changes from true to false or vice versa.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.declare_input_port()  # pred
        self.declare_input_port()  # true_val
        self.declare_input_port()  # false_val

        def _compute_output(_time, _state, *inputs, **_params):
            return cnp.where(inputs[0], inputs[1], inputs[2])

        self.declare_output_port(
            _compute_output,
            prerequisites_of_calc=[port.ticket for port in self.input_ports],
        )

    def _edge_detection(self, _time, _state, *inputs, **_params):
        return cnp.where(inputs[0], 1.0, -1.0)

    def initialize_static_data(self, context):
        # Add a zero-crossing event so ODE solvers can't try to integrate
        # through a discontinuity. For efficiency, only do this if the output is
        # fed to an ODE.
        if not self.has_zero_crossing_events and is_discontinuity(self.output_ports[0]):
            self.declare_zero_crossing(self._edge_detection, direction="crosses_zero")

        return super().initialize_static_data(context)

InfiniteHorizonKalmanFilter

Bases: KalmanFilterBase

Infinite Horizon Kalman Filter for the following system:

x[n+1] = A x[n] + B u[n] + G w[n]
y[n]   = C x[n] + D u[n] + v[n]

E(w[n]) = E(v[n]) = 0
E(w[n]w'[n]) = Q
E(v[n]v'[n]) = R
E(w[n]v'[n]) = N = 0
Input ports

(0) u[n] : control vector at timestep n (1) y[n] : measurement vector at timestep n

Output ports

(1) x_hat[n] : state vector estimate at timestep n

Parameters:

Name Type Description Default
dt

float Time step of the discrete-time system

required
A

ndarray State transition matrix

required
B

ndarray Input matrix

required
C

ndarray Output matrix

required
D

ndarray Feedthrough matrix

required
G

ndarray Process noise matrix

required
Q

ndarray Process noise covariance matrix

required
R

ndarray Measurement noise covariance matrix

required
x_hat_0

ndarray Initial state estimate

required
Source code in collimator/library/state_estimators/infinite_horizon_kalman_filter.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class InfiniteHorizonKalmanFilter(KalmanFilterBase):
    """
    Infinite Horizon Kalman Filter for the following system:

    ```
    x[n+1] = A x[n] + B u[n] + G w[n]
    y[n]   = C x[n] + D u[n] + v[n]

    E(w[n]) = E(v[n]) = 0
    E(w[n]w'[n]) = Q
    E(v[n]v'[n]) = R
    E(w[n]v'[n]) = N = 0
    ```

    Input ports:
        (0) u[n] : control vector at timestep n
        (1) y[n] : measurement vector at timestep n

    Output ports:
        (1) x_hat[n] : state vector estimate at timestep n

    Parameters:
        dt: float
            Time step of the discrete-time system
        A: ndarray
            State transition matrix
        B: ndarray
            Input matrix
        C: ndarray
            Output matrix
        D: ndarray
            Feedthrough matrix
        G: ndarray
            Process noise matrix
        Q: ndarray
            Process noise covariance matrix
        R: ndarray
            Measurement noise covariance matrix
        x_hat_0: ndarray
            Initial state estimate
    """

    def __init__(
        self,
        dt,
        A,
        B,
        C,
        D,
        G,
        Q,
        R,
        x_hat_0,
        name=None,
    ):
        # Note: This class inherits from KalmanFilterBase. Since the infinite horizon
        # kalman filter does not need P_hat and track it, a dummy_P_hat_0 is set as
        # Identity matrix of size 1, and used wherever KalmanFilterBase demands
        # P_hat-like matrices

        self.dummy_P_hat_0 = cnp.eye(1)
        super().__init__(dt, x_hat_0, self.dummy_P_hat_0, name)

        self.A = A
        self.B = B
        self.C = C
        self.D = D
        self.G = G
        self.Q = Q
        self.R = R

        self.nx, self.nu = B.shape
        self.ny = C.shape[0]

        L, P, E = control.dlqe(A, G, C, Q, R)

        self.K = np.linalg.solve(A, L)

        self.A_minus_LC = A - np.matmul(L, C)
        self.B_minus_LD = B - np.matmul(L, D)
        self.L = L

    def _correct(self, time, x_hat_minus, P_hat_minus, *inputs):
        u, y = inputs
        u = cnp.atleast_1d(u)
        y = cnp.atleast_1d(y)

        C, D = self.C, self.D

        x_hat_plus = x_hat_minus + cnp.dot(
            self.K, y - cnp.dot(C, x_hat_minus) - cnp.dot(D, u)
        )  # n|n

        return x_hat_plus, self.dummy_P_hat_0

    def _propagate(self, time, x_hat_plus, P_hat_plus, *inputs):
        u, y = inputs
        u = cnp.atleast_1d(u)

        x_hat_minus = (
            cnp.dot(self.A_minus_LC, x_hat_plus)
            + cnp.dot(self.B_minus_LD, u)
            + cnp.dot(self.L, y)
        )  # n+1|n

        return x_hat_minus, self.dummy_P_hat_0

    #######################################
    # Make filter for a continuous plant  #
    #######################################

    @staticmethod
    def for_continuous_plant(
        plant,
        x_eq,
        u_eq,
        dt,
        Q,
        R,
        G=None,
        x_hat_bar_0=None,
        discretization_method="zoh",
        discretized_noise=False,
        name=None,
    ):
        """
        Obtain an Infinite Horizon Kalman Filter system for a continuous-time plant
        after linearization at equilibrium point (x_eq, u_eq)

        The input plant contains the deterministic forms of the forward and observation
        operators:

        ```
            dx/dt = f(x,u)
            y = g(x,u)
        ```

        Note: (i) Only plants with one vector-valued input and one vector-valued output
        are currently supported. Furthermore, the plant LeafSystem/Diagram should have
        only one vector-valued integrator. (ii) the user may pass a plant with
        disturbances as the input plant. However, computation of `y_eq` will be fraught
        with disturbances.

        A plant with disturbances of the following form is then considered
        following form:

        ```
            dx/dt = f(x,u) + G w                        --- (C1)
            y = g(x,u) +  v                             --- (C2)
        ```

        where:

            `w` represents the process noise,
            `v` represents the measurement noise,

        and

        ```
            E(w) = E(v) = 0
            E(ww') = Q
            E(vv') = R
            E(wv') = N = 0
        ```

        This plant with disturbances is linearized (only `f` and `g`) around the
        equilibrium point to obtain:

        ```
            d/dt (x_bar) = A x_bar + B u_bar + G w
            y_bar = C x_bar + D u_bar + v
        ```

        where,

        ```
            x_bar = x - x_eq
            u_bar = u - u_eq
            y_bar = y - y_bar
            y_eq = g(x_eq, u_eq)
        ```

        The linearized plant is then discretized via `euler` or `zoh` method to obtain:

        ```
            x_bar[n] = Ad x_bar[n] + Bd u_bar[n] + Gd w[n]           --- (L1)
            y_bar[n] = Cd x_bar[n] + Dd u_bar[n] + v[n]              --- (L2)

            E(w[n]) = E(v[n]) = 0
            E(w[n]w'[n]) = Qd
            E(v[n]v'[n]) = Rd
            E(w[n]v'[n]) = Nd = 0
        ```

        Note: If `discretized_noise` is True, then it is assumed that the user is
        providing Gd, Qd and Rd. If False, then Qd and Rd are computed from
        continuous-time Q, R, and G, and Gd is set to Identity matrix.

        An Infinite Horizon Kalman Filter estimator for the system of equations (L1)
        and (L2) is returned. This filter is in the `x_bar`, `u_bar`, and `y_bar`
        states.

        This returned system will have

        Input ports:
            (0) u_bar[n] : control vector at timestep n, relative to equilibrium
            (1) y_bar[n] : measurement vector at timestep n, relative to equilibrium

        Output ports:
            (1) x_hat_bar[n] : state vector estimate at timestep n, relative to
                               equilibrium

        Parameters:
            plant : a `Plant` object which can be a LeafSystem or a Diagram.
            x_eq: ndarray
                Equilibrium state vector for discretization
            u_eq: ndarray
                Equilibrium control vector for discretization
            dt: float
                Time step for the discretization.
            Q: ndarray
                Process noise covariance matrix.
            R: ndarray
                Measurement noise covariance matrix.
            G: ndarray
                Process noise matrix. If `None`, `G=B` is assumed making disrurbances
                additive to control vector `u`, i.e. `u_disturbed = u_orig + w`.
            x_hat_bar_0: ndarray
                Initial state estimate relative to equilibrium.
                If None, an identity matrix is assumed.
            discretization_method: str ("euler" or "zoh")
                Method to discretize the continuous-time plant. Default is "euler".
            discretized_noise: bool
                Whether the user is directly providing Gd, Qd and Rd. Default is False.
                If True, `G`, `Q`, and `R` are assumed to be Gd, Qd, and Rd,
                respectively.
        """
        (
            y_eq,
            Ad,
            Bd,
            Cd,
            Dd,
            Gd,
            Qd,
            Rd,
        ) = linearize_and_discretize_continuous_plant(
            plant, x_eq, u_eq, dt, Q, R, G, discretization_method, discretized_noise
        )

        nx = x_eq.size

        if x_hat_bar_0 is None:
            x_hat_bar_0 = cnp.zeros(nx)

        # Instantiate an Infinite Horizon Kalman Filter for the linearized plant
        kf = InfiniteHorizonKalmanFilter(
            dt,
            Ad,
            Bd,
            Cd,
            Dd,
            Gd,
            Qd,
            Rd,
            x_hat_bar_0,
            name=name,
        )

        return y_eq, kf

for_continuous_plant(plant, x_eq, u_eq, dt, Q, R, G=None, x_hat_bar_0=None, discretization_method='zoh', discretized_noise=False, name=None) staticmethod

Obtain an Infinite Horizon Kalman Filter system for a continuous-time plant after linearization at equilibrium point (x_eq, u_eq)

The input plant contains the deterministic forms of the forward and observation operators:

    dx/dt = f(x,u)
    y = g(x,u)

Note: (i) Only plants with one vector-valued input and one vector-valued output are currently supported. Furthermore, the plant LeafSystem/Diagram should have only one vector-valued integrator. (ii) the user may pass a plant with disturbances as the input plant. However, computation of y_eq will be fraught with disturbances.

A plant with disturbances of the following form is then considered following form:

    dx/dt = f(x,u) + G w                        --- (C1)
    y = g(x,u) +  v                             --- (C2)

where:

`w` represents the process noise,
`v` represents the measurement noise,

and

    E(w) = E(v) = 0
    E(ww') = Q
    E(vv') = R
    E(wv') = N = 0

This plant with disturbances is linearized (only f and g) around the equilibrium point to obtain:

    d/dt (x_bar) = A x_bar + B u_bar + G w
    y_bar = C x_bar + D u_bar + v

where,

    x_bar = x - x_eq
    u_bar = u - u_eq
    y_bar = y - y_bar
    y_eq = g(x_eq, u_eq)

The linearized plant is then discretized via euler or zoh method to obtain:

    x_bar[n] = Ad x_bar[n] + Bd u_bar[n] + Gd w[n]           --- (L1)
    y_bar[n] = Cd x_bar[n] + Dd u_bar[n] + v[n]              --- (L2)

    E(w[n]) = E(v[n]) = 0
    E(w[n]w'[n]) = Qd
    E(v[n]v'[n]) = Rd
    E(w[n]v'[n]) = Nd = 0

Note: If discretized_noise is True, then it is assumed that the user is providing Gd, Qd and Rd. If False, then Qd and Rd are computed from continuous-time Q, R, and G, and Gd is set to Identity matrix.

An Infinite Horizon Kalman Filter estimator for the system of equations (L1) and (L2) is returned. This filter is in the x_bar, u_bar, and y_bar states.

This returned system will have

Input ports

(0) u_bar[n] : control vector at timestep n, relative to equilibrium (1) y_bar[n] : measurement vector at timestep n, relative to equilibrium

Output ports

(1) x_hat_bar[n] : state vector estimate at timestep n, relative to equilibrium

Parameters:

Name Type Description Default
plant

a Plant object which can be a LeafSystem or a Diagram.

required
x_eq

ndarray Equilibrium state vector for discretization

required
u_eq

ndarray Equilibrium control vector for discretization

required
dt

float Time step for the discretization.

required
Q

ndarray Process noise covariance matrix.

required
R

ndarray Measurement noise covariance matrix.

required
G

ndarray Process noise matrix. If None, G=B is assumed making disrurbances additive to control vector u, i.e. u_disturbed = u_orig + w.

None
x_hat_bar_0

ndarray Initial state estimate relative to equilibrium. If None, an identity matrix is assumed.

None
discretization_method

str ("euler" or "zoh") Method to discretize the continuous-time plant. Default is "euler".

'zoh'
discretized_noise

bool Whether the user is directly providing Gd, Qd and Rd. Default is False. If True, G, Q, and R are assumed to be Gd, Qd, and Rd, respectively.

False
Source code in collimator/library/state_estimators/infinite_horizon_kalman_filter.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@staticmethod
def for_continuous_plant(
    plant,
    x_eq,
    u_eq,
    dt,
    Q,
    R,
    G=None,
    x_hat_bar_0=None,
    discretization_method="zoh",
    discretized_noise=False,
    name=None,
):
    """
    Obtain an Infinite Horizon Kalman Filter system for a continuous-time plant
    after linearization at equilibrium point (x_eq, u_eq)

    The input plant contains the deterministic forms of the forward and observation
    operators:

    ```
        dx/dt = f(x,u)
        y = g(x,u)
    ```

    Note: (i) Only plants with one vector-valued input and one vector-valued output
    are currently supported. Furthermore, the plant LeafSystem/Diagram should have
    only one vector-valued integrator. (ii) the user may pass a plant with
    disturbances as the input plant. However, computation of `y_eq` will be fraught
    with disturbances.

    A plant with disturbances of the following form is then considered
    following form:

    ```
        dx/dt = f(x,u) + G w                        --- (C1)
        y = g(x,u) +  v                             --- (C2)
    ```

    where:

        `w` represents the process noise,
        `v` represents the measurement noise,

    and

    ```
        E(w) = E(v) = 0
        E(ww') = Q
        E(vv') = R
        E(wv') = N = 0
    ```

    This plant with disturbances is linearized (only `f` and `g`) around the
    equilibrium point to obtain:

    ```
        d/dt (x_bar) = A x_bar + B u_bar + G w
        y_bar = C x_bar + D u_bar + v
    ```

    where,

    ```
        x_bar = x - x_eq
        u_bar = u - u_eq
        y_bar = y - y_bar
        y_eq = g(x_eq, u_eq)
    ```

    The linearized plant is then discretized via `euler` or `zoh` method to obtain:

    ```
        x_bar[n] = Ad x_bar[n] + Bd u_bar[n] + Gd w[n]           --- (L1)
        y_bar[n] = Cd x_bar[n] + Dd u_bar[n] + v[n]              --- (L2)

        E(w[n]) = E(v[n]) = 0
        E(w[n]w'[n]) = Qd
        E(v[n]v'[n]) = Rd
        E(w[n]v'[n]) = Nd = 0
    ```

    Note: If `discretized_noise` is True, then it is assumed that the user is
    providing Gd, Qd and Rd. If False, then Qd and Rd are computed from
    continuous-time Q, R, and G, and Gd is set to Identity matrix.

    An Infinite Horizon Kalman Filter estimator for the system of equations (L1)
    and (L2) is returned. This filter is in the `x_bar`, `u_bar`, and `y_bar`
    states.

    This returned system will have

    Input ports:
        (0) u_bar[n] : control vector at timestep n, relative to equilibrium
        (1) y_bar[n] : measurement vector at timestep n, relative to equilibrium

    Output ports:
        (1) x_hat_bar[n] : state vector estimate at timestep n, relative to
                           equilibrium

    Parameters:
        plant : a `Plant` object which can be a LeafSystem or a Diagram.
        x_eq: ndarray
            Equilibrium state vector for discretization
        u_eq: ndarray
            Equilibrium control vector for discretization
        dt: float
            Time step for the discretization.
        Q: ndarray
            Process noise covariance matrix.
        R: ndarray
            Measurement noise covariance matrix.
        G: ndarray
            Process noise matrix. If `None`, `G=B` is assumed making disrurbances
            additive to control vector `u`, i.e. `u_disturbed = u_orig + w`.
        x_hat_bar_0: ndarray
            Initial state estimate relative to equilibrium.
            If None, an identity matrix is assumed.
        discretization_method: str ("euler" or "zoh")
            Method to discretize the continuous-time plant. Default is "euler".
        discretized_noise: bool
            Whether the user is directly providing Gd, Qd and Rd. Default is False.
            If True, `G`, `Q`, and `R` are assumed to be Gd, Qd, and Rd,
            respectively.
    """
    (
        y_eq,
        Ad,
        Bd,
        Cd,
        Dd,
        Gd,
        Qd,
        Rd,
    ) = linearize_and_discretize_continuous_plant(
        plant, x_eq, u_eq, dt, Q, R, G, discretization_method, discretized_noise
    )

    nx = x_eq.size

    if x_hat_bar_0 is None:
        x_hat_bar_0 = cnp.zeros(nx)

    # Instantiate an Infinite Horizon Kalman Filter for the linearized plant
    kf = InfiniteHorizonKalmanFilter(
        dt,
        Ad,
        Bd,
        Cd,
        Dd,
        Gd,
        Qd,
        Rd,
        x_hat_bar_0,
        name=name,
    )

    return y_eq, kf

Integrator

Bases: LeafSystem

Integrate the input signal in time.

The Integrator block is the main primitive for building continuous-time models. It is a first-order integrator, implementing the following linear time-invariant ordinary differential equation for input values u and output values y:

    ẋ = u
    y = x

where x is the state of the integrator. The integrator is initialized with the value of the initial_state parameter.

Options

Reset: the integrator can be configured to reset its state on an input trigger. The reset value can be either the initial state of the integrator or an external value provided by an input port. Limits: the integrator can be configured such that the output and state are constrained by upper and lower limits. Hold: the integrator can be configured to hold integration based on an input trigger.

The Integrator block is also designed to detect "Zeno" behavior, where the reset events happen asymptotically closer together. This is a pathological case that can cause numerical issues in the simulation and should typically be avoided by introducing some physically realistic hysteresis into the model. However, in the event that Zeno behavior is unavoidable, the integrator will enter a "Zeno" state where the output is held constant until the trigger changes value to False. See the "bouncing ball" demo for a Zeno example.

Input ports

(0) The input signal. Must match the shape and dtype of the initial continuous state. (1) The reset trigger. Optional, only if enable_reset is True. (2) The reset value. Optional, only if enable_external_reset is True. (3) The hold trigger. Optional, only if 'enable_hold' is True.

Output ports

(0) The continuous state of the integrator.

Parameters:

Name Type Description Default
initial_state

The initial value of the integrator state. Can be any array, or even a nested structure of arrays, but the data type should be floating-point.

required
enable_reset

If True, the integrator will reset its state to the initial value when the reset trigger is True. Adds an additional input port for the reset trigger. This signal should be boolean- or binary-valued.

False
enable_external_reset

If True, the integrator will reset its state to the value provided by the reset value input port when the reset trigger is True. Otherwise, the integrator will reset to the initial value. Adds an additional input port for the reset value. This signal should match the shape and dtype of the initial continuous state.

False
enable_limits

If True, the integrator will constrain its state and output to within the upper and lower limits. Either limit may be disbale by setting its value to None.

False
ennable_hold

If True, the integrator will hold integration when the hold trigger is True.

required
reset_on_enter_zeno

If True, the integrator will reset its state to the initial value when the integrator enters the Zeno state. This option is ignored unless enable_reset is True.

False
zeno_tolerance

The tolerance used to determine if the integrator is in the Zeno state. If the time between events is less than this tolerance, then the integrator is in the Zeno state. This option is ignored unless enable_reset is True.

1e-06
Events

An event is triggered when the "reset" port changes.

An event is triggered when the state hit one of the limits.

An event is triggered when the "hold" port changes.

Another guard is conditionally active when the integrator is in the Zeno state, and is triggered when the "reset" port changes from True to False. This event is used to exit the Zeno state and resume normal integration.

Source code in collimator/library/primitives.py
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
class Integrator(LeafSystem):
    """Integrate the input signal in time.

    The Integrator block is the main primitive for building continuous-time
    models.  It is a first-order integrator, implementing the following linear
    time-invariant ordinary differential equation for input values `u` and output
    values `y`:
    ```
        ẋ = u
        y = x
    ```
    where `x` is the state of the integrator.  The integrator is initialized
    with the value of the `initial_state` parameter.

    Options:
        Reset: the integrator can be configured to reset its state on an input
            trigger.  The reset value can be either the initial state of the
            integrator or an external value provided by an input port.
        Limits: the integrator can be configured such that the output and state
            are constrained by upper and lower limits.
        Hold: the integrator can be configured to hold integration based on an
            input trigger.

    The Integrator block is also designed to detect "Zeno" behavior, where the
    reset events happen asymptotically closer together.  This is a pathological
    case that can cause numerical issues in the simulation and should typically be
    avoided by introducing some physically realistic hysteresis into the model.
    However, in the event that Zeno behavior is unavoidable, the integrator will
    enter a "Zeno" state where the output is held constant until the trigger
    changes value to False.  See the "bouncing ball" demo for a Zeno example.

    Input ports:
        (0) The input signal.  Must match the shape and dtype of the initial
            continuous state.
        (1) The reset trigger.  Optional, only if `enable_reset` is True.
        (2) The reset value.  Optional, only if `enable_external_reset` is True.
        (3) The hold trigger. Optional, only if 'enable_hold' is True.

    Output ports:
        (0) The continuous state of the integrator.

    Parameters:
        initial_state:
            The initial value of the integrator state.  Can be any array, or even
            a nested structure of arrays, but the data type should be floating-point.
        enable_reset:
            If True, the integrator will reset its state to the initial value
            when the reset trigger is True.  Adds an additional input port for
            the reset trigger.  This signal should be boolean- or binary-valued.
        enable_external_reset:
            If True, the integrator will reset its state to the value provided
            by the reset value input port when the reset trigger is True. Otherwise,
            the integrator will reset to the initial value.  Adds an additional
            input port for the reset value.  This signal should match the shape
            and dtype of the initial continuous state.
        enable_limits:
            If True, the integrator will constrain its state and output to within
            the upper and lower limits. Either limit may be disbale by setting its
            value to None.
        ennable_hold:
            If True, the integrator will hold integration when the hold trigger is
            True.
        reset_on_enter_zeno:
            If True, the integrator will reset its state to the initial value
            when the integrator enters the Zeno state.  This option is ignored unless
            `enable_reset` is True.
        zeno_tolerance:
            The tolerance used to determine if the integrator is in the Zeno state.
            If the time between events is less than this tolerance, then the
            integrator is in the Zeno state.  This option is ignored unless
            `enable_reset` is True.


    Events:
        An event is triggered when the "reset" port changes.

        An event is triggered when the state hit one of the limits.

        An event is triggered when the "hold" port changes.

        Another guard is conditionally active when the integrator is in the Zeno
        state, and is triggered when the "reset" port changes from True to False.
        This event is used to exit the Zeno state and resume normal integration.
    """

    def __init__(
        self,
        initial_state,
        enable_reset=False,
        enable_limits=False,
        lower_limit=None,
        upper_limit=None,
        enable_hold=False,
        enable_external_reset=False,
        zeno_tolerance=1e-6,
        reset_on_enter_zeno=False,
        dtype=None,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.declare_configuration_parameters(
            enable_reset=enable_reset,
            enable_external_reset=enable_external_reset,
            reset_on_enter_zeno=reset_on_enter_zeno,
            enable_limits=enable_limits,
            enable_hold=enable_hold,
        )
        self.declare_parameter("initial_state", initial_state)
        self.declare_parameter("zeno_tolerance", zeno_tolerance)

        # Default initial condition unless modified in context
        # FIXME: initial_state is not optimizable?
        if isinstance(initial_state, Parameter):
            initial_state = initial_state.get()
        self.x0 = cnp.array(initial_state, dtype=dtype)
        self.dtype = dtype if dtype is not None else self.x0.dtype

        self.xdot_index = self.declare_input_port(name="in_0")
        self.declare_continuous_state(
            default_value=self.x0,
            ode=self._ode,
            prerequisites_of_calc=[self.input_ports[self.xdot_index].ticket],
        )

        # minimum output prereqs
        prerequisites_of_calc = [DependencyTicket.xc]

        self.enable_reset = enable_reset
        self.enable_external_reset = enable_external_reset
        self.zeno_tolerance = zeno_tolerance
        self.reset_on_enter_zeno = reset_on_enter_zeno
        self.discrete_state_type = namedtuple(
            "IntegratorDiscreteState", ["zeno", "counter", "tprev"]
        )

        self.enable_limits = enable_limits
        self.has_lower_limit = lower_limit is not None
        self.has_upper_limit = upper_limit is not None

        self.enable_hold = enable_hold

        if enable_reset:
            # Boolean input for triggering reset
            self.reset_trigger_index = self.declare_input_port(name="reset_trigger")
            # prerequisites_of_calc.append(
            #     self.input_ports[self.reset_trigger_index].ticket
            # )

            # Declare a custom discrete state to track Zeno behavior
            self.declare_discrete_state(
                default_value=self.discrete_state_type(
                    zeno=False, counter=0, tprev=0.0
                ),
                as_array=False,
            )

            #
            # Declare reset event
            #
            # when reset is triggered, execute the reset map.
            self.declare_zero_crossing(
                guard=self._reset_guard,
                reset_map=self._reset,
                name="reset_on",
                direction="negative_then_non_negative",
            )
            # when reset is deasserted, do not change the state.
            self.declare_zero_crossing(
                guard=self._reset_guard,
                name="reset_off",
                direction="positive_then_non_positive",
            )

            self.declare_zero_crossing(
                guard=self._exit_zeno_guard,
                reset_map=self._exit_zeno,
                name="exit_zeno",
                direction="positive_then_non_positive",
            )

            # Optional: reset value defined by external signal
            if enable_external_reset:
                self.reset_value_index = self.declare_input_port(name="reset_value")
                # prerequisites_of_calc.append(
                #     self.input_ports[self.reset_value_index].ticket
                # )

        self.declare_output_port(
            self._output,
            name="out_0",
            prerequisites_of_calc=prerequisites_of_calc,
            requires_inputs=False,
        )

        if enable_hold:
            # Boolean input for triggering hold assert/deassert
            self.hold_trigger_index = self.declare_input_port(name="hold_trigger")

            def _hold_guard(_time, _state, *inputs, **_params):
                trigger = inputs[self.hold_trigger_index]
                return cnp.where(trigger, 1.0, -1.0)

            self.declare_zero_crossing(
                guard=_hold_guard,
                name="hold",
                direction="crosses_zero",
            )

        if enable_limits:
            if lower_limit is not None:
                self.declare_parameter("lower_limit", lower_limit)

                def _lower_limit_guard(_time, state, *_inputs, **params):
                    return state.continuous_state - params["lower_limit"]

                self.declare_zero_crossing(
                    guard=_lower_limit_guard,
                    name="lower_limit",
                    direction="positive_then_non_positive",
                )

            if upper_limit is not None:
                self.declare_parameter("upper_limit", upper_limit)

                def _upper_limit_guard(_time, state, *_inputs, **params):
                    return state.continuous_state - params["upper_limit"]

                self.declare_zero_crossing(
                    guard=_upper_limit_guard,
                    name="upper_limit",
                    direction="negative_then_non_negative",
                )

    def _ode(self, _time, state, *inputs, **params):
        # Normally, just integrate the input signal
        xdot = inputs[self.xdot_index]

        # However, if the reset trigger is high or the integrator is in the Zeno state,
        # then the integrator should hold
        if self.enable_reset:
            trigger = inputs[self.reset_trigger_index]
            in_zeno_state = state.discrete_state.zeno
            xdot = cnp.where((trigger | in_zeno_state), cnp.zeros_like(xdot), xdot)

        # Additionally, if the limits are enabled, the derivative is set to zero if
        # either limit is presnetly violated.
        if self.enable_limits:
            xc =