import gradio as gr import json from difflib import Differ import ffmpeg import os from pathlib import Path import time import aiohttp import asyncio # Set true if you're using huggingface inference API API https://huggingface.co/inference-api API_BACKEND = True # MODEL = 'facebook/wav2vec2-large-960h-lv60-self' # MODEL = "facebook/wav2vec2-large-960h" MODEL = "facebook/wav2vec2-base-960h" # MODEL = "patrickvonplaten/wav2vec2-large-960h-lv60-self-4-gram" if API_BACKEND: from dotenv import load_dotenv import base64 import asyncio load_dotenv(Path(".env")) HF_TOKEN = os.environ["HF_TOKEN"] headers = {"Authorization": f"Bearer {HF_TOKEN}"} API_URL = f'https://api-inference.huggingface.co/models/{MODEL}' else: import torch from transformers import pipeline # is cuda available? cuda = torch.device( 'cuda:0') if torch.cuda.is_available() else torch.device('cpu') device = 0 if torch.cuda.is_available() else -1 speech_recognizer = pipeline( task="automatic-speech-recognition", model=f'{MODEL}', tokenizer=f'{MODEL}', framework="pt", device=device, ) videos_out_path = Path("./videos_out") videos_out_path.mkdir(parents=True, exist_ok=True) samples_data = sorted(Path('examples').glob('*.json')) SAMPLES = [] for file in samples_data: with open(file) as f: sample = json.load(f) SAMPLES.append(sample) VIDEOS = list(map(lambda x: [x['video']], SAMPLES)) total_inferences_since_reboot = 415 total_cuts_since_reboot = 1539 async def speech_to_text(video_file_path): """ Takes a video path to convert to audio, transcribe audio channel to text and char timestamps Using https://huggingface.co/tasks/automatic-speech-recognition pipeline """ global total_inferences_since_reboot if (video_file_path == None): raise ValueError("Error no video input") video_path = Path(video_file_path) try: # convert video to audio 16k using PIPE to audio_memory audio_memory, _ = ffmpeg.input(video_path).output( '-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) except Exception as e: raise RuntimeError("Error converting video to audio") ping("speech_to_text") last_time = time.time() if API_BACKEND: # Using Inference API https://huggingface.co/inference-api # try twice, because the model must be loaded for i in range(10): for tries in range(4): print(f'Transcribing from API attempt {tries}') try: inference_reponse = await query_api(audio_memory) print(inference_reponse) transcription = inference_reponse["text"].lower() timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] for chunk in inference_reponse['chunks']] total_inferences_since_reboot += 1 print("\n\ntotal_inferences_since_reboot: ", total_inferences_since_reboot, "\n\n") return (transcription, transcription, timestamps) except Exception as e: print(e) if 'error' in inference_reponse and 'estimated_time' in inference_reponse: wait_time = inference_reponse['estimated_time'] print("Waiting for model to load....", wait_time) # wait for loading model # 5 seconds plus for certanty await asyncio.sleep(wait_time + 5.0) elif 'error' in inference_reponse: raise RuntimeError("Error Fetching API", inference_reponse['error']) else: break else: raise RuntimeError(inference_reponse, "Error Fetching API") else: try: print(f'Transcribing via local model') output = speech_recognizer( audio_memory, return_timestamps="char", chunk_length_s=10, stride_length_s=(4, 2)) transcription = output["text"].lower() timestamps = [[chunk["text"].lower(), chunk["timestamp"][0].tolist(), chunk["timestamp"][1].tolist()] for chunk in output['chunks']] total_inferences_since_reboot += 1 print("\n\ntotal_inferences_since_reboot: ", total_inferences_since_reboot, "\n\n") return (transcription, transcription, timestamps) except Exception as e: raise RuntimeError("Error Running inference with local model", e) async def cut_timestamps_to_video(video_in, transcription, text_in, timestamps): """ Given original video input, text transcript + timestamps, and edit ext cuts video segments into a single video """ global total_cuts_since_reboot video_path = Path(video_in) video_file_name = video_path.stem if (video_in == None or text_in == None or transcription == None): raise ValueError("Inputs undefined") d = Differ() # compare original transcription with edit text diff_chars = d.compare(transcription, text_in) # remove all text aditions from diff filtered = list(filter(lambda x: x[0] != '+', diff_chars)) # filter timestamps to be removed # timestamps_to_cut = [b for (a,b) in zip(filtered, timestamps_var) if a[0]== '-' ] # return diff tokes and cutted video!! # groupping character timestamps so there are less cuts idx = 0 grouped = {} for (a, b) in zip(filtered, timestamps): if a[0] != '-': if idx in grouped: grouped[idx].append(b) else: grouped[idx] = [] grouped[idx].append(b) else: idx += 1 # after grouping, gets the lower and upter start and time for each group timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()] between_str = '+'.join( map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut)) if timestamps_to_cut: video_file = ffmpeg.input(video_in) video = video_file.video.filter( "select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB") audio = video_file.audio.filter( "aselect", f'({between_str})').filter("asetpts", "N/SR/TB") output_video = f'./videos_out/{video_file_name}.mp4' ffmpeg.concat(video, audio, v=1, a=1).output( output_video).overwrite_output().global_args('-loglevel', 'quiet').run() else: output_video = video_in tokens = [(token[2:], token[0] if token[0] != " " else None) for token in filtered] total_cuts_since_reboot += 1 ping("video_cuts") print("\n\ntotal_cuts_since_reboot: ", total_cuts_since_reboot, "\n\n") return (tokens, output_video) async def query_api(audio_bytes: bytes): """ Query for Huggingface Inference API for Automatic Speech Recognition task """ payload = json.dumps({ "inputs": base64.b64encode(audio_bytes).decode("utf-8"), "parameters": { "return_timestamps": "char", "chunk_length_s": 10, "stride_length_s": [4, 2] }, "options": {"use_gpu": False} }).encode("utf-8") async with aiohttp.ClientSession() as session: async with session.post(API_URL, headers=headers, data=payload) as response: print("API Response: ", response.status) if response.headers['Content-Type'] == 'application/json': return await response.json() elif response.headers['Content-Type'] == 'application/octet-stream': return await response.read() elif response.headers['Content-Type'] == 'text/plain': return await response.text() else: raise RuntimeError("Error Fetching API") def ping(name): url = f'https://huggingface.co/api/telemetry/spaces/radames/edit-video-by-editing-text/{name}' print("ping: ", url) async def req(): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print("pong: ", response.status) asyncio.create_task(req()) # ---- Gradio Layout ----- video_in = gr.Video(label="Video file") text_in = gr.Textbox(label="Transcription", lines=10, interactive=True) video_out = gr.Video(label="Video Out") diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True) examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index") css = """ #cut_btn, #reset_btn { align-self:stretch; } #\\31 3 { max-width: 540px; } .output-markdown {max-width: 65ch !important;} """ with gr.Blocks(css=css) as demo: transcription_var = gr.State() timestamps_var = gr.State() with gr.Row(): with gr.Column(): gr.Markdown(""" # Edit Video By Editing Text This project is a quick proof of concept of a simple video editor where the edits are made by editing the audio transcription. Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition) with a fine tuned [Wav2Vec2 model using Connectionist Temporal Classification (CTC)](https://huggingface.co/facebook/wav2vec2-large-960h-lv60-self) you can predict not only the text transcription but also the [character or word base timestamps](https://huggingface.co/docs/transformers/v4.19.2/en/main_classes/pipelines#transformers.AutomaticSpeechRecognitionPipeline.__call__.return_timestamps) """) with gr.Row(): examples.render() def load_example(id): video = SAMPLES[id]['video'] transcription = SAMPLES[id]['transcription'].lower() timestamps = SAMPLES[id]['timestamps'] return (video, transcription, transcription, timestamps) examples.click( load_example, inputs=[examples], outputs=[video_in, text_in, transcription_var, timestamps_var], queue=False) with gr.Row(): with gr.Column(): video_in.render() transcribe_btn = gr.Button("Transcribe Audio") transcribe_btn.click(speech_to_text, [video_in], [ text_in, transcription_var, timestamps_var]) with gr.Row(): gr.Markdown(""" ### Now edit as text After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""") with gr.Row(): with gr.Column(): text_in.render() with gr.Row(): cut_btn = gr.Button("Cut to video", elem_id="cut_btn") # send audio path and hidden variables cut_btn.click(cut_timestamps_to_video, [ video_in, transcription_var, text_in, timestamps_var], [diff_out, video_out]) reset_transcription = gr.Button( "Reset to last trascription", elem_id="reset_btn") reset_transcription.click( lambda x: x, transcription_var, text_in) with gr.Column(): video_out.render() diff_out.render() with gr.Row(): gr.Markdown(""" #### Video Credits 1. [Cooking](https://vimeo.com/573792389) 1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0) 1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8) """) demo.queue() if __name__ == "__main__": demo.launch(debug=True)