from flask import Flask, request, render_template, redirect, url_for import os import requests from moviepy.editor import VideoFileClip from pydub import AudioSegment from pydub.utils import make_chunks import time app = Flask(__name__) # Configure the maximum content length for uploads (500 MB) app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 500 # 500 MB limit # Hugging Face API endpoint API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large" # Read the API token from the environment variable API_TOKEN = os.getenv('HF_API_TOKEN') @app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_video(): if 'video' not in request.files: return redirect(url_for('index')) video_file = request.files['video'] if video_file.filename == '': return redirect(url_for('index')) # Save the video file video_path = os.path.join('uploads', video_file.filename) video_file.save(video_path) try: # Extract audio from the video audio_path = extract_audio(video_path) # Split and transcribe the audio transcript = split_and_transcribe_audio(audio_path) except Exception as e: return f"Error: {e}" return render_template('result.html', transcript=transcript) def extract_audio(video_path): audio_path = os.path.splitext(video_path)[0] + ".wav" try: # Use a temporary file to reduce the load on memory with VideoFileClip(video_path) as video: video.audio.write_audiofile(audio_path) except Exception as e: raise RuntimeError(f"Error extracting audio: {e}") return audio_path def transcribe_audio_chunk(audio_chunk_path, retries=5, wait_time=60): headers = {"Authorization": f"Bearer {API_TOKEN}"} for attempt in range(retries): try: with open(audio_chunk_path, "rb") as audio_chunk: response = requests.post(API_URL, headers=headers, files={"file": audio_chunk}) if response.status_code == 200: result = response.json() return result.get("text", "") elif response.status_code == 503 and 'estimated_time' in response.json(): # If the model is loading, wait and retry wait = response.json()['estimated_time'] time.sleep(wait) else: response.raise_for_status() except Exception as e: if attempt == retries - 1: raise RuntimeError(f"Error during transcription after {retries} attempts: {e}") else: time.sleep(wait_time) return "" def split_and_transcribe_audio(audio_path): if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found at {audio_path}") try: audio = AudioSegment.from_wav(audio_path) chunk_length_ms = 15000 # Split audio into 15-second chunks chunks = make_chunks(audio, chunk_length_ms) transcript = "" for i, chunk in enumerate(chunks): chunk_path = f"{audio_path[:-4]}_chunk{i}.wav" chunk.export(chunk_path, format="wav") transcript += transcribe_audio_chunk(chunk_path) transcript += " " # Add space between chunks' transcriptions return transcript.strip() except Exception as e: raise RuntimeError(f"Error during transcription: {e}") if __name__ == '__main__': app.run(debug=False, host='0.0.0.0', port=7860)