Spaces:
Build error
Build error
File size: 12,401 Bytes
2e0131e 6fd7ef3 2e0131e 6fd7ef3 f1f3adb 6fd7ef3 cb23f19 6fd7ef3 7db5fdc 352eb01 6fd7ef3 352eb01 42c4598 352eb01 f1f3adb cb23f19 f1f3adb cb23f19 6fd7ef3 5556030 074b5e7 3b1eecc 6fd7ef3 7de403b 6fd7ef3 cb23f19 6fd7ef3 cb23f19 6fd7ef3 cb23f19 6fd7ef3 cb23f19 2e0131e 6fd7ef3 cb23f19 2e0131e cb23f19 2e0131e cb23f19 6fd7ef3 96d2396 6fd7ef3 6e73d37 805009b 6fd7ef3 b21ecef 6fd7ef3 2e0131e 7db5fdc 2e0131e 7db5fdc 2e0131e 5efed34 2e0131e 7db5fdc 2e0131e 7db5fdc 2e0131e 7db5fdc 2e0131e a72265c 2e0131e a72265c 70bd663 a72265c fc7ccd8 a72265c 70bd663 a72265c 2e0131e a72265c fc7ccd8 31bc5a1 7db5fdc 6fd7ef3 2e0131e 6fd7ef3 7db5fdc 6fd7ef3 2e0131e 6fd7ef3 7db5fdc 2e0131e 6fd7ef3 7db5fdc 2e0131e 6fd7ef3 7db5fdc 2e0131e cb23f19 7db5fdc 352a44b 7db5fdc cb23f19 7db5fdc cb23f19 a72265c cb23f19 7db5fdc cb23f19 7db5fdc 5efed34 7de403b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
import shutil
import sys
import time
from pathlib import Path
import anvil.server
import anvil.media
from whisper.utils import write_srt, write_vtt
from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadError
import os
import tempfile
import json
import argparse
import whisper
from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE
import ffmpeg
from utils.subs import bake_subs, get_srt
from utils.utils import get_args
original_dir = os.getcwd()
output_dir = Path('output')
args = get_args()
model_size: str = args.get("model", os.environ.get("WHISPER_MODEL", "large"))
preload_model: bool = args.get("preload")
if preload_model:
print("Preloading model")
model = whisper.load_model(model_size)
def download_generator(url, translate_action=True, source_language='Autodetect', corrected_subtitles=None):
# Step 1 : check if video is available
yield {"message": f"Checking {url} for videos"}
try:
meta = check_download(url)
# print(json.dumps(meta, indent=2))
# if(meta['duration'] > 159) :
# raise Exception("Video is too long, please use videos less than 159 seconds")
yield {"message": f"Found video with {meta['duration']} seconds duration from {meta['extractor']}", "meta": meta}
tempdir = output_dir/f"{meta['id']}"
except Exception as e:
yield {"message": f"{e}"}
return
# Step 2 : Download video and extract audio
try:
# check if we already have the folder and the main files
if(tempdir.is_dir() and (tempdir/f"{meta['id']}.{meta['ext']}").is_file() and (tempdir/f"{meta['id']}.mp3").is_file()):
yield {"message": f"Using cached files"}
video = str((tempdir/f"{meta['id']}.{meta['ext']}").resolve())
audio = str((tempdir/f"{meta['id']}.mp3").resolve())
else:
yield {"message": f"Starting download with URL {url}, this may take a while"}
meta, video, audio = download(url, tempdir)
yield {"message": f"Downloaded video and extracted audio", "video": video, "audio": audio, "meta": meta}
except Exception as e:
os.chdir(original_dir)
yield {"message": f"{e}"}
raise e
srt_path = tempdir / f"{meta['id']}.srt"
vtt_path = tempdir / f"{meta['id']}.vtt"
if not corrected_subtitles:
### Step 3 : Transcribe with whisper
yield {"message": f"[PLEASE WAIT] Starting whisper transcribe with {meta['id']}.mp3"}
try:
whisper_result = transcribe(audio, translate_action, source_language)
with open(srt_path, "w", encoding="utf-8") as srt:
write_srt(whisper_result["segments"], file=srt)
with open(vtt_path, "w", encoding="utf-8") as vtt:
write_vtt(whisper_result["segments"], file=vtt)
whisper_result["srt"] = Path(srt_path).read_text()
whisper_result["vtt"] = Path(vtt_path).read_text()
yield {"message": f"Transcribe successful", "whisper_result": whisper_result, "meta": meta, "srt_path": srt_path, "vtt_path": vtt_path}
except Exception as e:
os.chdir(original_dir)
yield {"message": f"{e}"}
raise e
else:
### step 3.5 : use corrected subtitles
yield {"message": f"Using corrected subtitles"}
with open(srt_path, "w", encoding="utf-8") as srt:
srt.write(corrected_subtitles)
yield {"message": f"Transcribe successful", "srt_path": srt_path, "meta": meta}
### Step 4 : Bake subtitles into video with ffmpeg
yield {"message": f"[PLEASE WAIT] baking subtitles into video"}
try:
print('Stating to bake subtitles')
subbed_video_path = tempdir / f"{meta['id']}_translated.mp4"
fontsdir = Path('fonts')
bake_subs(video, subbed_video_path.absolute() , srt_path.absolute(), fontsdir, translate_action)
yield {"message": f"Subtitled video ready!", "sub_video": str(subbed_video_path.absolute()), "meta": meta, "vtt_path": vtt_path}
except ffmpeg.Error as e:
print('stdout:', e.stdout.decode('utf8'))
print('stderr:', e.stderr.decode('utf8'))
raise e
except Exception as e:
print('stdout:', e.stdout.decode('utf8'))
print('stderr:', e.stderr.decode('utf8'))
os.chdir(original_dir)
print('error', file=sys.stderr)
raise e
yield {"message": f"{e}"}
def user_uploaded_video_generator(video, translate_action=True, source_language='Autodetect', corrected_subtitles=None):
video_name = Path(video).stem
# create tempdir
tempdir = output_dir / video_name
tempdir.mkdir(parents=True, exist_ok=True)
# copy video with shutil.copy2
video_path = tempdir / Path(video).name
shutil.copy2(video, video_path)
yield {"message": f"Extracting audio from {video_name}", "video": video_path}
# TODO : extract audio from videos
output_audio = tempdir / f"{video_name}.mp3"
ffmpeg.input(video_path).output(filename=output_audio).run()
yield {"message": f"Got audio from {video_name}", "video": video, "audio": output_audio}
# Run whisper on the audio with language unless auto
try:
audio_file = output_audio
print(f"Starting whisper transcribe with {output_audio}")
transcribe_whisper_result = transcribe(audio_file, translate_action=False, language='Autodetect', override_model_size=model_size)
yield {"message": f"Finished transcription, starting translation to {transcribe_whisper_result['language']}"}
detected_language = LANGUAGES[transcribe_whisper_result["language"]]
translate_whisper_result = transcribe(audio_file, translate_action=True, language=detected_language, override_model_size=model_size)
yield {"message": f"Finished translation to English, preparing subtitle files"}
with open(tempdir / f"{video_name}.vtt", "w", encoding="utf-8") as vtt:
write_vtt(transcribe_whisper_result['segments'], file=vtt)
# yield {"message": f"Created VTT files", "vtt_path": f"{video_name}.vtt", "vtt_en_path": f"{video_name}.en.vtt"}
# write_srt(transcribe_whisper_result['segments'], tempdir / f"{video_name}.srt")
# write_srt(translate_whisper_result['segments'], tempdir / f"{video_name}_en.srt")
# yield {"message": f"Created SRT files", "srt_path": f"{video_name}.srt", "srt_en_path": f"{video_name}.en.srt"}
# print(f"Transcribe successful!")
except Exception as e:
print(f"Could not transcribe file: {e}")
return
def caption_generator(social_media_url,uid, language="Autodetect", model_size=model_size):
with tempfile.TemporaryDirectory() as tempdir:
tempdir = Path(tempdir)
# try:
# print(f"Downloading {social_media_url} ")
# meta = check_download(social_media_url)
# print(f"Downloaded {meta['id']}.mp3 from {meta['uploader_id']} and url {meta['webpage_url']}")
# except Exception as e:
# print(f"Could not download file: {e}")
# raise
try:
print(f"Starting audio only download with URL {social_media_url}, this may take a while")
meta, audio = download_audio(social_media_url, tempdir, id=uid)
print(f"Downloaded video and extracted audio")
except Exception as e:
print(f"Could not download file: {e}")
raise
# Run whisper on the audio with language unless auto
try:
print(f"Starting whisper transcribe with {uid}.mp3")
transcribe_whisper_result = transcribe(audio, translate_action=False, language=language, override_model_size=model_size)
detected_language = LANGUAGES[transcribe_whisper_result["language"]]
print(f"Transcribe successful!, writing files")
vtt_path = tempdir / f"{transcribe_whisper_result['language']}.vtt"
with open(vtt_path.resolve(), "w", encoding="utf-8") as vtt:
write_vtt(transcribe_whisper_result["segments"], file=vtt)
whisper_result_captions = [
{
"language_tag": transcribe_whisper_result["language"],
"vtt_text": vtt_path.read_text(encoding="utf-8"),
},
]
if detected_language != "en":
print(f"Transcribe successful! Starting translation to English")
translate_whisper_result = transcribe(audio, translate_action=True, language=detected_language, override_model_size=model_size)
en_vtt_path = tempdir / f"en.vtt"
with open(en_vtt_path.resolve(), "w", encoding="utf-8") as en_vtt:
write_vtt(translate_whisper_result["segments"], file=en_vtt)
print(f"Finished translation to English, preparing subtitle files")
whisper_result_captions.append(
{
"language_tag": "en",
"vtt_text": en_vtt_path.read_text(encoding="utf-8"),
}
)
except Exception as e:
print(f"Could not transcribe file: {e}")
raise
print(f"Finished processing {uid} file, returning results")
print(whisper_result_captions)
return 'success', whisper_result_captions, detected_language
# Run whisper with translation task enabled (and save to different srt file)
# Call anvil background task with both files, and both the plain texts
def progress_hook(d):
if d['status'] == 'downloading':
print("downloading " + str(round(float(d['downloaded_bytes']) / float(d['total_bytes']) * 100, 1)) + "%")
yield f"{d['_percent_str']} downloaded"
if d['status'] == 'finished':
filename = d['filename']
print(filename)
yield f"Downloaded {filename}"
def download(url, tempdir, format="bestvideo[ext=mp4]+bestaudio/best", verbose=False, keepVideo=True, filename="%(id)s.%(ext)s"):
try:
ydl_opts = {
"format": format,
"keepvideo": keepVideo,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
"skip_download": False,
"outtmpl": f"{tempdir}/{filename}",
"noplaylist": True,
"verbose": verbose,
"quiet": False,
"progress_hooks": [progress_hook],
}
ydl = YoutubeDL(ydl_opts)
meta = ydl.extract_info(
url,
download=True,
)
except DownloadError as e:
raise e
else:
audio = tempdir / f"{meta['id']}.mp3"
if (keepVideo):
video = tempdir / f"{meta['id']}.{meta['ext']}"
return meta, str(video.resolve()), str(audio.resolve())
else:
return meta, None, str(audio.resolve())
def download_audio(url, tempdir, format="bestaudio/best", verbose=False, id=None):
filename = f"{id}.%(ext)s"
try:
ydl_opts = {
"format": format,
"keepvideo": False,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
"skip_download": False,
"outtmpl": f"{tempdir}/{filename}",
"noplaylist": True,
"verbose": verbose,
"quiet": False,
"progress_hooks": [progress_hook],
}
ydl = YoutubeDL(ydl_opts)
meta = ydl.extract_info(
url,
download=True,
)
except DownloadError as e:
raise e
else:
audio = tempdir / f"{id}.mp3"
return meta, str(audio.resolve())
def check_download(url):
ydl_opts = {
"format": "bestvideo[ext=mp4]+bestaudio/best",
"skip_download": True,
"verbose": False,
}
ydl = YoutubeDL(ydl_opts)
try:
meta = ydl.extract_info(
url,
download=False,
)
except DownloadError as e:
raise e
else:
return meta
def transcribe(audio, translate_action=True, language='Autodetect', override_model_size=''):
"""
Transcribe audio file with whisper
:param audio: - The audio file to transcribe
:param translate_action: Bool - Whether to translate to English or keep original language
:param language: String - The language to transcribe to, default is Autodetect
:param override_model_size: Bool - Whether to override the model size
:return:
"""
task = "translate" if translate_action else "transcribe"
model_size_to_load = override_model_size if override_model_size else model_size
print(f'Starting {task} with whisper size {model_size_to_load} on {audio}')
global model
if not preload_model or model_size != override_model_size:
model = whisper.load_model(model_size_to_load)
props = {
"task": task,
}
if language != 'Autodetect':
props["language"] = TO_LANGUAGE_CODE[language.lower()] if len(language) > 2 else language
output = model.transcribe(audio, verbose=True, **props)
output['segments'] = output['segments']
output['requested_language'] = language.lower()
print(f'Finished transcribe from {LANGUAGES[output["language"]].capitalize()}', output["text"])
return output
|