Search

Python socketserver ThreadingTCPServer CPU 이슈

카테고리
Programming
태그
Python
AWS
게시일
2024/02/27
수정일
2024/03/05 08:44
시리즈
1 more property

1. Intro

Python을 이용하여 간단하게 TCP 서버를 개발하고, 약 3주 정도 시간을 가지고 stage 환경에서 테스트를 진행하였습니다. 그러던 중 instance의 사용률을 확인하던 도중 무서운 CPU 사용률 그래프를 확인하였습니다.
어느순간부터 평균 90%가 넘는 CPU 사용률을 보이고 있는 그래프 때문에 T3ACPUCredits를 어마어마하게 사용하여 예상보다 많은 비용을 지출하게 됐습니다. 슬픕니다.

CPU Burst

이슈를 발견하고 바로 어떤 상황인지 확인해보았,고 Docker 내의 Python script로 Thread로 인해 CPU 점유율은 그래프처럼 90%를 넘어선 상태였습니다. ThreadingTCPServer를 사용하여 생성된 Thread들은 꺼지지 않고 살아있었습니다. 이번 포스트는 해당 이슈를 해결하는 과정에서 리서치한 내용을 작성해보고자 합니다.

2. Server

실제 서비스를 위한 코드를 올릴 수 없기에 비슷하게 TCP 서버 코드를 작성하여 첨부하였습니다. 관련하여 TCP Server / Client 개발 내용은 링크를 참고하시면 좋습니다.

Server Spec

Ubuntu 22.04 x64 (intel) - AWS EC2 t3a.small
Python 3.11
poetry

Code

아래의 코드는 TCPServerHanlder의 handle() 함수에 처리로직이 들어가 있습니다. while True 반복문이 들어간 이유는 요구사항을 설명드려야 할 것 같습니다.
Embedded 장비 두 개가 2초 간격으로 지속적으로 데이터를 서버로 보냄
두 개의 장비는 특정 시간마다 통신이 연결(TCP-3Way Handshake)되었다가 끊어지고를 반복함
1.
특정 시간에 TCP 연결을 시작함
2.
데이터를 2초 간격으로 지속적으로 보냄
3.
연결 시간이 초과하면(장비를 끄면) 연결이 해제됨
4.
위의 1 ~ 3을 반복함
서버는 종료되면 안 됨
# server.py import os import threading import socketserver from datetime import datetime class TCPServerHandler(socketserver.BaseRequestHandler): def handle(self): while True: buff_size = 2048 try: raw_payload = self.request.recv(buff_size) payload = raw_payload.strip() except ConnectionResetError as e: print(e) break try: if payload != b'': # Do something. self.logging(payload) except ValueError as e: print(e) def logging(self, payload): # self.client_address : (BIND_IP, BIND_PORT) print(f"[Handle] PID : {os.getpid()}, THREAD : {threading.current_thread().native_id}") print(f"[{str(datetime.now())}] {self.client_address} - {payload.decode()}") if __name__ == "__main__": _server_ip = "0.0.0.0" _server_port = 12321 print(f"[ Main ] PID : {os.getpid()}, THREAD : {threading.current_thread().native_id}") server = socketserver.ThreadingTCPServer((_server_ip, _server_port), TCPServerHandler) server.serve_forever()
Python
복사

3. Client

클라이언트는 테스트를 위해 예시로 작성한 코드를 첨부하였습니다.

Code

# client.py import socket HOST, PORT = "127.0.0.1", 12321 data = [f"DATA{index:02d}" for index in range(10)] for d in data: # Create a socket (SOCK_STREAM means a TCP socket) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: # Connect to server and send data sock.connect((HOST, PORT)) sock.sendall(bytes(d + "\n", "utf-8")) # Receive data from the server and shut down received = str(sock.recv(1024), "utf-8") print("Sent: {}".format(d))
Python
복사

4. Case study

위의 요구사항과 비슷하게 테스트를 진행했습니다. Docker로 Python script를 구동하거나, 로컬에서 Python script를 구동해보았을 때 동일한 현상임을 확인하였습니다.
1.
Server 코드를 구동함
2.
CPU 확인 → 0~2% 내외
3.
Client 코드를 구동함
4.
CPU 확인 → 통신 과정에서는 정상이나, 통신이 끊기게 될 경우 CPU가 증가함

Process check(htop)

연결, 통신, 해제 과정을 두 번 정도만 반복해도 아래와 같이 Thread가 생성되며, Thread 하나당 CPU를 100% 정도 사용하는 것을 확인하였습니다.

Cause

문제를 파악하기 위해 통신 과정이 끝나고, 이슈가 발생한다는 점을 눈여겨보았습니다. 그렇게 한 가지 간과했던 점을 확인할 수 있었습니다. 통신이 끊겼음에도 Thread가 종료되지 않고, while문이 계속 반복될 수 있다는 점이었습니다. 또한 while 문 내에 self.request.recv() 함수가 데이터를 받을 때까지 멈추는 것이 아니라는 부분을 간과하였습니다.
1.
Thread가 종료되지 않음
2.
self.request.recv() 함수에서 데이터를 b'' 빈 값으로 계속 받고 연산이 반복됨
위의 두 가지 이슈로 인해 CPU가 과부화될 수 있음을 확인하였습니다. 이를 개선하기 위해 무한반복문 내에 방어로직을 추가하는 것으로 이슈를 해결할 수 있었습니다.

Solve

TIME_WAIT 을 60초로 두고, 0.2초씩 sleep을 걸어줍니다. 데이터를 받게 되면 waiting 시간을 초기화 해주고, 만약 0.2초씩 체크하며 60초가 초과될 경우 Thread를 종료합니다. sleep할 동안 CPU의 점유가 발생하지 않기 때문에 burst의 걱정이 없습니다. 만약 sleep하는 동안 통신이 들어오게 되어도 recv 로직에는 문제가 없는 것을 확인하였습니다.
import os import time import threading import socketserver from datetime import datetime class TCPServerHandler(socketserver.BaseRequestHandler): TIME_WAIT = 60 INTERVAL = 0.2 def handle(self): waiting = 0.0 buff_size = 2048 while True: try: raw_payload = self.request.recv(buff_size) payload = raw_payload.strip() except ConnectionResetError as e: print(e) break try: if payload != b'': self.request.sendall(f"I GOT YOUR DATA : {payload.decode()}".encode('utf-8')) self.logging(payload) waiting = 0.0 else: if waiting >= self.TIME_WAIT: exit(0) time.sleep(self.INTERVAL) waiting += self.INTERVAL except ValueError as e: print(e) def logging(self, payload): # self.client_address : (BIND_IP, BIND_PORT) print(f"[Handle] PID : {os.getpid()}, THREAD : {threading.current_thread().native_id}") print(f"[{str(datetime.now())}] {self.client_address} - {payload.decode()}") if __name__ == "__main__": _server_ip = "0.0.0.0" _server_port = 12321 print(f"[ Main ] PID : {os.getpid()}, THREAD : {threading.current_thread().native_id}") server = socketserver.ThreadingTCPServer((_server_ip, _server_port), TCPServerHandler) server.serve_forever()
Python
복사

5. Conclusion

해당 코드를 반영하고 CPU 트래킹을 수행한 결과 이슈 없이 해결되었습니다.
연구를 하며 아쉬웠던 점은 socketserver를 이용하여 TCP Raw packet의 Flag를 확인할 수 없다는 점이 가장 아쉬웠습니다. FIN Flag를 받았을 때의 로직을 작성하고 싶었으나, 이는 socket 을 이용하여 개발하였을 때 가능하리라고 판단하고 있습니다.

References