radames's picture
Update app.py
2918353
import gradio as gr
import json
from difflib import Differ
import ffmpeg
import os
from pathlib import Path
import time
import aiohttp
import asyncio
# Set true if you're using huggingface inference API API https://huggingface.co/inference-api
API_BACKEND = True
# MODEL = 'facebook/wav2vec2-large-960h-lv60-self'
# MODEL = "facebook/wav2vec2-large-960h"
MODEL = "facebook/wav2vec2-base-960h"
# MODEL = "patrickvonplaten/wav2vec2-large-960h-lv60-self-4-gram"
if API_BACKEND:
from dotenv import load_dotenv
import base64
import asyncio
load_dotenv(Path(".env"))
HF_TOKEN = os.environ["HF_TOKEN"]
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
API_URL = f'https://api-inference.huggingface.co/models/{MODEL}'
else:
import torch
from transformers import pipeline
# is cuda available?
cuda = torch.device(
'cuda:0') if torch.cuda.is_available() else torch.device('cpu')
device = 0 if torch.cuda.is_available() else -1
speech_recognizer = pipeline(
task="automatic-speech-recognition",
model=f'{MODEL}',
tokenizer=f'{MODEL}',
framework="pt",
device=device,
)
videos_out_path = Path("./videos_out")
videos_out_path.mkdir(parents=True, exist_ok=True)
samples_data = sorted(Path('examples').glob('*.json'))
SAMPLES = []
for file in samples_data:
with open(file) as f:
sample = json.load(f)
SAMPLES.append(sample)
VIDEOS = list(map(lambda x: [x['video']], SAMPLES))
total_inferences_since_reboot = 415
total_cuts_since_reboot = 1539
async def speech_to_text(video_file_path):
"""
Takes a video path to convert to audio, transcribe audio channel to text and char timestamps
Using https://huggingface.co/tasks/automatic-speech-recognition pipeline
"""
global total_inferences_since_reboot
if (video_file_path == None):
raise ValueError("Error no video input")
video_path = Path(video_file_path)
try:
# convert video to audio 16k using PIPE to audio_memory
audio_memory, _ = ffmpeg.input(video_path).output(
'-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True)
except Exception as e:
raise RuntimeError("Error converting video to audio")
ping("speech_to_text")
last_time = time.time()
if API_BACKEND:
# Using Inference API https://huggingface.co/inference-api
# try twice, because the model must be loaded
for i in range(10):
for tries in range(4):
print(f'Transcribing from API attempt {tries}')
try:
inference_reponse = await query_api(audio_memory)
print(inference_reponse)
transcription = inference_reponse["text"].lower()
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]]
for chunk in inference_reponse['chunks']]
total_inferences_since_reboot += 1
print("\n\ntotal_inferences_since_reboot: ",
total_inferences_since_reboot, "\n\n")
return (transcription, transcription, timestamps)
except Exception as e:
print(e)
if 'error' in inference_reponse and 'estimated_time' in inference_reponse:
wait_time = inference_reponse['estimated_time']
print("Waiting for model to load....", wait_time)
# wait for loading model
# 5 seconds plus for certanty
await asyncio.sleep(wait_time + 5.0)
elif 'error' in inference_reponse:
raise RuntimeError("Error Fetching API",
inference_reponse['error'])
else:
break
else:
raise RuntimeError(inference_reponse, "Error Fetching API")
else:
try:
print(f'Transcribing via local model')
output = speech_recognizer(
audio_memory, return_timestamps="char", chunk_length_s=10, stride_length_s=(4, 2))
transcription = output["text"].lower()
timestamps = [[chunk["text"].lower(), chunk["timestamp"][0].tolist(), chunk["timestamp"][1].tolist()]
for chunk in output['chunks']]
total_inferences_since_reboot += 1
print("\n\ntotal_inferences_since_reboot: ",
total_inferences_since_reboot, "\n\n")
return (transcription, transcription, timestamps)
except Exception as e:
raise RuntimeError("Error Running inference with local model", e)
async def cut_timestamps_to_video(video_in, transcription, text_in, timestamps):
"""
Given original video input, text transcript + timestamps,
and edit ext cuts video segments into a single video
"""
global total_cuts_since_reboot
video_path = Path(video_in)
video_file_name = video_path.stem
if (video_in == None or text_in == None or transcription == None):
raise ValueError("Inputs undefined")
d = Differ()
# compare original transcription with edit text
diff_chars = d.compare(transcription, text_in)
# remove all text aditions from diff
filtered = list(filter(lambda x: x[0] != '+', diff_chars))
# filter timestamps to be removed
# timestamps_to_cut = [b for (a,b) in zip(filtered, timestamps_var) if a[0]== '-' ]
# return diff tokes and cutted video!!
# groupping character timestamps so there are less cuts
idx = 0
grouped = {}
for (a, b) in zip(filtered, timestamps):
if a[0] != '-':
if idx in grouped:
grouped[idx].append(b)
else:
grouped[idx] = []
grouped[idx].append(b)
else:
idx += 1
# after grouping, gets the lower and upter start and time for each group
timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()]
between_str = '+'.join(
map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut))
if timestamps_to_cut:
video_file = ffmpeg.input(video_in)
video = video_file.video.filter(
"select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB")
audio = video_file.audio.filter(
"aselect", f'({between_str})').filter("asetpts", "N/SR/TB")
output_video = f'./videos_out/{video_file_name}.mp4'
ffmpeg.concat(video, audio, v=1, a=1).output(
output_video).overwrite_output().global_args('-loglevel', 'quiet').run()
else:
output_video = video_in
tokens = [(token[2:], token[0] if token[0] != " " else None)
for token in filtered]
total_cuts_since_reboot += 1
ping("video_cuts")
print("\n\ntotal_cuts_since_reboot: ", total_cuts_since_reboot, "\n\n")
return (tokens, output_video)
async def query_api(audio_bytes: bytes):
"""
Query for Huggingface Inference API for Automatic Speech Recognition task
"""
payload = json.dumps({
"inputs": base64.b64encode(audio_bytes).decode("utf-8"),
"parameters": {
"return_timestamps": "char",
"chunk_length_s": 10,
"stride_length_s": [4, 2]
},
"options": {"use_gpu": False}
}).encode("utf-8")
async with aiohttp.ClientSession() as session:
async with session.post(API_URL, headers=headers, data=payload) as response:
print("API Response: ", response.status)
if response.headers['Content-Type'] == 'application/json':
return await response.json()
elif response.headers['Content-Type'] == 'application/octet-stream':
return await response.read()
elif response.headers['Content-Type'] == 'text/plain':
return await response.text()
else:
raise RuntimeError("Error Fetching API")
def ping(name):
url = f'https://huggingface.co/api/telemetry/spaces/radames/edit-video-by-editing-text/{name}'
print("ping: ", url)
async def req():
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print("pong: ", response.status)
asyncio.create_task(req())
# ---- Gradio Layout -----
video_in = gr.Video(label="Video file", elem_id="video-container")
text_in = gr.Textbox(label="Transcription", lines=10, interactive=True)
video_out = gr.Video(label="Video Out")
diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True)
examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index")
css = """
#cut_btn, #reset_btn { align-self:stretch; }
#\\31 3 { max-width: 540px; }
.output-markdown {max-width: 65ch !important;}
#video-container{
max-width: 40rem;
}
"""
with gr.Blocks(css=css) as demo:
transcription_var = gr.State()
timestamps_var = gr.State()
with gr.Row():
with gr.Column():
gr.Markdown("""
# Edit Video By Editing Text
This project is a quick proof of concept of a simple video editor where the edits
are made by editing the audio transcription.
Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition)
with a fine tuned [Wav2Vec2 model using Connectionist Temporal Classification (CTC)](https://huggingface.co/facebook/wav2vec2-large-960h-lv60-self)
you can predict not only the text transcription but also the [character or word base timestamps](https://huggingface.co/docs/transformers/v4.19.2/en/main_classes/pipelines#transformers.AutomaticSpeechRecognitionPipeline.__call__.return_timestamps)
""")
with gr.Row():
examples.render()
def load_example(id):
video = SAMPLES[id]['video']
transcription = SAMPLES[id]['transcription'].lower()
timestamps = SAMPLES[id]['timestamps']
return (video, transcription, transcription, timestamps)
examples.click(
load_example,
inputs=[examples],
outputs=[video_in, text_in, transcription_var, timestamps_var],
queue=False)
with gr.Row():
with gr.Column():
video_in.render()
transcribe_btn = gr.Button("Transcribe Audio")
transcribe_btn.click(speech_to_text, [video_in], [
text_in, transcription_var, timestamps_var])
with gr.Row():
gr.Markdown("""
### Now edit as text
After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""")
with gr.Row():
with gr.Column():
text_in.render()
with gr.Row():
cut_btn = gr.Button("Cut to video", elem_id="cut_btn")
# send audio path and hidden variables
cut_btn.click(cut_timestamps_to_video, [
video_in, transcription_var, text_in, timestamps_var], [diff_out, video_out])
reset_transcription = gr.Button(
"Reset to last trascription", elem_id="reset_btn")
reset_transcription.click(
lambda x: x, transcription_var, text_in)
with gr.Column():
video_out.render()
diff_out.render()
with gr.Row():
gr.Markdown("""
#### Video Credits
1. [Cooking](https://vimeo.com/573792389)
1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0)
1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8)
""")
demo.queue()
if __name__ == "__main__":
demo.launch(debug=True)