MLOps

MLOps E2E - 3. Pipeline

Hongma 2022. 4. 29. 21:06

 

 

이전에 만든 data preprocess와 training을 차례로 실행시킬 수 있는 pipeline을 만들어야 한다.

pipeline을 만드는 방법은 다양하지만, 나는 가장 기본이 되는 방법을 쓰려고 한다.

먼저 kfp 패키지를 이용하여 pipeline 코드를 짜고, yaml 파일로 compile 한 뒤에 kubeflow dashboard에서 실행하는 것이다.

로컬에서 바로 kubeflow pipeline에 연결해서 실행하는 방법은 추후에 kubeflow에 대한 구조를 더 익히고나서 할 예정이다.

 

pipeline 코드를 작성하는 방법도 여러가지가 있다.

나는 pytorch-pipeline 예제에서 나온 방법대로 기존에 작성한 코드를 가지고 component를 구성하는 yaml파일을 만드는 방법을 사용할 것이다.

component는 하나의 pod으로 보면 될 것 같다.

 

먼저 작성한 python 코드를 component로 만들 수 있게 yaml파일을 작성해야한다.

크게 name, inputs, outputs, implementation으로 4가지 부분을 작성하면 된다.

 

- name : 당연히 component의 이름

- inputs : components에 들어갈 inputs 정의

- outputs : components에서 나올 outputs 정의

- implementation : container에 대한 정의(image, command, args 등)

 

Components/data_component.yaml

name: PreProcessData
description: |
  Prepare data for PyTorch training.

outputs:
  - {name: output_data, description: 'The path to the input datasets'}

implementation:
  container:
    image: nhm0819/kfp-pl:latest
    command: ['python3', 'Dataload/data_load.py']
    args:
    - --output_path
    - {outputPath: output_data}

 

Components/train_component.yaml

name: Training
description: |
  Pytorch training
inputs:
  - {name: dataset_path, description: 'Input dataset path'}
  - {name: model, description: 'model structure name in timm package'}
  - {name: gpus, description: 'number of using gpus'}
  - {name: max_epochs, description: 'training epochs'}
  - {name: num_classes, description: 'number of dataset classes'}
  - {name: train_batch_size, description: 'train dataset batch size'}
  - {name: train_num_workers, description: 'train dataloader num workers'}
  - {name: val_batch_size, description: 'val dataset batch size' }
  - {name: val_num_workers, description: 'val dataloader num workers' }
  - {name: lr, description: 'learning rate'}

outputs:
  - {name: checkpoint_dir, description: "Model checkpoint output"}

implementation:
  container:
    image: nhm0819/kfp-pl:latest
    command: ['python3', 'Training/pl_train.py']
    args:
      - --dataset_path
      - {inputPath: dataset_path}
      - --model
      - {inputValue: model}
      - --gpus
      - {inputValue: gpus}
      - --max_epochs
      - {inputValue: max_epochs}
      - --num_classes
      - {inputValue: num_classes}
      - --train_batch_size
      - {inputValue: train_batch_size}
      - --train_num_workers
      - {inputValue: train_num_workers}
      - --val_batch_size
      - {inputValue: val_batch_size}
      - --val_num_workers
      - {inputValue: val_num_workers }
      - --lr
      - {inputValue: lr}
      - --checkpoint_dir
      - {outputPath: checkpoint_dir}

implementation에서 {inputValue: model}을 사용하면 component에서 받은 model 값을 인자로 사용할 수 있다.

 

이렇게 두 개의 yaml파일을 만들면 이를 통해 pipeline을 구성할 수 있다.

pipeline 코드는 다음과 같다.

 

pipeline.py

from kfp import components
from kfp import dsl
from kfp import compiler
import kubernetes as k8s


# load components
data_op = components.load_component_from_file("Components/data_component.yaml")
train_op = components.load_component_from_file("Components/train_component.yaml")


# training args
MODEL = "resnet50"
GPUS = "-1"
MAX_EPOCHS = "30"
NUM_CLASSES = "10"
TRAIN_BATCH_SIZE = "256"
TRAIN_NUM_WORKERS = "8"
VAL_BATCH_SIZE = "256"
VAL_NUM_WORKERS = "8"
LR = "0.001"


# def pipeline
@dsl.pipeline(
    name="Pytorch Lightning Training pipeline", description="Cifar 10 dataset pipeline"
)
def pl_pipeline():
	
    # Dataload/data_load.py -> data_op
    data_task = data_op().set_display_name("Data Preprocess")
	
    # pytorch needs shared memory
    shm_volume = dsl.PipelineVolume(
        volume=k8s.client.V1Volume(
            name="shm", empty_dir=k8s.client.V1EmptyDirVolumeSource(medium="Memory")
        )
    )
    
    # Training/pl_train.py -> train_op
    train_task = (
        train_op(
            dataset_path=data_task.outputs["output_data"],
            model=MODEL,
            gpus=GPUS,
            max_epochs=MAX_EPOCHS,
            num_classes=NUM_CLASSES,
            train_batch_size=TRAIN_BATCH_SIZE,
            train_num_workers=TRAIN_NUM_WORKERS,
            val_batch_size=VAL_BATCH_SIZE,
            val_num_workers=VAL_NUM_WORKERS,
            lr=LR,
        )
        .after(data_task)
        .set_display_name("Training")
        .add_pvolumes({"/dev/shm": shm_volume})
    ).set_gpu_limit(1)


if __name__ == "__main__":
	# compile pipeline to yaml
    compiler.Compiler().compile(pl_pipeline, package_path="pytorch_lightning.yaml")

먼저 앞서 작성한 yaml 파일들을 통해 component를 load한다.

 

data_task = data_op()를 통해 인스턴스를 생성할 수 있다.

 

pytorch에서 multi processing을 하기 위해선 shared memory가 필요하다.

kubernetes에는 기본적으로 shared memory가 적용되지 않기 때문에 직접 볼륨을 만들어서 마운트 해야한다.

shared memory는 재사용할 필요가 없으므로 empty_dir의 속성을 가진 볼륨으로 선택한다.

마운트 위치는 "/dev/shm" 으로 해야한다.

 

이제 만들어서 kubeflow 에서 실행해보면 된다.

 

30 epochs 실행

이제 로그 저장, 그래프 저장, 모델 저장하는 방법 들을 추가하고 hyperparameter tuning을 추가하면 될 것 같다.