detectron2 official example : https://github.com/facebookresearch/detectron2/tree/main/projects
간략한 학습 코드 : https://github.com/nhm0819/detectron2_training
이번에는 detectron2 튜토리얼에 있는 과정 말고, 조금 더 customize 하는 방법에 대해 소개하려고 합니다.
detectron2 tutorial에서 학습하는 코드를 보면 아래와 같이 되어있습니다.
from detectron2.config import get_cfg
from detectron2.engine import DefaultTrainer
cfg = get_cfg() # default configure
cfg.DATASETS.TRAIN = ...
...
trainer = DefaultTrainer(cfg) # default trainer
trainer.train() # 학습 시작
먼저 detectron2에 필요한 기본 형식의 configure를 가져옵니다.
그 다음 default trainer를 configure를 인자로 선언합니다.
그리고 trainer.train() 하면 학습이 시작됩니다.
tutorial에서 configure의 이런 저런 값들을 바꿨듯이,
DefaultTrainer를 수정하는 방식으로 detectron을 customize 할 수 있습니다.
다만 이제는 단순히 값을 바꾸는 것이 아니라 원하는 기능들을 함수로 추가해야 합니다.
그리고 마지막엔 multi gpu 사용 방법에 대해 설명드리겠습니다.
먼저 DefaultTrainer의 형식을 보겠습니다.
class DefaultTrainer(TrainerBase):
def __init__(self, cfg):
super().__init__()
logger = logging.getLogger("detectron2")
cfg = DefaultTrainer.auto_scale_workers(cfg, comm.get_world_size())
# Assume these objects must be constructed in this order.
model = self.build_model(cfg)
optimizer = self.build_optimizer(cfg, model)
data_loader = self.build_train_loader(cfg)
model = create_ddp_model(model, broadcast_buffers=False)
self._trainer = ...
self.scheduler = ...
self.checkpointer = ...
self.start_iter = 0
self.max_iter = cfg.SOLVER.MAX_ITER
self.cfg = cfg
self.register_hooks(self.build_hooks())
def resume_or_load(self, resume=True):
...
def build_hooks(self):
return ret
def build_writers(self):
return default_writers(self.cfg.OUTPUT_DIR, self.max_iter)
def train(self):
super().train(self.start_iter, self.max_iter)
if len(self.cfg.TEST.EXPECTED_RESULTS) and comm.is_main_process():
assert hasattr(
self, "_last_eval_results"
), "No evaluation results obtained during training!"
verify_results(self.cfg, self._last_eval_results)
return self._last_eval_results
def run_step(self):
self._trainer.iter = self.iter
self._trainer.run_step()
@classmethod
def build_model(cls, cfg):
model = build_model(cfg)
logger = logging.getLogger(__name__)
logger.info("Model:\n{}".format(model))
return model
@classmethod
def build_optimizer(cls, cfg, model):
return build_optimizer(cfg, model)
@classmethod
def build_lr_scheduler(cls, cfg, optimizer):
return build_lr_scheduler(cfg, optimizer)
@classmethod
def build_train_loader(cls, cfg):
return build_detection_train_loader(cfg)
@classmethod
def build_test_loader(cls, cfg, dataset_name):
return build_detection_test_loader(cfg, dataset_name)
@classmethod
def build_evaluator(cls, cfg, dataset_name):
raise NotImplementedError()
@classmethod
def test(cls, cfg, model, evaluators=None):
return results
@staticmethod
def auto_scale_workers(cfg, num_workers: int):
return cfg
함수 이름은 그냥 최대한 놔두면서 짤라왔습니다.
먼저 init 에선 ddp로 모델을 만드는게 보이네요.
data loader도 trainer 안에서 build 하고 있습니다.
logger도 기본적으로 선언되고, writer에 대한 함수도 있네요.
build_train_loader, build_test_loader 부분을 수정하면 loader를 customize 할 수 있습니다.
당연히 이 안에서 augmentation도 변경이 가능하구요.
또 중요한 것이 hooks 입니다.
hooks를 수정해야 validation을 원하는 방법과 시점에서 할 수 있습니다.
그 외에 writer를 수정해서 원하는 방법으로 log를 남기거나, lr_scheduler, optimizer를 customize 할 수도 있습니다.
이정도가 개요가 될 것 같네요.
이제 DefaultTrainer를 간단하게 customize 해보겠습니다.
Custom Trainer
from detectron2.engine import DefaultTrainer
from detectron2.evaluation import COCOEvaluator
from detectron2.data import build_detection_train_loader, build_detection_test_loader, DatasetMapper
import os
class CustomTrainer(DefaultTrainer):
@classmethod
def build_evaluator(cls, cfg, dataset_name, output_folder=None):
if output_folder is None:
output_folder = os.path.join(cfg.OUTPUT_DIR, "inference")
return COCOEvaluator(dataset_name, cfg, True, output_folder)
@classmethod
def build_train_loader(cls, cfg):
return build_detection_train_loader(cfg, mapper=custom_mapper)
먼저 detectron2에는 COCOEvaluator라는 클래스가 있습니다.
bbox AP, segm AP 등을 평가해줍니다. (coco dataset 형식이 아닐때도 잘 작동하는지는 확인을 못해봤습니다.)
간단하니 미리 선언해주겠습니다.
train_loader (preprocessing)
그 다음 아래에 build_train_loader()입니다.
여기서 mapper라는 클래스를 정의함에 따라 preprocessing을 바꿀 수 있습니다.
Augmentation에 대한 부분이라고 봐도 되겠네요.
mapper의 예시는 아래와 같습니다.
from detectron2.data import detection_utils as utils
import detectron2.data.transforms as T
import torch
def custom_mapper(dataset_dict):
# Implement a mapper, similar to the default DatasetMapper, but with your own customizations
dataset_dict = copy.deepcopy(dataset_dict) # it will be modified by code below
image = utils.read_image(dataset_dict["file_name"], format="BGR")
transform_list = [
# T.Resize((720, 1280)),
T.ResizeShortestEdge(short_edge_length=(640, 672, 704, 720), max_size=1280,
sample_style='choice'),
T.RandomFlip(prob=0.5, horizontal=False, vertical=True),
T.RandomFlip(prob=0.5, horizontal=True, vertical=False),
T.RandomBrightness(0.8, 1.2),
T.RandomSaturation(0.8, 1.2),
T.RandomContrast(0.8, 1.2),
T.RandomRotation([-15, 15]),
]
image, transforms = T.apply_transform_gens(transform_list, image)
dataset_dict["image"] = torch.as_tensor(image.transpose(2, 0, 1).astype("float32"))
annos = [
utils.transform_instance_annotations(obj, transforms, image.shape[:2])
for obj in dataset_dict.pop("annotations")
if obj.get("iscrowd", 0) == 0
]
instances = utils.annotations_to_instances(annos, image.shape[:2])
dataset_dict["instances"] = utils.filter_empty_instances(instances)
return dataset_dict
transform_list가 preprocessing에 필요한 list입니다.
여타 augmentation tool들과 비슷한 형태죠?
detectron2에 내장된 transform은 특히 torchvision의 transform과 상당히 유사한걸로 보이네요.
(필요에 따라 albumentations나 torchvision으로도 변경 가능합니다.)
detection에선 image만 바꾸는 것이 아니라 좌표값을 바꾸는 과정도 당연히 필요합니다.
image를 resize하면 당연히 좌표값도 변환을 해주어야 하니까요.
아래에 annos 를 보시면 annotations의 변환에 대한 코드가 또 작성되어 있습니다.
transform을 통째로 넣으면 알아서 좌표 변환까지 해줍니다.
(함수에 따라 좌표 변환이 적용 되는지 안되는지 찾아볼 필요도 있습니다.
물론 이미지 채도, 밝기 변경 같은 것들은 당연히 좌표 변환이 필요 없으니 신경 안쓰셔도 됩니다.)
hooks
def build_hooks(self):
hooks = super().build_hooks() # DefaultTrainer에서 hooks를 이어 받음.
hooks.insert(-1, LossEvalHook( # 맨 마지막에 LossEvalHook 추가
self.cfg.TEST.EVAL_PERIOD,
self.model,
build_detection_test_loader(
self.cfg,
self.cfg.DATASETS.TEST[0],
DatasetMapper(self.cfg, True)
)
))
hooks = hooks[:-2] + hooks[-2:][::-1]
return hooks
기본 Evaluator는 Loss를 산출하지 않습니다.
validation마다 Loss를 구하기 위해 따로 custom hook을 추가하는 과정입니다.
DefaultTrainer에서 정의한 build_hooks()를 가지고 와서 작업을 합니다.
Loss를 구하는 Hook 클래스를 정의하여 뒤에서 두 번째에 넣습니다.
LossEvalHook은 아래와 같습니다.
from detectron2.engine.hooks import HookBase
from detectron2.utils.logger import log_every_n_seconds
import detectron2.utils.comm as comm
import torch
import time
import datetime
import logging
import numpy as np
class LossEvalHook(HookBase):
def __init__(self, eval_period, model, data_loader):
self._model = model
self._period = eval_period
self._data_loader = data_loader
def _do_loss_eval(self):
# Copying inference_on_dataset from evaluator.py
total = len(self._data_loader)
num_warmup = min(5, total - 1)
start_time = time.perf_counter()
total_compute_time = 0
losses = []
for idx, inputs in enumerate(self._data_loader):
if idx == num_warmup:
start_time = time.perf_counter()
total_compute_time = 0
start_compute_time = time.perf_counter()
if torch.cuda.is_available():
torch.cuda.synchronize()
total_compute_time += time.perf_counter() - start_compute_time
iters_after_start = idx + 1 - num_warmup * int(idx >= num_warmup)
seconds_per_img = total_compute_time / iters_after_start
if idx >= num_warmup * 2 or seconds_per_img > 5:
total_seconds_per_img = (time.perf_counter() - start_time) / iters_after_start
eta = datetime.timedelta(seconds=int(total_seconds_per_img * (total - idx - 1)))
log_every_n_seconds(
logging.INFO,
"Loss on Validation done {}/{}. {:.4f} s / img. ETA={}".format(
idx + 1, total, seconds_per_img, str(eta)
),
n=5,
)
loss_batch = self._get_loss(inputs)
losses.append(loss_batch)
mean_loss = np.mean(losses)
self.trainer.storage.put_scalar('validation_loss', mean_loss)
comm.synchronize()
return losses
def _get_loss(self, data):
# How loss is calculated on train_loop
metrics_dict = self._model(data)
metrics_dict = {
k: v.detach().cpu().item() if isinstance(v, torch.Tensor) else float(v)
for k, v in metrics_dict.items()
}
total_losses_reduced = sum(loss for loss in metrics_dict.values())
return total_losses_reduced
def after_step(self):
next_iter = self.trainer.iter + 1
is_final = next_iter == self.trainer.max_iter
if is_final or (self._period > 0 and next_iter % self._period == 0):
self._do_loss_eval()
self.trainer.storage.put_scalars(timetest=12)
writer
def default_writers(output_dir: str, max_iter: Optional[int] = None):
"""
Build a list of :class:`EventWriter` to be used.
It now consists of a :class:`CommonMetricPrinter`,
:class:`TensorboardXWriter` and :class:`JSONWriter`.
Args:
output_dir: directory to store JSON metrics and tensorboard events
max_iter: the total number of iterations
Returns:
list[EventWriter]: a list of :class:`EventWriter` objects.
"""
return [
# It may not always print what you want to see, since it prints "common" metrics only.
CommonMetricPrinter(max_iter),
JSONWriter(os.path.join(output_dir, "metrics.json")),
TensorboardXWriter(output_dir),
]
writer는 기본이 이 3가지입니다.
위의 writer들을 참조해서 원하시는 대로 추가가 가능합니다.
아래처럼 EvenctWriter 상속 받아서 하시는게 기본 모양입니다.
from detectron2.utils.events import EventWriter, get_event_storage
from collections import defaultdict
from detectron2.utils.file_io import PathManager
import os
import json
class JSONWriter(EventWriter):
def __init__(self, json_file, window_size=20):
self._file_handle = PathManager.open(json_file, "a")
self._window_size = window_size
self._last_write = -1
def write(self):
storage = get_event_storage()
to_save = defaultdict(dict)
for k, (v, iter) in storage.latest_with_smoothing_hint(self._window_size).items():
# keep scalars that have not been written
if iter <= self._last_write:
continue
to_save[iter][k] = v
if len(to_save):
all_iters = sorted(to_save.keys())
self._last_write = max(all_iters)
for itr, scalars_per_iter in to_save.items():
scalars_per_iter["iteration"] = itr
self._file_handle.write(json.dumps(scalars_per_iter, sort_keys=True) + "\n")
self._file_handle.flush()
try:
os.fsync(self._file_handle.fileno())
except AttributeError:
pass
def close(self):
self._file_handle.close()
multi gpu
이제 multi gpu에 대한 내용입니다.
먼저 detectron2에 내장된 argparser를 사용해야 합니다.
기본 코드는 다음과 같습니다.
from detectron2.engine import default_argument_parser, launch
parser = default_argument_parser()
parser.add_argument('--num_workers', type=int, default=10, metavar='N',
help='num workers')
def train(args):
cfg = get_config()
...
trainer = MyTrainer(cfg)
trainer.resume_or_load(resume=False)
return trainer.train()
if __name__ == '__main__':
args = parser.parse_args()
launch(
train,
args.num_gpus,
# num_machines=args.num_machines,
# machine_rank=args.machine_rank,
# dist_url=args.dist_url,
args=(args,),
)
num_gpus, num_machines, machine_rank, dist_url 등은 default_argument_parser에서 추가되어 있습니다.
컴퓨터가 한 대라면 그냥 num_gpus 값만 파라미터로 넣으면 됩니다. 나머진 default로 정의되어 있습니다.
다음과 같이 실행하면 위의 DefaultTrainer에서 보셨듯이 DDP 모드로 학습이 실행됩니다.
참 쉽죠?
wandb (선택사항)
마지막으로 DDP를 사용할 때 wandb를 그대로 쓰면 gpu 개수만큼 wandb project가 생기게 됩니다.
모든 값은 main gpu에 모이기 때문에 main gpu에서만 wandb를 키면 깔끔하게 한 개의 project만 올릴 수 있습니다.
코드는 아래와 같습니다.
from detectron2.engine import default_argument_parser, launch
parser = default_argument_parser()
parser.add_argument('--num_workers', type=int, default=10, metavar='N',
help='num workers')
def train(args):
cfg = get_config()
...
if args.num_gpus > 1:
if comm.get_local_rank()==0:
wandb.init(project=args.wandb_project, name=str(args.wandb_name), config=cfg_wandb)
else:
wandb.init(project=args.wandb_project, name=str(args.wandb_name), config=cfg_wandb)
args.wandb_id = wandb.run.id
trainer = MyTrainer(cfg)
trainer.resume_or_load(resume=False)
return trainer.train()
if __name__ == '__main__':
args = parser.parse_args()
launch(
train,
args.num_gpus,
# num_machines=args.num_machines,
# machine_rank=args.machine_rank,
# dist_url=args.dist_url,
args=(args,),
)
마무리
이 정도까지가 제가 올릴 분량입니다.
더 심화된 응용을 하려면 아래의 링크를 참조하면 될 것 같습니다.
detectron2 official example : https://github.com/facebookresearch/detectron2/tree/main/projects
'Detection' 카테고리의 다른 글
Detectron2 - 설치, Training 기초 (0) | 2022.02.07 |
---|---|
Mask RCNN (Faster RCNN + segmentation) (0) | 2022.02.06 |