legen / legen.py
RafaG's picture
Upload 24 files
5fa5566 verified
raw
history blame
19.8 kB
import argparse
import os
import subprocess
import time
from inspect import currentframe, getframeinfo
from pathlib import Path
import ffmpeg_utils
import file_utils
import translate_utils
from utils import time_task, audio_extensions, video_extensions, check_other_extensions
version = "v0.16"
# Terminal colors
default = "\033[1;0m"
gray = "\033[1;37m"
wblue = "\033[1;36m"
blue = "\033[1;34m"
yellow = "\033[1;33m"
green = "\033[1;32m"
red = "\033[1;31m"
print(f"""
{blue}888 {gray} .d8888b.
{blue}888 {gray}d88P Y88b
{blue}888 {gray}888 888
{blue}888 .d88b. {gray}888 .d88b. 88888b.
{blue}888 d8P Y8b {gray}888 88888 d8P Y8b 888 "88b
{blue}888 88888888 {gray}888 888 88888888 888 888
{blue}888 Y8b. {gray}Y88b d88P Y8b. 888 888
{blue}88888888 "Y8888 {gray} "Y8888P88 "Y8888 888 888
legen {version} - github.com/matheusbach/legen{default}
python {__import__('sys').version}
""")
time.sleep(1.5)
# Define parameters and configurations
parser = argparse.ArgumentParser(prog="LeGen", description="Uses AI to locally transcribes speech from media files, generating subtitle files, translates the generated subtitles, inserts them into the mp4 container, and burns them directly into video",
argument_default=True, allow_abbrev=True, add_help=True, usage='LeGen -i INPUT_PATH [other options]')
parser.add_argument("-i", "--input_path",
help="Path to media files. Can be a folder containing files or an individual file", required=True, type=Path)
parser.add_argument("--norm", default=False, action="store_true",
help="Normalize folder times and run vidqa on input_path before starting processing files")
parser.add_argument("-ts:e", "--transcription_engine", type=str, default="whisperx",
help="Transcription engine. Possible values: whisperx (default), whisper")
parser.add_argument("-ts:m", "--transcription_model", type=str, default="medium",
help="Path or name of the Whisper transcription model. A larger model will consume more resources and be slower, but with better transcription quality. Possible values: tiny, base, small, medium (default), large, ...")
parser.add_argument("-ts:d", "--transcription_device", type=str, default="auto",
help="Device to run the transcription through Whisper. Possible values: auto (default), cpu, cuda")
parser.add_argument("-ts:c", "--transcription_compute_type", type=str, default="auto",
help="Quantization for the neural network. Possible values: auto (default), int8, int8_float32, int8_float16, int8_bfloat16, int16, float16, bfloat16, float32")
parser.add_argument("-ts:b", "--transcription_batch", type=int, default=4,
help="Number of simultaneous segments being transcribed. Higher values will speed up processing. If you have low RAM/VRAM, long duration media files or have buggy subtitles, reduce this value to avoid issues. Only works using transcription_engine whisperx. (default: 4)")
parser.add_argument("--translate", type=str, default="none",
help="Translate subtitles to language code if not the same as origin. (default: don't translate)")
parser.add_argument("--input_lang", type=str, default="auto",
help="Indicates (forces) the language of the voice in the input media (default: auto)")
parser.add_argument("-c:v", "--codec_video", type=str, default="h264", metavar="VIDEO_CODEC",
help="Target video codec. Can be used to set acceleration via GPU or another video API [codec_api], if supported (ffmpeg -encoders). Ex: h264, libx264, h264_vaapi, h264_nvenc, hevc, libx265 hevc_vaapi, hevc_nvenc, hevc_cuvid, hevc_qsv, hevc_amf (default: h264)")
parser.add_argument("-c:a", "--codec_audio", type=str, default="aac", metavar="AUDIO_CODEC",
help="Target audio codec. (default: aac). Ex: aac, libopus, mp3, vorbis")
parser.add_argument("-o:s", "--output_softsubs", default=None, type=Path,
help="Path to the folder or output file for the video files with embedded softsub (embedded in the mp4 container and .srt files). (default: softsubs_ + input_path)")
parser.add_argument("-o:h", "--output_hardsubs", default=None, type=Path,
help="Output folder path for video files with burned-in captions and embedded in the mp4 container. (default: hardsubs_ + input_path)")
parser.add_argument("--overwrite", default=False, action="store_true",
help="Overwrite existing files in output directories")
parser.add_argument("--disable_srt", default=False, action="store_true",
help="Disable .srt file generation and don't insert subtitles in mp4 container of output_softsubs")
parser.add_argument("--disable_softsubs", default=False, action="store_true",
help="Don't insert subtitles in mp4 container of output_softsubs. This option continues generating .srt files")
parser.add_argument("--disable_hardsubs", default=False, action="store_true",
help="Disable subtitle burn in output_hardsubs")
parser.add_argument("--copy_files", default=False, action="store_true",
help="Copy other (non-video) files present in input directory to output directories. Only generate the subtitles and videos")
args = parser.parse_args()
if not args.output_softsubs and not args.input_path.is_file():
args.output_softsubs = compatibility_path if (compatibility_path := Path(args.input_path.parent, "legen_srt_" + args.input_path.name)).exists() else Path(args.input_path.parent, "softsubs_" + args.input_path.name)
if not args.output_hardsubs and not args.input_path.is_file():
args.output_hardsubs = compatibility_path if (compatibility_path := Path(args.input_path.parent, "legen_burned_" + args.input_path.name)).exists() else Path(args.input_path.parent, "hardsubs_" + args.input_path.name)
if args.transcription_device == "auto":
import torch
torch_device = ("cuda" if torch.cuda.is_available() else "cpu")
else:
torch_device = str.lower(args.transcription_device)
transcription_compute_type = args.transcription_compute_type if args.transcription_compute_type != "default" else "float16" if not torch_device == "cpu" else "float32"
args.transcription_model = "large-v3" if args.transcription_model == "large" else args.transcription_model
# ----------------------------------------------------------------------------
if args.norm:
# normalize video using vidqa
with time_task(message_start=f"Running {wblue}vidqa{default} and updating folder modifiation times in {gray}{args.input_path}{default}", end="\n"):
subprocess.run(["vidqa", "-i", args.input_path, "-m", "unique", "-fd",
Path(Path(getframeinfo(currentframe()).filename).resolve().parent, "vidqa_data")])
# update folder time structure
file_utils.update_folder_times(args.input_path)
# load whisper model
with time_task(message_start=f"\nLoading {args.transcription_engine} model: {wblue}{args.transcription_model}{default} ({transcription_compute_type}) on {wblue}{torch_device}{default}", end="\n"):
if args.transcription_engine == 'whisperx':
import whisperx
import whisperx_utils
whisper_model = whisperx.load_model(
whisper_arch=args.transcription_model, device=torch_device, compute_type=transcription_compute_type, asr_options={"repetition_penalty": 1, "prompt_reset_on_temperature": 0.5, "no_repeat_ngram_size": 2,})
elif args.transcription_engine == 'whisper':
import whisper
import whisper_utils
whisper_model = whisper.load_model(
name=args.transcription_model, device=torch_device, in_memory=True)
else:
raise ValueError(f'Unsupported transcription engine {args.transcription_engine}. Supported values: whisperx, whisper')
with time_task(message="⌛ Processing files for"):
path: Path
for path in (item for item in sorted(sorted(Path(args.input_path).rglob('*'), key=lambda x: x.stat().st_mtime), key=lambda x: len(x.parts)) if item.is_file()):
rel_path = path.relative_to(args.input_path)
with time_task(message_start=f"\nProcessing {yellow}{rel_path.as_posix()}{default}", end="\n", message="⌚ Done in"):
try:
# define file type by extensions
if path.suffix.lower() in video_extensions:
file_type = "video"
elif path.suffix.lower() in audio_extensions:
file_type = "audio"
else:
file_type = "other"
if file_type == "video" or file_type == "audio":
# define paths
origin_media_path = path
dupe_filename = len(check_other_extensions(path, list(video_extensions | audio_extensions))) > 1
posfix_extension = path.suffix.lower().replace('.', '_') if dupe_filename else ''
softsub_video_dir = Path(args.output_softsubs, rel_path.parent)
burned_video_dir = Path(args.output_hardsubs, rel_path.parent)
# output video extension will be changed to .mp4
softsub_video_path = Path(args.output_softsubs, rel_path.stem + posfix_extension + ".mp4")
hardsub_video_path = Path(burned_video_dir, rel_path.stem + posfix_extension + ".mp4")
subtitle_translated_path = Path(
softsub_video_dir, rel_path.stem + posfix_extension + f"_{args.translate}.srt")
subtitles_path = []
if args.input_lang == "auto":
# extract audio
audio_short_extracted = file_utils.TempFile(
None, file_ext=".wav")
ffmpeg_utils.extract_short_wav(
origin_media_path, audio_short_extracted.getpath())
# detect language
print("Detecting audio language: ", end='', flush=True)
if args.transcription_engine == 'whisperx':
audio_language = whisperx_utils.detect_language(
whisper_model, audio_short_extracted.getpath())
if args.transcription_engine == 'whisper':
audio_language = whisper_utils.detect_language(
whisper_model, audio_short_extracted.getpath())
print(f"{gray}{audio_language}{default}")
audio_short_extracted.destroy()
else:
audio_language = args.input_lang
print(f"Forced input audio language: {gray}{audio_language}{default}")
# set path after get transcribed language
subtitle_transcribed_path = Path(
softsub_video_dir, rel_path.stem + posfix_extension + f"_{audio_language}.srt")
# create temp file for .srt
transcribed_srt_temp = file_utils.TempFile(
subtitle_transcribed_path, file_ext=".srt")
# skip transcription if transcribed srt for this language is existing (without overwrite neabled) or will not be used in LeGen process
if (file_utils.file_is_valid(subtitle_transcribed_path)) or ((args.disable_hardsubs or file_utils.file_is_valid(hardsub_video_path)) and (args.disable_srt or file_utils.file_is_valid(subtitle_transcribed_path))) and not args.overwrite:
print("Transcription is unnecessary. Skipping.")
else:
# extract audio
audio_extracted = file_utils.TempFile(None, file_ext=".wav")
ffmpeg_utils.extract_audio_wav(
origin_media_path, audio_extracted.getpath())
# transcribe saving subtitles to temp .srt file
if args.transcription_engine == 'whisperx':
print(f"{wblue}Transcribing{default} with {gray}WhisperX{default}")
whisperx_utils.transcribe_audio(
whisper_model, audio_extracted.getpath(), transcribed_srt_temp.getpath(), audio_language, device=torch_device, batch_size=args.transcription_batch)
if args.transcription_engine == 'whisper':
print(f"{wblue}Transcribing{default} with {gray}Whisper{default}")
whisper_utils.transcribe_audio(
model=whisper_model, audio_path=audio_extracted.getpath(), srt_path=transcribed_srt_temp.getpath(), lang=audio_language, disable_fp16=False if transcription_compute_type == "float16" or transcription_compute_type == "fp16" else True)
audio_extracted.destroy()
# if save .srt is enabled, save it to destination dir, also update path with language code
if not args.disable_srt:
transcribed_srt_temp.save()
subtitles_path.append(transcribed_srt_temp.getvalidpath())
# translate transcribed subtitle using Google Translate if transcribed language is not equals to target
# skip translation if translation has not requested, has equal source and output language, if file is existing (without overwrite neabled) or will not be used in LeGen process
if args.translate == "none":
pass # translation not requested
elif args.translate == audio_language:
print("Translation is unnecessary because input and output language are the same. Skipping.")
elif (args.disable_hardsubs or file_utils.file_is_valid(hardsub_video_path)) and (args.disable_srt or (file_utils.file_is_valid(subtitle_translated_path) and file_utils.file_is_valid(subtitle_transcribed_path) and file_utils.file_is_valid(subtitle_translated_path))) and not args.overwrite:
print("Translation is unnecessary. Skipping.")
subtitles_path.insert(0, subtitle_translated_path)
elif file_utils.file_is_valid(subtitle_translated_path):
print("Translated file found. Skipping translation.")
subtitles_path.insert(0, subtitle_translated_path)
elif transcribed_srt_temp.getvalidpath():
# create the temp .srt translated file
translated_srt_temp = file_utils.TempFile(
subtitle_translated_path, file_ext=".srt")
# translating with google translate public API
print(f"{wblue}Translating{default} with {gray}Google Translate{default}")
subs = translate_utils.translate_srt_file(
transcribed_srt_temp.getvalidpath(), translated_srt_temp.getpath(), args.translate)
if not args.disable_srt:
translated_srt_temp.save()
subtitles_path.insert(0, translated_srt_temp.getvalidpath())
if not args.disable_softsubs:
if file_utils.file_is_valid(softsub_video_path) and not args.overwrite:
print(f"Existing video file {gray}{softsub_video_path}{default}. Skipping subtitle insert")
else:
# create the temp .mp4 with srt in video container
video_softsubs_temp = file_utils.TempFile(
softsub_video_path, file_ext=".mp4")
# insert subtitle into container using ffmpeg
print(f"{wblue}Inserting subtitle{default} in mp4 container using {gray}FFmpeg{default}")
ffmpeg_utils.insert_subtitle(input_media_path=origin_media_path, subtitles_path=subtitles_path,
burn_subtitles=False, output_video_path=video_softsubs_temp.getpath(),
codec_video=args.codec_video, codec_audio=args.codec_audio)
video_softsubs_temp.save()
if not args.disable_hardsubs:
if file_utils.file_is_valid(hardsub_video_path) and not args.overwrite:
print(f"Existing video file {gray}{hardsub_video_path}{default}. Skipping subtitle burn")
else:
# create the temp .mp4 with srt in video container
video_hardsubs_temp = file_utils.TempFile(
hardsub_video_path, file_ext=".mp4")
# insert subtitle into container and burn using ffmpeg
print(f"{wblue}Inserting subtitle{default} in mp4 container and {wblue}burning{default} using {gray}FFmpeg{default}")
ffmpeg_utils.insert_subtitle(input_media_path=origin_media_path, subtitles_path=subtitles_path,
burn_subtitles=True, output_video_path=video_hardsubs_temp.getpath(),
codec_video=args.codec_video, codec_audio=args.codec_audio)
video_hardsubs_temp.save()
else:
print("not a video file")
if args.copy_files:
if not args.disable_srt:
# copia o arquivo extra para pasta que contém também os arquivos srt
file_utils.copy_file_if_different(path, Path(
args.output_softsubs, rel_path))
if not args.disable_hardsubs:
# copia o arquivo extra para pasta que contém os videos queimados
file_utils.copy_file_if_different(path, Path(
args.output_hardsubs, rel_path))
except Exception as e:
file = path.as_posix()
print(f"{red}ERROR !!!{default} {file}")
print(f"{yellow}check legen-errors.txt for details{default}")
# extract the relevant information from the exception object
current_time = time.strftime("%y/%m/%d %H:%M:%S", time.localtime())
error_message = f"[{current_time}] {file}: {type(e).__name__}: {str(e)}"
# write the error message to a file
with open(Path(Path(getframeinfo(currentframe()).filename).resolve().parent, "legen-errors.txt"), "a") as f:
f.write(error_message + "\n")
f.close()
print("Deleting temp folder")
file_utils.delete_folder(
Path(Path(getframeinfo(currentframe()).filename).resolve().parent, "temp"))
print(f"{green}Tasks done!{default}")