opdmulti-demo / app.py
atwang's picture
update app to improve error handling and allow for simultaneous usage
92d915f
raw
history blame
6.04 kB
import os
import re
import shutil
import time
from types import SimpleNamespace
from typing import Any
import gradio as gr
import numpy as np
from detectron2 import engine
from PIL import Image
from inference import main, setup_cfg
# internal settings
NUM_PROCESSES = 1
CROP = False
SCORE_THRESHOLD = 0.8
MAX_PARTS = 5
ARGS = SimpleNamespace(
config_file="configs/coco/instance-segmentation/swin/opd_v1_real.yaml",
model=".data/models/motion_state_pred_opdformerp_rgb.pth",
input_format="RGB",
output=".output",
cpu=True,
)
NUM_SAMPLES = 10
outputs = {}
def predict(rgb_image: str, depth_image: str, intrinsics: np.ndarray, num_samples: int) -> list[Any]:
global outputs
def find_images(path: str) -> dict[str, list[str]]:
"""Scrape folders for all generated gif files."""
images = {}
for file in os.listdir(path):
sub_path = os.path.join(path, file)
if os.path.isdir(sub_path):
images[file] = []
for image_file in sorted(os.listdir(sub_path)):
if re.match(r".*\.png$", image_file):
images[file].append(os.path.join(sub_path, image_file))
return images
# clear old predictions
os.makedirs(ARGS.output, exist_ok=True)
for path in os.listdir(ARGS.output):
full_path = os.path.join(ARGS.output, path)
if os.path.isdir(full_path):
shutil.rmtree(full_path)
else:
os.remove(full_path)
if not rgb_image:
gr.Error("You must provide an RGB image before running the model.")
return [None] * 5
if not depth_image:
gr.Error("You must provide a depth image before running the model.")
return [None] * 5
cfg = setup_cfg(ARGS)
engine.launch(
main,
NUM_PROCESSES,
args=(
cfg,
rgb_image,
depth_image,
intrinsics,
num_samples,
CROP,
SCORE_THRESHOLD,
),
)
# process output
# TODO: may want to select these in decreasing order of score
outputs[rgb_image] = []
image_files = find_images(ARGS.output)
for count, part in enumerate(image_files):
if count < MAX_PARTS:
outputs[rgb_image].append([Image.open(im) for im in image_files[part]])
return [
*[gr.update(value=out[0], visible=True) for out in outputs[rgb_image]],
*[gr.update(visible=False) for _ in range(MAX_PARTS - len(outputs))],
]
def get_trigger(idx: int, fps: int = 40, oscillate: bool = True):
def iter_images(rgb_image: str):
if not rgb_image or rgb_image not in outputs:
gr.Warning("You must upload an image and run the model before you can view the output.")
elif idx < len(outputs[rgb_image]):
for im in outputs[rgb_image][idx]:
time.sleep(1.0 / fps)
yield im
if oscillate:
for im in reversed(outputs[rgb_image][idx]):
time.sleep(1.0 / fps)
yield im
else:
gr.Error("Could not find any images to load into this module.")
return iter_images
def clear_outputs():
return [gr.update(value=None, visible=(idx == 0)) for idx in range(MAX_PARTS)]
with gr.Blocks() as demo:
gr.Markdown(
"""
# OPDMulti Demo
Upload an image to see its range of motion.
"""
)
# TODO: add gr.Examples
with gr.Row():
rgb_image = gr.Image(
image_mode="RGB", source="upload", type="filepath", label="RGB Image", show_label=True, interactive=True
)
depth_image = gr.Image(
image_mode="I;16", source="upload", type="filepath", label="Depth Image", show_label=True, interactive=True
)
intrinsics = gr.Dataframe(
value=[
[
214.85935872395834,
0.0,
125.90160319010417,
],
[
0.0,
214.85935872395834,
95.13726399739583,
],
[
0.0,
0.0,
1.0,
],
],
row_count=(3, "fixed"),
col_count=(3, "fixed"),
datatype="number",
type="numpy",
label="Intrinsics matrix",
show_label=True,
interactive=True,
)
num_samples = gr.Number(
value=NUM_SAMPLES,
label="Number of samples",
show_label=True,
interactive=True,
precision=0,
minimum=3,
maximum=20,
)
examples = gr.Examples(
examples=[
["examples/59-4860.png", "examples/59-4860_d.png"],
["examples/174-8460.png", "examples/174-8460_d.png"],
["examples/187-0.png", "examples/187-0_d.png"],
["examples/187-23040.png", "examples/187-23040_d.png"],
],
inputs=[rgb_image, depth_image],
api_name=False,
examples_per_page=2,
)
submit_btn = gr.Button("Run model")
explanation = gr.Markdown(value="# Output\nClick on an image to see an animation of the part motion.")
# TODO: do we want to set a maximum limit on how many parts we render? We could also show the number of components
# identified.
images = [
gr.Image(type="pil", label=f"Part {idx + 1}", show_download_button=False, visible=(idx == 0))
for idx in range(MAX_PARTS)
]
for idx, image_comp in enumerate(images):
image_comp.select(get_trigger(idx), inputs=rgb_image, outputs=image_comp, api_name=False)
# if user changes input, clear output images
rgb_image.change(clear_outputs, inputs=[], outputs=images, api_name=False)
depth_image.change(clear_outputs, inputs=[], outputs=images, api_name=False)
submit_btn.click(
fn=predict, inputs=[rgb_image, depth_image, intrinsics, num_samples], outputs=images, api_name=False
)
demo.queue(api_open=False)
demo.launch()