Last Updated on November 14, 2023
A queue is a helpful data structure where items can be added and removed.
Queues are generally used on concurrent programs to connect tasks, such as between threads and between processes.
Asyncio also provides the asyncio.Queue, specifically tailored to share data between coroutines and tasks where blocking operations such as placing items on the queue and removing them can be awaited, spending the caller.
In this tutorial, you will discover how to share data between coroutines using queues.
After completing this tutorial, you will know:
- How to create and use a queue to connect coroutines and tasks in asyncio.
- How to use a queue without blocking and with timeouts.
- How to join a queue and have tasks done, and limit the overall capacity of the queue.
Let’s get started.
What is an Asyncio Queue
The asyncio.Queue provides a FIFO queue for use with coroutines.
Before we dive into the details of the asyncio.Queue, let’s take a quick look at queues more generally in Python.
Queue
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
Python provides a thread-safe queue via the threading.Queue class that allows threads to share objects with each other.
The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.
— Queue – A synchronized queue class
You can learn more about the thread-safe queue in the tutorial:
A process-safe queue is proved via the multiprocessing.Queue class. This queue is both thread-safe and process-safe and allows processes to share data Python objects.
You can learn more about the multiprocessing queue in the tutorial:
Both queues manage data in a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.
Next, let’s take a look at the asyncio queue.
Asyncio Queue
The asyncio.Queue class provides a queue that allows coroutines to share data.
asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.
— Asyncio Queues
This queue is not thread-safe nor process-safe.
This means that it cannot be used by threads or processes to share Python objects and is only intended for use by coroutines within one Python thread, e.g. one event loop.
Like the queue.Queue and multiprocessing.Queue classes, the asyncio.Queue is a FIFO queue. This means that data is managed and retrieved in a first-in, first-out manner.
The asyncio.Queue is intended for use within asyncio programs by coroutines. As such, some methods on the class are in fact coroutines and must be awaited.
This means that the asyncio.Queue cannot be used outside of an asyncio program.
Now that we know about the asyncio.Queue, let’s look at how we might use it.
Run loops using all CPUs, download your FREE book to learn how.
How to Use an Asyncio Queue
In this section, we will explore how to use the asyncio.Queue class, including how to create and configure an instance, how to add and remove items, query the properties of the queue and manage tasks.
Create an Asyncio Queue
We can create an asyncio.Queue by creating an instance of the class.
By default, the queue will not be limited in capacity.
For example:
1 2 3 |
... # create a queue with no size limit queue = asyncio.Queue() |
The asyncio.Queue takes one constructor argument which is “maxsize“, set to zero (no limit) by default.
For example:
1 2 3 |
... # create a queue with no size limit queue = asyncio.Queue(maxsize=0) |
We can set a size limit on the queue.
The effect of a size limit means that when the queue is full and coroutines attempt to add an object, they will block until space becomes available, or fail if a non-blocking method is used.
For example:
1 2 3 |
... # create a queue with a size limit queue = asyncio.Queue(maxsize=100) |
Add Items to Asyncio Queue
Python objects can be added to a queue via the put() method.
This is in fact a coroutine that must be awaited. The reason is that the calling coroutine may block if the queue is full.
For example:
1 2 3 |
... # add an object to the queue await queue.put(item) |
An item can also be added to the queue without blocking via the put_nowait() method.
This method is not a coroutine and will either add the item or return immediately or fail with an asyncio.QueueFull exception if the queue is full and the item cannot be added.
For example:
1 2 3 4 5 6 |
... try: # attempt to add an item queue.put_nowait(item) except asyncio.QueueFull: # ... |
Because the “maxsize” argument is the first position argument, we don’t need to specify it by name.
For example:
1 2 3 |
... # create a queue with a size limit queue = asyncio.Queue(100) |
Get Items from Asyncio Queue
Items can be retrieved from the queue by calling the get() method.
This is in fact a coroutine that must be awaited. The reason is that the queue may not have any items to retrieve at the time, and the calling coroutine may need to block until an item becomes available.
For example:
1 2 3 |
... # retrieve an item from the queue item = await queue.get() |
The item retrieved will be the oldest item added, e.g. FIFO ordering.
An item can be retrieved from the queue without blocking via the get_nowait() method.
This method is not a coroutine and will return an item immediately if available, otherwise will fail with an asyncio.QueueEmpty exception.
For example:
1 2 3 4 5 6 |
... try: # attempt retrieve an item item = queue.get_nowait() except asyncio.QueueEmpty: # ... |
Query Asyncio Queue Size
We can retrieve the fixed size of the queue via the “maxsize” property.
For example:
1 2 3 |
... # report the size of the queue print(queue.maxsize) |
We can check if the queue is empty via the empty() which returns True if the queue contains no items or False otherwise.
For example:
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
We can also check if the queue is full via the full() method that returns True if the queue is at capacity or False otherwise.
For example:
1 2 3 4 |
... # check if the queue is full if queue.full(): # ... |
Asyncio Queue Join and Task Done
Items on the queue can be treated as tasks that can be marked as processes by consumer coroutines.
This can be achieved by consumer coroutines retrieving items from the queue via get() or get_nowait() and once processed marking them via the task_done() method.
For example:
1 2 3 4 5 6 7 |
... # retrieve an item from the queue item = await queue.get() # process the item # ... # mark the item as processes queue.task_done() |
Other coroutines may be interested to know when all items added to the queue have been retrieved and marked as done.
This can be achieved by the coroutine awaiting the join() coroutine on the queue.
The join() coroutine will not return until all items added to the queue prior to the call have been marked as done.
For example:
1 2 3 |
... # wait for all items on the queue to be marked as done await queue.join() |
If the queue is empty or all items have already been marked as done, then the join() coroutine will return immediately.
Now that we know how to use the asyncio.Queue, let’s look at some worked examples.
Example of Asyncio Queue
We can explore how to use the asyncio.Queue class with a worked example.
In this example, we will create a producer coroutine that will generate ten random numbers and put them on the queue. We will also create a consumer coroutine that will get numbers from the queue and report their values.
The asyncio.Queue provides a way to allow these producer and consumer coroutines to communicate data with each other.
First, we can define the function to be executed by the producer coroutine.
The task will iterate ten times in a loop. Each iteration will generate a new random value between 0 and 1 via the random.random() function. It will then sleep for that fraction of a second to simulate work, then put the value on the queue.
Once the task is complete it will put the value None on the queue to signal to the consumer coroutine that there is no further work.
The producer() coroutine below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# coroutine to generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block to simulate work await asyncio.sleep(value) # add to the queue await queue.put(value) # send an all done signal await queue.put(None) print('Producer: Done') |
Next, we can define the function to be executed by the consumer coroutine.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the coroutine. Otherwise, the value is reported.
The consumer() coroutine below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# coroutine to consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # check for stop signal if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Finally, in the main coroutine, we can create the shared queue instance.
We can then create and run the producer and consumer coroutines and wait for them both to complete via the asyncio.gather() method.
1 2 3 4 5 6 |
# entry point coroutine async def main(): # create the shared queue queue = asyncio.Queue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) |
You can learn more about asyncio.gather() in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# SuperFastPython.com # example of using an asyncio queue from random import random import asyncio # coroutine to generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block to simulate work await asyncio.sleep(value) # add to the queue await queue.put(value) # send an all done signal await queue.put(None) print('Producer: Done') # coroutine to consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # check for stop signal if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # entry point coroutine async def main(): # create the shared queue queue = asyncio.Queue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) # start the asyncio program asyncio.run(main()) |
Running the example first creates the shared asyncio.Queue instance.
Next, the producer coroutine is created and passed the queue instance. Then the consumer coroutine is started and the main coroutine blocks until both coroutines terminate.
The producer coroutine generates a new random value for each iteration of the task, blocks, and adds it to the queue. The consumer coroutine waits on the queue for items to arrive, then consumes them one at a time, reporting their value.
Finally, the producer task finishes, a None value is put on the queue and the coroutine terminates. The consumer coroutine gets the None value, breaks its loop, and also terminates.
This highlights how the asyncio.Queue can be used to share data easily between producer and consumer coroutines.
Note that the program output will differ each time it is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Producer: Running Consumer: Running >got 0.7559246569022605 >got 0.965203750033905 >got 0.49834912260024233 >got 0.22783211775499135 >got 0.07775542407106295 >got 0.5997647474647314 >got 0.7236540952500915 >got 0.7956407178426339 >got 0.11256095725867177 Producer: Done >got 0.9095338767572713 Consumer: Done |
Next, let’s look at how we might get values from the queue without blocking.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of Asyncio Queue Without Blocking
We can get values from the asyncio.Queue without blocking.
This might be useful if we wish to use busy waiting in the consumer coroutine to check other state or perform other tasks while waiting for data to arrive on the queue.
You can learn more about busy waiting in the tutorial:
We can update the example from the previous section to get items from the queue without blocking.
This can be achieved by calling the get_nowait() method.
The get_nowait() function will return immediately.
If there is a value in the queue to retrieve, then it is returned. Otherwise, if the queue is empty, then an asyncio.QueueEmpty exception is raised, which can be handled.
In this case, if there is no value to get from the queue, we report a message and sleep for a fraction of a second. We will then continue which will jump back to the start of the consumer busy waiting loop.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# coroutine to consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work without blocking try: item = queue.get_nowait() except asyncio.QueueEmpty: print('Consumer: got nothing, waiting a while...') await asyncio.sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# SuperFastPython.com # example of using an asyncio queue without blocking from random import random import asyncio # coroutine to generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block to simulate work await asyncio.sleep(value) # add to the queue await queue.put(value) # send an all done signal await queue.put(None) print('Producer: Done') # coroutine to consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work without blocking try: item = queue.get_nowait() except asyncio.QueueEmpty: print('Consumer: got nothing, waiting a while...') await asyncio.sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # entry point coroutine async def main(): # create the shared queue queue = asyncio.Queue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) # start the asyncio program asyncio.run(main()) |
Running the example creates the shared queue.Queue, then starts the consumer and producer coroutines, as before.
The producer coroutine generates, blocks, and adds items to the queue.
The consumer coroutine attempts to get a value from the queue. If there is no value to retrieve, an asyncio.QueueEmpty exception is raised and handled by reporting a message, sleeping for a fraction of a second then starting the busy wait loop again.
Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as normal.
We can see the messages from the consumer coroutine busy waiting for new data to arrive in the queue.
This highlights how to get items from the asyncio.Queue without blocking.
Note that the program output will differ each time it is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Producer: Running Consumer: Running Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.896558357626797 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.6498874449486562 >got 0.14862534743361389 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.9271724543351715 Consumer: got nothing, waiting a while... >got 0.6659822945662333 >got 0.11205862071348183 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.9490125408623084 Consumer: got nothing, waiting a while... >got 0.150509682492045 >got 0.23281901173320807 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... Producer: Done >got 0.8999468879239988 Consumer: Done |
Next, let’s look at how we might get values from the queue with a timeout.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Asyncio Queue With Timeout
We can get values from the asyncio.Queue by blocking but limited by a timeout.
This allows a consumer coroutine to both block while waiting for values to arrive in the queue, but also execute other tasks while busy waiting. It may be more efficient than being busy waiting without any blocking calls.
We can update the above example to use a timeout when getting items from the queue in the consumer coroutine.
This can be achieved by calling the get() coroutine as normal.
Unlike the queue.Queue, the asyncio.Queue does not support a timeout directly.
Instead, we can wrap the get() coroutine in a wait_for() coroutine that supports a timeout. If the timeout elapses before the get() coroutine completes, an asyncio.TimeoutError exception is raised and can be handled.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: # retrieve the get() awaitable get_await = queue.get() # await the awaitable with a timeout item = await asyncio.wait_for(get_await, 0.5) except asyncio.TimeoutError: print('Consumer: gave up waiting...') continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
You can learn more about the asyncio.wait_for() function in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# SuperFastPython.com # example of using an asyncio queue with a timeout from random import random import asyncio # coroutine to generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block to simulate work await asyncio.sleep(value) # add to the queue await queue.put(value) # send an all done signal await queue.put(None) print('Producer: Done') # consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: # retrieve the get() awaitable get_await = queue.get() # await the awaitable with a timeout item = await asyncio.wait_for(get_await, 0.5) except asyncio.TimeoutError: print('Consumer: gave up waiting...') continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # entry point coroutine async def main(): # create the shared queue queue = asyncio.Queue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) # start the asyncio program asyncio.run(main()) |
Running the example creates the shared asyncio.Queue, then starts the consumer and producer coroutines, as before.
The producer coroutine will generate, block, and add items to the queue.
The consumer coroutine will attempt to get a value from the queue. The coroutine will block for a timeout.
If no value is available before the timeout expires, then an asyncio.TimeoutError exception is raised and handled by reporting a message and then starting the busy wait loop again. Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as normal.
We can see the messages from the consumer coroutine busy waiting for new data to arrive in the queue.
This highlights how to get items from the asyncio.Queue by blocking with a timeout.
Note that the program output will differ each time it is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Producer: Running Consumer: Running Consumer: gave up waiting... >got 0.8506665865206575 Consumer: gave up waiting... >got 0.851355213428328 >got 0.3050736798012632 Consumer: gave up waiting... >got 0.7019959682053681 Consumer: gave up waiting... >got 0.9753069917130328 Consumer: gave up waiting... >got 0.7813291071437218 Consumer: gave up waiting... >got 0.7831885826899522 Consumer: gave up waiting... >got 0.8001066750131507 Consumer: gave up waiting... >got 0.9564293628868409 Producer: Done >got 0.41507431394001704 Consumer: Done |
Next, let’s look at how we might wait on the queue and mark tasks as completed in the queue.
Example of Asyncio Queue Join and Task Done
In the previous examples, we have sent a special message (None) into the queue to indicate that all tasks are done.
An alternative approach is to have coroutines wait on the queue directly and to have the consumer coroutine mark tasks as done.
This can be achieved via the join() and task_done() functions on the asyncio.Queue.
The producer coroutine can be updated to no longer send a None value into the queue to indicate no further tasks.
The updated version of the producer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# coroutine to generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block to simulate work await asyncio.sleep(value) # add to the queue await queue.put(value) print('Producer: Done') |
The consumer coroutine can be updated to no longer check for None messages, and to mark each task as completed via a call to task_done().
The updated version of the consumer() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# coroutine to consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # report print(f'>got {item}') # block while processing if item: await asyncio.sleep(item) # mark the task as done queue.task_done() |
The producer coroutine will run until there are no longer any tasks to add to the queue and will terminate. The consumer coroutine will now run forever.
We will start the consumer coroutine first as a separate standalone task.
Next, we will execute and await the producer coroutine until it is done and all items have been added to the queue.
Finally, we will wait for all items added to the queue to be marked as done.
The updated main() coroutine is listed below.
1 2 3 4 5 6 7 8 9 10 |
# entry point coroutine async def main(): # create the shared queue queue = asyncio.Queue() # start the consumer _ = asyncio.create_task(consumer(queue)) # start the producer and wait for it to finish await asyncio.create_task(producer(queue)) # wait for all items to be processed await queue.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# SuperFastPython.com # example of using join and task_done with an asyncio queue from random import random import asyncio # coroutine to generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block to simulate work await asyncio.sleep(value) # add to the queue await queue.put(value) print('Producer: Done') # coroutine to consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # report print(f'>got {item}') # block while processing if item: await asyncio.sleep(item) # mark the task as done queue.task_done() # entry point coroutine async def main(): # create the shared queue queue = asyncio.Queue() # start the consumer _ = asyncio.create_task(consumer(queue)) # start the producer and wait for it to finish await asyncio.create_task(producer(queue)) # wait for all items to be processed await queue.join() # start the asyncio program asyncio.run(main()) |
Running the example creates the shared asyncio.Queue, then starts the consumer and producer coroutines, as before.
The producer coroutine generates, blocks, and adds items to the queue.
The consumer coroutine attempts to get values from the queue.
Once the producer coroutine has added all of its ten items, it will terminate. The consumer coroutine will run forever in the background.
Once the producer coroutine has terminated, the main coroutine will then block on the queue.
Once the consumer coroutine has processed all items that were added to the queue, the main coroutine will then resume and terminate.
The asyncio program then terminates with the consumer coroutine still running in the background, but with no work to process.
This highlights how to mark tasks as completed in the queue and how to wait on the queue for all work to be completed.
Note that the program output will differ each time it is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Consumer: Running Producer: Running >got 0.98439852757525 >got 0.31319007221013795 >got 0.9398085059848861 >got 0.14351842921376057 >got 0.24629462902135835 >got 0.4488704344186214 >got 0.19476785739518376 >got 0.8393990524378161 >got 0.3269099694795079 Producer: Done >got 0.8274430954459486 |
Next, let’s look at how we might work with a limited-sized queue.
Example of Asyncio Queue With Limited Size
We can limit the capacity of the queue.
This can be helpful if we have a large number of producers or slow consumers. It allows us to limit the number of tasks that may be in memory at any one time, limiting the overall memory usage of the application.
This can be achieved by setting the “maxsize” argument in the constructor to the queue when configuring it. Because this is the first and only positional argument, we can omit the name of the argument to the constructor.
When the queue is full, calls to put() will block until a position becomes available to place another item on the queue.
We can demonstrate this by updating the example in the previous section to limit the size of the queue to a small number, like 2 items, and have a modest number of producers, e.g. 5, attempt to populate the queue. This will keep the queue full most of the time forcing the producers to block when adding items.
Firstly, we can specify the capacity of the queue when it is constructed.
1 2 3 |
... # create the shared queue queue = asyncio.Queue(2) |
Next, we can create five producer coroutines that will all add attempted tasks to the queue at the same time.
This can be done in a list comprehension which can then be unpacked into separate expressions using the star operator in a call to asyncio.gather().
After all of the producers are finished, the main() coroutine can join() the queue and wait for the consumer to mark all items as done.
1 2 3 4 5 6 7 |
... # create many producers producers = [producer(queue) for _ in range(5)] # run and wait for the producers to finish await asyncio.gather(*producers) # wait for the consumer to process all items await queue.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of using an asyncio queue with a limited capacity from random import random import asyncio # coroutine to generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block to simulate work await asyncio.sleep(value) # add to the queue, may block await queue.put(value) print('Producer: Done') # coroutine to consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # report print(f'>got {item}') # block while processing if item: await asyncio.sleep(item) # mark as completed queue.task_done() # all done print('Consumer: Done') # entry point coroutine async def main(): # create the shared queue queue = asyncio.Queue(2) # start the consumer _ = asyncio.create_task(consumer(queue)) # create many producers producers = [producer(queue) for _ in range(5)] # run and wait for the producers to finish await asyncio.gather(*producers) # wait for the consumer to process all items await queue.join() # start the asyncio program asyncio.run(main()) |
Running the example first starts the consumer coroutine, then starts all five producer coroutines.
The main coroutine then blocks until the producer coroutines finish.
Each of the five producer coroutines then attempts to add ten items to the queue as fast as they are able. The capacity of the queue is so limited that these coroutines will block most of the time on their calls to put() until a position becomes available to add a new item to the queue.
The consumer coroutine will consume items from the queue as fast as it can, report their values and mark them as done. The coroutine will run forever as a background task.
Once the producer coroutines finish, the main coroutine resumes and then blocks on the queue, waiting for all remaining tasks to be consumed and marked done by the consumer coroutine.
This highlights how to use a limited capacity asyncio.Queue and how to combine this with marking tasks as completed and joining the queue.
Note that the program output will differ each time it is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
Consumer: Running Producer: Running Producer: Running Producer: Running Producer: Running Producer: Running >got 0.0798149651109541 >got 0.5513864113584395 >got 0.8149184098780632 >got 0.8561030038666221 >got 0.8225047439580798 >got 0.992630421268497 >got 0.27449486943860757 >got 0.10489939965437134 >got 0.9004478449122744 >got 0.9442262069705694 >got 0.9517905758143422 >got 0.38578513687892313 >got 0.21314357809327322 >got 0.006412317984848315 >got 0.522391949578982 >got 0.4289851852631642 >got 0.5237185610606917 >got 0.7128146789112292 >got 0.2424277811353306 >got 0.44543328087703804 >got 0.36961101864563994 >got 0.46362053301168127 >got 0.853341848695711 >got 0.5234863755930941 >got 0.04593820030932505 >got 0.0554357759717663 >got 0.008185842872241 >got 0.9700101228192052 >got 0.8048086100285801 >got 0.689831779214825 >got 0.3245915440087028 >got 0.21373695813973959 >got 0.9315929425005609 >got 0.9382045140049264 >got 0.925811547635268 >got 0.6079025826247971 >got 0.1675603246130124 >got 0.8861271320774468 >got 0.5610211824876841 >got 0.6335242295962565 Producer: Done >got 0.5251525663901687 >got 0.8263850076196841 >got 0.06117578863178552 >got 0.7066342593552792 Producer: Done >got 0.883204743564828 Producer: Done >got 0.06293969547023037 Producer: Done >got 0.5876241223957309 >got 0.7631673862150006 Producer: Done >got 0.07354652534254391 >got 0.25988256916156316 |
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to share data between coroutines using queues in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Kelvin Quarles on Unsplash
Dmitry says
Thank you for best tutorial!
However, in your last eхample you don’t cancel consumer task and it is still running.
This can be seen if you add print(asyncsio.all_task()) after asyncio.run(main()).
Is it always nesessary to cancel tasks running in endless cycle?
According to https://docs.python.org/3/library/asyncio-queue.html the endless tasks are cancelled:
# Cancel our worker tasks
for task in tasks:
task.cancel()
Moreover they wait until tasks are cancelled (I think it is optional):
# Wait until all worker tasks are cancelled
await asyncio.gather(*tasks, return_exceptions=True)
Jason Brownlee says
There is no need to cancel the consumer task. Once the main coroutine is done, the event loop will cancel any remaining tasks.