|
import argparse
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from inspect import currentframe, getframeinfo
|
|
from pathlib import Path
|
|
|
|
import ffmpeg_utils
|
|
import file_utils
|
|
import translate_utils
|
|
from utils import time_task, audio_extensions, video_extensions, check_other_extensions
|
|
|
|
version = "v0.16"
|
|
|
|
|
|
default = "\033[1;0m"
|
|
gray = "\033[1;37m"
|
|
wblue = "\033[1;36m"
|
|
blue = "\033[1;34m"
|
|
yellow = "\033[1;33m"
|
|
green = "\033[1;32m"
|
|
red = "\033[1;31m"
|
|
|
|
print(f"""
|
|
{blue}888 {gray} .d8888b.
|
|
{blue}888 {gray}d88P Y88b
|
|
{blue}888 {gray}888 888
|
|
{blue}888 .d88b. {gray}888 .d88b. 88888b.
|
|
{blue}888 d8P Y8b {gray}888 88888 d8P Y8b 888 "88b
|
|
{blue}888 88888888 {gray}888 888 88888888 888 888
|
|
{blue}888 Y8b. {gray}Y88b d88P Y8b. 888 888
|
|
{blue}88888888 "Y8888 {gray} "Y8888P88 "Y8888 888 888
|
|
|
|
legen {version} - github.com/matheusbach/legen{default}
|
|
python {__import__('sys').version}
|
|
""")
|
|
time.sleep(1.5)
|
|
|
|
|
|
parser = argparse.ArgumentParser(prog="LeGen", description="Uses AI to locally transcribes speech from media files, generating subtitle files, translates the generated subtitles, inserts them into the mp4 container, and burns them directly into video",
|
|
argument_default=True, allow_abbrev=True, add_help=True, usage='LeGen -i INPUT_PATH [other options]')
|
|
parser.add_argument("-i", "--input_path",
|
|
help="Path to media files. Can be a folder containing files or an individual file", required=True, type=Path)
|
|
parser.add_argument("--norm", default=False, action="store_true",
|
|
help="Normalize folder times and run vidqa on input_path before starting processing files")
|
|
parser.add_argument("-ts:e", "--transcription_engine", type=str, default="whisperx",
|
|
help="Transcription engine. Possible values: whisperx (default), whisper")
|
|
parser.add_argument("-ts:m", "--transcription_model", type=str, default="medium",
|
|
help="Path or name of the Whisper transcription model. A larger model will consume more resources and be slower, but with better transcription quality. Possible values: tiny, base, small, medium (default), large, ...")
|
|
parser.add_argument("-ts:d", "--transcription_device", type=str, default="auto",
|
|
help="Device to run the transcription through Whisper. Possible values: auto (default), cpu, cuda")
|
|
parser.add_argument("-ts:c", "--transcription_compute_type", type=str, default="auto",
|
|
help="Quantization for the neural network. Possible values: auto (default), int8, int8_float32, int8_float16, int8_bfloat16, int16, float16, bfloat16, float32")
|
|
parser.add_argument("-ts:b", "--transcription_batch", type=int, default=4,
|
|
help="Number of simultaneous segments being transcribed. Higher values will speed up processing. If you have low RAM/VRAM, long duration media files or have buggy subtitles, reduce this value to avoid issues. Only works using transcription_engine whisperx. (default: 4)")
|
|
parser.add_argument("--translate", type=str, default="none",
|
|
help="Translate subtitles to language code if not the same as origin. (default: don't translate)")
|
|
parser.add_argument("--input_lang", type=str, default="auto",
|
|
help="Indicates (forces) the language of the voice in the input media (default: auto)")
|
|
parser.add_argument("-c:v", "--codec_video", type=str, default="h264", metavar="VIDEO_CODEC",
|
|
help="Target video codec. Can be used to set acceleration via GPU or another video API [codec_api], if supported (ffmpeg -encoders). Ex: h264, libx264, h264_vaapi, h264_nvenc, hevc, libx265 hevc_vaapi, hevc_nvenc, hevc_cuvid, hevc_qsv, hevc_amf (default: h264)")
|
|
parser.add_argument("-c:a", "--codec_audio", type=str, default="aac", metavar="AUDIO_CODEC",
|
|
help="Target audio codec. (default: aac). Ex: aac, libopus, mp3, vorbis")
|
|
parser.add_argument("-o:s", "--output_softsubs", default=None, type=Path,
|
|
help="Path to the folder or output file for the video files with embedded softsub (embedded in the mp4 container and .srt files). (default: softsubs_ + input_path)")
|
|
parser.add_argument("-o:h", "--output_hardsubs", default=None, type=Path,
|
|
help="Output folder path for video files with burned-in captions and embedded in the mp4 container. (default: hardsubs_ + input_path)")
|
|
parser.add_argument("--overwrite", default=False, action="store_true",
|
|
help="Overwrite existing files in output directories")
|
|
parser.add_argument("--disable_srt", default=False, action="store_true",
|
|
help="Disable .srt file generation and don't insert subtitles in mp4 container of output_softsubs")
|
|
parser.add_argument("--disable_softsubs", default=False, action="store_true",
|
|
help="Don't insert subtitles in mp4 container of output_softsubs. This option continues generating .srt files")
|
|
parser.add_argument("--disable_hardsubs", default=False, action="store_true",
|
|
help="Disable subtitle burn in output_hardsubs")
|
|
parser.add_argument("--copy_files", default=False, action="store_true",
|
|
help="Copy other (non-video) files present in input directory to output directories. Only generate the subtitles and videos")
|
|
args = parser.parse_args()
|
|
|
|
if not args.output_softsubs and not args.input_path.is_file():
|
|
args.output_softsubs = compatibility_path if (compatibility_path := Path(args.input_path.parent, "legen_srt_" + args.input_path.name)).exists() else Path(args.input_path.parent, "softsubs_" + args.input_path.name)
|
|
if not args.output_hardsubs and not args.input_path.is_file():
|
|
args.output_hardsubs = compatibility_path if (compatibility_path := Path(args.input_path.parent, "legen_burned_" + args.input_path.name)).exists() else Path(args.input_path.parent, "hardsubs_" + args.input_path.name)
|
|
|
|
if args.transcription_device == "auto":
|
|
import torch
|
|
torch_device = ("cuda" if torch.cuda.is_available() else "cpu")
|
|
else:
|
|
torch_device = str.lower(args.transcription_device)
|
|
|
|
transcription_compute_type = args.transcription_compute_type if args.transcription_compute_type != "default" else "float16" if not torch_device == "cpu" else "float32"
|
|
|
|
args.transcription_model = "large-v3" if args.transcription_model == "large" else args.transcription_model
|
|
|
|
|
|
|
|
if args.norm:
|
|
|
|
with time_task(message_start=f"Running {wblue}vidqa{default} and updating folder modifiation times in {gray}{args.input_path}{default}", end="\n"):
|
|
subprocess.run(["vidqa", "-i", args.input_path, "-m", "unique", "-fd",
|
|
Path(Path(getframeinfo(currentframe()).filename).resolve().parent, "vidqa_data")])
|
|
|
|
file_utils.update_folder_times(args.input_path)
|
|
|
|
|
|
with time_task(message_start=f"\nLoading {args.transcription_engine} model: {wblue}{args.transcription_model}{default} ({transcription_compute_type}) on {wblue}{torch_device}{default}", end="\n"):
|
|
if args.transcription_engine == 'whisperx':
|
|
import whisperx
|
|
import whisperx_utils
|
|
|
|
whisper_model = whisperx.load_model(
|
|
whisper_arch=args.transcription_model, device=torch_device, compute_type=transcription_compute_type, asr_options={"repetition_penalty": 1, "prompt_reset_on_temperature": 0.5, "no_repeat_ngram_size": 2,})
|
|
elif args.transcription_engine == 'whisper':
|
|
import whisper
|
|
|
|
import whisper_utils
|
|
whisper_model = whisper.load_model(
|
|
name=args.transcription_model, device=torch_device, in_memory=True)
|
|
else:
|
|
raise ValueError(f'Unsupported transcription engine {args.transcription_engine}. Supported values: whisperx, whisper')
|
|
|
|
with time_task(message="⌛ Processing files for"):
|
|
path: Path
|
|
for path in (item for item in sorted(sorted(Path(args.input_path).rglob('*'), key=lambda x: x.stat().st_mtime), key=lambda x: len(x.parts)) if item.is_file()):
|
|
rel_path = path.relative_to(args.input_path)
|
|
with time_task(message_start=f"\nProcessing {yellow}{rel_path.as_posix()}{default}", end="\n", message="⌚ Done in"):
|
|
try:
|
|
|
|
if path.suffix.lower() in video_extensions:
|
|
file_type = "video"
|
|
elif path.suffix.lower() in audio_extensions:
|
|
file_type = "audio"
|
|
else:
|
|
file_type = "other"
|
|
|
|
if file_type == "video" or file_type == "audio":
|
|
|
|
origin_media_path = path
|
|
dupe_filename = len(check_other_extensions(path, list(video_extensions | audio_extensions))) > 1
|
|
posfix_extension = path.suffix.lower().replace('.', '_') if dupe_filename else ''
|
|
|
|
softsub_video_dir = Path(args.output_softsubs, rel_path.parent)
|
|
burned_video_dir = Path(args.output_hardsubs, rel_path.parent)
|
|
|
|
softsub_video_path = Path(args.output_softsubs, rel_path.stem + posfix_extension + ".mp4")
|
|
hardsub_video_path = Path(burned_video_dir, rel_path.stem + posfix_extension + ".mp4")
|
|
subtitle_translated_path = Path(
|
|
softsub_video_dir, rel_path.stem + posfix_extension + f"_{args.translate}.srt")
|
|
subtitles_path = []
|
|
|
|
if args.input_lang == "auto":
|
|
|
|
audio_short_extracted = file_utils.TempFile(
|
|
None, file_ext=".wav")
|
|
ffmpeg_utils.extract_short_wav(
|
|
origin_media_path, audio_short_extracted.getpath())
|
|
|
|
print("Detecting audio language: ", end='', flush=True)
|
|
if args.transcription_engine == 'whisperx':
|
|
audio_language = whisperx_utils.detect_language(
|
|
whisper_model, audio_short_extracted.getpath())
|
|
if args.transcription_engine == 'whisper':
|
|
audio_language = whisper_utils.detect_language(
|
|
whisper_model, audio_short_extracted.getpath())
|
|
print(f"{gray}{audio_language}{default}")
|
|
|
|
audio_short_extracted.destroy()
|
|
else:
|
|
audio_language = args.input_lang
|
|
print(f"Forced input audio language: {gray}{audio_language}{default}")
|
|
|
|
subtitle_transcribed_path = Path(
|
|
softsub_video_dir, rel_path.stem + posfix_extension + f"_{audio_language}.srt")
|
|
|
|
transcribed_srt_temp = file_utils.TempFile(
|
|
subtitle_transcribed_path, file_ext=".srt")
|
|
|
|
if (file_utils.file_is_valid(subtitle_transcribed_path)) or ((args.disable_hardsubs or file_utils.file_is_valid(hardsub_video_path)) and (args.disable_srt or file_utils.file_is_valid(subtitle_transcribed_path))) and not args.overwrite:
|
|
print("Transcription is unnecessary. Skipping.")
|
|
else:
|
|
|
|
audio_extracted = file_utils.TempFile(None, file_ext=".wav")
|
|
ffmpeg_utils.extract_audio_wav(
|
|
origin_media_path, audio_extracted.getpath())
|
|
|
|
if args.transcription_engine == 'whisperx':
|
|
print(f"{wblue}Transcribing{default} with {gray}WhisperX{default}")
|
|
whisperx_utils.transcribe_audio(
|
|
whisper_model, audio_extracted.getpath(), transcribed_srt_temp.getpath(), audio_language, device=torch_device, batch_size=args.transcription_batch)
|
|
if args.transcription_engine == 'whisper':
|
|
print(f"{wblue}Transcribing{default} with {gray}Whisper{default}")
|
|
whisper_utils.transcribe_audio(
|
|
model=whisper_model, audio_path=audio_extracted.getpath(), srt_path=transcribed_srt_temp.getpath(), lang=audio_language, disable_fp16=False if transcription_compute_type == "float16" or transcription_compute_type == "fp16" else True)
|
|
|
|
audio_extracted.destroy()
|
|
|
|
if not args.disable_srt:
|
|
transcribed_srt_temp.save()
|
|
subtitles_path.append(transcribed_srt_temp.getvalidpath())
|
|
|
|
|
|
if args.translate == "none":
|
|
pass
|
|
elif args.translate == audio_language:
|
|
print("Translation is unnecessary because input and output language are the same. Skipping.")
|
|
elif (args.disable_hardsubs or file_utils.file_is_valid(hardsub_video_path)) and (args.disable_srt or (file_utils.file_is_valid(subtitle_translated_path) and file_utils.file_is_valid(subtitle_transcribed_path) and file_utils.file_is_valid(subtitle_translated_path))) and not args.overwrite:
|
|
print("Translation is unnecessary. Skipping.")
|
|
subtitles_path.insert(0, subtitle_translated_path)
|
|
elif file_utils.file_is_valid(subtitle_translated_path):
|
|
print("Translated file found. Skipping translation.")
|
|
subtitles_path.insert(0, subtitle_translated_path)
|
|
elif transcribed_srt_temp.getvalidpath():
|
|
|
|
translated_srt_temp = file_utils.TempFile(
|
|
subtitle_translated_path, file_ext=".srt")
|
|
|
|
|
|
print(f"{wblue}Translating{default} with {gray}Google Translate{default}")
|
|
subs = translate_utils.translate_srt_file(
|
|
transcribed_srt_temp.getvalidpath(), translated_srt_temp.getpath(), args.translate)
|
|
if not args.disable_srt:
|
|
translated_srt_temp.save()
|
|
|
|
subtitles_path.insert(0, translated_srt_temp.getvalidpath())
|
|
if not args.disable_softsubs:
|
|
if file_utils.file_is_valid(softsub_video_path) and not args.overwrite:
|
|
print(f"Existing video file {gray}{softsub_video_path}{default}. Skipping subtitle insert")
|
|
else:
|
|
|
|
video_softsubs_temp = file_utils.TempFile(
|
|
softsub_video_path, file_ext=".mp4")
|
|
|
|
|
|
print(f"{wblue}Inserting subtitle{default} in mp4 container using {gray}FFmpeg{default}")
|
|
ffmpeg_utils.insert_subtitle(input_media_path=origin_media_path, subtitles_path=subtitles_path,
|
|
burn_subtitles=False, output_video_path=video_softsubs_temp.getpath(),
|
|
codec_video=args.codec_video, codec_audio=args.codec_audio)
|
|
video_softsubs_temp.save()
|
|
if not args.disable_hardsubs:
|
|
if file_utils.file_is_valid(hardsub_video_path) and not args.overwrite:
|
|
print(f"Existing video file {gray}{hardsub_video_path}{default}. Skipping subtitle burn")
|
|
else:
|
|
|
|
video_hardsubs_temp = file_utils.TempFile(
|
|
hardsub_video_path, file_ext=".mp4")
|
|
|
|
print(f"{wblue}Inserting subtitle{default} in mp4 container and {wblue}burning{default} using {gray}FFmpeg{default}")
|
|
ffmpeg_utils.insert_subtitle(input_media_path=origin_media_path, subtitles_path=subtitles_path,
|
|
burn_subtitles=True, output_video_path=video_hardsubs_temp.getpath(),
|
|
codec_video=args.codec_video, codec_audio=args.codec_audio)
|
|
video_hardsubs_temp.save()
|
|
else:
|
|
print("not a video file")
|
|
if args.copy_files:
|
|
if not args.disable_srt:
|
|
|
|
file_utils.copy_file_if_different(path, Path(
|
|
args.output_softsubs, rel_path))
|
|
if not args.disable_hardsubs:
|
|
|
|
file_utils.copy_file_if_different(path, Path(
|
|
args.output_hardsubs, rel_path))
|
|
except Exception as e:
|
|
file = path.as_posix()
|
|
print(f"{red}ERROR !!!{default} {file}")
|
|
print(f"{yellow}check legen-errors.txt for details{default}")
|
|
|
|
current_time = time.strftime("%y/%m/%d %H:%M:%S", time.localtime())
|
|
|
|
error_message = f"[{current_time}] {file}: {type(e).__name__}: {str(e)}"
|
|
|
|
with open(Path(Path(getframeinfo(currentframe()).filename).resolve().parent, "legen-errors.txt"), "a") as f:
|
|
f.write(error_message + "\n")
|
|
f.close()
|
|
|
|
print("Deleting temp folder")
|
|
file_utils.delete_folder(
|
|
Path(Path(getframeinfo(currentframe()).filename).resolve().parent, "temp"))
|
|
|
|
print(f"{green}Tasks done!{default}")
|
|
|