이전에 만든 data preprocess와 training을 차례로 실행시킬 수 있는 pipeline을 만들어야 한다.
pipeline을 만드는 방법은 다양하지만, 나는 가장 기본이 되는 방법을 쓰려고 한다.
먼저 kfp 패키지를 이용하여 pipeline 코드를 짜고, yaml 파일로 compile 한 뒤에 kubeflow dashboard에서 실행하는 것이다.
로컬에서 바로 kubeflow pipeline에 연결해서 실행하는 방법은 추후에 kubeflow에 대한 구조를 더 익히고나서 할 예정이다.
pipeline 코드를 작성하는 방법도 여러가지가 있다.
나는 pytorch-pipeline 예제에서 나온 방법대로 기존에 작성한 코드를 가지고 component를 구성하는 yaml파일을 만드는 방법을 사용할 것이다.
component는 하나의 pod으로 보면 될 것 같다.
먼저 작성한 python 코드를 component로 만들 수 있게 yaml파일을 작성해야한다.
크게 name, inputs, outputs, implementation으로 4가지 부분을 작성하면 된다.
- name : 당연히 component의 이름
- inputs : components에 들어갈 inputs 정의
- outputs : components에서 나올 outputs 정의
- implementation : container에 대한 정의(image, command, args 등)
Components/data_component.yaml
name: PreProcessData
description: |
Prepare data for PyTorch training.
outputs:
- {name: output_data, description: 'The path to the input datasets'}
implementation:
container:
image: nhm0819/kfp-pl:latest
command: ['python3', 'Dataload/data_load.py']
args:
- --output_path
- {outputPath: output_data}
Components/train_component.yaml
name: Training
description: |
Pytorch training
inputs:
- {name: dataset_path, description: 'Input dataset path'}
- {name: model, description: 'model structure name in timm package'}
- {name: gpus, description: 'number of using gpus'}
- {name: max_epochs, description: 'training epochs'}
- {name: num_classes, description: 'number of dataset classes'}
- {name: train_batch_size, description: 'train dataset batch size'}
- {name: train_num_workers, description: 'train dataloader num workers'}
- {name: val_batch_size, description: 'val dataset batch size' }
- {name: val_num_workers, description: 'val dataloader num workers' }
- {name: lr, description: 'learning rate'}
outputs:
- {name: checkpoint_dir, description: "Model checkpoint output"}
implementation:
container:
image: nhm0819/kfp-pl:latest
command: ['python3', 'Training/pl_train.py']
args:
- --dataset_path
- {inputPath: dataset_path}
- --model
- {inputValue: model}
- --gpus
- {inputValue: gpus}
- --max_epochs
- {inputValue: max_epochs}
- --num_classes
- {inputValue: num_classes}
- --train_batch_size
- {inputValue: train_batch_size}
- --train_num_workers
- {inputValue: train_num_workers}
- --val_batch_size
- {inputValue: val_batch_size}
- --val_num_workers
- {inputValue: val_num_workers }
- --lr
- {inputValue: lr}
- --checkpoint_dir
- {outputPath: checkpoint_dir}
implementation에서 {inputValue: model}을 사용하면 component에서 받은 model 값을 인자로 사용할 수 있다.
이렇게 두 개의 yaml파일을 만들면 이를 통해 pipeline을 구성할 수 있다.
pipeline 코드는 다음과 같다.
pipeline.py
from kfp import components
from kfp import dsl
from kfp import compiler
import kubernetes as k8s
# load components
data_op = components.load_component_from_file("Components/data_component.yaml")
train_op = components.load_component_from_file("Components/train_component.yaml")
# training args
MODEL = "resnet50"
GPUS = "-1"
MAX_EPOCHS = "30"
NUM_CLASSES = "10"
TRAIN_BATCH_SIZE = "256"
TRAIN_NUM_WORKERS = "8"
VAL_BATCH_SIZE = "256"
VAL_NUM_WORKERS = "8"
LR = "0.001"
# def pipeline
@dsl.pipeline(
name="Pytorch Lightning Training pipeline", description="Cifar 10 dataset pipeline"
)
def pl_pipeline():
# Dataload/data_load.py -> data_op
data_task = data_op().set_display_name("Data Preprocess")
# pytorch needs shared memory
shm_volume = dsl.PipelineVolume(
volume=k8s.client.V1Volume(
name="shm", empty_dir=k8s.client.V1EmptyDirVolumeSource(medium="Memory")
)
)
# Training/pl_train.py -> train_op
train_task = (
train_op(
dataset_path=data_task.outputs["output_data"],
model=MODEL,
gpus=GPUS,
max_epochs=MAX_EPOCHS,
num_classes=NUM_CLASSES,
train_batch_size=TRAIN_BATCH_SIZE,
train_num_workers=TRAIN_NUM_WORKERS,
val_batch_size=VAL_BATCH_SIZE,
val_num_workers=VAL_NUM_WORKERS,
lr=LR,
)
.after(data_task)
.set_display_name("Training")
.add_pvolumes({"/dev/shm": shm_volume})
).set_gpu_limit(1)
if __name__ == "__main__":
# compile pipeline to yaml
compiler.Compiler().compile(pl_pipeline, package_path="pytorch_lightning.yaml")
먼저 앞서 작성한 yaml 파일들을 통해 component를 load한다.
data_task = data_op()를 통해 인스턴스를 생성할 수 있다.
pytorch에서 multi processing을 하기 위해선 shared memory가 필요하다.
kubernetes에는 기본적으로 shared memory가 적용되지 않기 때문에 직접 볼륨을 만들어서 마운트 해야한다.
shared memory는 재사용할 필요가 없으므로 empty_dir의 속성을 가진 볼륨으로 선택한다.
마운트 위치는 "/dev/shm" 으로 해야한다.
이제 만들어서 kubeflow 에서 실행해보면 된다.
30 epochs 실행
이제 로그 저장, 그래프 저장, 모델 저장하는 방법 들을 추가하고 hyperparameter tuning을 추가하면 될 것 같다.
'MLOps' 카테고리의 다른 글
MLOps E2E - 5. Storage : minio (0) | 2022.05.10 |
---|---|
MLOps E2E - 4. Logging : wandb (0) | 2022.05.04 |
MLOps E2E - 2-2. CT : Training (kubeflow pipeline) (0) | 2022.04.28 |
MLOps E2E - 2-1. CT : Data Load (kubeflow pipeline) (0) | 2022.04.27 |
MLOps E2E - 1. CI / CD : Github Actions (0) | 2022.04.26 |