본문 바로가기

Python/FastAPI

[FastAPI] 웹소켓을 다룰 때 주의점 - 공식 예제 개선하기

728x90

이 포스트에서는 FastAPI 공식 문서에서 보여주는 웹소켓 연결 예제를 보고 이를 개선하도록 한다.

 

이 세상 어딘가에서는 분명 FastAPI로 여러 클라이언트로부터 웹소켓 연결을 하려는 경우가 있을 것이다.

 

FastAPI 공식 문서에서는 웹소켓 연결 방법에 대해 이런 예제가 우릴 반겨준다.

 

 

그리고 여러 웹소켓 연결을 관리하기 위해서 아래와 같이 웹소켓 연결을 모아두는 ConnectionManager를 정의하고 있다.

 

 

하지만 이 코드에는 여러 문제가 있다. 문제가 있는 코드를 공식 문서에 걸어놓다니 참 야속할 따름이다. 어떤 문제가 있는지 보고 개선해 보도록 하자.

Race Condition이 발생함

내가 생각하는 ConnectionManager의 가장 큰 문제점이다.

 

클래스에서는 active_connections로 웹소켓 연결을 관리하지만 connect()나 disconnect()에서 active_connections를 조작할 때 아무런 mutual exclusion이 없다.

 

이런 경우 동시에 다수 연결이 발생할 때 몇몇 웹소켓은 리스트에 들어가지 않아 메시지를 보내는 게 불가능한 상황이 발생한다. (실제로 겪었다.)

 

따라서 active_connections에 접근하기 전에 lock을 걸도록 해야 한다. FastAPI가 ASGI인 uvicorn을 사용하기에 비동기 lock을 걸어야 한다. 다행히 파이썬에서는 asynio 모듈에서 사용할 수 있는 lock이 있으므로 이걸로 임계 영역을 감싸면 된다.

 

import asyncio

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []
        self.lock = asyncio.Lock()

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        async with self.lock:
            self.active_connections.append(websocket)

    async def disconnect(self, websocket: WebSocket):
        async with self.lock:
            self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        async with self.lock:
            for connection in self.active_connections[:]:
                await connection.send_text(message)

 

이러면 여러 연결이 동시에 발생해도 레이스 컨디션이 발생하지 않기 때문에 괜찮다.

Disconnect 할 때 대상 웹소켓이 없을 수 있음

disconnect()를 호출할 때 여러 가지 이유로 대상 웹소켓이 active_connections에 없는 경우가 있어 이를 확인하는 것이 좋다.

 

이 포스트에서는 간단히 in 연산자를 사용해서 처리한다.

async def disconnect(self, websocket: WebSocket):
    async with self.lock:
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)

메시지를 보낼 때 웹소켓이 없어지거나 연결이 해제됨

위와 비슷한 경우로 메시지를 보내는 도중 대상 웹소켓이 잘못되는 경우이다.

 

try - exception 문을 넣어 웹소켓에 문제가 생겼을 때의 처리를 추가하자.

async def broadcast(self, message: str):
    async with self.lock:
        for connection in self.active_connections[:]:
            try:
                await connection.send_text(message)
            except Exception:
                self.active_connections.remove(connection)

충분히 많은 웹소켓에게 동시에 메시지를 발생할 때 비동기적으로 보내지 않음

예제 코드의 broadcast()는 for 문을 돌면서 메시지를 보내주는데 연결된 웹소켓의 수가 많으면 전송을 기다리느라 느려질 수 있다.

 

이를 원치 않는다면 각 전송을 태스크로 만들어야 한다.

async def broadcast(self, message: str):
    failed_socket = []
    tasks = []
    for connection in self.active_connections:
        tasks.append(connection.send_text(message))
    results = await asyncio.gather(*tasks, return_exceptions=True)

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            failed_socket.append(self.active_connections[i])

    for connection in failed_socket:
        self.active_connections.remove(connection)

 

추가로 예외가 발생 여부를 확인하는 코드도 있다.

 

이렇게 FastAPI 공식 문서에 있는 웹소켓 예제에 있는 문제점을 살펴보고 이를 해결해 보았다. 공식 문서에서 이런 코드를 예제로 걸어놓다니 너무 아쉬워서 개선을 해줬으면 좋겠다.

728x90