An error came up during a competition with my game. One of the 80 players got stuck. Like really stuck: a breaking defect! The error should not have happened, could not have happened, yet it did. The only possibility was the Web Socket stack I use. Turns out that layer didn’t work as intended, and there was no way to fix it. Alas, I sought an alternate solution.
Here I describe what I came up with: a new, more focused stack, using Python websockets. Lots of coroutines, asyncio, and queues. I have the complete working example at the end of this article. This is likely the last state where it can standalone; I’ll be changing it to fit even tighter with my game code.
Logical Structure and the Problem
My game is a multiplayer puzzle game coordinated by messages. I segregate each instance of the game from the others. For messages, I do this with the classic “rooms” concept. It’s the name that Flask-SocketIO uses as well, and that’s what my first implementation used.
Mostly, messages in the game don’t need a defined total order. They can arrive in a different order to the different clients. There are a few situations where this isn’t true however, places where I need some messages to have a defined total order. There aren’t many of them, and that’s likely why I didn’t notice the defect earlier.
When I first started the project I asked whether the library, if used on a single client system, echoing in order, could maintain an order to the clients. The answer was yes, but the truth is no. Given the load is mostly network bound, it usually maintains an order. Only in a few stress times, or because of dumb luck, it sends messages out-of-order.
Fine, I can probably put a queue around it. Ugh, the throughput drops to an abysmal 70msgs/s. Without the queue it was already a slow 1200msg/s, but that was enough for my game. After a bit of back-and-forth, me and the library author disagree on what is acceptable throughput.
So I grabbed the websockets library instead, whipped together a proof of concept, and got 12,000msgs /s. Yeah, that’s more like I’d expect.
Actually, I’d expect even more. And long term, if I get enough traffic, I’ll rewrite this in C++. The throughput should be entirely network bound, but it’s still CPU bound on the server. I’ve done a lot of low-level networking before to know I can push it higher, but for my needs now, 12K/s is way more than enough. I’d likely scale the number of servers before worrying about optimizing one of them.
On to the code!
A Python websockets Messaging server
The “websockets” module is a minimal implementation of WebSockets. That sounded like what I wanted. I didn’t want to go to the low-level of handling the protocol. This left me writing all my high-level logic, in particular the client rooms.
The library is easy to use. I got a basic example working with little effort. Of course, then there are lots of details to cover. Here’s a quick list of things I needed to support, features first:
-Handle an incoming client where “handle” is mainly done by the library
-Allow a client to join a game room (each client can only join one room in my system, simplifying the code)
-Allow another client to join the same game room
-Allow other clients to join other rooms
-Allow a client to send a message
-Provide a total order to the message, with a message id
-Dispatch that message to all clients in the room
In my final code I’ll persist the messages to a Redis store, then eventually to a MongoDB. That is not part of my example code.
And there are several situations, or errors, that I’d have to deal with.
-A client disconnects cleanly or abruptly
-The client sends crap
-The client is slow
-Cleanup a room if there are no more clients in it
Structure
My server maintains a list of clients in a list of rooms:
@dataclass
class Client:
socket: Any # What type? id: int
disconnected: bool = False
@dataclass
class Room:
key: str
clients: Dict[int,Client] = field(default_factory=dict)
new_clients: List[Client] = field(default_factory=list)
msg_id: int = 0
event_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
listening: bool = False
future: Any = None # What Type?
Enter fullscreen mode Exit fullscreen mode
I use type annotations for my Python, along with MyPy to check the types. Alas, for several library classes I’m unsure of types. Since many of them are created automatically, or are returned from other functions, it’s difficult to determine the type. I will eventually find out all the types.
In these data types, the socket
is the only part directly connected to the “websockets” module. It tracks the incoming connection and used to send and receive data.
In brief, the listen_room
function handles the incoming client connections. I push all messages onto the event_queue
of the Room
. The listen_room
function listens to this queue and sends messages to all clients in the room.
One listener per room
I initially had a single listening queue that handled all the rooms. When I eventually write the lower-level server, like in C++, I’d keep this structure. When you get low-enough level, you can control a lot more details, removing the need for coroutines entirely.
But in Python there are a few reasons I’m using one listener per room:
–not overhead
-Redis
-bad clients
Overhead is all my Python code, and the library code, surrounding the writing to the clients. It’s not a lot, but it can add up with a lot of activity. I suspect the JSON parsing and formatting is the biggest part of it. But this is not a reason I have one listener per room. Since the Python code is running as a single real thread, it is irrelevant whether this code happens in one listener, or many listeners. It’s all unavoidable computational load.
The first real reason, Redis, is the well-behaved motivator. For each outgoing message I have to create a unique message id. In the sample code I track this in Python, in the Room
class. On my final server, I’ll track this in a Redis integer key. Additionally, I’ll store all messages in a Redis list. A separate process will clear this regularly and persist the messages to a MongoDB. The calls to Redis take time, time that the server could instead process messages for other rooms. Thus I want to segregate the rooms. While one room waits on Redis, the others can continue processing.
The second reason, bad clients, is an unfortunate need. It’s possible that a client gets disconnected, or fails to process messages quickly enough. For the most part, this is handled by buffers. The calls to socket.send
are effectively asynchronous, at least until the queue fills up. When that happens, send
will wait until there is space in the queue. While waiting all the other rooms will stall, being unable to send any messages. By having one queue per room, I limit the damage of a client to that room only.
This won’t likely happen. First off, the websockets library has a timeout feature. Unresponsive clients will be disconnected long-before the outgoing socket buffers get filled up. My game simply doesn’t generate enough messages to ever fill the buffers. Extrapolating from my stress test, with an estimated average message size, there is room for 25K game messages in the standard buffers. And a typical run-through of my game, with a team, generates only 3 to 4 thousand messages.
In any case, it’s good protection to have.
clients
, new_clients
and memory
One advantage of having a single real thread is not needing to worry about actual data races. They simply don’t happen as they would in a multi-threaded application. Yay! No memory corruption is possible.
It doesn’t mean that race conditions, something different, don’t happen. The logical concerns of concurrency still exist, though to a lesser degree. Thanks cooperative threading! The most significant concern in my code is with the clients
object. The queue listener iterates over the clients. If the list is modified during the iteration, Python will throw a concurrent modification exception. That is a strict no-no, as the iterator has no idea what it should do.
There are three cases where the list needs to be modified:
-when a client disconnects in listen_socket
-when a client disconnects in listen_room
-when a new client joins the room
At first I handled disconnect in the listen_socket
function, but through testing noticed it can be a socket.send()
call that detects the disconnect first. Thus the disconnect happens in multiple places. In both cases, I merely mark the client as disconnected
in the Client
structure. The listen_room
skips disconnected clients while sending messages. It’ll track them and safely remove them from the room after the iteration loop.
When a new client joins the room, listen_socket
adds it to the new_clients
list. listen_room
will then add new clients prior to each message loop. It does this just after retrieving a message to ensure that all new clients get the message. This means that the room messages can arrive at a client prior to the “joined” response from joining the room. In my game, getting this ordering, along with sending old messages, is important for clients getting a consistent game state. I’ll likely have to adjust this code a bit.
At no point does listen_socket
know if it’s safe to work with clients
, since it can’t tell if listen_room
is inside or outside of the loop. A lock isn’t a bad idea, but it introduces an avoidable delay on the incoming listening side, and delays in the room listener. Why lock when I don’t have to?
In retrospect, it might be a disadvantage that much of the coroutine parallelism is implicit, especially if using something like eventlets. As a programmer, it’s less apparent where the logical thread switching happens. You just have to know that every await operation, every asyncio call, and every websocket call is a potential location for a thread switch. It’d be nice to say you should assume a switch is possible at any time, but then I couldn’t rely on it not switching in some places and would require a bunch of locks.
Use locks if you aren’t sure. Performance is irrelevant if your game breaks. grumble
Stats and throughput
I added a simple Stats
class to track throughput on the server. It emits timings for all the incoming and outgoing messages per room. The 12K/s is what happens if I have multiple clients connected to unique rooms. My machine hits that 12K limit with the server process pegged at 100% CPU use.
Unfortunately, I must adjust my number down to 10K. Once I moved the rooms to individual listeners, I hit far more overhead. I’m not entirely sure why — I can’t imagine it’s the extra number of coroutines. Likely there are some tweaks in the async stuff to improve it, but it’s still fast enough that I’m not concerned.
As a curiosity, I measured a single client connected to the server. It’s getting slightly over 5Kmsgs/s. Since this is a client and server, I have two processes. They are both at 58% CPU use. Ideally they should be at 50% CPU use, since they send a message back and forth. That extra 8% is processing spent doing stuff other than handling the message. Perhaps if I wrote the system in C++ it’d get closer to 50%, but never reach there completely. The throughput however should go up.
When I say C++ would be faster, it’s because of my experience. I have better control of what happens and know how to use that control. It’s easy to get it wrong and up with a steaming pile that is worse than the clean Python version. Server code is hard!
The stats don’t directly measure response time. But knowing the ping pong nature of the tests, I can calculate roughly what that’d be. At fractional milliseconds per message, it’ll be noise compared to the true network overhead when deployed.
This statistics I calculate here aren’t great. If I were trying to write a truly high-performance server, I’d track averages, standard deviations, extremes, and record it all better. The numbers it’s showing are so over-powered for my game though that there’s no need for more.
Next steps
Now I need to get this integrated with my existing server. I think I lose the ability to use a single port and single Python instance. That’s not a big loss, it’s something I was intending on doing at some point, anyway. The game server shouldn’t be the same as the web server. This is both for performance and stability. Eventually, should I have enough load, I must run multiple game servers (multiple servers handling web socket connections). I have a plan to scale that direction, but it’ll be a long time before I get there.
Code
Below is the code for the server, the Python client, and a sample client for the browser. While the details may change slightly, the structure will stay close to this. Given the size of the code, it’s better not to spin this into a library. I’ll directly it adapt it for my game server.
This code is fairly stable and should work as a starting point for your own needs. Though of course, I can’t make any promises of that. I’ll likely discover things in my application that require fixes.
ws_server.py
from typing import *
from dataclasses import dataclass, field
import asyncio, websockets, json, time
from collections import defaultdict
#sys.path.append('../server') #from escape.live_game_state import LiveGameState
def encode_msg(msg: Dict) -> str:
return json.dumps(msg, ensure_ascii=False)
def decode_msg(text: str) -> Dict:
return json.loads(text)
@dataclass
class Client:
socket: Any # What type? id: int
disconnected: bool = False
@dataclass
class Room:
key: str
clients: Dict[int,Client] = field(default_factory=dict)
new_clients: List[Client] = field(default_factory=list)
msg_id: int = 0
event_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
listening: bool = False
future: Any = None # What Type?
def client_count(self) -> int:
return len([c.id for c in self.clients.values() if not c.disconnected])
client_id_count = 0
rooms: Dict[str, Room] = {}
# Used to get a basic idea of throughput class Stats:
def __init__(self, name):
self._name = name
self._count = 0
self._time = time.monotonic()
def incr(self, amount = 1):
self._count += amount
if self._count > 5000:
end_time = time.monotonic()
print( f'{self._name} {self._count / (end_time-self._time)}/s' )
self._count = 0
self._time = end_time
async def listen_room(room):
if room.listening:
raise Exception(f'Already listening to {room.key}')
room.listening = True
print(f'Listen Room {room.key}')
stats = Stats(f'Outgoing {room.key}')
while True:
qevent = await room.event_queue.get()
if qevent == None:
break
# Add any new clients that have shown up, this handler must control this to avoid it # happening inside the loop below if len(room.new_clients) > 0:
for client in room.new_clients:
room.clients[client.id] = client
room.new_clients = []
# In my game I'll track IDs in Redis, to survie unexpected failures. # The messages will also be pushed there, to be picked up by another process for DB storage room.msg_id += 1
qevent['msg_id'] = room.msg_id
count = 0
disconnected: List[int] = []
for client in room.clients.values():
if client.disconnected:
disconnected.append(client.id)
continue
count += 1
# There's likely some asyncio technique to do this in parallel try:
await client.socket.send(encode_msg(qevent))
except websockets.ConnectionClosed:
print("Lost client in send")
client.disconnected = True
# Hoping incoming will detect disconnected as well
stats.incr(count)
# Remove clients that aren't there anymore. I don't really need this in my game, but it's # good to not let long-lived rooms build-up cruft. for d in disconnected:
# Check again since they may have reconnected in other loop if room.clients[d]:
del room.clients[d]
print(f'Unlisten Room {room.key}')
room.listening = False
async def listen_socket(websocket, path):
global rooms, client_id_count
print("connect", path)
client_id_count += 1
room: Optional[Room] = None
client = Client(id=client_id_count, socket=websocket)
stats = Stats('Incoming')
try:
async for message_raw in websocket:
message = decode_msg(message_raw)
if message['type'] == 'join':
# Get/create room room_key = message['room']
if not room_key in rooms:
room = Room(key=room_key)
rooms[room_key] = room
room.future = asyncio.ensure_future(listen_room(room))
else:
room = rooms[room_key]
# Add client to the room room.new_clients.append(client)
# Tell the client which id they are. await websocket.send(encode_msg({
'type': 'joined',
'client_id': client.id
}))
elif room:
# Identify message and pass it off to the room queue message['client_id'] = client.id
await room.event_queue.put(message)
else:
# Behave as trival echo server if not in room (will be removed in my final version) await websocket.send(encode_msg(message))
stats.incr()
except websockets.ConnectionClosed:
pass
except Exception as e:
# In case something else happens we want to ditch this client. This won't come from # websockets, but likely the code above, like having a broken JSON message print(e)
pass
# Only mark disconnected for queue loop on clients isn't broken client.disconnected = True
if room is not None:
# Though if zero we can kill the listener and clean up fully if room.client_count() == 0:
await room.event_queue.put(None)
del rooms[room.key]
await room.future
print(f"Cleaned Room {room.key}")
print("disconnect", rooms)
def main() -> None:
start_server = websockets.serve(listen_socket, "localhost", 8765, ping_interval=5, ping_timeout=5)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
main()
Enter fullscreen mode Exit fullscreen mode
ws_client.py
A simple client that validates the correct ordering of messages. Provide a room on the command line.
There’s an option to slow this client which forces the server to disconnect it when the buffers fill up.
You’ll note my client test code is rougher than my server code, and lacking many type definitions. This code will not be used long-term, but the server code will be.
from typing import *
import asyncio, json, websockets, time, sys
if len(sys.argv) < 2:
print(f"Sytnax {sys.argv[0]} room (delay)" )
sys.exit(-1)
room = sys.argv[1]
# A non-zero slow creates a client that can't keep up. If there are other clients in the room # it will end up breaking, causing the server to disconnect it. slow = 0.0
if len(sys.argv) > 2:
slow = float(sys.argv[2])
def encode_msg(msg: Dict) -> str:
return json.dumps(msg, ensure_ascii=False)
def decode_msg(text: str) -> Dict:
return json.loads(text)
# An even simpler stats tracker than the server trigger_count = 5000.0
if slow > 0:
trigger_count /= (1+slow) * 100
async def reader(websocket):
count = 0
seq = 0
last_time = time.monotonic()
client_id = None
last_msg_id = None
async for message_raw in websocket:
count += 1
msg = decode_msg(message_raw)
if msg['type'] == 'joined':
client_id = msg['client_id']
else:
# Ensure the messages have a single total order msg_id = msg['msg_id']
if last_msg_id is None:
last_msg_id == msg_id
else:
if msg_id != (last_msg_id+1):
print(last_msg_id, msg_id)
raise Exception("bad msg sequence")
if msg['type'] == 'ping' and client_id == msg['client_id']:
# Ensure our own measures retain the order we sent them if msg['seq'] != seq:
print(seq, message_raw)
raise Exception("bad message seq")
# Track rough throughput if count >= trigger_count:
next_time = time.monotonic()
print( f'{count /(next_time - last_time)}/s {room}' )
last_time = time.monotonic()
count = 0
if client_id == msg['client_id']:
seq += 1
await websocket.send(encode_msg({'type': 'ping', 'seq': seq }))
if slow > 0:
await asyncio.sleep(slow)
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
print("Connect")
await websocket.send( encode_msg({ 'type': 'join', 'room': room }) )
consumer_task = asyncio.ensure_future(
reader(websocket))
done = await asyncio.wait(
[consumer_task],
return_when=asyncio.FIRST_COMPLETED,
)
asyncio.get_event_loop().run_until_complete(hello())
Enter fullscreen mode Exit fullscreen mode
ws_client.html
A simple web browser client, to prove that it’s working where I need it.
<html>
<script>
function loaded() {
console.log("Loaded")
const socket = new WebSocket('ws://localhost:8765');
socket.addEventListener('open', function (event) {
socket.send(encode_msg({
type: 'join',
room: 'js-room',
}));
console.log("Opened")
});
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});
window.addEventListener('beforeunload', () => {
console.log("UNLOAD")
socket.close()
})
console.log("LOADED")
let count = 0
setInterval(() => {
count = count+1
socket.send(encode_msg({
type: 'ping',
seq: count,
}))
}, 500)
}
function encode_msg(msg) {
return JSON.stringify(msg)
}
function decode_msg(text) {
return JSON.parse(text)
}
</script>
<body onload="loaded()">
<p>Text</p>
</body>
</html>
Enter fullscreen mode Exit fullscreen mode
原文链接:High-Throughput Game Message Server with Python websockets
暂无评论内容