Skip to content

Interpreter

Module interpreter

This module contains the implementation of the Interpreter protocol, which is responsible for processing state-forwarding models and handling their execution flow. It uses asynchronous I/O for its operations, leveraging Python's asyncio library. The module depends on several other components like Queue, Clock, and Future to manage the scheduling of tasks and the progression of time within the simulation.

Classes

  • InterpreterStep: An Enum representing the state of a step within the Interpreter. Possible values are complete, incomplete, and deferred.

  • Interpreter: A protocol that defines the interface for an interpreter instance. These instances are responsible for orchestrating elements of the state-forwarding model. They manage a queue, a clock, a stack of futures, and a logging instance.

Interpreter Protocol Methods

  • __init__: Constructor for the interpreter. It takes a Queue instance and an optional logging.Logger instance as parameters.

  • send: Takes an event of type model.Element and schedules it for processing, returning a Future representing the eventual result of processing this event.

  • start: Initiates the interpreter loop which can be provided through the loop parameter; if None, the default event loop is used.

  • wait: Waits on a collection of tasks, which can be either asyncio.Task or asyncio.Future objects. It accepts an optional name to identify the wait operation and a return_when strategy that governs when the wait should return.

  • run: An asynchronous method that runs the interpreter to completion.

  • step: An abstract asynchronous method that should be implemented to perform a single step of the interpreter's operation.

  • is_active: Checks whether the provided elements are active within the current state model context.

  • push: Associates an element with a future object, pushing it onto the interpreter's stack.

  • pop: Removes an element from the interpreter's stack, providing a result to the associated future.

  • terminate: Cleans up and terminates the interpreter's operation.

Attributes

  • model: An attribute of the generic type T, which is bound to model.Model and represents the model instance that the interpreter is operating on.

Interpreter

Bases: Protocol[T]

A protocol class that defines the interface for an interpreter capable of handling events, managing an event loop, and interacting with a queue and a clock system.

Attributes:

Name Type Description
queue Queue

An instance of Queue where the interpreter manages scheduled events.

clock Clock

An instance of Clock that provides timing functionality.

stack dict[Element, Future]

A dictionary mapping model components to their corresponding futures.

log Logger

A Logger instance where the interpreter can log messages.

Methods:

Name Description
__init__

Constructor that initializes the interpreter with a given queue and an optional logger.

send

Sends an event to the interpreter for processing and returns a Future.

start

Initiates the event loop for the interpreter with an optional loop argument.

wait

Waits for the completion of the given tasks or futures and returns a collected Task.

run

Asynchronous method that starts running the interpreter event loop.

step

Asynchronous method that performs a single step in the interpreter processing.

is_active

Checks if any of the provided elements are currently active within the interpreter.

push

Associates an element with a future or task in the interpreter's stack.

pop

Removes an element from the stack and handles its corresponding result.

terminate

Ends the interpreter's execution and cleans up resources.

model

A generic type placeholder for the model handled by the interpreter.

Source code in stateforward/protocols/interpreter.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class Interpreter(typing.Protocol[T]):
    """
    A protocol class that defines the interface for an interpreter capable of handling events,
    managing an event loop, and interacting with a queue and a clock system.

    Attributes:
        queue:
             An instance of Queue where the interpreter manages scheduled events.
        clock:
             An instance of Clock that provides timing functionality.
        stack:
             A dictionary mapping model components to their corresponding futures.
        log:
             A Logger instance where the interpreter can log messages.

    Methods:
        __init__:
             Constructor that initializes the interpreter with a given queue and an optional logger.
        send:
             Sends an event to the interpreter for processing and returns a Future.
        start:
             Initiates the event loop for the interpreter with an optional loop argument.
        wait:
             Waits for the completion of the given tasks or futures and returns a collected Task.
        run:
             Asynchronous method that starts running the interpreter event loop.
        step:
             Asynchronous method that performs a single step in the interpreter processing.
        is_active:
             Checks if any of the provided elements are currently active within the interpreter.
        push:
             Associates an element with a future or task in the interpreter's stack.
        pop:
             Removes an element from the stack and handles its corresponding result.
        terminate:
             Ends the interpreter's execution and cleans up resources.
        model:
             A generic type placeholder for the model handled by the interpreter.

    """

    queue: Queue
    clock: Clock
    stack: dict[model.Element, Future]
    log: logging.Logger

    def __init__(self, queue: Queue, log: logging.Logger = None):
        """
        Initializes the instance with a Queue and an optional Logger object.

        Args:
            queue (Queue):
                 The queue to be used by the instance for storing or processing tasks.
            log (logging.Logger, optional):
                 The logger to be used for logging information, warnings, errors, etc. Defaults to None, in which case no logging will be performed.

        """
        ...

    def send(self, event: model.Element) -> Future:
        """
        Sends an event to be processed asynchronously.
        This method accepts an event object, wraps it in a Future, and schedules it for asynchronous processing.
        The method returns a Future object that can be used to retrieve the result of the event processing at a later time.

        Args:
            event (model.Element):
                 The event to send for processing.

        Returns:
            Future:
                 The future object representing the asynchronous operation of the event processing.

        """
        ...

    def start(
        self,
        loop: asyncio.AbstractEventLoop = None,
    ):
        """
        Starts the asynchronous event loop for the instance.

        Args:
            loop (asyncio.AbstractEventLoop, optional):
                 The event loop to run the instance on. If None is provided, the default event loop is used.

        """
        ...

    def wait(
        self,
        *tasks: typing.Union[asyncio.Task, asyncio.Future],
        name: str = None,
        return_when: str = asyncio.FIRST_COMPLETED,
    ) -> asyncio.Task:
        """
        Waits for the completion of given tasks or futures until a condition is met.
        This method is used to asynchronously wait for either any or all of the specified tasks or futures to complete, depending on the `return_when` parameter. It can be used to orchestrate the execution of different asynchronous operations in a non-blocking manner.

        Args:
            *tasks (typing.Union[asyncio.Task, asyncio.Future]):
                 An unpacked tuple of tasks or futures.
                These are the asynchronous operations that `wait` will wait on.
            name (str, optional):
                 A name for the group of tasks being waited on. This name is not
                directly used by the `wait` function but can be useful for logging and debugging purposes.
            return_when (str):
                 A string that specifies when the function should return. The default
                value is `asyncio.FIRST_COMPLETED`, which means the function will return as soon as any task or future is done.
                Other possible values are `asyncio.FIRST_EXCEPTION`, which returns when any task or future raises an exception,
                and `asyncio.ALL_COMPLETED`, which returns only when all tasks or futures are completed.

        Returns:
            asyncio.Task:
                 A single asyncio.Task instance that can be awaited. This task completes when
                the condition specified in `return_when` is met.

        """
        ...

    async def run(self) -> None:
        """
        Asynchronously executes the function's main logic.
        This function is designed to be called within an asynchronous context. It runs
        the primary task or series of tasks that the class instance is responsible
        for managing. As it is an async function, it should be awaited when called
        to ensure proper execution and handling of the event loop.

        Returns:
            None:
                 This function does not return any value.

        """
        ...

    async def step(self) -> None:
        """
        Performs an asynchronous step operation for the object. This function is intended to be overridden by subclasses to implement specific asynchronous behavior. The default implementation does nothing and is meant to be a placeholder.

        Returns:

        """
        pass

    def is_active(self, *elements: model.Element) -> bool:
        """
        Determines if the given elements are active.
        This method checks if the specified elements within the model are currently active. 'Active' in this context refers to the
        elements being in a state where they are in use or in operation within the model. Each element provided as an argument
        to the function is checked, and the function returns True only if all elements are active, otherwise False.

        Args:
            *elements (model.Element):
                 Variable number of Element objects to check for active status.

        Returns:
            bool:
                 True if all elements are active, False otherwise.

        """
        ...

    def push(self, element: model.Element, future: typing.Union[Future, asyncio.Task]):
        """
        Adds a new element to a collection with an optional future or task associated with it.

        Args:
            element (model.Element):
                 The element to be added to the collection.
            future (typing.Union[Future, asyncio.Task]):
                 A future or task that is
                associated with the element being added. This is optional and can be
                used to track the completion or result of an asynchronous operation
                related to the element.

        Raises:
            TypeError:
                 If the provided future is neither a Future nor an asyncio.Task instance.

        """
        ...

    def pop(self, element: model.Element, result: typing.Any):
        """
        Pops an element from the given structure and returns the result.
        This method is designed to remove the specified element from the structure it is called upon,
        and optionally returns the result of this operation.

        Args:
            element (model.Element):
                 The element to remove from the structure.
            result (typing.Any):
                 The result to return after the element has been popped.

        Returns:
            typing.Any:
                 The specified return result after the popping operation.

        """
        ...

    def terminate(self):
        """
        Terminates the current process or operation.
        This method provides the functionality to cease the operation for the associated object. It is meant to be implemented as a cleanup action to safely shut down or close resources such as file handlers, network connections, or database connections before the termination of the process.

        Raises:
            NotImplementedError:
                 If the method has not been implemented by the subclass.

        """
        ...

    model: T = None

__init__(queue, log=None)

Parameters:

Name Type Description Default
queue Queue

The queue to be used by the instance for storing or processing tasks.

required
log Logger

The logger to be used for logging information, warnings, errors, etc. Defaults to None, in which case no logging will be performed.

None
Source code in stateforward/protocols/interpreter.py
118
119
120
121
122
123
124
125
126
127
128
129
def __init__(self, queue: Queue, log: logging.Logger = None):
    """
    Initializes the instance with a Queue and an optional Logger object.

    Args:
        queue (Queue):
             The queue to be used by the instance for storing or processing tasks.
        log (logging.Logger, optional):
             The logger to be used for logging information, warnings, errors, etc. Defaults to None, in which case no logging will be performed.

    """
    ...

is_active(*elements)

Determines if the given elements are active. This method checks if the specified elements within the model are currently active. 'Active' in this context refers to the elements being in a state where they are in use or in operation within the model. Each element provided as an argument to the function is checked, and the function returns True only if all elements are active, otherwise False.

Parameters:

Name Type Description Default
*elements Element

Variable number of Element objects to check for active status.

()

Returns:

Name Type Description
bool bool

True if all elements are active, False otherwise.

Source code in stateforward/protocols/interpreter.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def is_active(self, *elements: model.Element) -> bool:
    """
    Determines if the given elements are active.
    This method checks if the specified elements within the model are currently active. 'Active' in this context refers to the
    elements being in a state where they are in use or in operation within the model. Each element provided as an argument
    to the function is checked, and the function returns True only if all elements are active, otherwise False.

    Args:
        *elements (model.Element):
             Variable number of Element objects to check for active status.

    Returns:
        bool:
             True if all elements are active, False otherwise.

    """
    ...

pop(element, result)

Pops an element from the given structure and returns the result. This method is designed to remove the specified element from the structure it is called upon, and optionally returns the result of this operation.

Parameters:

Name Type Description Default
element Element

The element to remove from the structure.

required
result Any

The result to return after the element has been popped.

required

Returns:

Type Description

typing.Any: The specified return result after the popping operation.

Source code in stateforward/protocols/interpreter.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def pop(self, element: model.Element, result: typing.Any):
    """
    Pops an element from the given structure and returns the result.
    This method is designed to remove the specified element from the structure it is called upon,
    and optionally returns the result of this operation.

    Args:
        element (model.Element):
             The element to remove from the structure.
        result (typing.Any):
             The result to return after the element has been popped.

    Returns:
        typing.Any:
             The specified return result after the popping operation.

    """
    ...

push(element, future)

Adds a new element to a collection with an optional future or task associated with it.

Parameters:

Name Type Description Default
element Element

The element to be added to the collection.

required
future Union[Future, Task]

A future or task that is associated with the element being added. This is optional and can be used to track the completion or result of an asynchronous operation related to the element.

required

Raises:

Type Description
TypeError

If the provided future is neither a Future nor an asyncio.Task instance.

Source code in stateforward/protocols/interpreter.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def push(self, element: model.Element, future: typing.Union[Future, asyncio.Task]):
    """
    Adds a new element to a collection with an optional future or task associated with it.

    Args:
        element (model.Element):
             The element to be added to the collection.
        future (typing.Union[Future, asyncio.Task]):
             A future or task that is
            associated with the element being added. This is optional and can be
            used to track the completion or result of an asynchronous operation
            related to the element.

    Raises:
        TypeError:
             If the provided future is neither a Future nor an asyncio.Task instance.

    """
    ...

run() async

Asynchronously executes the function's main logic. This function is designed to be called within an asynchronous context. It runs the primary task or series of tasks that the class instance is responsible for managing. As it is an async function, it should be awaited when called to ensure proper execution and handling of the event loop.

Returns:

Name Type Description
None None

This function does not return any value.

Source code in stateforward/protocols/interpreter.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def run(self) -> None:
    """
    Asynchronously executes the function's main logic.
    This function is designed to be called within an asynchronous context. It runs
    the primary task or series of tasks that the class instance is responsible
    for managing. As it is an async function, it should be awaited when called
    to ensure proper execution and handling of the event loop.

    Returns:
        None:
             This function does not return any value.

    """
    ...

send(event)

Sends an event to be processed asynchronously. This method accepts an event object, wraps it in a Future, and schedules it for asynchronous processing. The method returns a Future object that can be used to retrieve the result of the event processing at a later time.

Parameters:

Name Type Description Default
event Element

The event to send for processing.

required

Returns:

Name Type Description
Future Future

The future object representing the asynchronous operation of the event processing.

Source code in stateforward/protocols/interpreter.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def send(self, event: model.Element) -> Future:
    """
    Sends an event to be processed asynchronously.
    This method accepts an event object, wraps it in a Future, and schedules it for asynchronous processing.
    The method returns a Future object that can be used to retrieve the result of the event processing at a later time.

    Args:
        event (model.Element):
             The event to send for processing.

    Returns:
        Future:
             The future object representing the asynchronous operation of the event processing.

    """
    ...

start(loop=None)

Starts the asynchronous event loop for the instance.

Parameters:

Name Type Description Default
loop AbstractEventLoop

The event loop to run the instance on. If None is provided, the default event loop is used.

None
Source code in stateforward/protocols/interpreter.py
148
149
150
151
152
153
154
155
156
157
158
159
160
def start(
    self,
    loop: asyncio.AbstractEventLoop = None,
):
    """
    Starts the asynchronous event loop for the instance.

    Args:
        loop (asyncio.AbstractEventLoop, optional):
             The event loop to run the instance on. If None is provided, the default event loop is used.

    """
    ...

step() async

Performs an asynchronous step operation for the object. This function is intended to be overridden by subclasses to implement specific asynchronous behavior. The default implementation does nothing and is meant to be a placeholder.

Returns:

Source code in stateforward/protocols/interpreter.py
208
209
210
211
212
213
214
215
async def step(self) -> None:
    """
    Performs an asynchronous step operation for the object. This function is intended to be overridden by subclasses to implement specific asynchronous behavior. The default implementation does nothing and is meant to be a placeholder.

    Returns:

    """
    pass

terminate()

Terminates the current process or operation. This method provides the functionality to cease the operation for the associated object. It is meant to be implemented as a cleanup action to safely shut down or close resources such as file handlers, network connections, or database connections before the termination of the process.

Raises:

Type Description
NotImplementedError

If the method has not been implemented by the subclass.

Source code in stateforward/protocols/interpreter.py
274
275
276
277
278
279
280
281
282
283
284
def terminate(self):
    """
    Terminates the current process or operation.
    This method provides the functionality to cease the operation for the associated object. It is meant to be implemented as a cleanup action to safely shut down or close resources such as file handlers, network connections, or database connections before the termination of the process.

    Raises:
        NotImplementedError:
             If the method has not been implemented by the subclass.

    """
    ...

wait(*tasks, name=None, return_when=asyncio.FIRST_COMPLETED)

Waits for the completion of given tasks or futures until a condition is met. This method is used to asynchronously wait for either any or all of the specified tasks or futures to complete, depending on the return_when parameter. It can be used to orchestrate the execution of different asynchronous operations in a non-blocking manner.

Parameters:

Name Type Description Default
*tasks Union[Task, Future]

An unpacked tuple of tasks or futures. These are the asynchronous operations that wait will wait on.

()
name str

A name for the group of tasks being waited on. This name is not directly used by the wait function but can be useful for logging and debugging purposes.

None
return_when str

A string that specifies when the function should return. The default value is asyncio.FIRST_COMPLETED, which means the function will return as soon as any task or future is done. Other possible values are asyncio.FIRST_EXCEPTION, which returns when any task or future raises an exception, and asyncio.ALL_COMPLETED, which returns only when all tasks or futures are completed.

FIRST_COMPLETED

Returns:

Type Description
Task

asyncio.Task: A single asyncio.Task instance that can be awaited. This task completes when the condition specified in return_when is met.

Source code in stateforward/protocols/interpreter.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def wait(
    self,
    *tasks: typing.Union[asyncio.Task, asyncio.Future],
    name: str = None,
    return_when: str = asyncio.FIRST_COMPLETED,
) -> asyncio.Task:
    """
    Waits for the completion of given tasks or futures until a condition is met.
    This method is used to asynchronously wait for either any or all of the specified tasks or futures to complete, depending on the `return_when` parameter. It can be used to orchestrate the execution of different asynchronous operations in a non-blocking manner.

    Args:
        *tasks (typing.Union[asyncio.Task, asyncio.Future]):
             An unpacked tuple of tasks or futures.
            These are the asynchronous operations that `wait` will wait on.
        name (str, optional):
             A name for the group of tasks being waited on. This name is not
            directly used by the `wait` function but can be useful for logging and debugging purposes.
        return_when (str):
             A string that specifies when the function should return. The default
            value is `asyncio.FIRST_COMPLETED`, which means the function will return as soon as any task or future is done.
            Other possible values are `asyncio.FIRST_EXCEPTION`, which returns when any task or future raises an exception,
            and `asyncio.ALL_COMPLETED`, which returns only when all tasks or futures are completed.

    Returns:
        asyncio.Task:
             A single asyncio.Task instance that can be awaited. This task completes when
            the condition specified in `return_when` is met.

    """
    ...

InterpreterStep

Bases: Enum

An enumeration to represent the possible states of an interpreter step. This enum classifies the status of a step within an interpreter's execution process. Each member of the enumeration represents a distinct state that indicates the completeness of the current step being processed.

Attributes:

Name Type Description
complete str

A member indicating the interpreter step has been successfully completed without any pending actions.

incomplete str

A member indicating the interpreter step is not yet finished and may require further processing.

deferred str

A member indicating the interpreter step's execution has been postponed and will be revisited later.

Source code in stateforward/protocols/interpreter.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class InterpreterStep(Enum):
    """
    An enumeration to represent the possible states of an interpreter step.
    This enum classifies the status of a step within an interpreter's execution process. Each member of the enumeration represents a distinct state that indicates the completeness of the current step being processed.

    Attributes:
        complete (str):
             A member indicating the interpreter step has been successfully completed without any pending actions.
        incomplete (str):
             A member indicating the interpreter step is not yet finished and may require further processing.
        deferred (str):
             A member indicating the interpreter step's execution has been postponed and will be revisited later.

    """

    complete = "complete"
    incomplete = "incomplete"
    deferred = "deferred"