Asyncio Chat Client and Server
We can develop a group chat client and server with asyncio.
A chat room allows multiple clients to connect to the same server and chat together. Each client message is transmitted to all connected clients.
It is a great exercise in network programming and a great way to showcase asynchronous programming with asyncio with non-blocking TCP streams.
In this tutorial, you will discover how to develop a group chat room using asyncio.
After completing this tutorial, you will know:
- What is a multi-user chatroom and how we might develop a chat room client and server in asyncio.
- How to implement a specific chatroom server and client.
- How to test the developed chatroom programs with one and multiple connected clients.
Let's get started.
Develop Group Chat Client and Server
A group chat program allows multiple clients to connect and communicate via text in real-time.
Each client is able to send a message to the server and all clients are shown the message, allowing a group of clients to chat at the same time.
The primary use of a chat room is to share information via text with a group of other users. Generally speaking, the ability to converse with multiple people in the same conversation differentiates chat rooms from instant messaging programs, which are more typically designed for one-to-one communication.
-- Chat room, Wikipedia.
Developing a chat client is an excellent case study for network programming and asyncio in particular as it requires that both the client program and the server program be event-driven, responding to messages on demand.
Specifically:
- The client program must display messages as they are received from the server and must transmit messages as they are typed in on standard input.
- The server must accept and manage new client connections and must broadcast messages from each client to all clients.
How can we develop a chat room for group chat using asyncio?
How to Develop Asyncio Chat Server and Client
Developing an asyncio chat room involves developing client and server programs.
Let's take a closer look at each in turn.
Develop Asyncio Chat Server
An asyncio group chat server has several requirements, they are:
- Accept connections from new clients.
- Ask the client for their name and announce it to all other clients.
- Read messages from each client and broadcast them to all other clients.
- Determine when a client wants to exit and broadcast their disconnection.
There are several ways that we could implement these aspects.
We will explore perhaps the simplest approach and ignore error handling.
Firstly, we must create the server and accept incoming client connections.
This can be achieved using the asyncio.start_server() function and specify the coroutine to call to handle each incoming connection and the hostname and port on which to handle connections.
For example:
...
# define the local host
host_address, host_port = '127.0.0.1', 8888
# create the server
server = await asyncio.start_server(handle_chat_client, host_address, host_port)
We can then use the context manager on the asyncio.Server object instance and call the serve_forever() method, allowing the server to accept connections until the program is terminated.
...
# run the server
async with server:
# accept connections
await server.serve_forever()
Next, we can define the handle_chat_client() coroutine for handling new connections.
This coroutine must take a StreamReader and StreamWriter as arguments.
Firstly, it must retrieve a username from the client and store it for use when broadcasting all messages from the client.
We can send a message asking for the name. We will read and write each message as a line terminated with a new line character ('\n').
...
writer.write('Enter your name:\n'.encode())
await writer.drain()
Next, the user name can be read and used as a key in a global variable dict that stores a tuple of the client StreamReader and StreamWriter against the client name.
...
# ask the user for their name
name_bytes = await reader.readline()
# convert name to string
name = name_bytes.decode().strip()
# store the user details
global ALL_USERS
ALL_USERS = (reader, writer)
We can then announce that a client with this name has connected.
This can be achieved by defining a coroutine that takes a string message and loops over all clients in the ALL_USERS dict and shares the message with each in turn.
The broadcast_message() coroutine below implements this.
# send a message to all connected users
async def broadcast_message(message):
# report locally
print(f'Broadcast: {message.strip()}')
# convert to bytes
msg_bytes = message.encode()
# enumerate all users and broadcast the message
global ALL_USERS
for name, (reader, writer) in ALL_USERS.items():
# write message to this user
writer.write(msg_bytes)
# wait for the buffer to empty
await writer.drain()
We can use this coroutine to broadcast the connection of the new client.
...
# announce the user
await broadcast_message(f'{name} has connected\n')
You can learn more about reading and writing non-blocking TCP streams in the tutorial:
Next, we can loop forever, reading messages from the client and broadcasting them to all other clients.
...
# read messages from the user
while True:
# read a line of data
line_bytes = await reader.readline()
# convert to string
line = line_bytes.decode().strip()
# check for exit
if line == 'QUIT':
break
# broadcast message
await broadcast_message(f'{name}: {line}\n')
If the user sends a special message, e.g. "QUIT", the loop is broken and the user is choosing to disconnect.
To disconnect the user, we can close the StreamWriter and broadcast a message to all other clients that the user has disconnected.
For example:
...
# close the user's connection
writer.close()
await writer.wait_closed()
# remove from the dict of all users
global ALL_USERS
del ALL_USERS
# broadcast the user has left
await broadcast_message(f'{name} has disconnected\n')
That covers the main functionality of the server, next, let's look at the client.
Develop Asyncio Chat Client
An asyncio chat client has a number of requirements, they are:
- Open a connection to the server.
- Read and report messages from the server.
- Read and transmit messages from the user.
- Close the connection to the server.
The first step is to open a connection to the server.
This can be achieved via the asyncio.open_connection() function and specifying the hostname and port number. The function then returns a StreamReader and a StreamWriter for interacting with the server.
...
# open a connection to the server
reader, writer = await asyncio.open_connection(server_address, server_port)
Next, we can loop forever and read messages from the server and report them on standard output.
...
# read messages from the server and print to user
while True:
# read a message
result_bytes = await reader.readline()
# decode response
response = result_bytes.decode()
# report the response
print(response.strip())
This can be wrapped in a task and executed for the duration of the report. It ensures that all messages from the server, including broadcasts from other clients, are received and reported on demand.
Next, we need to read messages from the user and transmit them to the server.
This may be the user name at the beginning of the session or chat messages during the session.
We can implement this with a second loop that reads lines of text from standard input (stdin) and transmits them to the server.
If the user enters a special command, e.g. "QUIT", the loop is exited.
Reading lines of text from stdin is a blocking activity, which should not be executed in the asyncio event loop.
For example:
...
# read from stdin
message = sys.stdin.readline()
If executed, it will stop all activity in the program, preventing any messages from being read from the server and printed.
Instead, we must read from standard input and block in a thread and then read the result in the asyncio event loop when it is available.
This can be achieved via the asyncio.to_thread() function.
For example:
...
# read from stdin
message = await asyncio.to_thread(sys.stdin.readline)
You can learn more about executing blocking tasks in a new thread in the tutorial:
Using this, we can then define the loop that reads messages from the user and transmits them to the server.
...
# read messages from the user and transmit to server
while True:
# read from stdin
message = await asyncio.to_thread(sys.stdin.readline)
# send message to the server
msg_bytes = message.encode()
writer.write(msg_bytes)
await writer.drain()
# check for stop
if message.strip() == 'QUIT':
break
Finally, we can disconnect the client from the server.
...
# close the connection
writer.close()
await writer.wait_closed()
Now that we know how to develop a chat client and server, let's look at a worked example.
Example Asyncio Chat Server
We can explore how to develop an asyncio group chat server.
This program has 6 main parts, they are:
- A coroutine to broadcast to all users
- A coroutine for managing a new client connection.
- A coroutine for managing a client disconnection.
- A coroutine for managing the client connection overall.
- A coroutine for driving the server.
- The start of the event loop.
Let's dive in.
Broadcast to All Clients
Firstly, we can define a coroutine that broadcasts a message to all connected clients.
We will maintain a global variable dict of user name mapped to a tuple of StreamReader and StreamWriter. Therefore, to broadcast a message to all clients means traversing this dict and writing a message to each client.
The broadcast_message() coroutine below implements this, taking the string message as an argument.
# send a message to all connected users
async def broadcast_message(message):
# report locally
print(f'Broadcast: {message.strip()}')
# convert to bytes
msg_bytes = message.encode()
# enumerate all users and broadcast the message
global ALL_USERS
for _, (_, writer) in ALL_USERS.items():
# write message to this user
writer.write(msg_bytes)
# wait for the buffer to empty
await writer.drain()
We could improve this coroutine in a few ways.
Firstly, we could protect the ALL_USERS dict from concurrent updates while transmitting messages, in case a user disconnects while we are broadcasting.
Secondly, we could transmit the messages concurrently rather than sequentially through all clients.
These are left as an extension.
Connect A New Client
Next, we need to manage the connection of a new client.
This involves first transmitting a welcome message and asking the client for their name. The name is then read and used to update the ALL_USERS dict, storing the connection details for broadcasting later.
A broadcast message is then reported to all clients that a new client with a given name has arrived and a direct message is then sent to the client mentioning the command to terminate the chat session.
The connect_user() coroutine below implements this.
# connect a user
async def connect_user(reader, writer):
# get name message
writer.write('Asycio Chat Server\n'.encode())
writer.write('Enter your name:\n'.encode())
await writer.drain()
# ask the user for their name
name_bytes = await reader.readline()
# convert name to string
name = name_bytes.decode().strip()
# store the user details
global ALL_USERS
ALL_USERS = (reader, writer)
# announce the user
await broadcast_message(f'{name} has connected\n')
# welcome message
welcome = f'Welcome {name}. Send QUIT to disconnect.\n'
writer.write(welcome.encode())
await writer.drain()
return name
Disconnect a Client
Next, we can define a coroutine to handle the disconnection of a client.
This involves closing the StreamWriter, and then deleting the user from the ALL_USERS dict.
Finally, a broadcast message can be sent to all remaining clients that the client with the given name has disconnected.
The disconnect_user() coroutine below implements this.
# disconnect a user
async def disconnect_user(name, writer):
# close the user's connection
writer.close()
await writer.wait_closed()
# remove from the dict of all users
global ALL_USERS
del ALL_USERS
# broadcast the user has left
await broadcast_message(f'{name} has disconnected\n')
Manage a Client
Next, we need to define a coroutine to manage a single client connection.
This includes calling the connect_user() to formally connect the client, read all messages from the client, and then disconnect the client when they are done.
The core of this is a loop that reads messages transmitted from the client and broadcasts them to all other clients.
The handle_chat_client() coroutine below implements this.
# handle a chat client
async def handle_chat_client(reader, writer):
print('Client connecting...')
# connect the user
name = await connect_user(reader, writer)
try:
# read messages from the user
while True:
# read a line of data
line_bytes = await reader.readline()
# convert to string
line = line_bytes.decode().strip()
# check for exit
if line == 'QUIT':
break
# broadcast message
await broadcast_message(f'{name}: {line}\n')
finally:
# disconnect the user
await disconnect_user(name, writer)
A limitation of this implementation is that all reads from the client are blocked until the broadcast to all other clients is done.
This may limit the scalability and responsiveness of the server to rapid messages from a client,
We can improve the design by decoupling the broadcasting of messages from the client management. One approach would be to have the client put the broadcast message on a queue and have another coroutine read messages from the queue and transmit them to all clients.
This is left as an exercise.
Drive The Server
Next, we can create a coroutine to drive the main server.
This will define the details of the host, such as its port number, create the server and serve forever until the program is terminated.
The main() coroutine below implements this.
# chat server
async def main():
# define the local host
host_address, host_port = '127.0.0.1', 8888
# create the server
server = await asyncio.start_server(handle_chat_client, host_address, host_port)
# run the server
async with server:
# report message
print('Chat Server Running\nWaiting for chat clients...')
# accept connections
await server.serve_forever()
Start The Event Loop
Finally, we can define the global variable that holds all client connections and start the event loop.
...
# dict of all current users
ALL_USERS = {}
# start the event loop
asyncio.run(main())
Asyncio Chat Server Complete Example
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of a chat server using streams
import asyncio
# send a message to all connected users
async def broadcast_message(message):
# report locally
print(f'Broadcast: {message.strip()}')
# convert to bytes
msg_bytes = message.encode()
# enumerate all users and broadcast the message
global ALL_USERS
for _, (_, writer) in ALL_USERS.items():
# write message to this user
writer.write(msg_bytes)
# wait for the buffer to empty
await writer.drain()
# connect a user
async def connect_user(reader, writer):
# get name message
writer.write('Asycio Chat Server\n'.encode())
writer.write('Enter your name:\n'.encode())
await writer.drain()
# ask the user for their name
name_bytes = await reader.readline()
# convert name to string
name = name_bytes.decode().strip()
# store the user details
global ALL_USERS
ALL_USERS = (reader, writer)
# announce the user
await broadcast_message(f'{name} has connected\n')
# welcome message
welcome = f'Welcome {name}. Send QUIT to disconnect.\n'
writer.write(welcome.encode())
await writer.drain()
return name
# disconnect a user
async def disconnect_user(name, writer):
# close the user's connection
writer.close()
await writer.wait_closed()
# remove from the dict of all users
global ALL_USERS
del ALL_USERS
# broadcast the user has left
await broadcast_message(f'{name} has disconnected\n')
# handle a chat client
async def handle_chat_client(reader, writer):
print('Client connecting...')
# connect the user
name = await connect_user(reader, writer)
try:
# read messages from the user
while True:
# read a line of data
line_bytes = await reader.readline()
# convert to string
line = line_bytes.decode().strip()
# check for exit
if line == 'QUIT':
break
# broadcast message
await broadcast_message(f'{name}: {line}\n')
finally:
# disconnect the user
await disconnect_user(name, writer)
# chat server
async def main():
# define the local host
host_address, host_port = '127.0.0.1', 8888
# create the server
server = await asyncio.start_server(handle_chat_client, host_address, host_port)
# run the server
async with server:
# report message
print('Chat Server Running\nWaiting for chat clients...')
# accept connections
await server.serve_forever()
# dict of all current users
ALL_USERS = {}
# start the event loop
asyncio.run(main())
Next, let's explore an example of a chat client.
Example Asyncio Chat Client
We can explore how to develop an asyncio group chat client.
This is simpler than the server we developed in the previous section.
This program has 3 main parts, they are:
- A coroutine to read messages from the user and transmit to the server.
- A coroutine to read messages from the server and report to the user.
- A main coroutine to drive the connection to the server.
Let's dive in.
Read and Transmit User Messages
Firstly, we can develop a coroutine that reads messages from standard input and transmits them to the server.
Reads from standard input are blocking, therefore we must perform them in a separate thread via the asyncio.to_thread() function.
This coroutine loops until the user enters a message "QUIT", which then exits the loop.
The write_messages() coroutine below implements this.
# send message to server
async def write_messages(writer):
# read messages from the user and transmit to server
while True:
# read from stdin
message = await asyncio.to_thread(sys.stdin.readline)
# encode the string message to bytes
msg_bytes = message.encode()
# transmit the message to the server
writer.write(msg_bytes)
# wait for the buffer to be empty
await writer.drain()
# check if the user wants to quit the program
if message.strip() == 'QUIT':
# exit the loop
break
# report that the program is terminating
print('Quitting...')
Read and Report Server Messages
Next, we need to develop a coroutine that will read messages from the server and report them to the user.
This task operates in a loop, forever reading messages and printing them to standard output.
The read_messages() coroutine below implements this.
# read messages from the server
async def read_messages(reader):
# read messages from the server and print to user
while True:
# read a message from the server
result_bytes = await reader.readline()
# decode response from bytes to a string
response = result_bytes.decode()
# report the response to the user
print(response.strip())
Drive Connection With Server
Next, we need to drive the connection with the server.
This involves first opening a connection to the server on the required host and port number.
Next, we can create the read_messages() coroutine and schedule it as a background task, forever reading and reporting messages from the server.
We can then create and await the write_messages() that will read input from the user and transmit it to the server. This coroutine will return when the user decides to quit.
The read_messages() task can then be canceled and we can then close the connection to the server.
The main() coroutine below implements this.
# echo client
async def main():
# define the server details
server_address, server_port = '127.0.0.1', 8888
# report progress to the user
print(f'Connecting to {server_address}:{server_port}...')
# open a connection to the server
reader, writer = await asyncio.open_connection(server_address, server_port)
# report progress to the user
print('Connected.')
# read and report messages from the server
read_task = asyncio.create_task(read_messages(reader))
# write messages to the server
await write_messages(writer)
# cancel the read messages task
read_task.cancel()
# report progress to the user
print('Disconnecting from server...')
# close the stream writer
writer.close()
# wait for the tcp connection to close
await writer.wait_closed()
# report progress to the user
print('Done.')
Finally, we can start the event loop.
...
# run the event loop
asyncio.run(main())
Asyncio Chat Client Complete Example
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of a chat client using streams
import sys
import asyncio
# send message to server
async def write_messages(writer):
# read messages from the user and transmit to server
while True:
# read from stdin
message = await asyncio.to_thread(sys.stdin.readline)
# encode the string message to bytes
msg_bytes = message.encode()
# transmit the message to the server
writer.write(msg_bytes)
# wait for the buffer to be empty
await writer.drain()
# check if the user wants to quit the program
if message.strip() == 'QUIT':
# exit the loop
break
# report that the program is terminating
print('Quitting...')
# read messages from the server
async def read_messages(reader):
# read messages from the server and print to user
while True:
# read a message from the server
result_bytes = await reader.readline()
# decode response from bytes to a string
response = result_bytes.decode()
# report the response to the user
print(response.strip())
# echo client
async def main():
# define the server details
server_address, server_port = '127.0.0.1', 8888
# report progress to the user
print(f'Connecting to {server_address}:{server_port}...')
# open a connection to the server
reader, writer = await asyncio.open_connection(server_address, server_port)
# report progress to the user
print('Connected.')
# read and report messages from the server
read_task = asyncio.create_task(read_messages(reader))
# write messages to the server
await write_messages(writer)
# cancel the read messages task
read_task.cancel()
# report progress to the user
print('Disconnecting from server...')
# close the stream writer
writer.close()
# wait for the tcp connection to close
await writer.wait_closed()
# report progress to the user
print('Done.')
# run the event loop
asyncio.run(main())
Next, let's explore how to use our chat server and clients.
Example Chat Session
Now that we have developed our chat server and client, let's look at how we can use them with an example chat session.
Each program must be run as a separate program and the client programs require user input.
I recommend saving each program as a separate file and running them on the command line (terminal or prompt).
Firstly, we can run the server.
Save the chat server to a file like server.py and then run it on the command line to start the server.
For example:
python server.py
Next, save the chat client program in a file like client.py and then run it on the command line to start the client for example:
python client.py
The client must then enter their name and can begin chatting.
For example:
Connecting to 127.0.0.1:8888...
Connected.
Asycio Chat Server
Enter your name:
Jason
Jason has connected
Welcome Jason. Send QUIT to disconnect.
Hello there!
Jason: Hello there!
On the server, we can see the client connect and each broadcast message being sent.
Chat Server Running
Waiting for chat clients...
Client connecting...
Broadcast: Jason has connected
Broadcast: Jason: Hello there!
Next, open another window and start a second chat client.
For example:
python client.py
Enter a different name and begin chatting.
For example:
Connecting to 127.0.0.1:8888...
Connected.
Asycio Chat Server
Enter your name:
Tom
Tom has connected
Welcome Tom. Send QUIT to disconnect.
Hi there, this is very cool!
Tom: Hi there, this is very cool!
Now check the first chat client. You will see that the second client was announced and their chat messages were broadcast.
...
Tom has connected
Tom: Hi there, this is very cool!
Similarly, the server sees the second client.
...
Client connecting...
Broadcast: Tom has connected
Broadcast: Tom: Hi there, this is very cool!
Carry on and test the clients.
Each client can then terminate their session by sending the message: QUIT
For example:
...
QUIT
Quitting...
Disconnecting from server...
Done.
The first client will see the second client disconnect.
For example:
...
Tom has disconnected
The first client can then disconnect in the same manner.
For example:
...
QUIT
Quitting...
Disconnecting from server...
Done.
The server will see and report both client disconnections.
...
Broadcast: Tom has disconnected
Broadcast: Jason has disconnected
Finally, the server process can be forcefully terminated.
This highlights how we can use our group chat or chat room program by running separate server and client programs.
Extension: Concurrent Broadcasts on Server
We can explore an update to the server program that broadcasts to all clients concurrently, rather than sequentially.
This can be achieved by first defining a new coroutine that writes a byte message to a single StreamWriter.
The write_message() coroutine below implements this.
# write a message to a stream writer
async def write_message(writer, msg_bytes):
# write message to this user
writer.write(msg_bytes)
# wait for the buffer to empty
await writer.drain()
Next, the broadcast_message coroutine can be updated to call the write_message() coroutine concurrently.
This can be achieved by creating one asyncio.Task for each client in the ALL_USERS dict, then awaiting all tasks via asyncio.wait().
For example:
...
# create a task for each write to client
tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()]
# wait for all writes to complete
_ = await asyncio.wait(tasks)
The updated version of the broadcast_message coroutine with this change is listed below.
# send a message to all connected users
async def broadcast_message(message):
# report locally
print(f'Broadcast: {message.strip()}')
# convert to bytes
msg_bytes = message.encode()
# enumerate all users and broadcast the message
global ALL_USERS
# create a task for each write to client
tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()]
# wait for all writes to complete
_ = await asyncio.wait(tasks)
You can learn more about waiting for concurrent tasks in the tutorial:
Tying this together, the complete updated server program is listed below.
# SuperFastPython.com
# example of a chat server using streams
import asyncio
# write a message to a stream writer
async def write_message(writer, msg_bytes):
# write message to this user
writer.write(msg_bytes)
# wait for the buffer to empty
await writer.drain()
# send a message to all connected users
async def broadcast_message(message):
# report locally
print(f'Broadcast: {message.strip()}')
# convert to bytes
msg_bytes = message.encode()
# enumerate all users and broadcast the message
global ALL_USERS
# create a task for each write to client
tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()]
# wait for all writes to complete
_ = await asyncio.wait(tasks)
# connect a user
async def connect_user(reader, writer):
# get name message
writer.write('Asycio Chat Server\n'.encode())
writer.write('Enter your name:\n'.encode())
await writer.drain()
# ask the user for their name
name_bytes = await reader.readline()
# convert name to string
name = name_bytes.decode().strip()
# store the user details
global ALL_USERS
ALL_USERS = (reader, writer)
# announce the user
await broadcast_message(f'{name} has connected\n')
# welcome message
welcome = f'Welcome {name}. Send QUIT to disconnect.\n'
writer.write(welcome.encode())
await writer.drain()
return name
# disconnect a user
async def disconnect_user(name, writer):
# close the user's connection
writer.close()
await writer.wait_closed()
# remove from the dict of all users
global ALL_USERS
del ALL_USERS
# broadcast the user has left
await broadcast_message(f'{name} has disconnected\n')
# handle a chat client
async def handle_chat_client(reader, writer):
print('Client connecting...')
# connect the user
name = await connect_user(reader, writer)
try:
# read messages from the user
while True:
# read a line of data
line_bytes = await reader.readline()
# convert to string
line = line_bytes.decode().strip()
# check for exit
if line == 'QUIT':
break
# broadcast message
await broadcast_message(f'{name}: {line}\n')
finally:
# disconnect the user
await disconnect_user(name, writer)
# chat server
async def main():
# define the local host
host_address, host_port = '127.0.0.1', 8888
# create the server
server = await asyncio.start_server(handle_chat_client, host_address, host_port)
# run the server
async with server:
# report message
print('Chat Server Running\nWaiting for chat clients...')
# accept connections
await server.serve_forever()
# dict of all current users
ALL_USERS = {}
# start the event loop
asyncio.run(main())
Extension: Broadcast Via Queue in Server
We can explore an extension that uses a queue to dispatch broadcast messages from clients in the server that are transmitted at a later time.
This can make the server more responsive to client messages, especially if there are a large number of clients that may slow down the broadcasting.
Firstly, we can create an asyncio.Queue as a global variable that will be used for broadcast messages.
...
# queue for broadcast messages
queue = asyncio.Queue()
Next, we can create a new coroutine that loops forever and reads messages from the queue and broadcasts them to all clients.
The broadcaster() coroutine below implements this.
# read messages from the queue and transmit them to all clients
async def broadcaster():
global queue
while True:
# read a message from the queue
message = await queue.get()
# report locally
print(f'Broadcast: {message.strip()}')
# convert to bytes
msg_bytes = message.encode()
# enumerate all users and broadcast the message
global ALL_USERS
# create a task for each write to client
tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()]
# wait for all writes to complete
_ = await asyncio.wait(tasks)
Next, we can update our old broadcast_message() coroutine to simply put messages on the queue.
# send a message to all connected users
async def broadcast_message(message):
global queue
# put the message in the queue
await queue.put(message)
If you are new to putting and getting on an asyncio.Queue, see the tutorial:
Finally, we can update the main() to start the broadcaster() coroutine as a background task.
...
# start the broadcaster as a background task
broadcaster_task = asyncio.create_task(broadcaster())
The updated main() coroutine with this change is listed below.
# chat server
async def main():
# start the broadcaster as a background task
broadcaster_task = asyncio.create_task(broadcaster())
# define the local host
host_address, host_port = '127.0.0.1', 8888
# create the server
server = await asyncio.start_server(handle_chat_client, host_address, host_port)
# run the server
async with server:
# report message
print('Chat Server Running\nWaiting for chat clients...')
# accept connections
await server.serve_forever()
And that's it.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of a chat server using streams
import asyncio
# write a message to a stream writer
async def write_message(writer, msg_bytes):
# write message to this user
writer.write(msg_bytes)
# wait for the buffer to empty
await writer.drain()
# read messages from the queue and transmit them to all clients
async def broadcaster():
global queue
while True:
# read a message from the queue
message = await queue.get()
# report locally
print(f'Broadcast: {message.strip()}')
# convert to bytes
msg_bytes = message.encode()
# enumerate all users and broadcast the message
global ALL_USERS
# create a task for each write to client
tasks = [asyncio.create_task(write_message(w, msg_bytes)) for _,(_,w) in ALL_USERS.items()]
# wait for all writes to complete
_ = await asyncio.wait(tasks)
# send a message to all connected users
async def broadcast_message(message):
global queue
# put the message in the queue
await queue.put(message)
# connect a user
async def connect_user(reader, writer):
# get name message
writer.write('Asycio Chat Server\n'.encode())
writer.write('Enter your name:\n'.encode())
await writer.drain()
# ask the user for their name
name_bytes = await reader.readline()
# convert name to string
name = name_bytes.decode().strip()
# store the user details
global ALL_USERS
ALL_USERS = (reader, writer)
# announce the user
await broadcast_message(f'{name} has connected\n')
# welcome message
welcome = f'Welcome {name}. Send QUIT to disconnect.\n'
writer.write(welcome.encode())
await writer.drain()
return name
# disconnect a user
async def disconnect_user(name, writer):
# close the user's connection
writer.close()
await writer.wait_closed()
# remove from the dict of all users
global ALL_USERS
del ALL_USERS
# broadcast the user has left
await broadcast_message(f'{name} has disconnected\n')
# handle a chat client
async def handle_chat_client(reader, writer):
print('Client connecting...')
# connect the user
name = await connect_user(reader, writer)
try:
# read messages from the user
while True:
# read a line of data
line_bytes = await reader.readline()
# convert to string
line = line_bytes.decode().strip()
# check for exit
if line == 'QUIT':
break
# broadcast message
await broadcast_message(f'{name}: {line}\n')
finally:
# disconnect the user
await disconnect_user(name, writer)
# chat server
async def main():
# start the broadcaster as a background task
broadcaster_task = asyncio.create_task(broadcaster())
# define the local host
host_address, host_port = '127.0.0.1', 8888
# create the server
server = await asyncio.start_server(handle_chat_client, host_address, host_port)
# run the server
async with server:
# report message
print('Chat Server Running\nWaiting for chat clients...')
# accept connections
await server.serve_forever()
# dict of all current users
ALL_USERS = {}
# queue for broadcast messages
queue = asyncio.Queue()
# start the event loop
asyncio.run(main())
Extensions
This section lists some extensions you may wish to explore.
- Use Queue in the Server. Each client could put messages from clients on a queue. A broadcast coroutine could loop forever and read messages from the queue and broadcast them. This separation of concerns may be more efficient (implemented, see above).
- Server Broadcast Concurrently. The broadcasts triggered by each client on the server are executed sequentially in a loop, updating the loop so that all broadcasts are performed concurrently (implemented, see above).
- Add Coroutine Safety. Protect all changes to the ALL_USERS from race conditions via concurrent updates using a mutex lock.
- Add Error Handling. Check for and handle common errors, such as server already running, unable to connect to server, client failing to connect properly, client disconnected unexpectedly, etc.
If you explore any of these extensions, share your code and findings in the comments below.
I'd love to see what you come up with.
Takeaways
You now know how to develop a group chat room using asyncio in Python.
If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.