Spaces:
Runtime error
Runtime error
import itertools | |
import os | |
from typing import List, Optional | |
import torch | |
import numpy as np | |
import tempfile | |
from collections import OrderedDict | |
from PIL import Image | |
from tabulate import tabulate | |
import json | |
import contextlib | |
import detectron2.utils.comm as comm | |
from detectron2.utils.file_io import PathManager | |
from detectron2.data import MetadataCatalog | |
from detectron2.evaluation import COCOPanopticEvaluator | |
from detectron2.utils.visualizer import ColorMode, Visualizer | |
from detectron2.data import MetadataCatalog | |
import io | |
import math | |
from PIL import Image | |
from detectron2.solver.lr_scheduler import _get_warmup_factor_at_iter | |
import logging | |
logger = logging.getLogger(__name__) | |
class TF2WarmupPolyLR(torch.optim.lr_scheduler._LRScheduler): | |
""" | |
Poly learning rate schedule used in TF DeepLab2. | |
Reference: https://github.com/google-research/deeplab2/blob/main/trainer/trainer_utils.py#L23 | |
""" | |
def __init__( | |
self, | |
optimizer: torch.optim.Optimizer, | |
max_iters: int, | |
warmup_factor: float = 0.001, | |
warmup_iters: int = 1000, | |
warmup_method: str = "linear", | |
last_epoch: int = -1, | |
power: float = 0.9, | |
constant_ending: float = 0.0, | |
): | |
self.max_iters = max_iters | |
self.warmup_factor = warmup_factor | |
self.warmup_iters = warmup_iters | |
self.warmup_method = warmup_method | |
self.power = power | |
self.constant_ending = constant_ending | |
super().__init__(optimizer, last_epoch) | |
def get_lr(self) -> List[float]: | |
warmup_factor = _get_warmup_factor_at_iter( | |
self.warmup_method, self.last_epoch, self.warmup_iters, self.warmup_factor | |
) | |
if self.constant_ending > 0 and warmup_factor == 1.0: | |
# Constant ending lr. | |
if ( | |
math.pow((1.0 - self.last_epoch / self.max_iters), self.power) | |
< self.constant_ending | |
): | |
return [base_lr * self.constant_ending for base_lr in self.base_lrs] | |
if self.last_epoch < self.warmup_iters: | |
return [ | |
base_lr * warmup_factor | |
for base_lr in self.base_lrs | |
] | |
else: | |
return [ | |
base_lr * math.pow((1.0 - self.last_epoch / self.max_iters), self.power) | |
for base_lr in self.base_lrs | |
] | |
def _compute_values(self) -> List[float]: | |
# The new interface | |
return self.get_lr() | |
class COCOPanopticEvaluatorwithVis(COCOPanopticEvaluator): | |
""" | |
COCO Panoptic Evaluator that supports saving visualizations. | |
TODO(qihangyu): Note that original implementation will also write all predictions to a tmp folder | |
and then run official evaluation script, we may also check how to copy from the tmp folder for visualization. | |
""" | |
def __init__(self, dataset_name: str, output_dir: Optional[str] = None, save_vis_num=0): | |
super().__init__(dataset_name=dataset_name, output_dir=output_dir) | |
self.metadata = MetadataCatalog.get("coco_2017_val_panoptic_with_sem_seg") | |
self.output_dir = output_dir | |
self.save_vis_num = save_vis_num | |
def process(self, inputs, outputs): | |
from panopticapi.utils import id2rgb | |
cur_save_num = 0 | |
for input, output in zip(inputs, outputs): | |
panoptic_img, segments_info = output["panoptic_seg"] | |
panoptic_seg = panoptic_img.cpu() | |
panoptic_img = panoptic_seg.numpy() | |
file_name = os.path.basename(input["file_name"]) | |
file_name_png = os.path.splitext(file_name)[0] + ".png" | |
if cur_save_num < self.save_vis_num: | |
image = output["original_image"] | |
image = image.permute(1, 2 ,0).cpu().numpy()#[:, :, ::-1] | |
visualizer = Visualizer(image, self.metadata, instance_mode=ColorMode.IMAGE) | |
vis_output = visualizer.draw_panoptic_seg_predictions( | |
panoptic_seg, segments_info | |
) | |
if not os.path.exists(os.path.join(self.output_dir, 'vis')): | |
os.makedirs(os.path.join(self.output_dir, 'vis')) | |
out_filename = os.path.join(self.output_dir, 'vis', file_name_png) | |
vis_output.save(out_filename) | |
cur_save_num += 1 | |
if segments_info is None: | |
# If "segments_info" is None, we assume "panoptic_img" is a | |
# H*W int32 image storing the panoptic_id in the format of | |
# category_id * label_divisor + instance_id. We reserve -1 for | |
# VOID label, and add 1 to panoptic_img since the official | |
# evaluation script uses 0 for VOID label. | |
label_divisor = self._metadata.label_divisor | |
segments_info = [] | |
for panoptic_label in np.unique(panoptic_img): | |
if panoptic_label == -1: | |
# VOID region. | |
continue | |
pred_class = panoptic_label // label_divisor | |
isthing = ( | |
pred_class in self._metadata.thing_dataset_id_to_contiguous_id.values() | |
) | |
segments_info.append( | |
{ | |
"id": int(panoptic_label) + 1, | |
"category_id": int(pred_class), | |
"isthing": bool(isthing), | |
} | |
) | |
# Official evaluation script uses 0 for VOID label. | |
panoptic_img += 1 | |
with io.BytesIO() as out: | |
Image.fromarray(id2rgb(panoptic_img)).save(out, format="PNG") | |
segments_info = [self._convert_category_id(x) for x in segments_info] | |
self._predictions.append( | |
{ | |
"image_id": input["image_id"], | |
"file_name": file_name_png, | |
"png_string": out.getvalue(), | |
"segments_info": segments_info, | |
} | |
) | |
def evaluate(self): | |
comm.synchronize() | |
self._predictions = comm.gather(self._predictions) | |
self._predictions = list(itertools.chain(*self._predictions)) | |
if not comm.is_main_process(): | |
return | |
# PanopticApi requires local files | |
gt_json = PathManager.get_local_path(self._metadata.panoptic_json) | |
gt_folder = PathManager.get_local_path(self._metadata.panoptic_root) | |
with tempfile.TemporaryDirectory(prefix="panoptic_eval") as pred_dir: | |
logger.info("Writing all panoptic predictions to {} ...".format(pred_dir)) | |
for p in self._predictions: | |
with open(os.path.join(pred_dir, p["file_name"]), "wb") as f: | |
f.write(p.pop("png_string")) | |
with open(gt_json, "r") as f: | |
json_data = json.load(f) | |
json_data["annotations"] = self._predictions | |
output_dir = self._output_dir or pred_dir | |
predictions_json = os.path.join(output_dir, "predictions.json") | |
with PathManager.open(predictions_json, "w") as f: | |
f.write(json.dumps(json_data)) | |
from kmax_deeplab.evaluation.panoptic_evaluation import pq_compute | |
with contextlib.redirect_stdout(io.StringIO()): | |
pq_res = pq_compute( | |
gt_json, | |
PathManager.get_local_path(predictions_json), | |
gt_folder=gt_folder, | |
pred_folder=pred_dir, | |
) | |
res = {} | |
res["PQ"] = 100 * pq_res["All"]["pq"] | |
res["SQ"] = 100 * pq_res["All"]["sq"] | |
res["RQ"] = 100 * pq_res["All"]["rq"] | |
res["PQ_th"] = 100 * pq_res["Things"]["pq"] | |
res["SQ_th"] = 100 * pq_res["Things"]["sq"] | |
res["RQ_th"] = 100 * pq_res["Things"]["rq"] | |
res["PQ_st"] = 100 * pq_res["Stuff"]["pq"] | |
res["SQ_st"] = 100 * pq_res["Stuff"]["sq"] | |
res["RQ_st"] = 100 * pq_res["Stuff"]["rq"] | |
results = OrderedDict({"panoptic_seg": res}) | |
_print_panoptic_results(pq_res) | |
return results | |
def _print_panoptic_results(pq_res): | |
headers = ["", "PQ", "SQ", "RQ", "#categories"] | |
data = [] | |
for name in ["All", "Things", "Stuff"]: | |
row = [name] + [pq_res[name][k] * 100 for k in ["pq", "sq", "rq"]] + [pq_res[name]["n"]] | |
data.append(row) | |
table = tabulate( | |
data, headers=headers, tablefmt="pipe", floatfmt=".3f", stralign="center", numalign="center" | |
) | |
logger.info("Panoptic Evaluation Results:\n" + table) |