본문 바로가기

Python/FastAPI

간단한 웹소켓 부하 테스트 비교 FastAPI vs Golang

728x90

서론

현재 근무 중인 회사에서 수행하는 프로젝트로 LAN을 구축한 후 셀러론 N4505 CPU로 구동되는 서버로부터 클라이언트와 웹소켓 통신을 하는 서비스가 있다.

 

이 서비스의 가장 중요한 목표는 서버에서 브로드캐스트를 진행할 때 모든 클라이언트가 싱크를 맞춰 "동시에" 반응해야 한다는 것이다.

 

이를 위해 FastAPI를 사용해 웹소켓 연결을 관리했었는데 문제는 필드 테스트에서 싱크가 깨지는 것으로 체감이 되었기 때문이다.

 

환경이 제한적이라 현장에서는 모니터링을 제대로 할 수 없었는데, 혹시 파이썬 자체의 한계일까 싶어서 go와 비교를 하기로 했다.

스크립트 준비

웹서버 부하를 위해선 많은 도구들이 제공되고 있지만 이 포스트에서는 도구 없이 스크립팅으로 측정을 해볼 것이다.

FastAPI 코드

먼저 이 포스트의 작성 원인을 제공해준 FastAPI를 테스트하기 위해 아래와 같은 코드를 짰다.

 

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
import time


app = FastAPI()


class con_manager:
    def __init__(self):
        self.connections = []
        self.lock = asyncio.Lock()

    async def add_connection(self, websocket: WebSocket):
        await websocket.accept()
        async with self.lock:
            self.connections.append(websocket)

    async def remove_connection(self, websocket: WebSocket):
        async with self.lock:
            if websocket in self.connections:
                self.connections.remove(websocket)

    async def broadcast(self, msg: str):
        start_time = time.perf_counter()
        send_times = await asyncio.gather(
            *[self.send_msg(client, msg) for client in self.connections],
            return_exceptions=True
        )
        end_time = time.perf_counter()

        if send_times:
            print(f"Fastest: {min(send_times):.6f}s, Slowest: {max(send_times):.6f}s")
        print(f"Total Broadcast Time: {end_time - start_time:.6f}s")

    async def send_msg(self, websocket: WebSocket, msg: str):
        start_time = time.perf_counter()
        try:
            await websocket.send_text(msg)
            return time.perf_counter() - start_time
        except Exception as e:
            print(f'Unexpected Exception: {e}')
            self.remove_connection(websocket)
            return None


mgr = con_manager()


@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
    await mgr.add_connection(websocket)
    while True:
        try:
            data = await websocket.receive_text()
        except WebSocketDisconnect:
            await mgr.remove_connection(websocket)
        except:
            await mgr.remove_connection(websocket)


@app.get('/start_test')
async def stress_test():
    msg = """Lorem ipsum dolor sit amet,
    consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
    Ut enim ad minim veniam,
    quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
    Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore
    eu fugiat nulla pariatur.
    Excepteur sint occaecat cupidatat non proident,
    sunt in culpa qui officia deserunt mollit anim id est laborum."""

    await mgr.broadcast(msg)


@app.get('/test')
def test():
    return 'Hello World!'
    

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=9999)

 

웹소켓으로 로렘 입숨을 보낸다. 이정도면 실제 프로젝트에서 보내는 메시지 길이와 비슷하다.

Golang 코드 - gorilla/websocket

Golang에서는 gorilla/websocket으로 웹소켓 통신을 구현하도록 한다.

 

package main

import (
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/mux"
	"github.com/gorilla/websocket"
)

type ConnectionManager struct {
	connections []*websocket.Conn
	lock        sync.Mutex
}

func (mgr *ConnectionManager) AddConnection(conn *websocket.Conn) {
	mgr.lock.Lock()
	defer mgr.lock.Unlock()
	mgr.connections = append(mgr.connections, conn)
}

func (mgr *ConnectionManager) RemoveConnection(conn *websocket.Conn) {
	mgr.lock.Lock()
	defer mgr.lock.Unlock()
	for i, c := range mgr.connections {
		if c == conn {
			mgr.connections = append(mgr.connections[:i], mgr.connections[i+1:]...)
			break
		}
	}
}

func (mgr *ConnectionManager) Broadcast(message string) {
	mgr.lock.Lock()
	defer mgr.lock.Unlock()

	startTime := time.Now()
	var wg sync.WaitGroup
	responseTimes := make([]float64, 0)

	for _, conn := range mgr.connections {
		wg.Add(1)
		go func(c *websocket.Conn) {
			defer wg.Done()
			start := time.Now()
			err := c.WriteMessage(websocket.TextMessage, []byte(message))
			if err != nil {
				log.Println("Error sending message:", err)
				mgr.RemoveConnection(c)
				return
			}
			duration := time.Since(start).Seconds()
			responseTimes = append(responseTimes, duration)
		}(conn)
	}

	wg.Wait()
	totalTime := time.Since(startTime).Seconds()

	// Analyze response times
	if len(responseTimes) > 0 {
		minTime, maxTime := responseTimes[0], responseTimes[0]
		for _, t := range responseTimes {
			if t < minTime {
				minTime = t
			}
			if t > maxTime {
				maxTime = t
			}
		}
		fmt.Printf("Fastest: %.6fs, Slowest: %.6fs\n", minTime, maxTime)
	} else {
		fmt.Println("No valid response times recorded.")
	}
	fmt.Printf("Total Broadcast Time: %.6fs\n", totalTime)
}

var mgr = &ConnectionManager{}

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

func websocketHandler(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("Error upgrading connection:", err)
		return
	}
	defer conn.Close()

	mgr.AddConnection(conn)
	defer mgr.RemoveConnection(conn)

	for {
		_, _, err := conn.ReadMessage()
		if err != nil {
			break
		}
	}
}

func startTestHandler(w http.ResponseWriter, r *http.Request) {
	message := `Lorem ipsum dolor sit amet,
    consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
    Ut enim ad minim veniam,
    quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
    Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore
    eu fugiat nulla pariatur.
    Excepteur sint occaecat cupidatat non proident,
    sunt in culpa qui officia deserunt mollit anim id est laborum.`

	mgr.Broadcast(message)
	fmt.Fprintln(w, "Broadcast test completed.")
}

func testHandler(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintln(w, "Hello World!")
}

func main() {
	r := mux.NewRouter()

	r.HandleFunc("/ws", websocketHandler)
	r.HandleFunc("/start_test", startTestHandler)
	r.HandleFunc("/test", testHandler)

	log.Println("Starting server on :9999")
	log.Fatal(http.ListenAndServe(":9999", r))
}

부하 측정 스크립트 준비

다수의 웹소켓 클라이언트를 연결하고 메시지를 받는 코드를 파이썬으로 구현한다.

 

import websockets
import requests
import time


class WebSocketClient:
    def __init__(self, uri):
        self.uri = uri
        self.messages_received = 0
        self.response_times = []

    async def connect(self):
        self.connection = await websockets.connect(self.uri)

    async def listen(self):
        try:
            while True:
                start_time = time.time()
                message = await self.connection.recv()  # 메시지 수신
                end_time = time.time()
                self.messages_received += 1
                self.response_times.append((end_time - start_time) * 1000)  # ms 단위 저장
        except websockets.exceptions.ConnectionClosed:
            print("WebSocket connection closed.")
        except Exception as e:
            print(f"Error while receiving message: {e}")

    async def close(self):
        await self.connection.close()


# 전체 WebSocket 및 HTTP 테스트 관리
async def websocket_stress_test(uri, client_count, http_url):
    clients = []

    # 1. WebSocket 클라이언트 연결 생성
    print(f"Connecting {client_count} WebSocket clients...")
    for _ in range(client_count):
        client = WebSocketClient(uri)
        await client.connect()
        clients.append(client)

    # 각 클라이언트의 메시지 수신 태스크 생성
    listen_tasks = [asyncio.create_task(client.listen()) for client in clients]

    # 2. HTTP GET 요청을 트리거하여 WebSocket 브로드캐스트 시작
    print(f"Triggering broadcast via HTTP GET: {http_url}")
    start_broadcast_time = time.time()
    response = requests.get(http_url)
    end_broadcast_time = time.time()

    if response.status_code == 200:
        print(f"Broadcast triggered successfully. HTTP Response time: {(end_broadcast_time - start_broadcast_time) * 1000:.2f} ms")
    else:
        print(f"Failed to trigger broadcast. HTTP Status code: {response.status_code}")
        return

    # 3. 5초 동안 수신 대기
    print("Listening for 5 seconds to collect messages...")
    await asyncio.sleep(5)

    # 4. WebSocket 클라이언트 정리
    for client in clients:
        await client.close()

    # 5. 결과 정리
    all_response_times = [time for client in clients for time in client.response_times]
    if all_response_times:
        print(f"Messages received: {len(all_response_times)}")
        print(f"Fastest response: {min(all_response_times):.2f} ms")
        print(f"Slowest response: {max(all_response_times):.2f} ms")
        print(f"Average response: {sum(all_response_times) / len(all_response_times):.2f} ms")
    else:
        print("No messages received!")

# 메인 실행
if __name__ == "__main__":
    websocket_uri = "ws://[address]/ws"  # WebSocket 엔드포인트
    http_trigger_url = "http://[address]/start_test"  # HTTP GET 트리거 엔드포인트
    num_clients = 5  # WebSocket 클라이언트 수

    asyncio.run(websocket_stress_test(websocket_uri, num_clients, http_trigger_url))

 

서버를 구동한 후 부하 측정 스크립트를 실행하여 시간을 측정하자.

서버 준비

원래는 테스트할 서버로 GCP e2 micro를 사용하려 했으나 테스트 당시 연결에 지속적인 문제가 발생하여 구름의 Micro 스펙으로 구동했다.

 

 

저사양 서버를 테스트하는 목적에서 부합하는 사양이 될 것이다.

측정

각각 웹소켓 클라이언트가 5, 1000, 10000일 때의 처리 시간을 측정했다.

Golang - 5

Fastest: 0.000010s, Slowest: 0.000064s
Total Broadcast Time: 0.000147s

Golang - 1000

Fastest: 0.000011s, Slowest: 0.001807s
Total Broadcast Time: 0.004888s

Golang - 10000

Fastest:0.000007s, Slowest: 0.182480s
Total Broadcast Time: 0.387423s

FastAPI - 5

Fastest:0.000030s, Slowest: 0.000089s
Total Broadcast Time: 0.000340s

FastAPI - 1000

Fastest:0.000033s, Slowest: 0.000108s
Total Broadcast Time: 0.050357s

FastAPI - 10000

설정 제한으로 인해 구동하지 못함

 

각각의 측정에서는 FastAPI의 slowest가 Golang보다 우수했지만 최종 결과인 Total Broadcast Time에서는 golang이 아주 우수한 퍼포먼스를 보여주었다.

 

그리고 연결된 웹소켓의 객체가 많을수록 그 차이는 더 확실하게 나타난다.

 

적은 수의 연결에서는 차이가 크지 않지만 무엇보다도 속도가 중요시해지는 환경에서는 프레임워크 사용에 신중해야 함을 알 수 있다.

728x90