Async Interpreter
The async_interpreter
module provides an asynchronous execution framework specifically designed for state-based systems, utilizing asynchronous I/O provided by Python's asyncio library. It is built upon the concept of interacting with different models, events, queues, and clocks within state-based systems. This module contains several key classes and components, each working together to execute sequences of operations in a non-blocking manner. Key components include InterpreterStep
, AsyncInterpreter
, and Null
, as well as crucial controllable constructs such as queue
, clock
, stack
, and log
. The module defines the functioning of an asynchronous interpreter for state machine workflows, providing methods for event handling, concurrency management, and execution control. Notably, the AsyncInterpreter
class inherits from model.Element
and is parameterized to work with various model types, enriching its capability to interface with different state machine elements. This module is essential for scenarios where asynchronous event handling and state management are paramount, and it relies heavily on the asyncio event loop and future constructs for its operation.
AsyncInterpreter
Bases: Element
, Generic[T]
An asynchronous interpreter designed to operate with the provided state machine. This interpreter runs within the asyncio event loop and handles asynchronous task management and event processing for the state machine. It manages a queue of events and a stack to maintain the state of in-flight operations.
Attributes:
Name | Type | Description |
---|---|---|
queue |
Queue
|
The queue for managing incoming events. |
clock |
Clock
|
An object managing the clock speed for the interpreter's operations. |
stack |
dict[Element, Future]
|
A dictionary mapping state machine elements to their associated futures/tasks. |
loop |
AbstractEventLoop
|
The event loop in which this interpreter operates. |
log |
Logger
|
Logger for the interpreter to output its activity. |
running |
Event
|
An event signaling whether the interpreter is currently running. |
stepping |
Lock
|
A lock to ensure step execution is done atomically. |
model |
T
|
A generic type parameter representing the state machine model. |
Methods:
Name | Description |
---|---|
__init__ |
Queue, log: logging.Logger=None): Initializes the AsyncInterpreter with a queue and an optional logger. |
send |
model.Element): Sends an event to be processed by the state machine. |
start |
asyncio.AbstractEventLoop=None): Starts the interpreter within the given or default event loop. |
wait |
typing.Union[asyncio.Task, asyncio.Future], |
name |
str=None, return_when: str=asyncio.FIRST_COMPLETED) -> asyncio.Task: Waits for the given tasks to complete, returning an asyncio.Task wrapping the wait operation. |
run |
The coroutine that runs the main event processing loop of the interpreter. |
step |
Processes the next set of tasks from the stack. |
is_active |
model.Element) -> bool: Checks if the given elements are active within the current stack. |
push |
model.Element, |
future |
typing.Union[Future, asyncio.Task]=NULL): Pushes a new element and associated future/task onto the stack. |
pop |
model.Element, *, result: typing.Any=NULL): Pops an element and its associated task from the stack, handling its result. |
terminate |
Terminates the interpreter, cleaning up and cancelling tasks as necessary. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
|
__init__(queue, log=None)
of an instance. It assigns the queue to the instance, initializes a stack as a dictionary, prepares an asyncio event to manage the running state, and sets up logging.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
Queue
|
A queue object used for inter-thread or inter-process communication. |
required |
log |
Logger
|
A logger instance for logging messages. If not provided, it falls back to a default logger. |
None
|
Attributes:
Name | Type | Description |
---|---|---|
stack |
dict
|
A dictionary to hold instance-specific data. |
queue |
Queue
|
The queue object passed during initialization. |
running |
Event
|
An event to indicate whether the instance is running. |
log |
Logger
|
A logger instance for outputting logs. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
|
is_active(*elements)
Checks if all specified elements are active within the current context. This method determines if all elements provided as arguments exist in the context's active stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*elements |
Element
|
Variable number of elements to check for activeness within the stack. |
()
|
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if all specified elements are active (i.e., they exist in the stack); False otherwise. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
|
pop(element, *, result=NULL)
Pops an element from the stack and sets the result if specified.
This method retrieves the future associated with a stack element, removes the element from the stack,
and optionally sets a result on the future. If the future is already done and contains an exception,
the exception is raised. If result
is provided and not NULL, it is used to set the future's result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
element |
Element
|
The element to be popped from the stack. |
required |
result |
Any
|
The result to set on the future if not NULL. Defaults to NULL. |
NULL
|
Returns:
Type | Description |
---|---|
typing.cast(Future, future): The future associated with the popped element. |
Raises:
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 |
|
push(element, future=NULL)
Pushes an element onto the stack with an associated future or task, ensuring uniqueness. This method adds an element to an internal stack, associating it with a future or an asyncio task, which may represent the element's processing state. If the element already exists within the stack, a ValueError is thrown to avoid duplication. The method ensures that each element can have only one corresponding future or task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
element |
Element
|
The element to push onto the stack. |
required |
future |
Union[Future, Task]
|
A future or task to associate with the element. If not provided, the default value defined by NULL will be used. |
NULL
|
Returns:
Name | Type | Description |
---|---|---|
Future |
The future or task associated with the element. This can be either the provided argument or the one already associated with the element in case of previously existing entry. |
Raises:
Type | Description |
---|---|
ValueError
|
If the element is already present in the stack. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
|
run()
async
Asynchronously runs the process associated with the class instance. This coroutine continuously executes the 'step' method and awaits for a sleep interval based on the 'clock.multiplier'. It checks if the instance is active and if the 'running' flag is set before each iteration. If the 'running' flag is cancelled, it logs a debug message indicating the cancellation. If the 'running' flag is still set after an interruption, it ensures that the 'terminate' method is called.
Raises:
Type | Description |
---|---|
CancelledError
|
If the coroutine is cancelled during its execution. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
|
send(event)
Sends an event to be processed by the state machine. This method logs the receipt of the event, pushes it to the stack along with a new Future object, adds the event to a queue, and then awaits the processing of the event. The method returns the result of waiting for the future associated with the event to be completed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Element
|
The event to be sent to the state machine for processing. |
required |
Returns:
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
|
start(loop=None)
Starts an asynchronous task to run the state machine in an event loop.
This method initializes the event loop for the state machine, logs the state machine's qualified name,
and then creates and starts an asynchronous task for the state machine's run
method.
It pushes the running task into a tracking structure for managing tasks and then waits for the
initialization to complete before proceeding.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
loop |
AbstractEventLoop
|
The event loop in which the state machine will be run. If not provided, the current event loop will be used. |
None
|
Returns:
Type | Description |
---|---|
A |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
|
step()
async
Performs an asynchronous iteration step over a collection of futures stored in an instance's stack. This method iterates through the values of the instance's stack attribute, which is expected to be a mapping of futures. It checks if any of the futures have raised an exception. If an exception is encountered in any future, it is immediately raised, halting the iteration.
Raises:
Type | Description |
---|---|
Exception
|
Any exception raised by a future in the stack. |
Returns:
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
|
terminate()
Cancels the asynchronous task associated with the current instance if it is still running.
This method checks if the running
attribute, presumably an instance of threading.Event
or similar,
is set. If it is, it clears the running
attribute to stop the task. It then retrieves the current
task using a pop
method and inspects it. If the task is not yet completed, it will attempt
to cancel it by calling its cancel
method. The task object, now potentially canceled, is
then recast as an asyncio.Task
and returned.
Returns:
Type | Description |
---|---|
Task
|
asyncio.Task: The task associated with this instance after attempting to cancel it if necessary. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 |
|
wait(*tasks, name=None, return_when=asyncio.FIRST_COMPLETED)
Waits for the completion of one or more asyncio.Task or asyncio.Future objects. This function is a coroutine that accepts any number of asyncio.Task or asyncio.Future objects and an optional name for the underlying task that will wait for the provided tasks or futures to complete. The function will schedule the execution of these tasks or futures on the event loop, and wait until the conditions specified by return_when are met. return_when can indicate waiting for the first task to complete (asyncio.FIRST_COMPLETED), all tasks to complete (asyncio.ALL_COMPLETED), or the first task to not be cancelled (asyncio.FIRST_EXCEPTION). Upon completing the wait condition, the function will return the task that was created to perform the wait operation. If any of the awaited tasks or futures raise an exception during execution, the exception will be propagated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*tasks |
Union[Task, Future]
|
An arbitrary number of asyncio.Task or asyncio.Future objects to be awaited. |
()
|
name |
str
|
An optional name for the asyncio.Task that will be created to perform the waiting operation. If not provided, a name is generated by concatenating the names of the tasks with 'and'. |
None
|
return_when |
str
|
The condition that determines when the wait operation should return. Must be one of asyncio.FIRST_COMPLETED, asyncio.ALL_COMPLETED, or asyncio.FIRST_EXCEPTION. |
FIRST_COMPLETED
|
Returns:
Type | Description |
---|---|
Task
|
asyncio.Task: The task that was created to wait for the provided tasks or futures. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
|
InterpreterStep
Bases: Enum
An enumeration to represent the status of an interpretation step. This class is an enumeration (Enum) which categorizes the state of an interpretation step into one of three possible statuses:
Attributes:
Name | Type | Description |
---|---|---|
complete |
str
|
A status indicating that the interpretation step is complete and no further action is required. |
incomplete |
str
|
A status indicating that the interpretation step is not fully resolved and may require additional information or action. |
deferred |
str
|
A status that denotes a delay in the completion or evaluation of the interpretation step, potentially awaiting external input or another event. |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
|
Null
Bases: Future
A placeholder class designed to represent a Future with no value.
This class is a subclass of asyncio.Future
and is intended for scenarios where a Future-like object is required, but no actual result is expected. Upon initialization, it immediately sets its own result to None
.
Attributes:
Name | Type | Description |
---|---|---|
None |
This class does not have any public attributes besides those provided by its superclass. |
Methods:
Name | Description |
---|---|
__init__ |
Constructs a new |
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
|
__init__()
Source code in stateforward/state_machine/interpreters/asynchronous/async_interpreter.py
34 35 36 37 38 39 40 41 |
|