|
import gradio as gr |
|
import requests |
|
import os |
|
from huggingface_hub import InferenceClient,HfApi |
|
import random |
|
import json |
|
import datetime |
|
import uuid |
|
import yt_dlp |
|
import cv2 |
|
import whisper |
|
|
|
from agent import ( |
|
PREFIX, |
|
COMPRESS_DATA_PROMPT, |
|
COMPRESS_DATA_PROMPT_SMALL, |
|
LOG_PROMPT, |
|
LOG_RESPONSE, |
|
) |
|
client = InferenceClient( |
|
"mistralai/Mixtral-8x7B-Instruct-v0.1" |
|
) |
|
reponame="Omnibus/tmp" |
|
save_data=f'https://huggingface.co/datasets/{reponame}/raw/main/' |
|
|
|
|
|
|
|
sizes = list(whisper._MODELS.keys()) |
|
langs = ["none"] + sorted(list(whisper.tokenizer.LANGUAGES.values())) |
|
current_size = "base" |
|
loaded_model = whisper.load_model(current_size) |
|
|
|
VERBOSE = True |
|
MAX_HISTORY = 100 |
|
MAX_DATA = 20000 |
|
|
|
def dl(inp,img): |
|
uid=uuid.uuid4() |
|
fps="Error" |
|
out = None |
|
out_file=[] |
|
if img == None and inp !="": |
|
try: |
|
inp_out=inp.replace("https://","") |
|
inp_out=inp_out.replace("/","_").replace(".","_").replace("=","_").replace("?","_") |
|
if "twitter" in inp: |
|
os.system(f'yt-dlp "{inp}" --extractor-arg "twitter:api=syndication" --trim-filenames 160 -o "{uid}/{inp_out}.mp4" -S res,mp4 --recode mp4') |
|
else: |
|
os.system(f'yt-dlp "{inp}" --trim-filenames 160 -o "{uid}/{inp_out}.mp4" -S res,mp4 --recode mp4') |
|
|
|
out = f"{uid}/{inp_out}.mp4" |
|
capture = cv2.VideoCapture(out) |
|
fps = capture.get(cv2.CAP_PROP_FPS) |
|
capture.release() |
|
except Exception as e: |
|
print(e) |
|
out = None |
|
elif img !=None and inp == "": |
|
capture = cv2.VideoCapture(img) |
|
fps = capture.get(cv2.CAP_PROP_FPS) |
|
capture.release() |
|
out = f"{img}" |
|
return out |
|
|
|
def csv(segments): |
|
output = "" |
|
for segment in segments: |
|
output += f"{segment['start']},{segment['end']},{segment['text']}\n" |
|
return output |
|
def transcribe(path,lang,size): |
|
yield (None,[("","Transcribing Video...")]) |
|
|
|
loaded_model = whisper.load_model(size) |
|
current_size = size |
|
results = loaded_model.transcribe(path, language=lang) |
|
subs = ".csv" |
|
if subs == "None": |
|
yield results["text"],[("","Transcription Complete...")] |
|
elif subs == ".csv": |
|
yield csv(results["segments"]),[("","Transcription Complete...")] |
|
|
|
|
|
def format_prompt(message, history): |
|
prompt = "<s>" |
|
for user_prompt, bot_response in history: |
|
prompt += f"[INST] {user_prompt} [/INST]" |
|
prompt += f" {bot_response}</s> " |
|
prompt += f"[INST] {message} [/INST]" |
|
return prompt |
|
|
|
|
|
|
|
def run_gpt( |
|
prompt_template, |
|
stop_tokens, |
|
max_tokens, |
|
seed, |
|
**prompt_kwargs, |
|
): |
|
print(seed) |
|
timestamp=datetime.datetime.now() |
|
|
|
generate_kwargs = dict( |
|
temperature=0.9, |
|
max_new_tokens=max_tokens, |
|
top_p=0.95, |
|
repetition_penalty=1.0, |
|
do_sample=True, |
|
seed=seed, |
|
) |
|
|
|
content = PREFIX.format( |
|
timestamp=timestamp, |
|
purpose="Compile the provided data and complete the users task" |
|
) + prompt_template.format(**prompt_kwargs) |
|
if VERBOSE: |
|
print(LOG_PROMPT.format(content)) |
|
|
|
|
|
|
|
|
|
|
|
stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) |
|
resp = "" |
|
for response in stream: |
|
resp += response.token.text |
|
|
|
|
|
if VERBOSE: |
|
print(LOG_RESPONSE.format(resp)) |
|
return resp |
|
|
|
|
|
def compress_data(c, instruct, history): |
|
seed=random.randint(1,1000000000) |
|
|
|
print (f'c:: {c}') |
|
|
|
|
|
divr=int(c)/MAX_DATA |
|
divi=int(divr)+1 if divr != int(divr) else int(divr) |
|
chunk = int(int(c)/divr) |
|
print(f'chunk:: {chunk}') |
|
print(f'divr:: {divr}') |
|
print (f'divi:: {divi}') |
|
out = [] |
|
|
|
s=0 |
|
e=chunk |
|
print(f'e:: {e}') |
|
new_history="" |
|
|
|
for z in range(divi): |
|
print(f's:e :: {s}:{e}') |
|
|
|
hist = history[s:e] |
|
|
|
resp = run_gpt( |
|
COMPRESS_DATA_PROMPT_SMALL, |
|
stop_tokens=["observation:", "task:", "action:", "thought:"], |
|
max_tokens=8192, |
|
seed=seed, |
|
direction=instruct, |
|
knowledge="", |
|
history=hist, |
|
) |
|
out.append(resp) |
|
|
|
|
|
|
|
e=e+chunk |
|
s=s+chunk |
|
return out |
|
|
|
|
|
def compress_data_og(c, instruct, history): |
|
seed=random.randint(1,1000000000) |
|
|
|
print (c) |
|
|
|
|
|
divr=int(c)/MAX_DATA |
|
divi=int(divr)+1 if divr != int(divr) else int(divr) |
|
chunk = int(int(c)/divr) |
|
print(f'chunk:: {chunk}') |
|
print(f'divr:: {divr}') |
|
print (f'divi:: {divi}') |
|
out = [] |
|
|
|
s=0 |
|
e=chunk |
|
print(f'e:: {e}') |
|
new_history="" |
|
|
|
for z in range(divi): |
|
print(f's:e :: {s}:{e}') |
|
|
|
hist = history[s:e] |
|
|
|
resp = run_gpt( |
|
COMPRESS_DATA_PROMPT, |
|
stop_tokens=["observation:", "task:", "action:", "thought:"], |
|
max_tokens=8192, |
|
seed=seed, |
|
direction=instruct, |
|
knowledge=new_history, |
|
history=hist, |
|
) |
|
|
|
new_history = resp |
|
print (resp) |
|
out+=resp |
|
e=e+chunk |
|
s=s+chunk |
|
''' |
|
resp = run_gpt( |
|
COMPRESS_DATA_PROMPT, |
|
stop_tokens=["observation:", "task:", "action:", "thought:"], |
|
max_tokens=8192, |
|
seed=seed, |
|
direction=instruct, |
|
knowledge=new_history, |
|
history="All data has been recieved.", |
|
)''' |
|
print ("final" + resp) |
|
|
|
return resp |
|
|
|
|
|
|
|
def summarize(inp,history,mem_check,data=None): |
|
|
|
json_box=[] |
|
error_box="" |
|
json_out={} |
|
rawp="Error" |
|
if inp == "": |
|
inp = "Process this data" |
|
history.clear() |
|
history = [(inp,"Summarizing Transcription...")] |
|
yield "",history,error_box,json_box |
|
|
|
if data != "Error" and data != "" and data != None: |
|
print(inp) |
|
out = str(data) |
|
rl = len(out) |
|
print(f'rl:: {rl}') |
|
c=1 |
|
for i in str(out): |
|
print(f'i:: {i}') |
|
if i == " " or i=="," or i=="\n": |
|
c +=1 |
|
print (f'c:: {c}') |
|
json_out = compress_data(c,inp,out) |
|
history = [(inp,"Generating Report...")] |
|
yield "", history,error_box,json_out |
|
|
|
out = str(json_out) |
|
print (out) |
|
rl = len(out) |
|
print(f'rl:: {rl}') |
|
c=1 |
|
for i in str(out): |
|
if i == " " or i=="," or i=="\n": |
|
c +=1 |
|
print (f'c2:: {c}') |
|
rawp = compress_data_og(c,inp,out) |
|
history.clear() |
|
history.append((inp,rawp)) |
|
|
|
yield "", history,error_box,json_out |
|
else: |
|
rawp = "Provide a valid data source" |
|
history.clear() |
|
history.append((inp,rawp)) |
|
yield "", history,error_box,json_out |
|
|
|
|
|
|
|
def clear_fn(): |
|
return "",[(None,None)] |
|
|
|
with gr.Blocks() as app: |
|
gr.HTML("""<center><h1>Video Summarizer</h1><h3>Mixtral 8x7B + Whisper</h3>""") |
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Row(): |
|
inp_url = gr.Textbox(label="Video URL") |
|
url_btn = gr.Button("Load Video") |
|
vid = gr.Video() |
|
|
|
trans = gr.Textbox(interactive=True) |
|
chatbot = gr.Chatbot(label="Mixtral 8x7B Chatbot",show_copy_button=True) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
prompt=gr.Textbox(label = "Instructions (optional)") |
|
with gr.Column(scale=1): |
|
mem_check=gr.Checkbox(label="Memory", value=False) |
|
button=gr.Button() |
|
|
|
|
|
with gr.Row(): |
|
stop_button=gr.Button("Stop") |
|
clear_btn = gr.Button("Clear") |
|
with gr.Row(): |
|
sz = gr.Dropdown(label="Model Size", choices=sizes, value='base') |
|
lang = gr.Dropdown(label="Language (Optional)", choices=langs, value="English") |
|
json_out=gr.JSON() |
|
e_box=gr.Textbox() |
|
|
|
|
|
|
|
url_btn.click(dl,[inp_url,vid],vid) |
|
|
|
clear_btn.click(clear_fn,None,[prompt,chatbot]) |
|
go=button.click(transcribe,[vid,lang,sz],[trans,chatbot]).then(summarize,[prompt,chatbot,mem_check,trans],[prompt,chatbot,e_box,json_out]) |
|
stop_button.click(None,None,None,cancels=[go]) |
|
app.queue(default_concurrency_limit=20).launch(show_api=False) |