Skip to content

Queue

The queue module provides a Queue class as a protocol with type variable T, which represents the generic type of items that the queue can contain. The Queue class defines several methods to interact with the queue in a thread-safe manner, making it suitable for concurrent programming.

  • task_done(): This method indicates that a formerly enqueued task is complete. It is typically used by queue consumers after an item has been processed.

  • join(): This method blocks until all items in the queue have been processed. It is used to wait for all queued tasks to be completed.

  • qsize(): Returns the approximate size of the queue. It may not be accurate in multithreaded contexts due to the nature of queue operations.

  • empty(): Returns True if the queue is empty, otherwise False. This is a snapshot of the state of the queue and may not be reliable in multithreaded environments.

  • full(): Determines if the queue is full. Similar to empty(), it's a snapshot and may not be accurate in the presence of multiple threads.

  • put(item, block, timeout): Adds an item to the queue. If block is True (the default), the method blocks until a free slot is available if the queue is full, or until an optional timeout is reached. If block is False, the item is put on the queue if a free slot is immediately available, otherwise, the method raises a Full exception.

  • get(block, timeout): Removes and returns an item from the queue. If block is True (the default), the method blocks until an item is available, or until an optional timeout is reached. If block is False, the method returns an item if one is immediately available, or raises an Empty exception otherwise.

  • put_nowait(item): Equivalent to put with block set to False. It enqueues an item without blocking.

  • get_nowait(): Equivalent to get with block set to False. It attempts to immediately remove and return an item from the queue without blocking.

This module is designed to facilitate communication between producer and consumer threads without the need for explicit locking. It abstracts away the complexities of thread synchronization while providing a clear protocol for queuing and dequeueing items.

Queue

Bases: Protocol[T]

A protocol that defines the expected methods and behaviors of a queue data structure. The Queue protocol specifies interface contracts for queue operations. Any queue implementation conforming to this protocol must provide these methods.

Methods:

Name Description
task_done

Indicate that a formerly enqueued task is complete. Used by queue consumers to signal the task is finished. The semantics of task completion depend on the concrete implementation of the queue.

join

Block until all items in the queue have been received and processed. This method is used to wait for the completion of all the tasks in the queue.

qsize

Return the approximate size of the queue. Note that the size is approximate because there may be pending add or get operations that are not yet reflected in the size.

empty

Check if the queue is empty. Returns True if the queue is empty, False otherwise.

full

Check if the queue is full. Returns True if the queue is full, False otherwise.

put

T, block: Optional[bool]=True, timeout: Optional[float]=None): Put an item into the queue.

Parameters:

Name Type Description Default
item T

The item of type T to be put into the queue.

required
block Optional[bool]

Whether to block if the queue is full (defaults to True).

required
timeout Optional[float]

The maximum time to block for (in seconds), or None for no timeout (defaults to None).

required
get(block

Optional[bool]=True, timeout: Optional[float]=None): Remove and return an item from the queue.

required

Parameters:

Name Type Description Default
block Optional[bool]

Whether to block if the queue is empty (defaults to True).

required
timeout Optional[float]

The maximum time to block for, or None for no timeout (defaults to None).

required

Returns:

Name Type Description
put_nowait item

Equivalent to put(item, False). A non-blocking variant of put method.

Parameters:

Name Type Description Default
item

The item to put into the queue without blocking.

required
get_nowait()

Equivalent to get(False). A non-blocking variant of get method.

required

Returns:

Source code in stateforward/protocols/queue.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
class Queue(typing.Protocol[T]):
    """
    A protocol that defines the expected methods and behaviors of a queue data structure.
    The Queue protocol specifies interface contracts for queue operations. Any queue implementation
    conforming to this protocol must provide these methods.

    Methods:
        task_done():
             Indicate that a formerly enqueued task is complete.
            Used by queue consumers to signal the task is finished. The semantics of task completion
            depend on the concrete implementation of the queue.
        join():
             Block until all items in the queue have been received and processed.
            This method is used to wait for the completion of all the tasks in the queue.
        qsize():
             Return the approximate size of the queue.
            Note that the size is approximate because there may be pending add or get
            operations that are not yet reflected in the size.
        empty():
             Check if the queue is empty.
            Returns True if the queue is empty, False otherwise.
        full():
             Check if the queue is full.
            Returns True if the queue is full, False otherwise.
        put(item:
             T, block: Optional[bool]=True, timeout: Optional[float]=None): Put an item into the queue.

    Args:
        item (T):
             The item of type T to be put into the queue.
        block (Optional[bool]):
             Whether to block if the queue is full (defaults to True).
        timeout (Optional[float]):
             The maximum time to block for (in seconds),
            or None for no timeout (defaults to None).
        get(block:
             Optional[bool]=True, timeout: Optional[float]=None): Remove and return an item from the queue.

    Args:
        block (Optional[bool]):
             Whether to block if the queue is empty (defaults to True).
        timeout (Optional[float]):
             The maximum time to block for, or None for
            no timeout (defaults to None).

    Returns:
        put_nowait(item):
             Equivalent to put(item, False).
            A non-blocking variant of put method.

    Args:
        item:
             The item to put into the queue without blocking.
        get_nowait():
             Equivalent to get(False).
            A non-blocking variant of get method.

    Returns:

    """

    def task_done(self) -> None:
        """
        Marks a task as successfully completed.
        This method should be called once a task is finished and its
        outcome is successful. It is responsible for performing any cleanup,
        logging, or notification activities associated with task completion.

        Returns:
            None:
                 This method does not return a value and only has side effects.

        """
        ...

    def join(self) -> None:
        """
        Joins a collection of elements into a single string with a specified separator.
        This method takes all elements of a collection, converts them into strings (if they are not already), and concatenates them into a single string, separating each element by a predefined separator. If the collection is empty, the resulting string will be empty as well. The object itself should hold the collection and the separator as instance variables.

        Returns:
            None:
                 This method does not return anything. It acts on the object directly by modifying its state, potentially setting an instance variable with the concatenated string.

        """
        ...

    def qsize(self) -> int:
        """

        Returns the number of items in the queue.

        Returns:
            int:
                 The current number of items in the queue.

        """
        ...

    def empty(self) -> bool:
        """
        Checks if the current data structure is empty.
        This method evaluates whether the data structure (such as a list, queue, stack, etc.) that it is a member of contains any elements. If there are no elements present, it returns True; otherwise, it returns False.

        Returns:
            bool:
                 True if the data structure is empty, False otherwise.

        """
        ...

    def full(self) -> bool:
        """
        Checks if a structure is full.
        This method evaluates whether a particular structure (e.g., a data container or a buffer) is
        complete or has reached its capacity. It does not take any parameters and returns a boolean value
        indicating the fullness of the structure.

        Returns:
            bool:
                 True if the structure is full, False otherwise.

        """
        ...

    def put(
        self,
        item: T,
        block: typing.Optional[bool] = True,
        timeout: typing.Optional[float] = None,
    ) -> None:
        """
        Puts an item into the queue.
        If the queue is full, the method will either block until a free slot is available or until the optional timeout occurs. An item can be any type.

        Args:
            item (T):
                 The item to be put into the queue.
            block (Optional[bool]):
                 True if the method should block until a slot is free; False to have it raise the Full exception if the queue is currently full (defaults to True).
            timeout (Optional[float]):
                 The number of seconds to wait for a free slot before raising the Full exception if the queue is full and block is True. A 'None' timeout indicates an infinite wait (defaults to None).

        Raises:
            Full:
                 If the queue is full and the 'block' is set to False, or if 'block' is set to True and the 'timeout' period is exceeded.

        Returns:

        """
        ...

    def get(
        self,
        block: typing.Optional[bool] = True,
        timeout: typing.Optional[float] = None,
    ) -> T:
        """
        Fetches an item from a data source with optional blocking and timeout.

        Args:
            block (typing.Optional[bool], optional):
                 A flag to determine if the operation should wait until an item is available before returning. Defaults to True, meaning the function will block.
            timeout (typing.Optional[float], optional):
                 The maximum amount of time (in seconds) to wait for an item to become available. If 'None', the function may wait indefinitely. Only effective when 'block' is True. Defaults to None.

        Returns:
            T:
                 The item fetched from the data source. The type 'T' is a placeholder for the actual data type returned by the function.

        Raises:
            TimeoutError:
                 If a timeout is set by providing a 'timeout' argument and the operation times out before an item becomes available.
            SomeOtherException:
                 If the function encounters an issue specific to the implementation details (replace 'SomeOtherException' with actual exceptions specific to the context).

        """
        ...

    def put_nowait(self, item) -> None:
        """
        Puts an item into the queue without blocking.
        This method adds an item to the queue without waiting for a free slot to be available if the queue is already full.
        If the queue is full, the method will raise the `QueueFull` exception immediately.

        Args:
            item:
                 The item to be added to the queue.

        Raises:
            QueueFull:
                 If the queue is full and the item cannot be added without waiting.

        Returns:

        """
        ...

    def get_nowait(self) -> T:
        """
        Retrieves and returns an item from the queue without blocking.
        This method attempts to immediately retrieve an item from the queue. If no item
        is available, it will raise an exception rather than waiting for an item to become available.

        Returns:
            T:
                 The next item from the queue, if available.

        Raises:
            QueueEmptyError:
                 If there is no item available in the queue at the time of the call.

        """
        ...

empty()

Checks if the current data structure is empty. This method evaluates whether the data structure (such as a list, queue, stack, etc.) that it is a member of contains any elements. If there are no elements present, it returns True; otherwise, it returns False.

Returns:

Name Type Description
bool bool

True if the data structure is empty, False otherwise.

Source code in stateforward/protocols/queue.py
129
130
131
132
133
134
135
136
137
138
139
def empty(self) -> bool:
    """
    Checks if the current data structure is empty.
    This method evaluates whether the data structure (such as a list, queue, stack, etc.) that it is a member of contains any elements. If there are no elements present, it returns True; otherwise, it returns False.

    Returns:
        bool:
             True if the data structure is empty, False otherwise.

    """
    ...

full()

Checks if a structure is full. This method evaluates whether a particular structure (e.g., a data container or a buffer) is complete or has reached its capacity. It does not take any parameters and returns a boolean value indicating the fullness of the structure.

Returns:

Name Type Description
bool bool

True if the structure is full, False otherwise.

Source code in stateforward/protocols/queue.py
141
142
143
144
145
146
147
148
149
150
151
152
153
def full(self) -> bool:
    """
    Checks if a structure is full.
    This method evaluates whether a particular structure (e.g., a data container or a buffer) is
    complete or has reached its capacity. It does not take any parameters and returns a boolean value
    indicating the fullness of the structure.

    Returns:
        bool:
             True if the structure is full, False otherwise.

    """
    ...

get(block=True, timeout=None)

Fetches an item from a data source with optional blocking and timeout.

Parameters:

Name Type Description Default
block Optional[bool]

A flag to determine if the operation should wait until an item is available before returning. Defaults to True, meaning the function will block.

True
timeout Optional[float]

The maximum amount of time (in seconds) to wait for an item to become available. If 'None', the function may wait indefinitely. Only effective when 'block' is True. Defaults to None.

None

Returns:

Name Type Description
T T

The item fetched from the data source. The type 'T' is a placeholder for the actual data type returned by the function.

Raises:

Type Description
TimeoutError

If a timeout is set by providing a 'timeout' argument and the operation times out before an item becomes available.

SomeOtherException

If the function encounters an issue specific to the implementation details (replace 'SomeOtherException' with actual exceptions specific to the context).

Source code in stateforward/protocols/queue.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def get(
    self,
    block: typing.Optional[bool] = True,
    timeout: typing.Optional[float] = None,
) -> T:
    """
    Fetches an item from a data source with optional blocking and timeout.

    Args:
        block (typing.Optional[bool], optional):
             A flag to determine if the operation should wait until an item is available before returning. Defaults to True, meaning the function will block.
        timeout (typing.Optional[float], optional):
             The maximum amount of time (in seconds) to wait for an item to become available. If 'None', the function may wait indefinitely. Only effective when 'block' is True. Defaults to None.

    Returns:
        T:
             The item fetched from the data source. The type 'T' is a placeholder for the actual data type returned by the function.

    Raises:
        TimeoutError:
             If a timeout is set by providing a 'timeout' argument and the operation times out before an item becomes available.
        SomeOtherException:
             If the function encounters an issue specific to the implementation details (replace 'SomeOtherException' with actual exceptions specific to the context).

    """
    ...

get_nowait()

Retrieves and returns an item from the queue without blocking. This method attempts to immediately retrieve an item from the queue. If no item is available, it will raise an exception rather than waiting for an item to become available.

Returns:

Name Type Description
T T

The next item from the queue, if available.

Raises:

Type Description
QueueEmptyError

If there is no item available in the queue at the time of the call.

Source code in stateforward/protocols/queue.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def get_nowait(self) -> T:
    """
    Retrieves and returns an item from the queue without blocking.
    This method attempts to immediately retrieve an item from the queue. If no item
    is available, it will raise an exception rather than waiting for an item to become available.

    Returns:
        T:
             The next item from the queue, if available.

    Raises:
        QueueEmptyError:
             If there is no item available in the queue at the time of the call.

    """
    ...

join()

Joins a collection of elements into a single string with a specified separator. This method takes all elements of a collection, converts them into strings (if they are not already), and concatenates them into a single string, separating each element by a predefined separator. If the collection is empty, the resulting string will be empty as well. The object itself should hold the collection and the separator as instance variables.

Returns:

Name Type Description
None None

This method does not return anything. It acts on the object directly by modifying its state, potentially setting an instance variable with the concatenated string.

Source code in stateforward/protocols/queue.py
105
106
107
108
109
110
111
112
113
114
115
def join(self) -> None:
    """
    Joins a collection of elements into a single string with a specified separator.
    This method takes all elements of a collection, converts them into strings (if they are not already), and concatenates them into a single string, separating each element by a predefined separator. If the collection is empty, the resulting string will be empty as well. The object itself should hold the collection and the separator as instance variables.

    Returns:
        None:
             This method does not return anything. It acts on the object directly by modifying its state, potentially setting an instance variable with the concatenated string.

    """
    ...

put(item, block=True, timeout=None)

Puts an item into the queue. If the queue is full, the method will either block until a free slot is available or until the optional timeout occurs. An item can be any type.

Parameters:

Name Type Description Default
item T

The item to be put into the queue.

required
block Optional[bool]

True if the method should block until a slot is free; False to have it raise the Full exception if the queue is currently full (defaults to True).

True
timeout Optional[float]

The number of seconds to wait for a free slot before raising the Full exception if the queue is full and block is True. A 'None' timeout indicates an infinite wait (defaults to None).

None

Raises:

Type Description
Full

If the queue is full and the 'block' is set to False, or if 'block' is set to True and the 'timeout' period is exceeded.

Returns:

Source code in stateforward/protocols/queue.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def put(
    self,
    item: T,
    block: typing.Optional[bool] = True,
    timeout: typing.Optional[float] = None,
) -> None:
    """
    Puts an item into the queue.
    If the queue is full, the method will either block until a free slot is available or until the optional timeout occurs. An item can be any type.

    Args:
        item (T):
             The item to be put into the queue.
        block (Optional[bool]):
             True if the method should block until a slot is free; False to have it raise the Full exception if the queue is currently full (defaults to True).
        timeout (Optional[float]):
             The number of seconds to wait for a free slot before raising the Full exception if the queue is full and block is True. A 'None' timeout indicates an infinite wait (defaults to None).

    Raises:
        Full:
             If the queue is full and the 'block' is set to False, or if 'block' is set to True and the 'timeout' period is exceeded.

    Returns:

    """
    ...

put_nowait(item)

Puts an item into the queue without blocking. This method adds an item to the queue without waiting for a free slot to be available if the queue is already full. If the queue is full, the method will raise the QueueFull exception immediately.

Parameters:

Name Type Description Default
item

The item to be added to the queue.

required

Raises:

Type Description
QueueFull

If the queue is full and the item cannot be added without waiting.

Returns:

Source code in stateforward/protocols/queue.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def put_nowait(self, item) -> None:
    """
    Puts an item into the queue without blocking.
    This method adds an item to the queue without waiting for a free slot to be available if the queue is already full.
    If the queue is full, the method will raise the `QueueFull` exception immediately.

    Args:
        item:
             The item to be added to the queue.

    Raises:
        QueueFull:
             If the queue is full and the item cannot be added without waiting.

    Returns:

    """
    ...

qsize()

Returns the number of items in the queue.

Returns:

Name Type Description
int int

The current number of items in the queue.

Source code in stateforward/protocols/queue.py
117
118
119
120
121
122
123
124
125
126
127
def qsize(self) -> int:
    """

    Returns the number of items in the queue.

    Returns:
        int:
             The current number of items in the queue.

    """
    ...

task_done()

Marks a task as successfully completed. This method should be called once a task is finished and its outcome is successful. It is responsible for performing any cleanup, logging, or notification activities associated with task completion.

Returns:

Name Type Description
None None

This method does not return a value and only has side effects.

Source code in stateforward/protocols/queue.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def task_done(self) -> None:
    """
    Marks a task as successfully completed.
    This method should be called once a task is finished and its
    outcome is successful. It is responsible for performing any cleanup,
    logging, or notification activities associated with task completion.

    Returns:
        None:
             This method does not return a value and only has side effects.

    """
    ...