|
import gradio as gr |
|
import json |
|
from difflib import Differ |
|
import ffmpeg |
|
import os |
|
from pathlib import Path |
|
import time |
|
import aiohttp |
|
import asyncio |
|
|
|
|
|
|
|
API_BACKEND = True |
|
|
|
|
|
MODEL = "facebook/wav2vec2-base-960h" |
|
|
|
if API_BACKEND: |
|
from dotenv import load_dotenv |
|
import base64 |
|
import asyncio |
|
load_dotenv(Path(".env")) |
|
|
|
HF_TOKEN = os.environ["HF_TOKEN"] |
|
headers = {"Authorization": f"Bearer {HF_TOKEN}"} |
|
API_URL = f'https://api-inference.huggingface.co/models/{MODEL}' |
|
|
|
else: |
|
import torch |
|
from transformers import pipeline |
|
|
|
|
|
cuda = torch.device( |
|
'cuda:0') if torch.cuda.is_available() else torch.device('cpu') |
|
device = 0 if torch.cuda.is_available() else -1 |
|
speech_recognizer = pipeline( |
|
task="automatic-speech-recognition", |
|
model=f'{MODEL}', |
|
tokenizer=f'{MODEL}', |
|
framework="pt", |
|
device=device, |
|
) |
|
|
|
videos_out_path = Path("./videos_out") |
|
videos_out_path.mkdir(parents=True, exist_ok=True) |
|
|
|
samples_data = sorted(Path('examples').glob('*.json')) |
|
SAMPLES = [] |
|
for file in samples_data: |
|
with open(file) as f: |
|
sample = json.load(f) |
|
SAMPLES.append(sample) |
|
VIDEOS = list(map(lambda x: [x['video']], SAMPLES)) |
|
|
|
total_inferences_since_reboot = 415 |
|
total_cuts_since_reboot = 1539 |
|
|
|
|
|
async def speech_to_text(video_file_path): |
|
""" |
|
Takes a video path to convert to audio, transcribe audio channel to text and char timestamps |
|
|
|
Using https://huggingface.co/tasks/automatic-speech-recognition pipeline |
|
""" |
|
global total_inferences_since_reboot |
|
if (video_file_path == None): |
|
raise ValueError("Error no video input") |
|
|
|
video_path = Path(video_file_path) |
|
try: |
|
|
|
audio_memory, _ = ffmpeg.input(video_path).output( |
|
'-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) |
|
except Exception as e: |
|
raise RuntimeError("Error converting video to audio") |
|
|
|
ping("speech_to_text") |
|
last_time = time.time() |
|
if API_BACKEND: |
|
|
|
|
|
for i in range(10): |
|
for tries in range(4): |
|
print(f'Transcribing from API attempt {tries}') |
|
try: |
|
inference_reponse = await query_api(audio_memory) |
|
print(inference_reponse) |
|
transcription = inference_reponse["text"].lower() |
|
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] |
|
for chunk in inference_reponse['chunks']] |
|
|
|
total_inferences_since_reboot += 1 |
|
print("\n\ntotal_inferences_since_reboot: ", |
|
total_inferences_since_reboot, "\n\n") |
|
return (transcription, transcription, timestamps) |
|
except Exception as e: |
|
print(e) |
|
if 'error' in inference_reponse and 'estimated_time' in inference_reponse: |
|
wait_time = inference_reponse['estimated_time'] |
|
print("Waiting for model to load....", wait_time) |
|
|
|
|
|
await asyncio.sleep(wait_time + 5.0) |
|
elif 'error' in inference_reponse: |
|
raise RuntimeError("Error Fetching API", |
|
inference_reponse['error']) |
|
else: |
|
break |
|
else: |
|
raise RuntimeError(inference_reponse, "Error Fetching API") |
|
else: |
|
|
|
try: |
|
print(f'Transcribing via local model') |
|
output = speech_recognizer( |
|
audio_memory, return_timestamps="char", chunk_length_s=10, stride_length_s=(4, 2)) |
|
|
|
transcription = output["text"].lower() |
|
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0].tolist(), chunk["timestamp"][1].tolist()] |
|
for chunk in output['chunks']] |
|
total_inferences_since_reboot += 1 |
|
|
|
print("\n\ntotal_inferences_since_reboot: ", |
|
total_inferences_since_reboot, "\n\n") |
|
return (transcription, transcription, timestamps) |
|
except Exception as e: |
|
raise RuntimeError("Error Running inference with local model", e) |
|
|
|
|
|
async def cut_timestamps_to_video(video_in, transcription, text_in, timestamps): |
|
""" |
|
Given original video input, text transcript + timestamps, |
|
and edit ext cuts video segments into a single video |
|
""" |
|
global total_cuts_since_reboot |
|
|
|
video_path = Path(video_in) |
|
video_file_name = video_path.stem |
|
if (video_in == None or text_in == None or transcription == None): |
|
raise ValueError("Inputs undefined") |
|
|
|
d = Differ() |
|
|
|
diff_chars = d.compare(transcription, text_in) |
|
|
|
filtered = list(filter(lambda x: x[0] != '+', diff_chars)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
idx = 0 |
|
grouped = {} |
|
for (a, b) in zip(filtered, timestamps): |
|
if a[0] != '-': |
|
if idx in grouped: |
|
grouped[idx].append(b) |
|
else: |
|
grouped[idx] = [] |
|
grouped[idx].append(b) |
|
else: |
|
idx += 1 |
|
|
|
|
|
timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()] |
|
|
|
between_str = '+'.join( |
|
map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut)) |
|
|
|
if timestamps_to_cut: |
|
video_file = ffmpeg.input(video_in) |
|
video = video_file.video.filter( |
|
"select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB") |
|
audio = video_file.audio.filter( |
|
"aselect", f'({between_str})').filter("asetpts", "N/SR/TB") |
|
|
|
output_video = f'./videos_out/{video_file_name}.mp4' |
|
ffmpeg.concat(video, audio, v=1, a=1).output( |
|
output_video).overwrite_output().global_args('-loglevel', 'quiet').run() |
|
else: |
|
output_video = video_in |
|
|
|
tokens = [(token[2:], token[0] if token[0] != " " else None) |
|
for token in filtered] |
|
|
|
total_cuts_since_reboot += 1 |
|
ping("video_cuts") |
|
print("\n\ntotal_cuts_since_reboot: ", total_cuts_since_reboot, "\n\n") |
|
return (tokens, output_video) |
|
|
|
|
|
async def query_api(audio_bytes: bytes): |
|
""" |
|
Query for Huggingface Inference API for Automatic Speech Recognition task |
|
""" |
|
payload = json.dumps({ |
|
"inputs": base64.b64encode(audio_bytes).decode("utf-8"), |
|
"parameters": { |
|
"return_timestamps": "char", |
|
"chunk_length_s": 10, |
|
"stride_length_s": [4, 2] |
|
}, |
|
"options": {"use_gpu": False} |
|
}).encode("utf-8") |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(API_URL, headers=headers, data=payload) as response: |
|
print("API Response: ", response.status) |
|
if response.headers['Content-Type'] == 'application/json': |
|
return await response.json() |
|
elif response.headers['Content-Type'] == 'application/octet-stream': |
|
return await response.read() |
|
elif response.headers['Content-Type'] == 'text/plain': |
|
return await response.text() |
|
else: |
|
raise RuntimeError("Error Fetching API") |
|
|
|
|
|
def ping(name): |
|
url = f'https://huggingface.co/api/telemetry/spaces/radames/edit-video-by-editing-text/{name}' |
|
print("ping: ", url) |
|
|
|
async def req(): |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as response: |
|
print("pong: ", response.status) |
|
asyncio.create_task(req()) |
|
|
|
|
|
|
|
video_in = gr.Video(label="Video file", elem_id="video-container") |
|
text_in = gr.Textbox(label="Transcription", lines=10, interactive=True) |
|
video_out = gr.Video(label="Video Out") |
|
diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True) |
|
examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index") |
|
|
|
css = """ |
|
#cut_btn, #reset_btn { align-self:stretch; } |
|
#\\31 3 { max-width: 540px; } |
|
.output-markdown {max-width: 65ch !important;} |
|
#video-container{ |
|
max-width: 40rem; |
|
} |
|
""" |
|
with gr.Blocks(css=css) as demo: |
|
transcription_var = gr.State() |
|
timestamps_var = gr.State() |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown(""" |
|
# Edit Video By Editing Text |
|
This project is a quick proof of concept of a simple video editor where the edits |
|
are made by editing the audio transcription. |
|
Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition) |
|
with a fine tuned [Wav2Vec2 model using Connectionist Temporal Classification (CTC)](https://huggingface.co/facebook/wav2vec2-large-960h-lv60-self) |
|
you can predict not only the text transcription but also the [character or word base timestamps](https://huggingface.co/docs/transformers/v4.19.2/en/main_classes/pipelines#transformers.AutomaticSpeechRecognitionPipeline.__call__.return_timestamps) |
|
""") |
|
|
|
with gr.Row(): |
|
|
|
examples.render() |
|
|
|
def load_example(id): |
|
video = SAMPLES[id]['video'] |
|
transcription = SAMPLES[id]['transcription'].lower() |
|
timestamps = SAMPLES[id]['timestamps'] |
|
|
|
return (video, transcription, transcription, timestamps) |
|
|
|
examples.click( |
|
load_example, |
|
inputs=[examples], |
|
outputs=[video_in, text_in, transcription_var, timestamps_var], |
|
queue=False) |
|
with gr.Row(): |
|
with gr.Column(): |
|
video_in.render() |
|
transcribe_btn = gr.Button("Transcribe Audio") |
|
transcribe_btn.click(speech_to_text, [video_in], [ |
|
text_in, transcription_var, timestamps_var]) |
|
|
|
with gr.Row(): |
|
gr.Markdown(""" |
|
### Now edit as text |
|
After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
text_in.render() |
|
with gr.Row(): |
|
cut_btn = gr.Button("Cut to video", elem_id="cut_btn") |
|
|
|
cut_btn.click(cut_timestamps_to_video, [ |
|
video_in, transcription_var, text_in, timestamps_var], [diff_out, video_out]) |
|
|
|
reset_transcription = gr.Button( |
|
"Reset to last trascription", elem_id="reset_btn") |
|
reset_transcription.click( |
|
lambda x: x, transcription_var, text_in) |
|
with gr.Column(): |
|
video_out.render() |
|
diff_out.render() |
|
with gr.Row(): |
|
gr.Markdown(""" |
|
#### Video Credits |
|
|
|
1. [Cooking](https://vimeo.com/573792389) |
|
1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0) |
|
1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8) |
|
""") |
|
demo.queue() |
|
if __name__ == "__main__": |
|
demo.launch(debug=True) |
|
|