# Ported from https://github.com/openai/whisper/blob/main/whisper/utils.py import json import os import re import sys import zlib from typing import Callable, List, Optional, TextIO, Union, Dict, Tuple from datetime import datetime from modules.whisper.data_classes import Segment, Word from .files_manager import read_file def format_timestamp( seconds: float, always_include_hours: bool = True, decimal_marker: str = "," ) -> str: assert seconds >= 0, "non-negative timestamp expected" milliseconds = round(seconds * 1000.0) hours = milliseconds // 3_600_000 milliseconds -= hours * 3_600_000 minutes = milliseconds // 60_000 milliseconds -= minutes * 60_000 seconds = milliseconds // 1_000 milliseconds -= seconds * 1_000 hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else "" return ( f"{hours_marker}{minutes:02d}:{seconds:02d}{decimal_marker}{milliseconds:03d}" ) def time_str_to_seconds(time_str: str, decimal_marker: str = ",") -> float: times = time_str.split(":") if len(times) == 3: hours, minutes, rest = times hours = int(hours) else: hours = 0 minutes, rest = times seconds, fractional = rest.split(decimal_marker) minutes = int(minutes) seconds = int(seconds) fractional_seconds = float("0." + fractional) return hours * 3600 + minutes * 60 + seconds + fractional_seconds def get_start(segments: List[dict]) -> Optional[float]: return next( (w["start"] for s in segments for w in s["words"]), segments[0]["start"] if segments else None, ) def get_end(segments: List[dict]) -> Optional[float]: return next( (w["end"] for s in reversed(segments) for w in reversed(s["words"])), segments[-1]["end"] if segments else None, ) class ResultWriter: extension: str def __init__(self, output_dir: str): self.output_dir = output_dir def __call__( self, result: Union[dict, List[Segment]], output_file_name: str, options: Optional[dict] = None, **kwargs ): if isinstance(result, List) and result and isinstance(result[0], Segment): result = {"segments": [seg.model_dump() for seg in result]} output_path = os.path.join( self.output_dir, output_file_name + "." + self.extension ) with open(output_path, "w", encoding="utf-8") as f: self.write_result(result, file=f, options=options, **kwargs) def write_result( self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs ): raise NotImplementedError class WriteTXT(ResultWriter): extension: str = "txt" def write_result( self, result: Union[Dict, List[Segment]], file: TextIO, options: Optional[dict] = None, **kwargs ): for segment in result["segments"]: print(segment["text"].strip(), file=file, flush=True) class SubtitlesWriter(ResultWriter): always_include_hours: bool decimal_marker: str def iterate_result( self, result: dict, options: Optional[dict] = None, *, max_line_width: Optional[int] = None, max_line_count: Optional[int] = None, highlight_words: bool = False, align_lrc_words: bool = False, max_words_per_line: Optional[int] = None, ): options = options or {} max_line_width = max_line_width or options.get("max_line_width") max_line_count = max_line_count or options.get("max_line_count") highlight_words = highlight_words or options.get("highlight_words", False) align_lrc_words = align_lrc_words or options.get("align_lrc_words", False) max_words_per_line = max_words_per_line or options.get("max_words_per_line") preserve_segments = max_line_count is None or max_line_width is None max_line_width = max_line_width or 1000 max_words_per_line = max_words_per_line or 1000 def iterate_subtitles(): line_len = 0 line_count = 1 # the next subtitle to yield (a list of word timings with whitespace) subtitle: List[dict] = [] last: float = get_start(result["segments"]) or 0.0 for segment in result["segments"]: chunk_index = 0 words_count = max_words_per_line while chunk_index < len(segment["words"]): remaining_words = len(segment["words"]) - chunk_index if max_words_per_line > len(segment["words"]) - chunk_index: words_count = remaining_words for i, original_timing in enumerate( segment["words"][chunk_index : chunk_index + words_count] ): timing = original_timing.copy() long_pause = ( not preserve_segments and timing["start"] - last > 3.0 ) has_room = line_len + len(timing["word"]) <= max_line_width seg_break = i == 0 and len(subtitle) > 0 and preserve_segments if ( line_len > 0 and has_room and not long_pause and not seg_break ): # line continuation line_len += len(timing["word"]) else: # new line timing["word"] = timing["word"].strip() if ( len(subtitle) > 0 and max_line_count is not None and (long_pause or line_count >= max_line_count) or seg_break ): # subtitle break yield subtitle subtitle = [] line_count = 1 elif line_len > 0: # line break line_count += 1 timing["word"] = "\n" + timing["word"] line_len = len(timing["word"].strip()) subtitle.append(timing) last = timing["start"] chunk_index += max_words_per_line if len(subtitle) > 0: yield subtitle if len(result["segments"]) > 0 and "words" in result["segments"][0] and result["segments"][0]["words"]: for subtitle in iterate_subtitles(): subtitle_start = self.format_timestamp(subtitle[0]["start"]) subtitle_end = self.format_timestamp(subtitle[-1]["end"]) subtitle_text = "".join([word["word"] for word in subtitle]) if highlight_words: last = subtitle_start all_words = [timing["word"] for timing in subtitle] for i, this_word in enumerate(subtitle): start = self.format_timestamp(this_word["start"]) end = self.format_timestamp(this_word["end"]) if last != start: yield last, start, subtitle_text yield start, end, "".join( [ re.sub(r"^(\s*)(.*)$", r"\1\2", word) if j == i else word for j, word in enumerate(all_words) ] ) last = end if align_lrc_words: lrc_aligned_words = [f"[{self.format_timestamp(sub['start'])}]{sub['word']}" for sub in subtitle] l_start, l_end = self.format_timestamp(subtitle[-1]['start']), self.format_timestamp(subtitle[-1]['end']) lrc_aligned_words[-1] = f"[{l_start}]{subtitle[-1]['word']}[{l_end}]" lrc_aligned_words = ' '.join(lrc_aligned_words) yield None, None, lrc_aligned_words else: yield subtitle_start, subtitle_end, subtitle_text else: for segment in result["segments"]: segment_start = self.format_timestamp(segment["start"]) segment_end = self.format_timestamp(segment["end"]) segment_text = segment["text"].strip().replace("-->", "->") yield segment_start, segment_end, segment_text def format_timestamp(self, seconds: float): return format_timestamp( seconds=seconds, always_include_hours=self.always_include_hours, decimal_marker=self.decimal_marker, ) class WriteVTT(SubtitlesWriter): extension: str = "vtt" always_include_hours: bool = False decimal_marker: str = "." def write_result( self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs ): print("WEBVTT\n", file=file) for start, end, text in self.iterate_result(result, options, **kwargs): print(f"{start} --> {end}\n{text}\n", file=file, flush=True) def to_segments(self, file_path: str) -> List[Segment]: segments = [] blocks = read_file(file_path).split('\n\n') for block in blocks: if block.strip() != '' and not block.strip().startswith("WEBVTT"): lines = block.strip().split('\n') time_line = lines[0].split(" --> ") start, end = time_str_to_seconds(time_line[0], self.decimal_marker), time_str_to_seconds(time_line[1], self.decimal_marker) sentence = ' '.join(lines[1:]) segments.append(Segment( start=start, end=end, text=sentence )) return segments class WriteSRT(SubtitlesWriter): extension: str = "srt" always_include_hours: bool = True decimal_marker: str = "," def write_result( self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs ): for i, (start, end, text) in enumerate( self.iterate_result(result, options, **kwargs), start=1 ): print(f"{i}\n{start} --> {end}\n{text}\n", file=file, flush=True) def to_segments(self, file_path: str) -> List[Segment]: segments = [] blocks = read_file(file_path).split('\n\n') for block in blocks: if block.strip() != '': lines = block.strip().split('\n') index = lines[0] time_line = lines[1].split(" --> ") start, end = time_str_to_seconds(time_line[0], self.decimal_marker), time_str_to_seconds(time_line[1], self.decimal_marker) sentence = ' '.join(lines[2:]) segments.append(Segment( start=start, end=end, text=sentence )) return segments class WriteLRC(SubtitlesWriter): extension: str = "lrc" always_include_hours: bool = False decimal_marker: str = "." def write_result( self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs ): for i, (start, end, text) in enumerate( self.iterate_result(result, options, **kwargs), start=1 ): if "align_lrc_words" in kwargs and kwargs["align_lrc_words"]: print(f"{text}\n", file=file, flush=True) else: print(f"[{start}]{text}[{end}]\n", file=file, flush=True) def to_segments(self, file_path: str) -> List[Segment]: segments = [] blocks = read_file(file_path).split('\n') for block in blocks: if block.strip() != '': lines = block.strip() pattern = r'(\[.*?\])' parts = re.split(pattern, lines) parts = [part.strip() for part in parts if part] for i, part in enumerate(parts): sentence_i = i%2 if sentence_i == 1: start_str, text, end_str = parts[sentence_i-1], parts[sentence_i], parts[sentence_i+1] start_str, end_str = start_str.replace("[", "").replace("]", ""), end_str.replace("[", "").replace("]", "") start, end = time_str_to_seconds(start_str, self.decimal_marker), time_str_to_seconds(end_str, self.decimal_marker) segments.append(Segment( start=start, end=end, text=text, )) return segments class WriteTSV(ResultWriter): """ Write a transcript to a file in TSV (tab-separated values) format containing lines like: \t\t Using integer milliseconds as start and end times means there's no chance of interference from an environment setting a language encoding that causes the decimal in a floating point number to appear as a comma; also is faster and more efficient to parse & store, e.g., in C++. """ extension: str = "tsv" def write_result( self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs ): print("start", "end", "text", sep="\t", file=file) for segment in result["segments"]: print(round(1000 * segment["start"]), file=file, end="\t") print(round(1000 * segment["end"]), file=file, end="\t") print(segment["text"].strip().replace("\t", " "), file=file, flush=True) class WriteJSON(ResultWriter): extension: str = "json" def write_result( self, result: dict, file: TextIO, options: Optional[dict] = None, **kwargs ): json.dump(result, file) def get_writer( output_format: str, output_dir: str ) -> Callable[[dict, TextIO, dict], None]: output_format = output_format.strip().lower().replace(".", "") writers = { "txt": WriteTXT, "vtt": WriteVTT, "srt": WriteSRT, "tsv": WriteTSV, "json": WriteJSON, "lrc": WriteLRC } if output_format == "all": all_writers = [writer(output_dir) for writer in writers.values()] def write_all( result: dict, file: TextIO, options: Optional[dict] = None, **kwargs ): for writer in all_writers: writer(result, file, options, **kwargs) return write_all return writers[output_format](output_dir) def generate_file( output_format: str, output_dir: str, result: Union[dict, List[Segment]], output_file_name: str, add_timestamp: bool = True, **kwargs ) -> Tuple[str, str]: output_format = output_format.strip().lower().replace(".", "") output_format = "vtt" if output_format == "webvtt" else output_format if add_timestamp: timestamp = datetime.now().strftime("%m%d%H%M%S") output_file_name += f"-{timestamp}" file_path = os.path.join(output_dir, f"{output_file_name}.{output_format}") file_writer = get_writer(output_format=output_format, output_dir=output_dir) if isinstance(file_writer, WriteLRC) and kwargs.get("highlight_words", False): kwargs["highlight_words"], kwargs["align_lrc_words"] = False, True file_writer(result=result, output_file_name=output_file_name, **kwargs) content = read_file(file_path) return content, file_path def safe_filename(name): INVALID_FILENAME_CHARS = r'[<>:"/\\|?*\x00-\x1f]' safe_name = re.sub(INVALID_FILENAME_CHARS, '_', name) # Truncate the filename if it exceeds the max_length (20) if len(safe_name) > 20: file_extension = safe_name.split('.')[-1] if len(file_extension) + 1 < 20: truncated_name = safe_name[:20 - len(file_extension) - 1] safe_name = truncated_name + '.' + file_extension else: safe_name = safe_name[:20] return safe_name