Async Behavior Interpreter
The async_behavior_interpreter
module is designed to provide an asynchronous interpreter for state machines, specifically tailored for handling behaviors encoded into event-driven systems. It extends functionality from a base AsyncInterpreter
, and is meant to work within a broader framework that manages state transitions and event processing called stateforward
. This interpreter deals with asynchronous event queues and utilizes Python's asyncio
library to manage concurrent execution of behaviors associated with state machine events.
The primary class provided by this module is AsyncBehaviorInterpreter
, which inherits from the AsyncInterpreter
class and introduces additional mechanisms to manage deferred events within the state machine's execution flow. Events are accumulated from various sources such as the internal model pool, an asynchronous queue, and a deferred list, ensuring that all events are processed in an ordered and non-repetitive fashion.
The AsyncBehaviorInterpreter
class introduces two key asynchronous methods: step
and exec_event_processing
. The step
method orchestrates the processing of events by maintaining a stack for event execution, processing events from the queue, checking for deferred events, and tirelessly fetching events until all have been handled. The exec_event_processing
method, which is to be implemented, is responsible for handling the processing logic for each event encountered.
The AsyncBehaviorInterpreter
also provides a synchronous method compile
meant to be overridden by subclasses as a classmethod. This method's role is typically to compile or prepare the state machine for execution, though it is abstract in the context of the provided schema.
Additional utility methods such as exec_behavior
facilitate the execution of behaviors associated with events, handling both synchronous and asynchronous operations seamlessly.
The module is robust in its logging capabilities, ensuring that event processing is transparent and debuggable. A Logger
interface is expected for this purpose, and a default logger is created if none is provided upon instantiation of the interpreter.
The module leverages asyncio
for handling concurrency and asynchronous operations, and incorporates custom abstractions such as StateMachine
, Clock
, Queue
, and Logger
provided by the stateforward
framework. It is a key component within the framework for implementing asynchronous behavior-driven state machines.
AsyncBehaviorInterpreter
Bases: AsyncInterpreter
A class to interpret and manage asynchronous behaviors within an event-driven system. It utilizes a queue to process events and can defer processing of some events to a subsequent cycle.
Attributes:
Name | Type | Description |
---|---|---|
deferred |
list[Event]
|
A list to keep track of events that are deferred during processing. |
Inherits |
list[Event]
|
|
AsyncInterpreter |
list[Event]
|
Base class that provides the asynchronous event interpretation framework. |
Clock |
list[Event]
|
(Inherited via AsyncInterpreter) Represents a time-keeping component. |
Methods:
Name | Description |
---|---|
__init__ |
Queue=None, log: Logger=None): Initializes the AsyncBehaviorInterpreter instance. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
Queue
|
An asyncio Queue instance used to manage the event queue. Defaults to None, where an asyncio.Queue() will be created. |
None
|
log |
Logger
|
Logger instance for logging/debugging. Defaults to None, where a new logger is created based on the model's qualified name. |
None
|
step(self) |
Processes events in the queue, executing behaviors associated with each event and supporting deferral of events. Events are deduplicated and processed in order until the queue is emptied or all events are deferred. Results are collected and any completions are handled. |
required |
Returns:
Type | Description |
---|---|
exec_behavior(self, behavior: core.Behavior, event: typing.Optional[core.Event]): Executes the specified behavior with the associated event. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
behavior |
Behavior
|
The behavior object to execute. |
required |
event |
Optional[Event]
|
The event related to the behavior, if any. |
required |
Returns:
Type | Description |
---|---|
exec_event_processing(self, event: core.Event) -> InterpreterStep: Method stub for processing an individual event. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event
|
The event to be processed. |
required |
Returns:
Name | Type | Description |
---|---|---|
InterpreterStep |
The outcome of processing the event, indicating if the event is complete, pending, or deferred. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_behavior_interpreter.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
|
__init__(queue=None, log=None)
This method sets up the class instance with a specified or default asynchronous queue and logging system. If no queue is provided, an asyncio Queue instance is created. If no logger is provided, a new logger is created based on the qualified name of the class instance via the create_logger function. The method also initializes a list to keep track of deferred tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
Queue
|
An instance of a queue for task management. Defaults to None, in which case a new asyncio Queue is created. |
None
|
log |
Logger
|
A logging instance to log messages and errors. Defaults to None, in which case a new logger is created based on the qualified name of the class instance. |
None
|
Attributes:
Name | Type | Description |
---|---|---|
deferred |
List
|
A list to store deferred tasks or activities that should be executed later. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_behavior_interpreter.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
|
exec_behavior(behavior, event)
async
Asynchronously executes a behavior in response to an event. This function takes a behavior object and optionally an event object. It begins by logging the execution of the behavior using its qualified name. Afterwards, it executes the activity associated with the behavior, potentially waiting for the result if the activity is async (a future or coroutine). Finally, it returns the result of the behavior's activity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
behavior |
Behavior
|
The behavior object to execute. |
required |
event |
Optional[Event]
|
The event that triggers the execution of the behavior, if any. |
required |
Returns:
Source code in stateforward/state_machine/interpreters/asynchronous/async_behavior_interpreter.py
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
|
exec_event_processing(event)
async
Processes a given event asynchronously within the system's event processing pipeline. This function serves as an asynchronous handler, tasked with interpreting and processing an event object. It forms an integral part of the event-driven architecture and is expected to be invoked with an event instance that it shall interpret, potentially yielding changes in system behavior or state as a result. Upon successful execution, it returns an instance of InterpreterStep, which encapsulates the outcome of processing the event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event
|
An event instance that contains data and information to be processed by the system. |
required |
Returns:
Name | Type | Description |
---|---|---|
InterpreterStep |
InterpreterStep
|
An object representing the result of the event processing. It includes information about what actions should be taken next within the system based on the interpretation of the event. |
Raises:
Type | Description |
---|---|
Exception
|
If the event processing fails or encounters an unexpected error, an exception may be thrown indicating the nature of the failure. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_behavior_interpreter.py
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
|
step()
async
Performs a single step in the event processing cycle of the current state machine asynchronously. This async method processes events that are pending in the machine's queue, handling each event according to the state machine's logic. During the step, it ensures that each event is either processed completely or deferred for later processing. The state machine evaluates events from its internal queue, deferred events from the previous step, and new events that arrive during the cycle. Duplicate events are filtered out, ensuring that each unique event is processed only once per iteration. Processed events are logged for debugging purposes, and any events that are determined to be owned by no one are stacked for result assignment. If an event signals the completion of the processing cycle, the method will clear the list of processed events and break out of the loop for the current step. The method uses an internal while loop that runs as long as there are events to process. Events that have already been processed are skipped in subsequent iterations. Once all events are processed, the loop will end, updating the deferred list with any events that were not processed and need to be revisited in the next step. The method concludes by setting the result for any futures in the stack that have been processed, thus completing the step.
Note that this method should be used within the context of an async function or coroutine due to its asynchronous nature.
Returns:
Name | Type | Description |
---|---|---|
None |
This method does not return a value, as its purpose is to update the state machine's internal state based on event processing. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_behavior_interpreter.py
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
|