import gradio as gr from download import check_download, download, download_generator import anvil.media import os import pathlib from pathlib import Path from shutil import rmtree import dotenv dotenv.load_dotenv() anvil.server.connect(os.environ.get('ANVIL_UPLINK_KEY')) queue_placeholder = None url_input = gr.Textbox(label="Youtube/Twitter/etc video URL (supports many services)", value='https://twitter.com/starsonxh/status/1552945347194142720', lines=1, elem_id="url_input") download_status = gr.Textbox(label="Status:", value='', lines=1, elem_id="download_status") init_video = gr.Video(label="Downloaded video", visible=False) init_audio = gr.Audio(label="Downloaded audio", visible=False) output_text = gr.Textbox(label="Output text", lines=5, visible=False, max_lines=10, interactive=True) sub_video = gr.Video(label="Subbed video", visible=False) @anvil.server.callable def cleanup_output_dir(): #make sure we're in the main directory os.chdir(pathlib.Path(__file__).parent.absolute()) #delete the output directory contents for path in Path("output").glob("**/*"): if path.is_file(): path.unlink() elif path.is_dir(): rmtree(path) @anvil.server.callable def download_api(url): print(f'Request from Anvil with URL {url}') final_response = '' #TODO: figure out how to push an incoming event to the queue #THIS DOESN'T WORK queue_placeholder.push_event('download', url) #TODO: handle errors for response in download_generator(url): final_response = response['message'] print(final_response) return final_response def predownload(url): for response in download_generator(url): updates_object = {} updates_object[download_status] = response.get('message', '') meta = response.get('meta') if 'video' in response: updates_object[init_video] = gr.update(visible=True, value=response["video"], label=f"Init Video: {meta['id']}.{meta['ext']}") updates_object[init_audio] = gr.update(visible=True, value=response["audio"], label=f"Extracted audio : {meta['id']}.mp3") if 'whisper_result' in response: updates_object[output_text] = gr.update(value=response['whisper_result'].get('srt'), visible=True, label=f"Subtitles translated from {response['whisper_result'].get('language')} (detected language)") if 'sub_video' in response: updates_object[sub_video] = gr.update(visible=True, value=response["sub_video"], label=f"Subbed video: {meta['id']}_translated.mp4") yield updates_object def rebake(output_text, sub_video): raise NotImplementedError("Rebake not implemented yet") subtitled_video = False css = """ #submit{ position: absolute; flex:0 !important; width: 120px; right: 13px; top: 40px; } #url_input{ font-size: 40px !important; } #download_status{ font-size: 40px !important; } #input_row{ position: relative; } .gradio-interface #submit{ } """ with gr.Blocks(css=css+"") as demo: gr.Markdown('# Vid Translator 0.1 - get english subtitles for videos in any language') gr.Markdown('### Link to a tweet, youtube or other video and get a translated video with @openAi #whisper, built by [@altryne](https://twitter.com/altryne/)') gr.Markdown('### This is used as the backend for [@vidtranslator](https://twitter.com/vidtranslator/)') with gr.Row(elem_id="input_row"): with gr.Group() as group: url_input.render() greet_btn = gr.Button("Translate", elem_id='submit', variant='primary') pause_for_editing = gr.Checkbox(label="Pause for editing") drowpdown = gr.PlayableVideo with gr.Row(): with gr.Column(): download_status.render() init_video.render() init_audio.render() with gr.Column(): with gr.Group(): sub_video.render() output_text.render() gr.Button("Download srt file") rebake = gr.Button("Edit subtitles on video") greet_btn.click(fn=predownload, inputs=[url_input], outputs=[download_status, init_video, init_audio, output_text, sub_video], api_name='predownload') url_input.submit(fn=predownload, inputs=[url_input], outputs=[download_status, init_video, init_audio, output_text, sub_video]) rebake.click(fn=rebake, inputs=[output_text, sub_video], outputs=[download_status, output_text, sub_video]) api_button = gr.Button("API", variant='primary', visible=False).click(fn=cleanup_output_dir, inputs=[], outputs=[], api_name='cleanup_output_dir') queue_placeholder = demo.queue() if __name__ == "__main__": demo.launch(show_error=True, debug=True)