We can develop a group chat client and server with asyncio.
A chat room allows multiple clients to connect to the same server and chat together. Each client message is transmitted to all connected clients.
It is a great exercise in network programming and a great way to showcase asynchronous programming with asyncio with non-blocking TCP streams.
In this tutorial, you will discover how to develop a group chat room using asyncio.
After completing this tutorial, you will know:
- What is a multi-user chatroom and how we might develop a chat room client and server in asyncio.
- How to implement a specific chatroom server and client.
- How to test the developed chatroom programs with one and multiple connected clients.
Let’s get started.
Develop Group Chat Client and Server
A group chat program allows multiple clients to connect and communicate via text in real-time.
Each client is able to send a message to the server and all clients are shown the message, allowing a group of clients to chat at the same time.
The primary use of a chat room is to share information via text with a group of other users. Generally speaking, the ability to converse with multiple people in the same conversation differentiates chat rooms from instant messaging programs, which are more typically designed for one-to-one communication.
— Chat room, Wikipedia.
Developing a chat client is an excellent case study for network programming and asyncio in particular as it requires that both the client program and the server program be event-driven, responding to messages on demand.
Specifically:
- The client program must display messages as they are received from the server and must transmit messages as they are typed in on standard input.
- The server must accept and manage new client connections and must broadcast messages from each client to all clients.
How can we develop a chat room for group chat using asyncio?
Run loops using all CPUs, download your FREE book to learn how.
How to Develop Asyncio Chat Server and Client
Developing an asyncio chat room involves developing client and server programs.
Let’s take a closer look at each in turn.
Develop Asyncio Chat Server
An asyncio group chat server has several requirements, they are:
- Accept connections from new clients.
- Ask the client for their name and announce it to all other clients.
- Read messages from each client and broadcast them to all other clients.
- Determine when a client wants to exit and broadcast their disconnection.
There are several ways that we could implement these aspects.
We will explore perhaps the simplest approach and ignore error handling.
Firstly, we must create the server and accept incoming client connections.
This can be achieved using the asyncio.start_server() function and specify the coroutine to call to handle each incoming connection and the hostname and port on which to handle connections.
For example:
1 2 3 4 5 |
... # define the local host host_address, host_port = '127.0.0.1', 8888 # create the server server = await asyncio.start_server(handle_chat_client, host_address, host_port) |
We can then use the context manager on the asyncio.Server object instance and call the serve_forever() method, allowing the server to accept connections until the program is terminated.
1 2 3 4 5 |
... # run the server async with server: # accept connections await server.serve_forever() |
Next, we can define the handle_chat_client() coroutine for handling new connections.
This coroutine must take a StreamReader and StreamWriter as arguments.
Firstly, it must retrieve a username from the client and store it for use when broadcasting all messages from the client.
We can send a message asking for the name. We will read and write each message as a line terminated with a new line character (‘\n’).
1 2 3 |
... writer.write('Enter your name:\n'.encode()) await writer.drain() |
Next, the user name can be read and used as a key in a global variable dict that stores a tuple of the client StreamReader and StreamWriter against the client name.
1 2 3 4 5 6 7 8 |
... # ask the user for their name name_bytes = await reader.readline() # convert name to string name = name_bytes.decode().strip() # store the user details global ALL_USERS ALL_USERS[name] = (reader, writer) |
We can then announce that a client with this name has connected.
This can be achieved by defining a coroutine that takes a string message and loops over all clients in the ALL_USERS dict and shares the message with each in turn.
The broadcast_message() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# send a message to all connected users async def broadcast_message(message): # report locally print(f'Broadcast: {message.strip()}') # convert to bytes msg_bytes = message.encode() # enumerate all users and broadcast the message global ALL_USERS for name, (reader, writer) in ALL_USERS.items(): # write message to this user writer.write(msg_bytes) # wait for the buffer to empty await writer.drain() |
We can use this coroutine to broadcast the connection of the new client.
1 2 3 |
... # announce the user await broadcast_message(f'{name} has connected\n') |
You can learn more about reading and writing non-blocking TCP streams in the tutorial:
Next, we can loop forever, reading messages from the client and broadcasting them to all other clients.
1 2 3 4 5 6 7 8 9 10 11 12 |
... # read messages from the user while True: # read a line of data line_bytes = await reader.readline() # convert to string line = line_bytes.decode().strip() # check for exit if line == 'QUIT': break # broadcast message await broadcast_message(f'{name}: {line}\n') |
If the user sends a special message, e.g. “QUIT“, the loop is broken and the user is choosing to disconnect.
To disconnect the user, we can close the StreamWriter and broadcast a message to all other clients that the user has disconnected.
For example:
1 2 3 4 5 6 7 8 9 |
... # close the user's connection writer.close() await writer.wait_closed() # remove from the dict of all users global ALL_USERS del ALL_USERS[name] # broadcast the user has left await broadcast_message(f'{name} has disconnected\n') |
That covers the main functionality of the server, next, let’s look at the client.
Develop Asyncio Chat Client
An asyncio chat client has a number of requirements, they are:
- Open a connection to the server.
- Read and report messages from the server.
- Read and transmit messages from the user.
- Close the connection to the server.
The first step is to open a connection to the server.
This can be achieved via the asyncio.open_connection() function and specifying the hostname and port number. The function then returns a StreamReader and a StreamWriter for interacting with the server.
1 2 3 |
... # open a connection to the server reader, writer = await asyncio.open_connection(server_address, server_port) |
Next, we can loop forever and read messages from the server and report them on standard output.
1 2 3 4 5 6 7 8 9 |
... # read messages from the server and print to user while True: # read a message result_bytes = await reader.readline() # decode response response = result_bytes.decode() # report the response print(response.strip()) |
This can be wrapped in a task and executed for the duration of the report. It ensures that all messages from the server, including broadcasts from other clients, are received and reported on demand.
Next, we need to read messages from the user and transmit them to the server.
This may be the user name at the beginning of the session or chat messages during the session.
We can implement this with a second loop that reads lines of text from standard input (stdin) and transmits them to the server.
If the user enters a special command, e.g. “QUIT“, the loop is exited.
Reading lines of text from stdin is a blocking activity, which should not be executed in the asyncio event loop.
For example:
1 2 3 |
... # read from stdin message = sys.stdin.readline() |
If executed, it will stop all activity in the program, preventing any messages from being read from the server and printed.
Instead, we must read from standard input and block in a thread and then read the result in the asyncio event loop when it is available.
This can be achieved via the asyncio.to_thread() function.
For example:
1 2 3 |
... # read from stdin message = await asyncio.to_thread(sys.stdin.readline) |
You can learn more about executing blocking tasks in a new thread in the tutorial:
Using this, we can then define the loop that reads messages from the user and transmits them to the server.
1 2 3 4 5 6 7 8 9 10 11 12 |
... # read messages from the user and transmit to server while True: # read from stdin message = await asyncio.to_thread(sys.stdin.readline) # send message to the server msg_bytes = message.encode() writer.write(msg_bytes) await writer.drain() # check for stop if message.strip() == 'QUIT': break |
Finally, we can disconnect the client from the server.
1 2 3 4 |
... # close the connection writer.close() await writer.wait_closed() |
Now that we know how to develop a chat client and server, let’s look at a worked example.
Example Asyncio Chat Server
We can explore how to develop an asyncio group chat server.
This program has 6 main parts, they are:
- A coroutine to broadcast to all users
- A coroutine for managing a new client connection.
- A coroutine for managing a client disconnection.
- A coroutine for managing the client connection overall.
- A coroutine for driving the server.
- The start of the event loop.
Let’s dive in.
Broadcast to All Clients
Firstly, we can define a coroutine that broadcasts a message to all connected clients.
We will maintain a global variable dict of user name mapped to a tuple of StreamReader and StreamWriter. Therefore, to broadcast a message to all clients means traversing this dict and writing a message to each client.
The broadcast_message() coroutine below implements this, taking the string message as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# send a message to all connected users async def broadcast_message(message): # report locally print(f'Broadcast: {message.strip()}') # convert to bytes msg_bytes = message.encode() # enumerate all users and broadcast the message global ALL_USERS for _, (_, writer) in ALL_USERS.items(): # write message to this user writer.write(msg_bytes) # wait for the buffer to empty await writer.drain() |
We could improve this coroutine in a few ways.
Firstly, we could protect the ALL_USERS dict from concurrent updates while transmitting messages, in case a user disconnects while we are broadcasting.
Secondly, we could transmit the messages concurrently rather than sequentially through all clients.
These are left as an extension.
Connect A New Client
Next, we need to manage the connection of a new client.
This involves first transmitting a welcome message and asking the client for their name. The name is then read and used to update the ALL_USERS dict, storing the connection details for broadcasting later.
A broadcast message is then reported to all clients that a new client with a given name has arrived and a direct message is then sent to the client mentioning the command to terminate the chat session.
The connect_user() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# connect a user async def connect_user(reader, writer): # get name message writer.write('Asycio Chat Server\n'.encode()) writer.write('Enter your name:\n'.encode()) await writer.drain() # ask the user for their name name_bytes = await reader.readline() # convert name to string name = name_bytes.decode().strip() # store the user details global ALL_USERS ALL_USERS[name] = (reader, writer) # announce the user await broadcast_message(f'{name} has connected\n') # welcome message welcome = f'Welcome {name}. Send QUIT to disconnect.\n' writer.write(welcome.encode()) await writer.drain() return name |
Disconnect a Client
Next, we can define a coroutine to handle the disconnection of a client.
This involves closing the StreamWriter, and then deleting the user from the ALL_USERS dict.
Finally, a broadcast message can be sent to all remaining clients that the client with the given name has disconnected.
The disconnect_user() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 |
# disconnect a user async def disconnect_user(name, writer): # close the user's connection writer.close() await writer.wait_closed() # remove from the dict of all users global ALL_USERS del ALL_USERS[name] # broadcast the user has left await broadcast_message(f'{name} has disconnected\n') |
Manage a Client
Next, we need to define a coroutine to manage a single client connection.
This includes calling the connect_user() to formally connect the client, read all messages from the client, and then disconnect the client when they are done.
The core of this is a loop that reads messages transmitted from the client and broadcasts them to all other clients.
The handle_chat_client() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# handle a chat client async def handle_chat_client(reader, writer): print('Client connecting...') # connect the user name = await connect_user(reader, writer) try: # read messages from the user while True: # read a line of data line_bytes = await reader.readline() # convert to string line = line_bytes.decode().strip() # check for exit if line == 'QUIT': break # broadcast message await broadcast_message(f'{name}: {line}\n') finally: # disconnect the user await disconnect_user(name, writer) |
A limitation of this implementation is that all reads from the client are blocked until the broadcast to all other clients is done.
This may limit the scalability and responsiveness of the server to rapid messages from a client,
We can improve the design by decoupling the broadcasting of messages from the client management. One approach would be to have the client put the broadcast message on a queue and have another coroutine read messages from the queue and transmit them to all clients.
This is left as an exercise.
Drive The Server
Next, we can create a coroutine to drive the main server.
This will define the details of the host, such as its port number, create the server and serve forever until the program is terminated.
The main() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 |
# chat server async def main(): # define the local host host_address, host_port = '127.0.0.1', 8888 # create the server server = await asyncio.start_server(handle_chat_client, host_address, host_port) # run the server async with server: # report message print('Chat Server Running\nWaiting for chat clients...') # accept connections await server.serve_forever() |
Start The Event Loop
Finally, we can define the global variable that holds all client connections and start the event loop.
1 2 3 4 5 |
... # dict of all current users ALL_USERS = {} # start the event loop asyncio.run(main()) |
Asyncio Chat Server Complete Example
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# SuperFastPython.com # example of a chat server using streams import asyncio # send a message to all connected users async def broadcast_message(message): # report locally print(f'Broadcast: {message.strip()}') # convert to bytes msg_bytes = message.encode() # enumerate all users and broadcast the message global ALL_USERS for _, (_, writer) in ALL_USERS.items(): # write message to this user writer.write(msg_bytes) # wait for the buffer to empty await writer.drain() # connect a user async def connect_user(reader, writer): # get name message writer.write('Asycio Chat Server\n'.encode()) writer.write('Enter your name:\n'.encode()) await writer.drain() # ask the user for their name name_bytes = await reader.readline() # convert name to string name = name_bytes.decode().strip() # store the user details global ALL_USERS ALL_USERS[name] = (reader, writer) # announce the user await broadcast_message(f'{name} has connected\n') # welcome message welcome = f'Welcome {name}. Send QUIT to disconnect.\n' writer.write(welcome.encode()) await writer.drain() return name # disconnect a user async def disconnect_user(name, writer): # close the user's connection writer.close() await writer.wait_closed() # remove from the dict of all users global ALL_USERS del ALL_USERS[name] # broadcast the user has left await broadcast_message(f'{name} has disconnected\n') # handle a chat client async def handle_chat_client(reader, writer): print('Client connecting...') # connect the user name = await connect_user(reader, writer) try: # read messages from the user while True: # read a line of data line_bytes = await reader.readline() # convert to string line = line_bytes.decode().strip() # check for exit if line == 'QUIT': break # broadcast message await broadcast_message(f'{name}: {line}\n') finally: # disconnect the user await disconnect_user(name, writer) # chat server async def main(): # define the local host host_address, host_port = '127.0.0.1', 8888 # create the server server = await asyncio.start_server(handle_chat_client, host_address, host_port) # run the server async with server: # report message print('Chat Server Running\nWaiting for chat clients...') # accept connections await server.serve_forever() # dict of all current users ALL_USERS = {} # start the event loop asyncio.run(main()) |
Next, let’s explore an example of a chat client.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example Asyncio Chat Client
We can explore how to develop an asyncio group chat client.
This is simpler than the server we developed in the previous section.
This program has 3 main parts, they are:
- A coroutine to read messages from the user and transmit to the server.
- A coroutine to read messages from the server and report to the user.
- A main coroutine to drive the connection to the server.
Let’s dive in.
Read and Transmit User Messages
Firstly, we can develop a coroutine that reads messages from standard input and transmits them to the server.
Reads from standard input are blocking, therefore we must perform them in a separate thread via the asyncio.to_thread() function.
This coroutine loops until the user enters a message “QUIT“, which then exits the loop.
The write_messages() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# send message to server async def write_messages(writer): # read messages from the user and transmit to server while True: # read from stdin message = await asyncio.to_thread(sys.stdin.readline) # encode the string message to bytes msg_bytes = message.encode() # transmit the message to the server writer.write(msg_bytes) # wait for the buffer to be empty await writer.drain() # check if the user wants to quit the program if message.strip() == 'QUIT': # exit the loop break # report that the program is terminating print('Quitting...') |
Read and Report Server Messages
Next, we need to develop a coroutine that will read messages from the server and report them to the user.
This task operates in a loop, forever reading messages and printing them to standard output.
The read_messages() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 |
# read messages from the server async def read_messages(reader): # read messages from the server and print to user while True: # read a message from the server result_bytes = await reader.readline() # decode response from bytes to a string response = result_bytes.decode() # report the response to the user print(response.strip()) |
Drive Connection With Server
Next, we need to drive the connection with the server.
This involves first opening a connection to the server on the required host and port number.
Next, we can create the read_messages() coroutine and schedule it as a background task, forever reading and reporting messages from the server.
We can then create and await the write_messages() that will read input from the user and transmit it to the server. This coroutine will return when the user decides to quit.
The read_messages() task can then be canceled and we can then close the connection to the server.
The main() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# echo client async def main(): # define the server details server_address, server_port = '127.0.0.1', 8888 # report progress to the user print(f'Connecting to {server_address}:{server_port}...') # open a connection to the server reader, writer = await asyncio.open_connection(server_address, server_port) # report progress to the user print('Connected.') # read and report messages from the server read_task = asyncio.create_task(read_messages(reader)) # write messages to the server await write_messages(writer) # cancel the read messages task read_task.cancel() # report progress to the user print('Disconnecting from server...') # close the stream writer writer.close() # wait for the tcp connection to close await writer.wait_closed() # report progress to the user print('Done.') |
Finally, we can start the event loop.
1 2 3 |
... # run the event loop asyncio.run(main()) |
Asyncio Chat Client Complete Example
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# SuperFastPython.com # example of a chat client using streams import sys import asyncio # send message to server async def write_messages(writer): # read messages from the user and transmit to server while True: # read from stdin message = await asyncio.to_thread(sys.stdin.readline) # encode the string message to bytes msg_bytes = message.encode() # transmit the message to the server writer.write(msg_bytes) # wait for the buffer to be empty await writer.drain() # check if the user wants to quit the program if message.strip() == 'QUIT': # exit the loop break # report that the program is terminating print('Quitting...') # read messages from the server async def read_messages(reader): # read messages from the server and print to user while True: # read a message from the server result_bytes = await reader.readline() # decode response from bytes to a string response = result_bytes.decode() # report the response to the user print(response.strip()) # echo client async def main(): # define the server details server_address, server_port = '127.0.0.1', 8888 # report progress to the user print(f'Connecting to {server_address}:{server_port}...') # open a connection to the server reader, writer = await asyncio.open_connection(server_address, server_port) # report progress to the user print('Connected.') # read and report messages from the server read_task = asyncio.create_task(read_messages(reader)) # write messages to the server await write_messages(writer) # cancel the read messages task read_task.cancel() # report progress to the user print('Disconnecting from server...') # close the stream writer writer.close() # wait for the tcp connection to close await writer.wait_closed() # report progress to the user print('Done.') # run the event loop asyncio.run(main()) |
Next, let’s explore how to use our chat server and clients.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example Chat Session
Now that we have developed our chat server and client, let’s look at how we can use them with an example chat session.
Each program must be run as a separate program and the client programs require user input.
I recommend saving each program as a separate file and running them on the command line (terminal or prompt).
Firstly, we can run the server.
Save the chat server to a file like server.py and then run it on the command line to start the server.
For example:
1 |
python server.py |
Next, save the chat client program in a file like client.py and then run it on the command line to start the client for example:
1 |
python client.py |
The client must then enter their name and can begin chatting.
For example:
1 2 3 4 5 6 7 8 9 |
Connecting to 127.0.0.1:8888... Connected. Asycio Chat Server Enter your name: Jason Jason has connected Welcome Jason. Send QUIT to disconnect. Hello there! Jason: Hello there! |
On the server, we can see the client connect and each broadcast message being sent.
1 2 3 4 5 |
Chat Server Running Waiting for chat clients... Client connecting... Broadcast: Jason has connected Broadcast: Jason: Hello there! |
Next, open another window and start a second chat client.
For example:
1 |
python client.py |
Enter a different name and begin chatting.
For example:
1 2 3 4 5 6 7 8 9 |
Connecting to 127.0.0.1:8888... Connected. Asycio Chat Server Enter your name: Tom Tom has connected Welcome Tom. Send QUIT to disconnect. Hi there, this is very cool! Tom: Hi there, this is very cool! |
Now check the first chat client. You will see that the second client was announced and their chat messages were broadcast.
1 2 3 |
... Tom has connected Tom: Hi there, this is very cool! |
Similarly, the server sees the second client.
1 2 3 4 |
... Client connecting... Broadcast: Tom has connected Broadcast: Tom: Hi there, this is very cool! |
Carry on and test the clients.
Each client can then terminate their session by sending the message: QUIT
For example:
1 2 3 4 5 |
... QUIT Quitting... Disconnecting from server... Done. |
The first client will see the second client disconnect.
For example:
1 2 |
... Tom has disconnected |
The first client can then disconnect in the same manner.
For example:
1 2 3 4 5 |
... QUIT Quitting... Disconnecting from server... Done. |
The server will see and report both client disconnections.
1 2 3 |
... Broadcast: Tom has disconnected Broadcast: Jason has disconnected |
Finally, the server process can be forcefully terminated.
This highlights how we can use our group chat or chat room program by running separate server and client programs.
Extension: Concurrent Broadcasts on Server
We can explore an update to the server program that broadcasts to all clients concurrently, rather than sequentially.
This can be achieved by first defining a new coroutine that writes a byte message to a single StreamWriter.
The write_message() coroutine below implements this.
1 2 3 4 5 6 |
# write a message to a stream writer async def write_message(writer, msg_bytes): # write message to this user writer.write(msg_bytes) # wait for the buffer to empty await writer.drain() |
Next, the broadcast_message coroutine can be updated to call the write_message() coroutine concurrently.
This can be achieved by creating one asyncio.Task for each client in the ALL_USERS dict, then awaiting all tasks via asyncio.wait().
For example:
1 2 3 4 5 |
... # create a task for each write to client tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()] # wait for all writes to complete _ = await asyncio.wait(tasks) |
The updated version of the broadcast_message coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# send a message to all connected users async def broadcast_message(message): # report locally print(f'Broadcast: {message.strip()}') # convert to bytes msg_bytes = message.encode() # enumerate all users and broadcast the message global ALL_USERS # create a task for each write to client tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()] # wait for all writes to complete _ = await asyncio.wait(tasks) |
You can learn more about waiting for concurrent tasks in the tutorial:
Tying this together, the complete updated server program is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# SuperFastPython.com # example of a chat server using streams import asyncio # write a message to a stream writer async def write_message(writer, msg_bytes): # write message to this user writer.write(msg_bytes) # wait for the buffer to empty await writer.drain() # send a message to all connected users async def broadcast_message(message): # report locally print(f'Broadcast: {message.strip()}') # convert to bytes msg_bytes = message.encode() # enumerate all users and broadcast the message global ALL_USERS # create a task for each write to client tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()] # wait for all writes to complete _ = await asyncio.wait(tasks) # connect a user async def connect_user(reader, writer): # get name message writer.write('Asycio Chat Server\n'.encode()) writer.write('Enter your name:\n'.encode()) await writer.drain() # ask the user for their name name_bytes = await reader.readline() # convert name to string name = name_bytes.decode().strip() # store the user details global ALL_USERS ALL_USERS[name] = (reader, writer) # announce the user await broadcast_message(f'{name} has connected\n') # welcome message welcome = f'Welcome {name}. Send QUIT to disconnect.\n' writer.write(welcome.encode()) await writer.drain() return name # disconnect a user async def disconnect_user(name, writer): # close the user's connection writer.close() await writer.wait_closed() # remove from the dict of all users global ALL_USERS del ALL_USERS[name] # broadcast the user has left await broadcast_message(f'{name} has disconnected\n') # handle a chat client async def handle_chat_client(reader, writer): print('Client connecting...') # connect the user name = await connect_user(reader, writer) try: # read messages from the user while True: # read a line of data line_bytes = await reader.readline() # convert to string line = line_bytes.decode().strip() # check for exit if line == 'QUIT': break # broadcast message await broadcast_message(f'{name}: {line}\n') finally: # disconnect the user await disconnect_user(name, writer) # chat server async def main(): # define the local host host_address, host_port = '127.0.0.1', 8888 # create the server server = await asyncio.start_server(handle_chat_client, host_address, host_port) # run the server async with server: # report message print('Chat Server Running\nWaiting for chat clients...') # accept connections await server.serve_forever() # dict of all current users ALL_USERS = {} # start the event loop asyncio.run(main()) |
Extension: Broadcast Via Queue in Server
We can explore an extension that uses a queue to dispatch broadcast messages from clients in the server that are transmitted at a later time.
This can make the server more responsive to client messages, especially if there are a large number of clients that may slow down the broadcasting.
Firstly, we can create an asyncio.Queue as a global variable that will be used for broadcast messages.
1 2 3 |
... # queue for broadcast messages queue = asyncio.Queue() |
Next, we can create a new coroutine that loops forever and reads messages from the queue and broadcasts them to all clients.
The broadcaster() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# read messages from the queue and transmit them to all clients async def broadcaster(): global queue while True: # read a message from the queue message = await queue.get() # report locally print(f'Broadcast: {message.strip()}') # convert to bytes msg_bytes = message.encode() # enumerate all users and broadcast the message global ALL_USERS # create a task for each write to client tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()] # wait for all writes to complete _ = await asyncio.wait(tasks) |
Next, we can update our old broadcast_message() coroutine to simply put messages on the queue.
1 2 3 4 5 |
# send a message to all connected users async def broadcast_message(message): global queue # put the message in the queue await queue.put(message) |
If you are new to putting and getting on an asyncio.Queue, see the tutorial:
Finally, we can update the main() to start the broadcaster() coroutine as a background task.
1 2 3 |
... # start the broadcaster as a background task broadcaster_task = asyncio.create_task(broadcaster()) |
The updated main() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# chat server async def main(): # start the broadcaster as a background task broadcaster_task = asyncio.create_task(broadcaster()) # define the local host host_address, host_port = '127.0.0.1', 8888 # create the server server = await asyncio.start_server(handle_chat_client, host_address, host_port) # run the server async with server: # report message print('Chat Server Running\nWaiting for chat clients...') # accept connections await server.serve_forever() |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# SuperFastPython.com # example of a chat server using streams import asyncio # write a message to a stream writer async def write_message(writer, msg_bytes): # write message to this user writer.write(msg_bytes) # wait for the buffer to empty await writer.drain() # read messages from the queue and transmit them to all clients async def broadcaster(): global queue while True: # read a message from the queue message = await queue.get() # report locally print(f'Broadcast: {message.strip()}') # convert to bytes msg_bytes = message.encode() # enumerate all users and broadcast the message global ALL_USERS # create a task for each write to client tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()] # wait for all writes to complete _ = await asyncio.wait(tasks) # send a message to all connected users async def broadcast_message(message): global queue # put the message in the queue await queue.put(message) # connect a user async def connect_user(reader, writer): # get name message writer.write('Asycio Chat Server\n'.encode()) writer.write('Enter your name:\n'.encode()) await writer.drain() # ask the user for their name name_bytes = await reader.readline() # convert name to string name = name_bytes.decode().strip() # store the user details global ALL_USERS ALL_USERS[name] = (reader, writer) # announce the user await broadcast_message(f'{name} has connected\n') # welcome message welcome = f'Welcome {name}. Send QUIT to disconnect.\n' writer.write(welcome.encode()) await writer.drain() return name # disconnect a user async def disconnect_user(name, writer): # close the user's connection writer.close() await writer.wait_closed() # remove from the dict of all users global ALL_USERS del ALL_USERS[name] # broadcast the user has left await broadcast_message(f'{name} has disconnected\n') # handle a chat client async def handle_chat_client(reader, writer): print('Client connecting...') # connect the user name = await connect_user(reader, writer) try: # read messages from the user while True: # read a line of data line_bytes = await reader.readline() # convert to string line = line_bytes.decode().strip() # check for exit if line == 'QUIT': break # broadcast message await broadcast_message(f'{name}: {line}\n') finally: # disconnect the user await disconnect_user(name, writer) # chat server async def main(): # start the broadcaster as a background task broadcaster_task = asyncio.create_task(broadcaster()) # define the local host host_address, host_port = '127.0.0.1', 8888 # create the server server = await asyncio.start_server(handle_chat_client, host_address, host_port) # run the server async with server: # report message print('Chat Server Running\nWaiting for chat clients...') # accept connections await server.serve_forever() # dict of all current users ALL_USERS = {} # queue for broadcast messages queue = asyncio.Queue() # start the event loop asyncio.run(main()) |
Extensions
This section lists some extensions you may wish to explore.
- Use Queue in the Server. Each client could put messages from clients on a queue. A broadcast coroutine could loop forever and read messages from the queue and broadcast them. This separation of concerns may be more efficient (implemented, see above).
- Server Broadcast Concurrently. The broadcasts triggered by each client on the server are executed sequentially in a loop, updating the loop so that all broadcasts are performed concurrently (implemented, see above).
- Add Coroutine Safety. Protect all changes to the ALL_USERS from race conditions via concurrent updates using a mutex lock.
- Add Error Handling. Check for and handle common errors, such as server already running, unable to connect to server, client failing to connect properly, client disconnected unexpectedly, etc.
If you explore any of these extensions, share your code and findings in the comments below.
I’d love to see what you come up with.
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to develop a group chat room using asyncio in Python.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Alexandru Ivanov on Unsplash
Do you have any questions?