Skip to content

Async Interpreter

The async_interpreter module provides an asynchronous execution framework specifically designed for state-based systems, utilizing asynchronous I/O provided by Python's asyncio library. It is built upon the concept of interacting with different models, events, queues, and clocks within state-based systems. This module contains several key classes and components, each working together to execute sequences of operations in a non-blocking manner. Key components include InterpreterStep, AsyncInterpreter, and Null, as well as crucial controllable constructs such as queue, clock, stack, and log. The module defines the functioning of an asynchronous interpreter for state machine workflows, providing methods for event handling, concurrency management, and execution control. Notably, the AsyncInterpreter class inherits from model.Element and is parameterized to work with various model types, enriching its capability to interface with different state machine elements. This module is essential for scenarios where asynchronous event handling and state management are paramount, and it relies heavily on the asyncio event loop and future constructs for its operation.

AsyncInterpreter

Bases: Element, Generic[T]

An asynchronous interpreter designed to operate with the provided state machine. This interpreter runs within the asyncio event loop and handles asynchronous task management and event processing for the state machine. It manages a queue of events and a stack to maintain the state of in-flight operations.

Attributes:

Name Type Description
queue Queue

The queue for managing incoming events.

clock Clock

An object managing the clock speed for the interpreter's operations.

stack dict[Element, Future]

A dictionary mapping state machine elements to their associated futures/tasks.

loop AbstractEventLoop

The event loop in which this interpreter operates.

log Logger

Logger for the interpreter to output its activity.

running Event

An event signaling whether the interpreter is currently running.

stepping Lock

A lock to ensure step execution is done atomically.

model T

A generic type parameter representing the state machine model.

Methods:

Name Description
__init__

Queue, log: logging.Logger=None): Initializes the AsyncInterpreter with a queue and an optional logger.

send

model.Element): Sends an event to be processed by the state machine.

start

asyncio.AbstractEventLoop=None): Starts the interpreter within the given or default event loop.

wait

typing.Union[asyncio.Task, asyncio.Future],

name

str=None, return_when: str=asyncio.FIRST_COMPLETED) -> asyncio.Task: Waits for the given tasks to complete, returning an asyncio.Task wrapping the wait operation.

run

The coroutine that runs the main event processing loop of the interpreter.

step

Processes the next set of tasks from the stack.

is_active

model.Element) -> bool: Checks if the given elements are active within the current stack.

push

model.Element,

future

typing.Union[Future, asyncio.Task]=NULL): Pushes a new element and associated future/task onto the stack.

pop

model.Element, *, result: typing.Any=NULL): Pops an element and its associated task from the stack, handling its result.

terminate

Terminates the interpreter, cleaning up and cancelling tasks as necessary.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class AsyncInterpreter(model.Element, typing.Generic[T]):
    """
    An asynchronous interpreter designed to operate with the provided state machine.
    This interpreter runs within the asyncio event loop and handles asynchronous task management
    and event processing for the state machine. It manages a queue of events and a stack to
    maintain the state of in-flight operations.

    Attributes:
        queue (Queue):
             The queue for managing incoming events.
        clock (Clock):
             An object managing the clock speed for the interpreter's operations.
        stack (dict[model.Element, asyncio.Future]):
             A dictionary mapping state machine elements
            to their associated futures/tasks.
        loop (asyncio.AbstractEventLoop):
             The event loop in which this interpreter operates.
        log (logging.Logger):
             Logger for the interpreter to output its activity.
        running (asyncio.Event):
             An event signaling whether the interpreter is currently running.
        stepping (asyncio.Lock):
             A lock to ensure step execution is done atomically.
        model (T):
             A generic type parameter representing the state machine model.

    Methods:
        __init__(self, queue:
             Queue, log: logging.Logger=None):
            Initializes the AsyncInterpreter with a queue and an optional logger.
        send(self, event:
             model.Element):
            Sends an event to be processed by the state machine.
        start(self, loop:
             asyncio.AbstractEventLoop=None):
            Starts the interpreter within the given or default event loop.
        wait(self, *tasks:
             typing.Union[asyncio.Task, asyncio.Future],
        name:
             str=None, return_when: str=asyncio.FIRST_COMPLETED) -> asyncio.Task:
            Waits for the given tasks to complete, returning an asyncio.Task wrapping the wait operation.
        run(self) -> None:
            The coroutine that runs the main event processing loop of the interpreter.
        step(self) -> None:
            Processes the next set of tasks from the stack.
        is_active(self, *elements:
             model.Element) -> bool:
            Checks if the given elements are active within the current stack.
        push(self, element:
             model.Element,
        future:
             typing.Union[Future, asyncio.Task]=NULL):
            Pushes a new element and associated future/task onto the stack.
        pop(self, element:
             model.Element, *, result: typing.Any=NULL):
            Pops an element and its associated task from the stack, handling its result.
        terminate(self) -> asyncio.Task:
            Terminates the interpreter, cleaning up and cancelling tasks as necessary.

    """

    queue: Queue = None
    clock: Clock
    stack: dict[model.Element, asyncio.Future] = None
    loop: asyncio.AbstractEventLoop = None
    log: logging.Logger = logging.getLogger(__name__)
    running: asyncio.Event = None
    stepping: asyncio.Lock = None

    def __init__(self, queue: Queue, log: logging.Logger = None):
        """
        Initializes the instance with a queue, an optional log, and internal attributes.
        This method sets up the data structures and synchronization primitives required for the operation
        of an instance. It assigns the queue to the instance, initializes a stack as a dictionary,
        prepares an asyncio event to manage the running state, and sets up logging.

        Args:
            queue (Queue):
                 A queue object used for inter-thread or inter-process communication.
            log (logging.Logger, optional):
                 A logger instance for logging messages. If not provided, it falls back to a default logger.

        Attributes:
            stack (dict):
                 A dictionary to hold instance-specific data.
            queue (Queue):
                 The queue object passed during initialization.
            running (asyncio.Event):
                 An event to indicate whether the instance is running.
            log (logging.Logger):
                 A logger instance for outputting logs.

        """
        self.stack = {}
        self.queue = queue
        self.running = asyncio.Event()
        self.log = log or self.log

    def send(self, event: model.Element):
        """
        Sends an event to be processed by the state machine.
        This method logs the receipt of the event, pushes it to the stack along with a new Future object, adds the event to a queue, and then awaits the processing of the event. The method returns the result of waiting for the future associated with the event to be completed.

        Args:
            event (model.Element):
                 The event to be sent to the state machine for processing.

        Returns:

        """
        self.log.debug(f"Received {model.qualified_name_of(event)}")
        # push the event onto the stack
        future = self.push(event, asyncio.Future())
        # add the event to the queue
        self.queue.put_nowait(event)
        return self.wait(
            future,
            self.stack.get(self),
            name=f"{model.qualified_name_of(event)}.sent",
        )

    def start(
        self,
        loop: asyncio.AbstractEventLoop = None,
    ):
        """
        Starts an asynchronous task to run the state machine in an event loop.
        This method initializes the event loop for the state machine, logs the state machine's qualified name,
        and then creates and starts an asynchronous task for the state machine's `run` method.
        It pushes the running task into a tracking structure for managing tasks and then waits for the
        initialization to complete before proceeding.

        Args:
            loop (asyncio.AbstractEventLoop, optional):
                 The event loop in which the state machine will
                be run. If not provided, the current event loop will be used.

        Returns:
            A `wait` wrapper that is used to wait for two events:
                 the task that runs the state machine
                to complete, and a separate task that signals the state machine is running.

        """
        qualified_name = model.qualified_name_of(self)
        self.log.debug(f"Starting {qualified_name}")
        loop = self.loop = loop or asyncio.get_event_loop()
        task = loop.create_task(self.run(), name=qualified_name)
        self.push(self, task)
        return self.wait(task, self.loop.create_task(self.running.wait()))

    def wait(
        self,
        *tasks: typing.Union[asyncio.Task, asyncio.Future],
        name: str = None,
        return_when: str = asyncio.FIRST_COMPLETED,
    ) -> asyncio.Task:
        """
        Waits for the completion of one or more asyncio.Task or asyncio.Future objects.
        This function is a coroutine that accepts any number of asyncio.Task or asyncio.Future
        objects and an optional name for the underlying task that will wait for the
        provided tasks or futures to complete. The function will schedule the execution
        of these tasks or futures on the event loop, and wait until the conditions specified
        by return_when are met. return_when can indicate waiting for the first task to
        complete (asyncio.FIRST_COMPLETED), all tasks to complete (asyncio.ALL_COMPLETED),
        or the first task to not be cancelled (asyncio.FIRST_EXCEPTION).
        Upon completing the wait condition, the function will return the task that was created
        to perform the wait operation. If any of the awaited tasks or futures raise an
        exception during execution, the exception will be propagated.

        Args:
            *tasks (Union[asyncio.Task, asyncio.Future]):
                 An arbitrary number of asyncio.Task
                or asyncio.Future objects to be awaited.
            name (str, optional):
                 An optional name for the asyncio.Task that will be created
                to perform the waiting operation. If not provided, a name is generated by
                concatenating the names of the tasks with '_and_'.
            return_when (str):
                 The condition that determines when the wait operation
                should return. Must be one of asyncio.FIRST_COMPLETED, asyncio.ALL_COMPLETED,
                or asyncio.FIRST_EXCEPTION.

        Returns:
            asyncio.Task:
                 The task that was created to wait for the provided tasks or futures.

        """

        async def wait_for_tasks(_tasks=tasks, _return_when=return_when):
            """
            Waits for the completion of tasks, possibly returning before all tasks are finished based on a condition.
            This function asynchronously waits for the `_tasks` iterable of tasks to reach a completion state, which is determined by the `_return_when` condition. Once the condition is met, it retrieves one of the completed tasks from `done` set. If that task raised an exception, the exception is propagated by awaiting on the task object.

            Args:
                _tasks (Iterable[Task]):
                     The collection of asyncio.Task objects to wait on. Defaults to the `tasks` variable in the current context.
                _return_when (str):
                     The condition upon which the function should return. This condition dictates whether the function waits for all tasks to complete, or returns earlier. Defaults to the `return_when` variable in the current context.

            Returns:
                Tuple[Set[Task], Set[Task]]:
                     A tuple containing two sets. The first set contains all tasks that are done, and the second set contains all tasks that are pending.

            Raises:
                Any exception raised by a task:
                     If any task among `_tasks` raises an exception, that exception is propagated.

            """
            done, pending = await asyncio.wait(_tasks, return_when=_return_when)
            task = done.pop()
            if task.exception() is not None:
                await task

        return self.loop.create_task(
            wait_for_tasks(),
            name=name or "_and_".join(task.get_name() for task in tasks),
        )

    async def run(self) -> None:
        """
        Asynchronously runs the process associated with the class instance.
        This coroutine continuously executes the 'step' method and awaits for a sleep interval based on the 'clock.multiplier'. It checks if the instance is active and if the 'running' flag is set before each iteration. If the 'running' flag is cancelled, it logs a debug message indicating the cancellation. If the 'running' flag is still set after an interruption, it ensures that the 'terminate' method is called.

        Raises:
            asyncio.CancelledError:
                 If the coroutine is cancelled during its execution.

        """
        self.log.debug(
            f"Running {model.qualified_name_of(self)} clock multiplier {self.clock.multiplier}"
        )
        if self.is_active(self):
            self.running.set()
            try:
                while self.running.is_set():
                    await self.step()
                    await asyncio.sleep(self.clock.multiplier)
            except asyncio.CancelledError:
                self.log.debug(f"Cancelled {model.qualified_name_of(self)}")
            if self.running.is_set():
                await self.terminate()

    async def step(self) -> None:
        """
        Performs an asynchronous iteration step over a collection of futures stored in an instance's stack.
        This method iterates through the values of the instance's stack attribute, which is expected to be a mapping of futures. It checks if any of the futures have raised an exception. If an exception is encountered in any future, it is immediately raised, halting the iteration.

        Raises:
            Exception:
                 Any exception raised by a future in the stack.

        Returns:

        """
        for future in self.stack.values():
            if exception := future.exception() is not None:
                raise exception

    def is_active(self, *elements: model.Element) -> bool:
        """
        Checks if all specified elements are active within the current context.
        This method determines if all elements provided as arguments exist in the context's active stack.

        Args:
            *elements (model.Element):
                 Variable number of elements to check for activeness within the stack.

        Returns:
            bool:
                 True if all specified elements are active (i.e., they exist in the stack); False otherwise.

        """
        return bool(self.stack) and all(element in self.stack for element in elements)

    def push(
        self, element: model.Element, future: typing.Union[Future, asyncio.Task] = NULL
    ):
        """
        Pushes an element onto the stack with an associated future or task, ensuring uniqueness.
        This method adds an element to an internal stack, associating it with a future or an asyncio task, which may represent the element's processing state.
        If the element already exists within the stack, a ValueError is thrown to avoid duplication.
        The method ensures that each element can have only one corresponding future or task.

        Args:
            element (model.Element):
                 The element to push onto the stack.
            future (typing.Union[Future, asyncio.Task], optional):
                 A future or task to associate with the element.
                If not provided, the default value defined by NULL will be used.

        Returns:
            Future:
                 The future or task associated with the element. This can be either the provided argument
                or the one already associated with the element in case of previously existing entry.

        Raises:
            ValueError:
                 If the element is already present in the stack.

        """
        if element in self.stack:
            raise ValueError(
                f"element {model.qualified_name_of(element)} already exists"
            )
        future = self.stack.setdefault(element, future)
        return typing.cast(Future, future)

    def pop(self, element: model.Element, *, result: typing.Any = NULL):
        """
        Pops an element from the stack and sets the result if specified.
        This method retrieves the future associated with a stack element, removes the element from the stack,
        and optionally sets a result on the future. If the future is already done and contains an exception,
        the exception is raised. If `result` is provided and not NULL, it is used to set the future's result.

        Args:
            element (model.Element):
                 The element to be popped from the stack.
            result (typing.Any, optional):
                 The result to set on the future if not NULL. Defaults to NULL.

        Returns:
            typing.cast(Future, future):
                 The future associated with the popped element.

        Raises:

        """
        future = self.stack.pop(element, NULL)
        if future.done():
            if future.exception() is not None:
                raise future.result()
            elif result is not NULL:
                future.set_result(result)
        return typing.cast(Future, future)

    def terminate(self) -> asyncio.Task:
        """
        Cancels the asynchronous task associated with the current instance if it is still running.
        This method checks if the `running` attribute, presumably an instance of `threading.Event` or similar,
        is set. If it is, it clears the `running` attribute to stop the task. It then retrieves the current
        task using a `pop` method and inspects it. If the task is not yet completed, it will attempt
        to cancel it by calling its `cancel` method. The task object, now potentially canceled, is
        then recast as an `asyncio.Task` and returned.

        Returns:
            asyncio.Task:
                 The task associated with this instance after attempting to cancel it if necessary.

        """
        if self.running.is_set():
            self.running.clear()
        task = self.pop(self)
        if not task.done():
            task.cancel()
        return typing.cast(asyncio.Task, task)

    model: T = None

__init__(queue, log=None)

of an instance. It assigns the queue to the instance, initializes a stack as a dictionary, prepares an asyncio event to manage the running state, and sets up logging.

Parameters:

Name Type Description Default
queue Queue

A queue object used for inter-thread or inter-process communication.

required
log Logger

A logger instance for logging messages. If not provided, it falls back to a default logger.

None

Attributes:

Name Type Description
stack dict

A dictionary to hold instance-specific data.

queue Queue

The queue object passed during initialization.

running Event

An event to indicate whether the instance is running.

log Logger

A logger instance for outputting logs.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def __init__(self, queue: Queue, log: logging.Logger = None):
    """
    Initializes the instance with a queue, an optional log, and internal attributes.
    This method sets up the data structures and synchronization primitives required for the operation
    of an instance. It assigns the queue to the instance, initializes a stack as a dictionary,
    prepares an asyncio event to manage the running state, and sets up logging.

    Args:
        queue (Queue):
             A queue object used for inter-thread or inter-process communication.
        log (logging.Logger, optional):
             A logger instance for logging messages. If not provided, it falls back to a default logger.

    Attributes:
        stack (dict):
             A dictionary to hold instance-specific data.
        queue (Queue):
             The queue object passed during initialization.
        running (asyncio.Event):
             An event to indicate whether the instance is running.
        log (logging.Logger):
             A logger instance for outputting logs.

    """
    self.stack = {}
    self.queue = queue
    self.running = asyncio.Event()
    self.log = log or self.log

is_active(*elements)

Checks if all specified elements are active within the current context. This method determines if all elements provided as arguments exist in the context's active stack.

Parameters:

Name Type Description Default
*elements Element

Variable number of elements to check for activeness within the stack.

()

Returns:

Name Type Description
bool bool

True if all specified elements are active (i.e., they exist in the stack); False otherwise.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def is_active(self, *elements: model.Element) -> bool:
    """
    Checks if all specified elements are active within the current context.
    This method determines if all elements provided as arguments exist in the context's active stack.

    Args:
        *elements (model.Element):
             Variable number of elements to check for activeness within the stack.

    Returns:
        bool:
             True if all specified elements are active (i.e., they exist in the stack); False otherwise.

    """
    return bool(self.stack) and all(element in self.stack for element in elements)

pop(element, *, result=NULL)

Pops an element from the stack and sets the result if specified. This method retrieves the future associated with a stack element, removes the element from the stack, and optionally sets a result on the future. If the future is already done and contains an exception, the exception is raised. If result is provided and not NULL, it is used to set the future's result.

Parameters:

Name Type Description Default
element Element

The element to be popped from the stack.

required
result Any

The result to set on the future if not NULL. Defaults to NULL.

NULL

Returns:

Type Description

typing.cast(Future, future): The future associated with the popped element.

Raises:

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def pop(self, element: model.Element, *, result: typing.Any = NULL):
    """
    Pops an element from the stack and sets the result if specified.
    This method retrieves the future associated with a stack element, removes the element from the stack,
    and optionally sets a result on the future. If the future is already done and contains an exception,
    the exception is raised. If `result` is provided and not NULL, it is used to set the future's result.

    Args:
        element (model.Element):
             The element to be popped from the stack.
        result (typing.Any, optional):
             The result to set on the future if not NULL. Defaults to NULL.

    Returns:
        typing.cast(Future, future):
             The future associated with the popped element.

    Raises:

    """
    future = self.stack.pop(element, NULL)
    if future.done():
        if future.exception() is not None:
            raise future.result()
        elif result is not NULL:
            future.set_result(result)
    return typing.cast(Future, future)

push(element, future=NULL)

Pushes an element onto the stack with an associated future or task, ensuring uniqueness. This method adds an element to an internal stack, associating it with a future or an asyncio task, which may represent the element's processing state. If the element already exists within the stack, a ValueError is thrown to avoid duplication. The method ensures that each element can have only one corresponding future or task.

Parameters:

Name Type Description Default
element Element

The element to push onto the stack.

required
future Union[Future, Task]

A future or task to associate with the element. If not provided, the default value defined by NULL will be used.

NULL

Returns:

Name Type Description
Future

The future or task associated with the element. This can be either the provided argument or the one already associated with the element in case of previously existing entry.

Raises:

Type Description
ValueError

If the element is already present in the stack.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def push(
    self, element: model.Element, future: typing.Union[Future, asyncio.Task] = NULL
):
    """
    Pushes an element onto the stack with an associated future or task, ensuring uniqueness.
    This method adds an element to an internal stack, associating it with a future or an asyncio task, which may represent the element's processing state.
    If the element already exists within the stack, a ValueError is thrown to avoid duplication.
    The method ensures that each element can have only one corresponding future or task.

    Args:
        element (model.Element):
             The element to push onto the stack.
        future (typing.Union[Future, asyncio.Task], optional):
             A future or task to associate with the element.
            If not provided, the default value defined by NULL will be used.

    Returns:
        Future:
             The future or task associated with the element. This can be either the provided argument
            or the one already associated with the element in case of previously existing entry.

    Raises:
        ValueError:
             If the element is already present in the stack.

    """
    if element in self.stack:
        raise ValueError(
            f"element {model.qualified_name_of(element)} already exists"
        )
    future = self.stack.setdefault(element, future)
    return typing.cast(Future, future)

run() async

Asynchronously runs the process associated with the class instance. This coroutine continuously executes the 'step' method and awaits for a sleep interval based on the 'clock.multiplier'. It checks if the instance is active and if the 'running' flag is set before each iteration. If the 'running' flag is cancelled, it logs a debug message indicating the cancellation. If the 'running' flag is still set after an interruption, it ensures that the 'terminate' method is called.

Raises:

Type Description
CancelledError

If the coroutine is cancelled during its execution.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
async def run(self) -> None:
    """
    Asynchronously runs the process associated with the class instance.
    This coroutine continuously executes the 'step' method and awaits for a sleep interval based on the 'clock.multiplier'. It checks if the instance is active and if the 'running' flag is set before each iteration. If the 'running' flag is cancelled, it logs a debug message indicating the cancellation. If the 'running' flag is still set after an interruption, it ensures that the 'terminate' method is called.

    Raises:
        asyncio.CancelledError:
             If the coroutine is cancelled during its execution.

    """
    self.log.debug(
        f"Running {model.qualified_name_of(self)} clock multiplier {self.clock.multiplier}"
    )
    if self.is_active(self):
        self.running.set()
        try:
            while self.running.is_set():
                await self.step()
                await asyncio.sleep(self.clock.multiplier)
        except asyncio.CancelledError:
            self.log.debug(f"Cancelled {model.qualified_name_of(self)}")
        if self.running.is_set():
            await self.terminate()

send(event)

Sends an event to be processed by the state machine. This method logs the receipt of the event, pushes it to the stack along with a new Future object, adds the event to a queue, and then awaits the processing of the event. The method returns the result of waiting for the future associated with the event to be completed.

Parameters:

Name Type Description Default
event Element

The event to be sent to the state machine for processing.

required

Returns:

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def send(self, event: model.Element):
    """
    Sends an event to be processed by the state machine.
    This method logs the receipt of the event, pushes it to the stack along with a new Future object, adds the event to a queue, and then awaits the processing of the event. The method returns the result of waiting for the future associated with the event to be completed.

    Args:
        event (model.Element):
             The event to be sent to the state machine for processing.

    Returns:

    """
    self.log.debug(f"Received {model.qualified_name_of(event)}")
    # push the event onto the stack
    future = self.push(event, asyncio.Future())
    # add the event to the queue
    self.queue.put_nowait(event)
    return self.wait(
        future,
        self.stack.get(self),
        name=f"{model.qualified_name_of(event)}.sent",
    )

start(loop=None)

Starts an asynchronous task to run the state machine in an event loop. This method initializes the event loop for the state machine, logs the state machine's qualified name, and then creates and starts an asynchronous task for the state machine's run method. It pushes the running task into a tracking structure for managing tasks and then waits for the initialization to complete before proceeding.

Parameters:

Name Type Description Default
loop AbstractEventLoop

The event loop in which the state machine will be run. If not provided, the current event loop will be used.

None

Returns:

Type Description

A wait wrapper that is used to wait for two events: the task that runs the state machine to complete, and a separate task that signals the state machine is running.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def start(
    self,
    loop: asyncio.AbstractEventLoop = None,
):
    """
    Starts an asynchronous task to run the state machine in an event loop.
    This method initializes the event loop for the state machine, logs the state machine's qualified name,
    and then creates and starts an asynchronous task for the state machine's `run` method.
    It pushes the running task into a tracking structure for managing tasks and then waits for the
    initialization to complete before proceeding.

    Args:
        loop (asyncio.AbstractEventLoop, optional):
             The event loop in which the state machine will
            be run. If not provided, the current event loop will be used.

    Returns:
        A `wait` wrapper that is used to wait for two events:
             the task that runs the state machine
            to complete, and a separate task that signals the state machine is running.

    """
    qualified_name = model.qualified_name_of(self)
    self.log.debug(f"Starting {qualified_name}")
    loop = self.loop = loop or asyncio.get_event_loop()
    task = loop.create_task(self.run(), name=qualified_name)
    self.push(self, task)
    return self.wait(task, self.loop.create_task(self.running.wait()))

step() async

Performs an asynchronous iteration step over a collection of futures stored in an instance's stack. This method iterates through the values of the instance's stack attribute, which is expected to be a mapping of futures. It checks if any of the futures have raised an exception. If an exception is encountered in any future, it is immediately raised, halting the iteration.

Raises:

Type Description
Exception

Any exception raised by a future in the stack.

Returns:

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
async def step(self) -> None:
    """
    Performs an asynchronous iteration step over a collection of futures stored in an instance's stack.
    This method iterates through the values of the instance's stack attribute, which is expected to be a mapping of futures. It checks if any of the futures have raised an exception. If an exception is encountered in any future, it is immediately raised, halting the iteration.

    Raises:
        Exception:
             Any exception raised by a future in the stack.

    Returns:

    """
    for future in self.stack.values():
        if exception := future.exception() is not None:
            raise exception

terminate()

Cancels the asynchronous task associated with the current instance if it is still running. This method checks if the running attribute, presumably an instance of threading.Event or similar, is set. If it is, it clears the running attribute to stop the task. It then retrieves the current task using a pop method and inspects it. If the task is not yet completed, it will attempt to cancel it by calling its cancel method. The task object, now potentially canceled, is then recast as an asyncio.Task and returned.

Returns:

Type Description
Task

asyncio.Task: The task associated with this instance after attempting to cancel it if necessary.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def terminate(self) -> asyncio.Task:
    """
    Cancels the asynchronous task associated with the current instance if it is still running.
    This method checks if the `running` attribute, presumably an instance of `threading.Event` or similar,
    is set. If it is, it clears the `running` attribute to stop the task. It then retrieves the current
    task using a `pop` method and inspects it. If the task is not yet completed, it will attempt
    to cancel it by calling its `cancel` method. The task object, now potentially canceled, is
    then recast as an `asyncio.Task` and returned.

    Returns:
        asyncio.Task:
             The task associated with this instance after attempting to cancel it if necessary.

    """
    if self.running.is_set():
        self.running.clear()
    task = self.pop(self)
    if not task.done():
        task.cancel()
    return typing.cast(asyncio.Task, task)

wait(*tasks, name=None, return_when=asyncio.FIRST_COMPLETED)

Waits for the completion of one or more asyncio.Task or asyncio.Future objects. This function is a coroutine that accepts any number of asyncio.Task or asyncio.Future objects and an optional name for the underlying task that will wait for the provided tasks or futures to complete. The function will schedule the execution of these tasks or futures on the event loop, and wait until the conditions specified by return_when are met. return_when can indicate waiting for the first task to complete (asyncio.FIRST_COMPLETED), all tasks to complete (asyncio.ALL_COMPLETED), or the first task to not be cancelled (asyncio.FIRST_EXCEPTION). Upon completing the wait condition, the function will return the task that was created to perform the wait operation. If any of the awaited tasks or futures raise an exception during execution, the exception will be propagated.

Parameters:

Name Type Description Default
*tasks Union[Task, Future]

An arbitrary number of asyncio.Task or asyncio.Future objects to be awaited.

()
name str

An optional name for the asyncio.Task that will be created to perform the waiting operation. If not provided, a name is generated by concatenating the names of the tasks with 'and'.

None
return_when str

The condition that determines when the wait operation should return. Must be one of asyncio.FIRST_COMPLETED, asyncio.ALL_COMPLETED, or asyncio.FIRST_EXCEPTION.

FIRST_COMPLETED

Returns:

Type Description
Task

asyncio.Task: The task that was created to wait for the provided tasks or futures.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def wait(
    self,
    *tasks: typing.Union[asyncio.Task, asyncio.Future],
    name: str = None,
    return_when: str = asyncio.FIRST_COMPLETED,
) -> asyncio.Task:
    """
    Waits for the completion of one or more asyncio.Task or asyncio.Future objects.
    This function is a coroutine that accepts any number of asyncio.Task or asyncio.Future
    objects and an optional name for the underlying task that will wait for the
    provided tasks or futures to complete. The function will schedule the execution
    of these tasks or futures on the event loop, and wait until the conditions specified
    by return_when are met. return_when can indicate waiting for the first task to
    complete (asyncio.FIRST_COMPLETED), all tasks to complete (asyncio.ALL_COMPLETED),
    or the first task to not be cancelled (asyncio.FIRST_EXCEPTION).
    Upon completing the wait condition, the function will return the task that was created
    to perform the wait operation. If any of the awaited tasks or futures raise an
    exception during execution, the exception will be propagated.

    Args:
        *tasks (Union[asyncio.Task, asyncio.Future]):
             An arbitrary number of asyncio.Task
            or asyncio.Future objects to be awaited.
        name (str, optional):
             An optional name for the asyncio.Task that will be created
            to perform the waiting operation. If not provided, a name is generated by
            concatenating the names of the tasks with '_and_'.
        return_when (str):
             The condition that determines when the wait operation
            should return. Must be one of asyncio.FIRST_COMPLETED, asyncio.ALL_COMPLETED,
            or asyncio.FIRST_EXCEPTION.

    Returns:
        asyncio.Task:
             The task that was created to wait for the provided tasks or futures.

    """

    async def wait_for_tasks(_tasks=tasks, _return_when=return_when):
        """
        Waits for the completion of tasks, possibly returning before all tasks are finished based on a condition.
        This function asynchronously waits for the `_tasks` iterable of tasks to reach a completion state, which is determined by the `_return_when` condition. Once the condition is met, it retrieves one of the completed tasks from `done` set. If that task raised an exception, the exception is propagated by awaiting on the task object.

        Args:
            _tasks (Iterable[Task]):
                 The collection of asyncio.Task objects to wait on. Defaults to the `tasks` variable in the current context.
            _return_when (str):
                 The condition upon which the function should return. This condition dictates whether the function waits for all tasks to complete, or returns earlier. Defaults to the `return_when` variable in the current context.

        Returns:
            Tuple[Set[Task], Set[Task]]:
                 A tuple containing two sets. The first set contains all tasks that are done, and the second set contains all tasks that are pending.

        Raises:
            Any exception raised by a task:
                 If any task among `_tasks` raises an exception, that exception is propagated.

        """
        done, pending = await asyncio.wait(_tasks, return_when=_return_when)
        task = done.pop()
        if task.exception() is not None:
            await task

    return self.loop.create_task(
        wait_for_tasks(),
        name=name or "_and_".join(task.get_name() for task in tasks),
    )

InterpreterStep

Bases: Enum

An enumeration to represent the status of an interpretation step. This class is an enumeration (Enum) which categorizes the state of an interpretation step into one of three possible statuses:

Attributes:

Name Type Description
complete str

A status indicating that the interpretation step is complete and no further action is required.

incomplete str

A status indicating that the interpretation step is not fully resolved and may require additional information or action.

deferred str

A status that denotes a delay in the completion or evaluation of the interpretation step, potentially awaiting external input or another event.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class InterpreterStep(Enum):
    """
    An enumeration to represent the status of an interpretation step.
    This class is an enumeration (Enum) which categorizes the state of an interpretation step into one of three possible statuses:

    Attributes:
        complete (str):
             A status indicating that the interpretation step is complete and no further action is required.
        incomplete (str):
             A status indicating that the interpretation step is not fully resolved and may require additional information or action.
        deferred (str):
             A status that denotes a delay in the completion or evaluation of the interpretation step, potentially awaiting external input or another event.

    """

    complete = "complete"
    incomplete = "incomplete"
    deferred = "deferred"

Null

Bases: Future

A placeholder class designed to represent a Future with no value. This class is a subclass of asyncio.Future and is intended for scenarios where a Future-like object is required, but no actual result is expected. Upon initialization, it immediately sets its own result to None.

Attributes:

Name Type Description
None

This class does not have any public attributes besides those provided by its superclass.

Methods:

Name Description
__init__

Constructs a new Null object and sets its result to None. The Null class does not have its own methods, but inherits all methods from asyncio.Future.

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Null(asyncio.Future):
    """
    A placeholder class designed to represent a Future with no value.
    This class is a subclass of `asyncio.Future` and is intended for scenarios where a Future-like object is required, but no actual result is expected. Upon initialization, it immediately sets its own result to `None`.

    Attributes:
        None:
             This class does not have any public attributes besides those provided by its superclass.

    Methods:
        __init__:
             Constructs a new `Null` object and sets its result to `None`.
            The `Null` class does not have its own methods, but inherits all methods from `asyncio.Future`.

    """

    def __init__(self):
        """
        Initializes a new instance of the class.
        This constructor method sets up the initial state of the object by calling its superclass initializer and setting the result attribute to None.

        """
        super().__init__()
        self.set_result(None)

__init__()

Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
34
35
36
37
38
39
40
41
def __init__(self):
    """
    Initializes a new instance of the class.
    This constructor method sets up the initial state of the object by calling its superclass initializer and setting the result attribute to None.

    """
    super().__init__()
    self.set_result(None)