Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import time | |
import tempfile | |
from math import floor | |
from typing import Optional, List, Dict, Any | |
import torch | |
import gradio as gr | |
import yt_dlp as youtube_dl | |
import numpy as np | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
from punctuators.models import PunctCapSegModelONNX | |
from stable_whisper import WhisperResult | |
# configuration | |
MODEL_NAME = "kotoba-tech/kotoba-whisper-v1.0" | |
BATCH_SIZE = 16 | |
CHUNK_LENGTH_S = 15 | |
FILE_LIMIT_MB = 1000 | |
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files | |
# device setting | |
if torch.cuda.is_available(): | |
torch_dtype = torch.bfloat16 | |
device = "cuda:0" | |
model_kwargs = {'attn_implementation': 'sdpa'} | |
else: | |
torch_dtype = torch.float32 | |
device = "cpu" | |
model_kwargs = {} | |
# define the pipeline | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
chunk_length_s=CHUNK_LENGTH_S, | |
batch_size=BATCH_SIZE, | |
torch_dtype=torch_dtype, | |
device=device, | |
model_kwargs=model_kwargs | |
) | |
class Punctuator: | |
ja_punctuations = ["!", "?", "γ", "γ"] | |
def __init__(self, model: str = "pcs_47lang"): | |
self.punctuation_model = PunctCapSegModelONNX.from_pretrained(model) | |
def punctuate(self, pipeline_chunk: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
def validate_punctuation(raw: str, punctuated: str): | |
if 'unk' in punctuated: | |
return raw | |
if punctuated.count("γ") > 1: | |
ind = punctuated.rfind("γ") | |
punctuated = punctuated.replace("γ", "") | |
punctuated = punctuated[:ind] + "γ" + punctuated[ind:] | |
return punctuated | |
text_edit = self.punctuation_model.infer([c['text'] for c in pipeline_chunk]) | |
return [ | |
{ | |
'timestamp': c['timestamp'], | |
'text': validate_punctuation(c['text'], "".join(e)) | |
} for c, e in zip(pipeline_chunk, text_edit) | |
] | |
PUNCTUATOR = Punctuator() | |
def _fix_timestamp(sample_rate: int, result: List[Dict[str, Any]], audio: np.ndarray) -> WhisperResult or None: | |
def replace_none_ts(parts): | |
total_dur = round(audio.shape[-1] / sample_rate, 3) | |
_medium_dur = _ts_nonzero_mask = None | |
def ts_nonzero_mask() -> np.ndarray: | |
nonlocal _ts_nonzero_mask | |
if _ts_nonzero_mask is None: | |
_ts_nonzero_mask = np.array([(p['end'] or p['start']) is not None for p in parts]) | |
return _ts_nonzero_mask | |
def medium_dur() -> float: | |
nonlocal _medium_dur | |
if _medium_dur is None: | |
nonzero_dus = [p['end'] - p['start'] for p in parts if None not in (p['end'], p['start'])] | |
nonzero_durs = np.array(nonzero_dus) | |
_medium_dur = np.median(nonzero_durs) * 2 if len(nonzero_durs) else 2.0 | |
return _medium_dur | |
def _curr_max_end(start: float, next_idx: float) -> float: | |
max_end = total_dur | |
if next_idx != len(parts): | |
mask = np.flatnonzero(ts_nonzero_mask()[next_idx:]) | |
if len(mask): | |
_part = parts[mask[0]+next_idx] | |
max_end = _part['start'] or _part['end'] | |
new_end = round(start + medium_dur(), 3) | |
if new_end > max_end: | |
return max_end | |
return new_end | |
for i, part in enumerate(parts, 1): | |
if part['start'] is None: | |
is_first = i == 1 | |
if is_first: | |
new_start = round((part['end'] or 0) - medium_dur(), 3) | |
part['start'] = max(new_start, 0.0) | |
else: | |
part['start'] = parts[i - 2]['end'] | |
if part['end'] is None: | |
no_next_start = i == len(parts) or parts[i]['start'] is None | |
part['end'] = _curr_max_end(part['start'], i) if no_next_start else parts[i]['start'] | |
words = [dict(start=word['timestamp'][0], end=word['timestamp'][1], word=word['text']) for word in result] | |
replace_none_ts(words) | |
return WhisperResult([words], force_order=True, check_sorted=True) | |
def fix_timestamp(pipeline_output: List[Dict[str, Any]], audio: np.ndarray, sample_rate: int) -> List[Dict[str, Any]]: | |
result = _fix_timestamp(sample_rate=sample_rate, audio=audio, result=pipeline_output) | |
result.adjust_by_silence( | |
audio, | |
q_levels=20, | |
k_size=5, | |
sample_rate=sample_rate, | |
min_word_dur=None, | |
word_level=True, | |
verbose=True, | |
nonspeech_error=0.1, | |
use_word_position=True | |
) | |
if result.has_words: | |
result.regroup(True) | |
return [{"timestamp": [s.start, s.end], "text": s.text} for s in result.segments] | |
def format_time(start: Optional[float], end: Optional[float]): | |
def _format_time(seconds: Optional[float]): | |
if seconds is None: | |
return "complete " | |
minutes = floor(seconds / 60) | |
hours = floor(seconds / 3600) | |
seconds = seconds - hours * 3600 - minutes * 60 | |
m_seconds = floor(round(seconds - floor(seconds), 3) * 10 ** 3) | |
seconds = floor(seconds) | |
return f'{hours:02}:{minutes:02}:{seconds:02}.{m_seconds:03}' | |
return f"[{_format_time(start)}-> {_format_time(end)}]:" | |
def get_prediction(inputs, prompt: Optional[str], punctuate_text: bool = True, stabilize_timestamp: bool = True): | |
generate_kwargs = {"language": "japanese", "task": "transcribe"} | |
if prompt: | |
generate_kwargs['prompt_ids'] = pipe.tokenizer.get_prompt_ids(prompt, return_tensors='pt').to(device) | |
array = inputs["array"] | |
sr = inputs["sampling_rate"] | |
prediction = pipe(inputs, return_timestamps=True, generate_kwargs=generate_kwargs) | |
if stabilize_timestamp: | |
prediction['chunks'] = fix_timestamp(pipeline_output=prediction['chunks'], audio=array, sample_rate=sr) | |
if punctuate_text: | |
prediction['chunks'] = PUNCTUATOR.punctuate(prediction['chunks']) | |
text = "".join([c['text'] for c in prediction['chunks']]) | |
text_timestamped = "\n".join([ | |
f"{format_time(*c['timestamp'])} {c['text']}" for c in prediction['chunks'] | |
]) | |
return text, text_timestamped | |
def transcribe(inputs: str, prompt, punctuate_text, stabilize_timestamp): | |
if inputs is None: | |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
with open(inputs, "rb") as f: | |
inputs = f.read() | |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
return get_prediction(inputs, prompt, punctuate_text, stabilize_timestamp) | |
def _return_yt_html_embed(yt_url): | |
video_id = yt_url.split("?v=")[-1] | |
return f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe> </center>' | |
def download_yt_audio(yt_url, filename): | |
info_loader = youtube_dl.YoutubeDL() | |
try: | |
info = info_loader.extract_info(yt_url, download=False) | |
except youtube_dl.utils.DownloadError as err: | |
raise gr.Error(str(err)) | |
file_length = info["duration_string"] | |
file_h_m_s = file_length.split(":") | |
file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] | |
if len(file_h_m_s) == 1: | |
file_h_m_s.insert(0, 0) | |
if len(file_h_m_s) == 2: | |
file_h_m_s.insert(0, 0) | |
file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] | |
if file_length_s > YT_LENGTH_LIMIT_S: | |
yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) | |
file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) | |
raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") | |
ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} | |
with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
try: | |
ydl.download([yt_url]) | |
except youtube_dl.utils.ExtractorError as err: | |
raise gr.Error(str(err)) | |
def yt_transcribe(yt_url, prompt, punctuate_text: bool = True, stabilize_timestamp: bool = True): | |
html_embed_str = _return_yt_html_embed(yt_url) | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
filepath = os.path.join(tmpdirname, "video.mp4") | |
download_yt_audio(yt_url, filepath) | |
with open(filepath, "rb") as f: | |
inputs = f.read() | |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
text, text_timestamped = get_prediction(inputs, prompt, punctuate_text, stabilize_timestamp) | |
return html_embed_str, text, text_timestamped | |
demo = gr.Blocks() | |
mf_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.inputs.Audio(source="microphone", type="filepath", optional=True), | |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True), | |
gr.inputs.Checkbox(default=True, label="Add punctuations"), | |
gr.inputs.Checkbox(default=True, label="Stabilize timestamp") | |
], | |
outputs=["text", "text"], | |
layout="horizontal", | |
theme="huggingface", | |
title=f"Transcribe Audio with {os.path.basename(MODEL_NAME)}", | |
description=f"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the Kotoba-Whisper checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and π€ Transformers to transcribe audio files of arbitrary length.", | |
allow_flagging="never", | |
) | |
file_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.inputs.Audio(source="upload", type="filepath", optional=True, label="Audio file"), | |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True), | |
gr.inputs.Checkbox(default=True, label="Add punctuations"), | |
gr.inputs.Checkbox(default=True, label="Stabilize timestamp") | |
], | |
outputs=["text", "text"], | |
layout="horizontal", | |
theme="huggingface", | |
title=f"Transcribe Audio with {os.path.basename(MODEL_NAME)}", | |
description=f"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses Kotoba-Whisper checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and π€ Transformers to transcribe audio files of arbitrary length.", | |
allow_flagging="never", | |
) | |
yt_transcribe = gr.Interface( | |
fn=yt_transcribe, | |
inputs=[ | |
gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), | |
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True), | |
gr.inputs.Checkbox(default=True, label="Add punctuations"), | |
gr.inputs.Checkbox(default=True, label="Stabilize timestamp") | |
], | |
outputs=["html", "text", "text"], | |
layout="horizontal", | |
theme="huggingface", | |
title=f"Transcribe YouTube with {os.path.basename(MODEL_NAME)}", | |
description=f"Transcribe long-form YouTube videos with the click of a button! Demo uses Kotoba-Whisper checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and π€ Transformers to transcribe video files of arbitrary length.", | |
allow_flagging="never", | |
) | |
with demo: | |
gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"]) | |
demo.launch(enable_queue=True) | |