# Audio_Transcription_Lib.py ######################################### # Transcription Library # This library is used to perform transcription of audio files. # Currently, uses faster_whisper for transcription. # #### import configparser #################### # Function List # # 1. convert_to_wav(video_file_path, offset=0, overwrite=False) # 2. speech_to_text(audio_file_path, selected_source_lang='en', whisper_model='small.en', vad_filter=False) # #################### # # Import necessary libraries to run solo for testing import json import logging import os import sys import subprocess import time # Import Local # ####################################################################################################################### # Function Definitions # # Convert video .m4a into .wav using ffmpeg # ffmpeg -i "example.mp4" -ar 16000 -ac 1 -c:a pcm_s16le "output.wav" # https://www.gyan.dev/ffmpeg/builds/ # # os.system(r'.\Bin\ffmpeg.exe -ss 00:00:00 -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"') def convert_to_wav(video_file_path, offset=0, overwrite=False): out_path = os.path.splitext(video_file_path)[0] + ".wav" if os.path.exists(out_path) and not overwrite: print(f"File '{out_path}' already exists. Skipping conversion.") logging.info(f"Skipping conversion as file already exists: {out_path}") return out_path print("Starting conversion process of .m4a to .WAV") out_path = os.path.splitext(video_file_path)[0] + ".wav" try: if os.name == "nt": logging.debug("ffmpeg being ran on windows") if sys.platform.startswith('win'): ffmpeg_cmd = ".\\Bin\\ffmpeg.exe" logging.debug(f"ffmpeg_cmd: {ffmpeg_cmd}") else: ffmpeg_cmd = 'ffmpeg' # Assume 'ffmpeg' is in PATH for non-Windows systems command = [ ffmpeg_cmd, # Assuming the working directory is correctly set where .\Bin exists "-ss", "00:00:00", # Start at the beginning of the video "-i", video_file_path, "-ar", "16000", # Audio sample rate "-ac", "1", # Number of audio channels "-c:a", "pcm_s16le", # Audio codec out_path ] try: # Redirect stdin from null device to prevent ffmpeg from waiting for input with open(os.devnull, 'rb') as null_file: result = subprocess.run(command, stdin=null_file, text=True, capture_output=True) if result.returncode == 0: logging.info("FFmpeg executed successfully") logging.debug("FFmpeg output: %s", result.stdout) else: logging.error("Error in running FFmpeg") logging.error("FFmpeg stderr: %s", result.stderr) raise RuntimeError(f"FFmpeg error: {result.stderr}") except Exception as e: logging.error("Error occurred - ffmpeg doesn't like windows") raise RuntimeError("ffmpeg failed") elif os.name == "posix": os.system(f'ffmpeg -ss 00:00:00 -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"') else: raise RuntimeError("Unsupported operating system") logging.info("Conversion to WAV completed: %s", out_path) except subprocess.CalledProcessError as e: logging.error("Error executing FFmpeg command: %s", str(e)) raise RuntimeError("Error converting video file to WAV") except Exception as e: logging.error("speech-to-text: Error transcribing audio: %s", str(e)) return {"error": str(e)} return out_path # Transcribe .wav into .segments.json def speech_to_text(audio_file_path, selected_source_lang='en', whisper_model='medium.en', vad_filter=False, diarize=False): logging.info('speech-to-text: Loading faster_whisper model: %s', whisper_model) from faster_whisper import WhisperModel # Retrieve processing choice from the configuration file config = configparser.ConfigParser() config.read('config.txt') processing_choice = config.get('Processing', 'processing_choice', fallback='cpu') model = WhisperModel(whisper_model, device=f"{processing_choice}") time_start = time.time() if audio_file_path is None: raise ValueError("speech-to-text: No audio file provided") logging.info("speech-to-text: Audio file path: %s", audio_file_path) try: _, file_ending = os.path.splitext(audio_file_path) out_file = audio_file_path.replace(file_ending, ".segments.json") prettified_out_file = audio_file_path.replace(file_ending, ".segments_pretty.json") if os.path.exists(out_file): logging.info("speech-to-text: Segments file already exists: %s", out_file) with open(out_file) as f: global segments segments = json.load(f) return segments logging.info('speech-to-text: Starting transcription...') options = dict(language=selected_source_lang, beam_size=5, best_of=5, vad_filter=vad_filter) transcribe_options = dict(task="transcribe", **options) segments_raw, info = model.transcribe(audio_file_path, **transcribe_options) segments = [] for segment_chunk in segments_raw: chunk = { "Time_Start": segment_chunk.start, "Time_End": segment_chunk.end, "Text": segment_chunk.text } logging.debug("Segment: %s", chunk) segments.append(chunk) if not segments: raise RuntimeError("No transcription produced. The audio file may be invalid or empty.") logging.info("speech-to-text: Transcription completed in %.2f seconds", time.time() - time_start) # Create a dictionary with the 'segments' key output_data = {'segments': segments} # Save prettified JSON logging.info("speech-to-text: Saving prettified JSON to %s", prettified_out_file) with open(prettified_out_file, 'w') as f: json.dump(output_data, f, indent=2) # Save non-prettified JSON logging.info("speech-to-text: Saving JSON to %s", out_file) with open(out_file, 'w') as f: json.dump(output_data, f) except Exception as e: logging.error("speech-to-text: Error transcribing audio: %s", str(e)) raise RuntimeError("speech-to-text: Error transcribing audio") return segments # # #######################################################################################################################