import os import time import tempfile from math import floor from typing import Optional, List, Dict, Any import torch import gradio as gr import yt_dlp as youtube_dl import numpy as np from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read from punctuators.models import PunctCapSegModelONNX from stable_whisper import WhisperResult # configuration MODEL_NAME = "kotoba-tech/kotoba-whisper-v1.0" BATCH_SIZE = 16 CHUNK_LENGTH_S = 15 FILE_LIMIT_MB = 1000 YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files # device setting if torch.cuda.is_available(): torch_dtype = torch.bfloat16 device = "cuda:0" model_kwargs = {'attn_implementation': 'sdpa'} else: torch_dtype = torch.float32 device = "cpu" model_kwargs = {} # define the pipeline pipe = pipeline( task="automatic-speech-recognition", model=MODEL_NAME, chunk_length_s=CHUNK_LENGTH_S, batch_size=BATCH_SIZE, torch_dtype=torch_dtype, device=device, model_kwargs=model_kwargs ) class Punctuator: ja_punctuations = ["!", "?", "、", "。"] def __init__(self, model: str = "pcs_47lang"): self.punctuation_model = PunctCapSegModelONNX.from_pretrained(model) def punctuate(self, pipeline_chunk: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def validate_punctuation(raw: str, punctuated: str): if 'unk' in punctuated: return raw if punctuated.count("。") > 1: ind = punctuated.rfind("。") punctuated = punctuated.replace("。", "") punctuated = punctuated[:ind] + "。" + punctuated[ind:] return punctuated text_edit = self.punctuation_model.infer([c['text'] for c in pipeline_chunk]) return [ { 'timestamp': c['timestamp'], 'text': validate_punctuation(c['text'], "".join(e)) } for c, e in zip(pipeline_chunk, text_edit) ] PUNCTUATOR = Punctuator() def _fix_timestamp(sample_rate: int, result: List[Dict[str, Any]], audio: np.ndarray) -> WhisperResult or None: def replace_none_ts(parts): total_dur = round(audio.shape[-1] / sample_rate, 3) _medium_dur = _ts_nonzero_mask = None def ts_nonzero_mask() -> np.ndarray: nonlocal _ts_nonzero_mask if _ts_nonzero_mask is None: _ts_nonzero_mask = np.array([(p['end'] or p['start']) is not None for p in parts]) return _ts_nonzero_mask def medium_dur() -> float: nonlocal _medium_dur if _medium_dur is None: nonzero_dus = [p['end'] - p['start'] for p in parts if None not in (p['end'], p['start'])] nonzero_durs = np.array(nonzero_dus) _medium_dur = np.median(nonzero_durs) * 2 if len(nonzero_durs) else 2.0 return _medium_dur def _curr_max_end(start: float, next_idx: float) -> float: max_end = total_dur if next_idx != len(parts): mask = np.flatnonzero(ts_nonzero_mask()[next_idx:]) if len(mask): _part = parts[mask[0]+next_idx] max_end = _part['start'] or _part['end'] new_end = round(start + medium_dur(), 3) if new_end > max_end: return max_end return new_end for i, part in enumerate(parts, 1): if part['start'] is None: is_first = i == 1 if is_first: new_start = round((part['end'] or 0) - medium_dur(), 3) part['start'] = max(new_start, 0.0) else: part['start'] = parts[i - 2]['end'] if part['end'] is None: no_next_start = i == len(parts) or parts[i]['start'] is None part['end'] = _curr_max_end(part['start'], i) if no_next_start else parts[i]['start'] words = [dict(start=word['timestamp'][0], end=word['timestamp'][1], word=word['text']) for word in result] replace_none_ts(words) return WhisperResult([words], force_order=True, check_sorted=True) def fix_timestamp(pipeline_output: List[Dict[str, Any]], audio: np.ndarray, sample_rate: int) -> List[Dict[str, Any]]: result = _fix_timestamp(sample_rate=sample_rate, audio=audio, result=pipeline_output) result.adjust_by_silence( audio, q_levels=20, k_size=5, sample_rate=sample_rate, min_word_dur=None, word_level=True, verbose=True, nonspeech_error=0.1, use_word_position=True ) if result.has_words: result.regroup(True) return [{"timestamp": [s.start, s.end], "text": s.text} for s in result.segments] def format_time(start: Optional[float], end: Optional[float]): def _format_time(seconds: Optional[float]): if seconds is None: return "complete " minutes = floor(seconds / 60) hours = floor(seconds / 3600) seconds = seconds - hours * 3600 - minutes * 60 m_seconds = floor(round(seconds - floor(seconds), 3) * 10 ** 3) seconds = floor(seconds) return f'{hours:02}:{minutes:02}:{seconds:02}.{m_seconds:03}' return f"[{_format_time(start)}-> {_format_time(end)}]:" def get_prediction(inputs, prompt: Optional[str], punctuate_text: bool = True, stabilize_timestamp: bool = True): generate_kwargs = {"language": "japanese", "task": "transcribe"} if prompt: generate_kwargs['prompt_ids'] = pipe.tokenizer.get_prompt_ids(prompt, return_tensors='pt').to(device) prediction = pipe(inputs, return_timestamps=True, generate_kwargs=generate_kwargs) if stabilize_timestamp: prediction['chunks'] = fix_timestamp(pipeline_output=prediction['chunks'], audio=inputs["array"], sample_rate=inputs["sampling_rate"] ) if punctuate_text: prediction['chunks'] = PUNCTUATOR.punctuate(prediction['chunks']) text = "".join([c['text'] for c in prediction['chunks']]) text_timestamped = "\n".join([ f"{format_time(*c['timestamp'])} {c['text']}" for c in prediction['chunks'] ]) return text, text_timestamped def transcribe(inputs, prompt, punctuate_text, stabilize_timestamp): if inputs is None: raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} return get_prediction(inputs, prompt, punctuate_text, stabilize_timestamp) def _return_yt_html_embed(yt_url): video_id = yt_url.split("?v=")[-1] return f'