MLOps

MLOps E2E - 5. Storage : minio

Hongma 2022. 5. 10. 21:27

 

kubeflow는 클라우드를 대상으로 만들어진 시스템이기 때문에 기본으로 저장하는 방식도 단순한 마운트 방식은 아니다.

나중에 모델 서빙을 용이하게 할 필요도 있기 때문에 오픈 소스로 된 오브젝트 스토리지를 사용하는게 기본인듯 하다.

minio는 kubernetes에서 사용하는 기본적인 오브젝트 스토리지로 Open Source Object Storage 이다.

또한 minio는 s3와 호환이 됨.

스토리지 클래스에 대한 자세한 내용은 좀 더 많은 공부가 필요해 보인다.

 

우선 나는 training에서 나온 산출물들 중 log와 checkpoint 폴더를 minio에 저장했다.

이를 위해 input path를 받으면 minio에 올리는 기능의 component를 하나 더 만들었다.

먼저 minio에 올리는 파이썬 파일을 만들고, yaml파일을 통해 component로 만든다.

 

 

Storage/minio_upload.py

from minio import Minio
import os
from argparse import ArgumentParser

def upload_artifacts_to_minio(
        client: Minio,
        source: str,
        destination: str,
        bucket_name: str,
        output_dict: dict,
):
    import urllib3
    """Uploads artifacts to minio server.

    Args:
        client : Result client
        source : source path of artifacts.
        destination : destination path of artifacts
        bucket_name : minio bucket name.
        output_dict : dict of output containing destination paths,
                      source and bucket names
    Raises:
        Exception : on MaxRetryError, NewConnectionError,
                    ConnectionError.
    Returns:
        output_dict : dict of output containing destination paths,
                      source and bucket names
    """
    print(f"source {source} destination {destination}")
    try:
        client.fput_object(
            bucket_name=bucket_name,
            file_path=source,
            object_name=destination,
        )
        output_dict[destination] = {
            "bucket_name": bucket_name,
            "source": source,
        }
    except (
            urllib3.exceptions.MaxRetryError,
            urllib3.exceptions.NewConnectionError,
            urllib3.exceptions.ConnectionError,
            RuntimeError,
    ) as expection_raised:
        print(str(expection_raised))
        raise Exception(expection_raised)  # pylint: disable=raise-missing-from

    return output_dict


if __name__ == "__main__":
    parser = ArgumentParser()

    parser.add_argument(
        "--bucket_name",
        type=str,
        default="mlpipeline",
        help="Minio bucket name",
    )

    parser.add_argument(
        "--folder_name",
        type=str,
        default="test",
        help="Path to destination folder",
    )

    parser.add_argument(
        "--input_path",
        type=str,
        default="../results",
        help="Input path of the file or folder to upload",
    )

    parser.add_argument(
        "--filename",
        type=str,
        help="Input path of the file or folder to upload",
    )


    args = parser.parse_args()

    bucket_name = args.bucket_name
    input_path = args.input_path
    folder_name = args.folder_name
    filename = args.filename

    if filename:
        input_path = os.path.join(input_path, filename)


    endpoint = os.environ["MINIO_ENDPOINT"]
    access_key = os.environ["AWS_ACCESS_KEY_ID"]
    secret_key = os.environ["AWS_SECRET_ACCESS_KEY"]
    output_dict = {}
    client = Minio(
        endpoint,
        access_key=access_key,
        secret_key=secret_key,
        secure=False,
    )

    if os.path.isdir(input_path):
        for root, dirs, files in os.walk(input_path):  # pylint: disable=unused-variable
            for file in files:
                source = os.path.join(root, file)
                artifact_name = '/'.join(source.split("/")[5:])
                destination = os.path.join(folder_name, artifact_name)
                upload_artifacts_to_minio(
                    client=client,
                    source=source,
                    destination=destination,
                    bucket_name=bucket_name,
                    output_dict=output_dict,
                )
    else:
        artifact_name = input_path.split("/")[-1]
        destination = os.path.join(folder_name, artifact_name)
        upload_artifacts_to_minio(
            client=client,
            source=input_path,
            destination=destination,
            bucket_name=bucket_name,
            output_dict=output_dict,
        )

minio 패키지 안에 Minio 클래스를 이용하여 minio client를 선언한다.

여기에 access key id / secret access key가 필요한데 kubeflow를 설치하면 default 값은 minio / minio123 이다.

이는 파이프라인에서 환경변수로 넘겨주면 되기 때문에 환경변수로 불러온다.

endpoint도 마찬가지이다.

 

파일을 올리는 함수는 위에 선언된 upload_artifacts_to_minio() 함수 안에 있으며 Minio 클래스에 들어 있는 fput_object() 이다.

 

 

Components/minio_component.yaml

name: Minio Upload
description: |
  Minio Upload
inputs:
  - {name: bucket_name, description: 'Minio Bucket name'}
  - {name: folder_name, description: 'Minio folder name to upload the files'}
  - {name: input_path, description: 'Input file/folder name'}
  - {name: filename, description: 'Input file name'}


implementation:
  container:
    image: nhm0819/kfp-pl:latest # public.ecr.aws/pytorch-samples/kfp_samples:latest
    command: ["python3", "Storage/minio_upload.py"]
    args:
      - --bucket_name
      - {inputValue: bucket_name}
      - --folder_name
      - {inputValue: folder_name}
      - --input_path
      - {inputPath: input_path}
      - --filename
      - {inputValue: filename}

component로 만드는 yaml 파일은 다음과 같다.

딱히 설명할게 없다.

 

 

pipeline.py

from kfp import components
from kfp import dsl
from kfp import compiler
import kubernetes as k8s
from kubernetes.client.models import V1EnvVar, V1EnvVarSource, V1SecretKeySelector
import os


# load components
data_op = components.load_component_from_file("Components/data_component.yaml")
train_op = components.load_component_from_file("Components/train_component.yaml")
minio_op = components.load_component_from_file("Components/minio_component.yaml")


# pipeline args
NAMESPACE = "kubeflow-user-example-com"
SECRET_NAME = "mlpipeline-minio-artifact"
MINIO_ENDPOINT = "minio-service.kubeflow:9000"
BUCKET_NAME = "mlpipeline"
LOG_DIR = f"logs/{dsl.RUN_ID_PLACEHOLDER}"
CHECKPOINT_DIR = f"ckpt/{dsl.RUN_ID_PLACEHOLDER}"


@dsl.pipeline(
    name="Pytorch Lightning Training pipeline", description="Cifar 10 dataset pipeline"
)
def pl_pipeline():

	...
    
    # Minio Upload
    (
        minio_op(
            bucket_name=BUCKET_NAME,
            folder_name=LOG_DIR,
            input_path=train_task.outputs["log_dir"],
            filename="",
        ).after(train_task).set_display_name("training logs Pusher")
            .add_env_variable(V1EnvVar(name="MINIO_ENDPOINT", value=MINIO_ENDPOINT))
            .add_env_variable(
            V1EnvVar(
                name="AWS_ACCESS_KEY_ID",
                value_from=V1EnvVarSource(
                    secret_key_ref=V1SecretKeySelector(
                        name=SECRET_NAME, key="accesskey"
                    )
                ),
            )
        )
            .add_env_variable(
            V1EnvVar(
                name="AWS_SECRET_ACCESS_KEY",
                value_from=V1EnvVarSource(
                    secret_key_ref=V1SecretKeySelector(
                        name=SECRET_NAME, key="secretkey"
                    )
                ),
            )
        )
            .add_env_variable(V1EnvVar(name="S3_USE_HTTPS", value="0"))
            .add_env_variable(V1EnvVar(name="S3_VERIFY_SSL", value="0"))
    )

    (
        minio_op(
            bucket_name=BUCKET_NAME,
            folder_name=CHECKPOINT_DIR,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="",
        ).after(train_task).set_display_name("checkpoint results Pusher")
            .add_env_variable(V1EnvVar(name="MINIO_ENDPOINT", value=MINIO_ENDPOINT))
            .add_env_variable(
            V1EnvVar(
                name="AWS_ACCESS_KEY_ID",
                value_from=V1EnvVarSource(
                    secret_key_ref=V1SecretKeySelector(
                        name=SECRET_NAME, key="accesskey"
                    )
                ),
            )
        )
            .add_env_variable(
            V1EnvVar(
                name="AWS_SECRET_ACCESS_KEY",
                value_from=V1EnvVarSource(
                    secret_key_ref=V1SecretKeySelector(
                        name=SECRET_NAME, key="secretkey"
                    )
                ),
            )
        )
            .add_env_variable(V1EnvVar(name="S3_USE_HTTPS", value="0"))
            .add_env_variable(V1EnvVar(name="S3_VERIFY_SSL", value="0"))
    )



if __name__ == "__main__":
    compiler.Compiler().compile(pl_pipeline, package_path="pytorch_lightning.yaml")

우선 위에 pipeline args 부분을 보면 6가지 변수를 선언한다.

위의 4가지 [NAMESPACE, SECRET_NAME, MINIO_ENDPOINT, BUCKET_NAME] 는 kubeflow를 설치할 때 기본적으로 설정되는 default 값들을 넣었다.

```

"kubectl get profile -A" : 등록되어 있는 profile(namespace)을 확인할 수 있다.

"kubectl -n kubeflow get cm/workflow-controller-configmap -o yaml" : endpoint, 어떤 secret을 사용하는지 나온다.

"kubectl -n kubeflow port-forward svc/minio-service 9000:9000" : minio 포트포워딩.

```

log_dir이나 checkpoint_dir 은 자기가 원하는데로 설정하면 된다.

 

 

환경변수 추가 하는 부분중 다음과 같은 부분이 있다.

.add_env_variable(
            V1EnvVar(
                name="AWS_ACCESS_KEY_ID",
                value_from=V1EnvVarSource(
                    secret_key_ref=V1SecretKeySelector(
                        name=SECRET_NAME, key="accesskey"
                    )
                ),
            )
        )
        .add_env_variable(
            V1EnvVar(
                name="AWS_SECRET_ACCESS_KEY",
                value_from=V1EnvVarSource(
                    secret_key_ref=V1SecretKeySelector(
                        name=SECRET_NAME, key="secretkey"
                    )
                ),
            )
        )

SECRET_NAME 이름을 가진 secret에서 accesskey와 secretkey를 가져와서 AWS_ACCESS_KEY_ID와 AWS_SECRET_ACCESS_KEY로 환경 변수를 등록하는 방법이다.

(대부분의 예제들이 앞에 AWS를 붙인 방식으로 이름을 짓기 때문에 같은 방식으로 이름을 지었다.)

 

accesskey / secreckey 의 default 값은 minio / minio123 이다.

"kubectl get secret -n kubeflow-user-example-com mlpipeline-minio-artifact -o yaml" 를 통해 키 값을 볼 수 있다.

yaml파일 안에는 다음과 같이 base64로 인코딩 되어 들어있다.

"echo minio | base64" --> bWluaW8K

"echo minio123 | base64" --> bWluaW8xMjMK

 

 

 

 

 

 

마지막으로 파이프라인을 컴파일 하여 쿠베플로우에서 실행하였을 때 모습.

 

Kubeflow Dashboard

 

Minio UI

경로 설정은 minio_upload.py 에서 더 깔끔하게 설정해야 한다.