Spaces:
Running
on
Zero
Running
on
Zero
import face_detection | |
import numpy as np | |
import cv2 | |
from tqdm import tqdm | |
import torch | |
import glob | |
import os | |
from natsort import natsorted | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
def get_squre_coords(coords, image, size=None, last_size=None): | |
y1, y2, x1, x2 = coords | |
w, h = x2 - x1, y2 - y1 | |
center = (x1 + w // 2, y1 + h // 2) | |
if size is None: | |
size = (w + h) // 2 | |
if last_size is not None: | |
size = (w + h) // 2 | |
size = (size - last_size) // 5 + last_size | |
x1, y1 = center[0] - size // 2, center[1] - size // 2 | |
x2, y2 = x1 + size, y1 + size | |
return size, [y1, y2, x1, x2] | |
def get_smoothened_boxes(boxes, T): | |
for i in range(len(boxes)): | |
if i + T > len(boxes): | |
window = boxes[len(boxes) - T :] | |
else: | |
window = boxes[i : i + T] | |
boxes[i] = np.mean(window, axis=0) | |
return boxes | |
def face_detect(images, pads): | |
detector = face_detection.FaceAlignment(face_detection.LandmarksType._2D, flip_input=False, device=device) | |
batch_size = 32 if device == "cuda" else 4 | |
print("face detect batch size:", batch_size) | |
while 1: | |
predictions = [] | |
try: | |
for i in tqdm(range(0, len(images), batch_size)): | |
predictions.extend(detector.get_detections_for_batch(np.array(images[i : i + batch_size]))) | |
except RuntimeError: | |
if batch_size == 1: | |
raise RuntimeError("Image too big to run face detection on GPU. Please use the --resize_factor argument") | |
batch_size //= 2 | |
print("Recovering from OOM error; New batch size: {}".format(batch_size)) | |
continue | |
break | |
results = [] | |
pady1, pady2, padx1, padx2 = pads | |
for rect, image in zip(predictions, images): | |
if rect is None: | |
cv2.imwrite(".temp/faulty_frame.jpg", image) # check this frame where the face was not detected. | |
raise ValueError("Face not detected! Ensure the video contains a face in all the frames.") | |
y1 = max(0, rect[1] - pady1) | |
y2 = min(image.shape[0], rect[3] + pady2) | |
x1 = max(0, rect[0] - padx1) | |
x2 = min(image.shape[1], rect[2] + padx2) | |
# y_gap, x_gap = ((y2 - y1) * 2) // 3, ((x2 - x1) * 2) // 3 | |
y_gap, x_gap = (y2 - y1) // 2, (x2 - x1) // 2 | |
coords_ = [y1 - y_gap, y2 + y_gap, x1 - x_gap, x2 + x_gap] | |
_, coords = get_squre_coords(coords_, image) | |
y1, y2, x1, x2 = coords | |
y1 = max(0, y1) | |
y2 = min(image.shape[0], y2) | |
x1 = max(0, x1) | |
x2 = min(image.shape[1], x2) | |
results.append([x1, y1, x2, y2]) | |
print("Number of frames cropped: {}".format(len(results))) | |
print("First coords: {}".format(results[0])) | |
boxes = np.array(results) | |
boxes = get_smoothened_boxes(boxes, T=25) | |
# results = [[image[y1:y2, x1:x2], (y1, y2, x1, x2)] for image, (x1, y1, x2, y2) in zip(images, boxes)] | |
del detector | |
return boxes | |
def add_black(imgs): | |
for i in range(len(imgs)): | |
imgs[i] = cv2.vconcat([np.zeros((100, imgs[i].shape[1], 3), dtype=np.uint8), imgs[i], np.zeros((20, imgs[i].shape[1], 3), dtype=np.uint8)]) | |
return imgs | |
def preprocess(video_dir="./assets/videos", save_dir="./assets/coords"): | |
all_videos = natsorted(glob.glob(os.path.join(video_dir, "*.mp4"))) | |
for video_path in all_videos: | |
video_stream = cv2.VideoCapture(video_path) | |
# print('Reading video frames...') | |
full_frames = [] | |
while 1: | |
still_reading, frame = video_stream.read() | |
if not still_reading: | |
video_stream.release() | |
break | |
full_frames.append(frame) | |
print("Number of frames available for inference: " + str(len(full_frames))) | |
full_frames = add_black(full_frames) | |
# print('Face detection running...') | |
coords = face_detect(full_frames, pads=(0, 0, 0, 0)) | |
np.savez_compressed(os.path.join(save_dir, os.path.basename(video_path).split(".")[0]), coords=coords) | |
def load_from_npz(video_name, save_dir="./assets/coords"): | |
npz = np.load(os.path.join(save_dir, video_name + ".npz")) | |
return npz["coords"] | |
if __name__ == "__main__": | |
preprocess() | |