python

Subprocess - flask에 학습 기능 추가

Hongma 2022. 1. 20. 11:19

 

pytorch에서 학습을 돌릴 때 data_loader는 num_workers의 값에 따라서 cpu core를 사용하게 됩니다.

요즘 사용하는 대부분의 컴퓨터는 코어가 여러개 달려있고, data load의 속도가 학습 속도에 상당히 영향을 많이 미치기 때문에 worker의 숫자를 잘 설정해야 합니다.

 

flask에서 웹을 실행하면 app.run을 가동하는 프로세스 안에서 함수들이 실행되기 때문에 pytorch가 여러 개의 코어를 활성화 할 수 없습니다.

일반적인 model serving이 목적이신 분들은 bentoML을 사용하거나, ubuntu container 환경에서 gunicorn을 사용할 수도 있습니다.

 

그러나 저는 추론 기능은 필요 없고, 웹에 학습 기능만을 추가하려고 합니다.

때문에 이 글은 웹 전체를 멀티스레딩 하는 방법보단 학습 기능에 새로운 프로세스를 할당하는 방법입니다.

저는 subprocess라는 패키지를 이용하여 코어를 따로 할당하는 방법으로 학습 기능을 추가하려고 합니다.

 

우선 학습코드가 train.py라는 이름으로 있다고 하겠습니다.

import subprocess

# subprocess 활성화
train_process = subprocess.Popen(['python', 'train.py', "--num_workers", "2"], shell=True)

# pid
train_process.pid

# 프로세스 상황 (return이 없으면 진행중, return이 숫자면 종료상태)
train_process.poll()

popen을 실행하면 "python train.py --num_workers 2"라는 명령어를 새로운 cmd창에서 실행하게 됩니다.

Popen 옵션에 shell=True로 설정을 해놓으면 위의 명령어를 수행하는 cmd창을 만들게 됩니다.

따라서 train_process에는 cmd창을 실행하고 있는 process 객체가 담겨있습니다.

그리고 train_process의 자식프로세스에 train.py를 실행하는 프로세스가 속하게 됩니다.

train_process 객체를 활용해서 pid나 진행상황 등 프로세스 정보들을 확인 가능합니다.

 

(기타 Popen의 다른 옵션들로 가상환경을 지정하거나, input output을 조정하는 등의 기능을 구현할 수 있습니다.)

(https://docs.python.org/3/library/subprocess.html)

 

결과적으로 새로운 프로세스에서 python 파일을 실행시키기 때문에 pytorch에 있는 multiprocessing의 기능이 사용 가능해집니다.

문제는 train_process.kill() 과 같은 방식으로 프로세스를 종료하게 되면 cmd창만 종료가 되고 그 자식 프로세스들은 전부 남아있게 됩니다.

따라서 train_process의 자식 프로세스를 다 확인해서 종료를 해줘야 합니다.

popen 옵션에서 preexec_fn를 사용해서 자식 프로세스를 파악하는 방법도 있지만 저는 psutil 패키지를 사용했습니다.

 

import psutil

# terminate 알리미
def on_terminate(proc):
    print("process {} terminated with exit code {}".format(proc, proc.returncode))


# train_process의 return이 없으면 종료 시작.
if not isinstance(train_process.poll(), int):
    
    process = psutil.Process(train_process.pid).children(recursive=True) # 모든 자식 프로세스
    
    for p in process:
        p.terminate()	# terminate : 정상종료
        
    # 10초간 terminate를 기다리기.
    gone, alive = psutil.wait_procs(process, timeout=10, callback=on_terminate) 
    
    for p in alive:	# 10초 기다려도 살아있는 프로세스들
        p.kill()	# kill : 강제종료
    
    # psutil.Process(train_process.pid).kill() # cmd 창 종료

on_terminate 함수는 terminate 결과를 print하는 함수입니다.

 

if문을 통해 process가 진행중일 때 (poll값이 int가 아닐 때) process 종료입니다.

terminate(정상 종료) 실행 후 종료되지 않은 프로세스 는 kill(강제 종료)로 종료하는 코드입니다.

(cmd 창 자체는 kill 하지 않는데 제 경우에는 알아서 종료가 되어서 그냥 주석으로 달아놨습니다.)

 

 

이제 flask에 적용하면 됩니다.

train process는 global로 선언하여 언제든 stop이 가능하도록 코딩해봤습니다.

학습 프로세스는 하나만 실행되는게 이상적인 상황이기도 하구요.

 

최종 형태

import os
import subprocess
import psutil
from flask import Flask

app = Flask(__name__)

BASE_PATH = os.getcwd()
TRAIN_PATH = os.path.join(BASE_PATH, "train.py")

train_proc = subprocess.Popen(['echo', 'Server Start!'], shell=True) # global object init
train_proc.terminate()


def on_terminate(proc):
    print("process {} terminated with exit code {}".format(proc, proc.returncode))


@app.route('/')
def hello_world():
    return 'Hello, World!'


@app.route('/train', methods=["POST", "GET"])
def train():
    global train_proc
    
    try:
        if not train_proc:
            train_proc = subprocess.Popen(['python', TRAIN_PATH, "--num_workers", "2"], shell=True)
            train_proc.communicate()
    
        else:
            if isinstance(train_proc.poll(), int):
                train_proc = subprocess.Popen(['python', TRAIN_PATH, "--num_workers", "2"], shell=True)
                train_proc.communicate()
            else:
                print("Training Process is already started")
    except:
        stop()
        raise ValueError("Training has stopped.")

    return "Training"


@app.route('/stop', methods=["POST", "GET"])
def stop():
    global train_proc

    while not isinstance(train_proc.poll(), int):
        process = psutil.Process(train_proc.pid).children(recursive=True)
        for p in process:
            p.terminate()
        psutil.Process(train_proc.pid).terminate()

        gone, alive = psutil.wait_procs(process, timeout=10, callback=on_terminate)
        for p in alive:
            p.kill()

    return "Stop"

 

이렇게 기본적인 형태가 완성이 되었습니다.

후에 웹에 할당하는 cpu와 학습에 할당하는 cpu 개수를 조절하는 등 필요한 서비스에 맞추어 기능을 추가해나가면 될 것 같습니다.