|
import asyncio
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import deep_translator
|
|
import pysrt
|
|
import tqdm.asyncio
|
|
import subtitle_utils
|
|
from utils import format_time
|
|
|
|
|
|
sentence_endings = ['.', '!', '?', ')', 'よ', 'ね',
|
|
'の', 'さ', 'ぞ', 'な', 'か', '!', '。', '」', '…']
|
|
|
|
|
|
separator = " ◌ "
|
|
separator_unjoin = separator.replace(' ', '')
|
|
chunk_max_chars = 4999
|
|
|
|
|
|
def translate_srt_file(srt_file_path: Path, translated_subtitle_path: Path, target_lang):
|
|
|
|
subs = pysrt.open(srt_file_path, encoding='utf-8')
|
|
|
|
|
|
sub_content = [' '.join(sub.text.strip().splitlines()) for sub in subs]
|
|
|
|
|
|
chunks = join_sentences(sub_content, chunk_max_chars) or []
|
|
|
|
|
|
translated_chunks = [None] * len(chunks)
|
|
|
|
tasks = []
|
|
|
|
semaphore = asyncio.Semaphore(7)
|
|
|
|
|
|
async def translate_async():
|
|
async def run_translate(index, chunk, lang):
|
|
while True:
|
|
try:
|
|
async with semaphore:
|
|
result = await asyncio.wait_for(translate_chunk(index, chunk, lang), 120)
|
|
translated_chunks[index] = result
|
|
break
|
|
except Exception:
|
|
|
|
await asyncio.sleep(3)
|
|
|
|
for index, chunk in enumerate(chunks):
|
|
task = asyncio.create_task(
|
|
run_translate(index, chunk, target_lang))
|
|
tasks.append(task)
|
|
|
|
for tsk in tqdm.asyncio.tqdm_asyncio.as_completed(tasks, total=len(tasks), desc="Translating", unit="chunks", unit_scale=False, leave=True, bar_format="{desc} {percentage:3.0f}% | {n_fmt}/{total_fmt} | ETA: {remaining} | ⏱: {elapsed}"):
|
|
await tsk
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(translate_async())
|
|
|
|
print('Processing translation...', end='')
|
|
|
|
|
|
unjoined_texts = [unjoin_sentences(
|
|
chunk, translated_chunks[i], separator_unjoin) or "" for i, chunk in enumerate(chunks)]
|
|
unjoined_texts = [text for sublist in unjoined_texts for text in sublist]
|
|
|
|
|
|
for i, segment in enumerate(unjoined_texts):
|
|
unjoined_texts[i] = "\n".join(subtitle_utils.split_string_to_max_lines(
|
|
text=segment, max_width=0, max_lines=len(subs[i].text.splitlines())))
|
|
|
|
|
|
for i, sub in enumerate(subs):
|
|
sub.text = unjoined_texts[i]
|
|
|
|
|
|
os.makedirs(translated_subtitle_path.parent, exist_ok=True)
|
|
subs.save(translated_subtitle_path, encoding='utf-8')
|
|
|
|
print('\r ', end='\r')
|
|
|
|
return subs
|
|
|
|
|
|
|
|
|
|
async def translate_chunk(index, chunk, target_lang):
|
|
while True:
|
|
try:
|
|
|
|
translator = deep_translator.google.GoogleTranslator(
|
|
source='auto', target=target_lang)
|
|
translated_chunk: str = await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, translator.translate, chunk), 30)
|
|
await asyncio.sleep(0)
|
|
|
|
|
|
if translated_chunk is None or len(translated_chunk.replace(separator.strip(), '').split()) == 0:
|
|
return chunk
|
|
|
|
return translated_chunk
|
|
except Exception as e:
|
|
|
|
del translator
|
|
print(
|
|
f"\r[chunk {index}]: Exception: {e.__doc__} Retrying in 30 seconds...", flush=True)
|
|
await asyncio.sleep(30)
|
|
|
|
|
|
def join_sentences(lines, max_chars):
|
|
"""
|
|
Joins the given list of strings in a way that each part ends with a sentence ending.
|
|
Adds a separator to all lines in the chunk.
|
|
"""
|
|
joined_lines = []
|
|
current_chunk = ""
|
|
|
|
for line in lines:
|
|
if not line or line is None:
|
|
line = 'ㅤ'
|
|
|
|
if len(current_chunk) + len(line) + len(separator) <= max_chars:
|
|
current_chunk += line + separator
|
|
if any(line.endswith(ending) for ending in sentence_endings):
|
|
joined_lines.append(current_chunk)
|
|
current_chunk = ""
|
|
else:
|
|
if current_chunk:
|
|
joined_lines.append(current_chunk)
|
|
current_chunk = ""
|
|
if len(current_chunk) + len(line) + len(separator) <= max_chars:
|
|
current_chunk += line + separator
|
|
else:
|
|
|
|
end_index = line.rfind(
|
|
' ', 0, max_chars - (1 + len(separator)))
|
|
|
|
if end_index == - (1 + len(separator)):
|
|
end_index = max_chars - (1 + len(separator))
|
|
|
|
joined_lines.append(
|
|
(line[:end_index] + '…' + separator)[:max_chars])
|
|
|
|
|
|
if current_chunk:
|
|
joined_lines.append(current_chunk)
|
|
|
|
return joined_lines
|
|
|
|
|
|
def unjoin_sentences(original_sentence: str, modified_sentence: str, separator: str):
|
|
"""
|
|
Splits the original and modified sentences into lines based on the separator.
|
|
Tries to match the number of lines between the original and modified sentences.
|
|
"""
|
|
|
|
if original_sentence is None:
|
|
return ' '
|
|
|
|
|
|
original_lines = original_sentence.split(separator)
|
|
original_lines = [s.strip().replace(' ', ' ').lstrip(" ,.:;)") if s.strip().replace(' ', ' ').lstrip(" ,.:;)") else s
|
|
for s in original_lines if s.strip()]
|
|
original_lines = [s for s in original_lines if s]
|
|
original_lines = [s for s in original_lines if s.strip()]
|
|
|
|
if modified_sentence is None:
|
|
return original_lines or ' '
|
|
|
|
|
|
modified_sentence.replace(f"{separator_unjoin} ", f"{separator_unjoin}").replace(f" {separator_unjoin}", f"{separator_unjoin}").replace(
|
|
f"{separator_unjoin}.", f".{separator_unjoin}").replace(f"{separator_unjoin},", f",{separator_unjoin}")
|
|
|
|
|
|
modified_lines = modified_sentence.split(separator_unjoin)
|
|
modified_lines = [s.strip().replace(' ', ' ').lstrip(" ,.:;)") if s.strip().replace(' ', ' ').lstrip(" ,.:;)") else s
|
|
for s in modified_lines if s.strip()]
|
|
modified_lines = [s for s in modified_lines if s]
|
|
modified_lines = [s for s in modified_lines if s.strip()]
|
|
|
|
|
|
if original_lines == "..." or original_lines == "…":
|
|
return original_lines
|
|
|
|
|
|
if len(original_lines) == len(modified_lines):
|
|
return modified_lines
|
|
|
|
|
|
original_word_count = sum(len(line.strip().split())
|
|
for line in original_lines)
|
|
modified_word_count = len(' '.join(modified_lines).strip().split())
|
|
if original_word_count == 0 or modified_word_count == 0:
|
|
return original_sentence.replace(separator, ' ').replace(' ', ' ')
|
|
|
|
|
|
modified_words_proportion = modified_word_count / original_word_count
|
|
|
|
modified_words = ' '.join(modified_lines).replace(separator, "").replace(
|
|
separator_unjoin, "").replace(" ", " ").strip().split(' ')
|
|
|
|
new_modified_lines = []
|
|
current_index = 0
|
|
|
|
|
|
for i in range(len(original_lines)):
|
|
|
|
num_words = int(
|
|
round(len(original_lines[i].strip().split()) * modified_words_proportion))
|
|
|
|
|
|
generated_line = ' '.join(
|
|
modified_words[current_index:current_index+num_words])
|
|
|
|
|
|
current_index += num_words
|
|
|
|
|
|
if i == len(original_lines) - 1:
|
|
' '.join([generated_line, ' '.join(
|
|
modified_words[current_index:])])
|
|
|
|
|
|
new_modified_lines.append(generated_line.replace(" ", " ").strip())
|
|
|
|
|
|
while len(new_modified_lines) < len(original_lines):
|
|
new_modified_lines.append(new_modified_lines[-1])
|
|
|
|
return new_modified_lines or original_lines or ' '
|
|
|