Spaces:
Runtime error
Runtime error
from youtube_transcript_api import YouTubeTranscriptApi as yta | |
from youtube_transcript_api import NoTranscriptFound, TranscriptsDisabled | |
import streamlit as st | |
from yt_stats import YTstats | |
from datetime import datetime | |
import isodate | |
import pandas as pd | |
import deeppunkt | |
import time | |
import lexrank | |
import mysheet | |
def time_it(func): | |
def wrapper(*args, **kwargs): | |
start = time.time() | |
result = func(*args, **kwargs) | |
end = time.time() | |
elapsed = end - start | |
#st.write(f"Elapsed time: {end - start}") | |
st.write('Load time: '+str(round(elapsed,1))+' sec') | |
return result | |
return wrapper | |
def reset_session(): | |
if 'punkt' in st.session_state: | |
del st.session_state.punkt | |
if 'extract' in st.session_state: | |
del st.session_state.extract | |
if 'channel_id' in st.session_state: | |
del st.session_state.channel_id | |
def update_param_example(): | |
#st.session_state.url_vid = st.session_state.ex_vid | |
video_id = get_id_from_link(st.session_state.ex_vid) | |
st.experimental_set_query_params(vid=video_id) | |
reset_session() | |
def update_param_textinput(): | |
#st.session_state.url_vid = st.session_state.ti_vid | |
video_id = get_id_from_link(st.session_state.ti_vid) | |
st.experimental_set_query_params(vid=video_id) | |
reset_session() | |
def get_link_from_id(video_id): | |
if "v=" not in video_id: | |
return 'https://www.youtube.com/watch?v='+video_id | |
else: | |
return video_id | |
def get_id_from_link(link): | |
if "v=" in link: | |
return link.split("v=")[1].split("&")[0] | |
elif len(link)==11: | |
return link | |
else: | |
return "Error: Invalid Link." | |
# @st.cache(allow_output_mutation=True, suppress_st_warning=True) | |
# def retry_access_yt_object(url, max_retries=5, interval_secs=5, on_progress_callback=None): | |
# """ | |
# Retries creating a YouTube object with the given URL and accessing its title several times | |
# with a given interval in seconds, until it succeeds or the maximum number of attempts is reached. | |
# If the object still cannot be created or the title cannot be accessed after the maximum number | |
# of attempts, the last exception is raised. | |
# """ | |
# last_exception = None | |
# for i in range(max_retries): | |
# try: | |
# yt = YouTube(url, on_progress_callback=on_progress_callback) | |
# #title = yt.title # Access the title of the YouTube object. | |
# #views = yt.views | |
# return yt # Return the YouTube object if successful. | |
# except Exception as err: | |
# last_exception = err # Keep track of the last exception raised. | |
# st.write(f"Failed to create YouTube object or access title. Retrying... ({i+1}/{max_retries})") | |
# time.sleep(interval_secs) # Wait for the specified interval before retrying. | |
# # If the YouTube object still cannot be created or the title cannot be accessed after the maximum number of attempts, raise the last exception. | |
# raise last_exception | |
def get_video_data(_yt, video_id): | |
yt_img = f'http://img.youtube.com/vi/{video_id}/mqdefault.jpg' | |
yt_img_html = '<img src='+yt_img+' width="250" height="150" />' | |
yt_img_html_link = '<a href='+url+'>'+yt_img_html+'</a>' | |
snippet = yt._get_single_video_data(video_id,'snippet') | |
yt_publish_date = snippet['publishedAt'] | |
yt_title = snippet['title'] | |
yt_author = snippet['channelTitle'] | |
yt_channel_id = snippet['channelId'] | |
try: | |
yt_keywords = snippet['tags'] | |
except: | |
yt_keywords = [] | |
statistics = yt._get_single_video_data(video_id,'statistics') | |
yt_views = statistics['viewCount'] | |
contentDetails = yt._get_single_video_data(video_id,'contentDetails') | |
yt_length = contentDetails['duration'] | |
yt_length_isodate = isodate.parse_duration(yt_length) | |
yt_length_isoformat = isodate.duration_isoformat(yt_length_isodate, "%H:%M:%S")[1:] | |
data = {'Video':[yt_img_html_link], | |
'Author': [yt_author], | |
'Title': [yt_title], | |
'Published': [datetime.strptime(yt_publish_date, '%Y-%m-%dT%H:%M:%SZ').strftime('%B %d, %Y')], | |
'Views':[format(int(yt_views), ",").replace(",", "'")], | |
'Length':[yt_length_isoformat]} | |
return data, yt_keywords, yt_channel_id | |
def get_video_data_from_gsheed(df, video_id): | |
yt_img_html_link = df.loc[df["ID"] == video_id]['Video'].to_list()[0] | |
yt_author = df.loc[df["ID"] == video_id]['Author'].to_list()[0] | |
yt_title = df.loc[df["ID"] == video_id]['Title'].to_list()[0] | |
yt_publish_date = df.loc[df["ID"] == video_id]['Published'].to_list()[0] | |
yt_views = df.loc[df["ID"] == video_id]['Views'].to_list()[0] | |
yt_length_isoformat = df.loc[df["ID"] == video_id]['Length'].to_list()[0] | |
yt_keywords = df.loc[df["ID"] == video_id]['Keywords'].to_list()[0].split(';') | |
yt_channel_id = df.loc[df["ID"] == video_id]['Channel'].to_list()[0] | |
data = {'Video':[yt_img_html_link], | |
'Author': [yt_author], | |
'Title': [yt_title], | |
'Published': [yt_publish_date], | |
'Views':[yt_views], | |
'Length':[yt_length_isoformat]} | |
return data, yt_keywords, yt_channel_id | |
def get_punctuated_text(raw_text): | |
response = deeppunkt.predict('sentences',raw_text) | |
st.session_state['punkt'] = response | |
def get_punctuated_text_to_dict(raw_text): | |
#st.session_state['punkt'] = {'data':[raw_text,0,0,0,0], 'duration':0} | |
st.session_state['punkt'] = [raw_text,0,0,0,0] | |
def get_extracted_text(raw_text): | |
response = lexrank.summarize(raw_text) | |
st.session_state['extract'] = response | |
def get_extracted_text_to_dict(raw_text): | |
st.session_state['extract'] = [raw_text,0,0,0,0] | |
def get_videos_from_yt(yt): | |
vids_thumbnails = [] | |
vids_videoIds = [] | |
vids_titles = [] | |
vids_lengths = [] | |
vids_published= [] | |
vids_views= [] | |
item=0 | |
for video in yt.video_data: | |
if item == item_limit: | |
break | |
item = item+1 | |
vids_video_id = video | |
vids_url = 'https://www.youtube.com/watch?v='+vids_video_id | |
yt_img = f'http://img.youtube.com/vi/{vids_video_id}/mqdefault.jpg' | |
yt_img_html = '<img src='+yt_img+' width="250" height="150" />' | |
yt_img_html_link = '<a href='+vids_url+'>'+yt_img_html+'</a>' | |
vids_thumbnails.append(yt_img_html_link) | |
vids_video_id_link = '<a target="_self" href="/?vid='+vids_video_id+'">'+vids_video_id+'</a>' | |
vids_videoIds.append(vids_video_id_link) | |
vids_titles.append(yt.video_data[video]['title']) | |
yt_length = yt.video_data[video]['duration'] | |
yt_length_isodate = isodate.parse_duration(yt_length) | |
yt_length_isoformat = isodate.duration_isoformat(yt_length_isodate, "%H:%M:%S")[1:] | |
vids_lengths.append(yt_length_isoformat) | |
yt_publish_date = yt.video_data[video]['publishedAt'] | |
yt_publish_date_formatted = datetime.strptime(yt_publish_date, '%Y-%m-%dT%H:%M:%SZ').strftime('%B %d, %Y') | |
vids_published.append(yt_publish_date_formatted) | |
yt_views = yt.video_data[video]['viewCount'] | |
yt_viws_formatted = format(int(yt_views), ",").replace(",", "'") | |
vids_views.append(yt_viws_formatted) | |
df_videos = {'Video': vids_thumbnails, | |
'Video ID':vids_videoIds, | |
'Title':vids_titles, | |
'Published':vids_published, | |
'Views':vids_views, | |
'Length':vids_lengths} | |
return df_videos | |
def get_transcript(video_id): | |
# transcript_list = yta.list_transcripts(video_id) | |
# # iterate over all available transcripts | |
# for transcript in transcript_list: | |
# # the Transcript object provides metadata properties | |
# st.write( | |
# transcript.video_id, | |
# transcript.language, | |
# transcript.language_code, | |
# # whether it has been manually created or generated by YouTube | |
# transcript.is_generated, | |
# # whether this transcript can be translated or not | |
# transcript.is_translatable, | |
# # a list of languages the transcript can be translated to | |
# transcript.translation_languages, | |
# ) | |
transcript_raw = None | |
try: | |
transcript_list = yta.list_transcripts(video_id) | |
transcript_item = transcript_list.find_transcript(['en']) | |
except (NoTranscriptFound, TranscriptsDisabled) as e: | |
transcript_item = 'No Transcript available.' | |
transcript_text = 'No Transcript available.' | |
transcript_item_is_generated = False | |
return transcript_text, transcript_item_is_generated | |
transcript_item_is_generated = transcript_item.is_generated | |
transcript_raw = transcript_item.fetch() | |
if transcript_raw is None: | |
return None | |
transcript_text = '\n'.join([i['text'].replace('\n',' ') for i in transcript_raw]) | |
return transcript_text, transcript_item_is_generated | |
def get_meta_info(video_id, url): | |
lextext = st.session_state.extract[0] | |
gpt_sum = '0' | |
gpt_title = '0' | |
title_sim = '0' | |
if len(lextext) < 10: | |
gpt_sum = 'NA' | |
gpt_title = 'NA' | |
title_sim = 'NA' | |
yt_img = f'http://img.youtube.com/vi/{video_id}/mqdefault.jpg' | |
yt_img_html = '<img src='+yt_img+' width="250" height="150" />' | |
yt_img_html_link = '<a href='+url+'>'+yt_img_html+'</a>' | |
video_info = {'ID': [video_id], | |
'Video':[yt_img_html_link], | |
'Author': [st.session_state["video_data"]["Author"][0]], | |
'Channel':[st.session_state["channel_id"]], | |
'Title': [st.session_state["video_data"]["Title"][0]], | |
'Published': [st.session_state["video_data"]["Published"][0]], | |
'Views':[st.session_state["video_data"]["Views"][0]], | |
'Length':[st.session_state["video_data"]["Length"][0]], | |
'Keywords':['; '.join(st.session_state["keywords"])]} | |
transcript_info = {'Words':[int(st.session_state.extract[1])], | |
'Sentences': [int(st.session_state.extract[2])], | |
'Characters': [int(st.session_state.extract[3])], | |
'Tokens':[int(st.session_state.extract[4])], | |
'Lextext':[st.session_state.extract[0]], | |
'GPTSummary':[gpt_sum], | |
'GPTTitle':[gpt_title], | |
'Titlesim':[title_sim]} | |
df_current_ts = pd.DataFrame({**video_info,**transcript_info}) | |
return df_current_ts | |
####################################################################################### | |
# Application Start | |
####################################################################################### | |
st.title("Transcriptifier") | |
st.subheader("Youtube Transcript Downloader") | |
example_urls = [ | |
'https://www.youtube.com/watch?v=8uQDDUfGNPA', # blog | |
'https://www.youtube.com/watch?v=ofZEo0Rzo5s', # h-educate | |
'https://www.youtube.com/watch?v=ReHGSGwV4-A', #wholesale ted | |
'https://www.youtube.com/watch?v=n8JHnLgodRI', #kevindavid | |
'https://www.youtube.com/watch?v=6MI0f6YjJIk', # Nicholas | |
'https://www.youtube.com/watch?v=nr4kmlTr9xw', # Linus | |
'https://www.youtube.com/watch?v=64Izfm24FKA', # Yannic | |
'https://www.youtube.com/watch?v=Mt1P7p9HmkU', # Fogarty | |
'https://www.youtube.com/watch?v=bj9snrsSook', #Geldschnurrbart | |
'https://www.youtube.com/watch?v=0kJz0q0pvgQ', # fcc | |
'https://www.youtube.com/watch?v=gNRGkMeITVU', # iman | |
'https://www.youtube.com/watch?v=vAuQuL8dlXo', #ghiorghiu | |
'https://www.youtube.com/watch?v=5scEDopRAi0', #infohaus | |
'https://www.youtube.com/watch?v=lCnHfTHkhbE', #fcc tutorial | |
'https://www.youtube.com/watch?v=QI2okshNv_4' | |
] | |
par_vid = st.experimental_get_query_params().get("vid") | |
if par_vid: | |
par_url = par_vid[0] | |
else: | |
par_url = None | |
select_examples = st.selectbox(label="Choose an example",options=example_urls, key='ex_vid', on_change=update_param_example) | |
url = st.text_input("Or Enter the YouTube video URL or ID:", value=par_url if par_url else select_examples, key='ti_vid', on_change=update_param_textinput) | |
######################## | |
# Load the data for a given video | |
######################## | |
API_KEY = st.secrets["api_key"] | |
yt = YTstats(API_KEY) | |
#yt = retry_access_yt_object(get_link_from_id(url)) | |
if url: | |
video_id = get_id_from_link(url) | |
if 'gsheed' not in st.session_state: | |
df = mysheet.read_gspread() | |
st.session_state.gsheed = df | |
#st.write("reading spradsheet") | |
else: | |
df = st.session_state.gsheed | |
#st.write("getting spreadsheed from session_state") | |
gslist=[] | |
try: | |
gslist = df.ID.to_list() | |
except: | |
st.write('no items available.') | |
if video_id in gslist: | |
#st.write(df.loc[df["ID"] == video_id]) | |
st.write("reading from sheet") | |
#transcript_item_is_generated = False | |
#transcript_text = df.loc[df["ID"] == video_id]['Punkttext'].to_list()[0] | |
#get_punctuated_text_to_dict(transcript_text) | |
extracted_text = df.loc[df["ID"] == video_id]['Lextext'].to_list()[0] | |
get_extracted_text_to_dict(extracted_text) | |
video_data, yt_keywords, yt_channel_id = get_video_data_from_gsheed(df, video_id) | |
else: | |
st.write("reading from api") | |
video_data, yt_keywords, yt_channel_id = get_video_data(yt, video_id) | |
st.session_state["video_data"] = video_data | |
st.session_state["keywords"] = yt_keywords | |
st.session_state["channel_id"] = yt_channel_id | |
df = pd.DataFrame(st.session_state["video_data"]) | |
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) | |
st.write("") | |
########################### | |
# Load Transcript | |
########################### | |
transcript_text, transcript_item_is_generated = get_transcript(video_id) | |
#if transcript_text is None: | |
# st.error("No transcript available.") | |
# st.stop() | |
######################## | |
# Load Author Keywords, that are not viewable by users | |
######################## | |
keywords_data = {'Authors Keywords':yt_keywords} | |
st.table(keywords_data) | |
st.write("") | |
# TODO | |
# or this video (bj9snrsSook) transcripts are available in the following languages: | |
# (MANUALLY CREATED) | |
# None | |
# (GENERATED) | |
# - de ("Deutsch (automatisch erzeugt)")[TRANSLATABLE] | |
# (TRANSLATION LANGUAGES) | |
# - af ("Afrikaans") | |
######################## | |
# Display the transcript along with the download button | |
######################## | |
with st.expander('Preview Transcript'): | |
st.code(transcript_text, language=None) | |
st.download_button('Download Transcript', transcript_text) | |
######################## | |
# API Call to deeppunkt-gr | |
######################## | |
st.subheader("Restore Punctuations of Transcript") | |
if not transcript_item_is_generated: | |
st.write("Transcript is punctuated by author.") | |
# TODO | |
#check if the transcript contains more than 5 sentences | |
if st.button('Load Punctuated Transcript'): | |
with st.spinner('Loading Punctuation...'): | |
if 'punkt' not in st.session_state: | |
# first figure out if transcript is already punctuated | |
if transcript_item_is_generated: | |
get_punctuated_text(transcript_text) | |
else: | |
get_punctuated_text_to_dict(transcript_text) | |
#st.write('Load time: '+str(round(st.session_state.punkt['duration'],1))+' sec') | |
metrics_data = {'Words':[int(st.session_state.punkt[1])], | |
'Sentences': [int(st.session_state.punkt[2])], | |
'Characters': [int(st.session_state.punkt[3])], | |
'Tokens':[int(st.session_state.punkt[4])]} | |
df = pd.DataFrame(metrics_data) | |
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) | |
st.write("") | |
with st.expander('Preview Transcript'): | |
st.code(st.session_state.punkt[0], language=None) | |
######################## | |
# Call to lexrank-gr | |
######################## | |
st.subheader("Extract Core Sentences from Transcript") | |
if st.button('Extract Sentences'): | |
# decide if the extract is already available, if not, text has to be punctuated first | |
with st.spinner('Loading Extractions ...'): | |
if 'extract' not in st.session_state: | |
with st.spinner('Loading Punctuation for Extraction ...'): | |
if 'punkt' not in st.session_state: | |
# first figure out if transcript is already punctuated | |
if transcript_item_is_generated: | |
get_punctuated_text(transcript_text) | |
else: | |
get_punctuated_text_to_dict(transcript_text) | |
get_extracted_text(st.session_state.punkt[0]) | |
metrics_data = {'Words':[int(st.session_state.extract[1])], | |
'Sentences': [int(st.session_state.extract[2])], | |
'Characters': [int(st.session_state.extract[3])], | |
'Tokens':[int(st.session_state.extract[4])]} | |
df = pd.DataFrame(metrics_data) | |
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) | |
st.write("") | |
with st.expander('Preview Transcript'): | |
st.code(st.session_state.extract[0], language=None) | |
################ | |
if 'extract' not in st.session_state: | |
st.error('Please run extraction first.', icon="🚨") | |
else: | |
df_current_ts = get_meta_info(video_id, url) | |
# initial write. | |
#df_new_sheet = pd.concat([df_current_ts]) | |
#mysheet.write_gspread(df_new_sheet) | |
#st.write(video_info) | |
if 'gsheed' not in st.session_state: | |
df = mysheet.read_gspread() | |
st.session_state.gsheed = df | |
df_sheet = st.session_state.gsheed | |
df_current_ts_id = list(df_current_ts.ID)[0] | |
if df_current_ts_id not in list(df_sheet.ID): | |
df_new_sheet = pd.concat([df_sheet,df_current_ts]) | |
mysheet.write_gspread(df_new_sheet) | |
st.session_state.gsheed = df_new_sheet | |
st.write('video added to sheet') | |
#else: | |
# st.write('video already in sheet') | |
# st.write(df_sheet) | |
####################### | |
# write to gspread file | |
######################## | |
if st.button('Read Spreadsheet'): | |
if 'gsheed' not in st.session_state: | |
df = mysheet.read_gspread() | |
st.session_state.gsheed = df | |
st.write(st.session_state.gsheed) | |
#if st.button('Add to Spreadsheet'): | |
####################### | |
# API Call to summarymachine | |
######################## | |
# def get_summarized_text(raw_text): | |
# response = requests.post("https://wldmr-summarymachine.hf.space/run/predict", json={ | |
# "data": [ | |
# raw_text, | |
# ]}) | |
# #response_id = response | |
# if response.status_code == 504: | |
# raise "Error: Request took too long (>60sec), please try a shorter text." | |
# return response.json() | |
# st.subheader("Summarize Extracted Sentences with Flan-T5-large") | |
# if st.button('Summarize Sentences'): | |
# command = 'Summarize the transcript in one sentence:\n\n' | |
# with st.spinner('Loading Punctuation (Step 1/3)...'): | |
# if 'punkt' not in st.session_state: | |
# # first figure out if transcript is already punctuated | |
# if transcript_item.is_generated: | |
# get_punctuated_text(transcript_text) | |
# else: | |
# get_punctuated_text_to_dict(transcript_text) | |
# with st.spinner('Loading Extraction (Step 2/3)...'): | |
# if 'extract' not in st.session_state: | |
# get_extracted_text(st.session_state.punkt['data'][0]) | |
# with st.spinner('Loading Summary (Step 3/3)...'): | |
# summary_text = get_summarized_text(command+st.session_state.extract['data'][0]) | |
# st.write('Load time: '+str(round(summary_text['duration'],1))+' sec') | |
# with st.expander('Preview Transcript'): | |
# st.write(summary_text['data'][0], language=None) | |
######################## | |
# Channel | |
######################## | |
st.subheader("Other Videos of the Channel") | |
#st.write(st.session_state["channel_id"]) | |
if 'channel_id' not in st.session_state: | |
st.error('Channel ID not available.', icon="🚨") | |
else: | |
yt.get_channel_statistics(st.session_state["channel_id"]) | |
stats_data = {'Channel ID': [st.session_state["channel_id"]], | |
'Total Views':[format(int(yt.channel_statistics["viewCount"]), ",").replace(",", "'")], | |
'Total Subscribers':[format(int(yt.channel_statistics["subscriberCount"]), ",").replace(",", "'")], | |
'Total Videos':[format(int(yt.channel_statistics["videoCount"]), ",").replace(",", "'")], | |
} | |
df = pd.DataFrame(stats_data) | |
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) | |
st.write("") | |
if st.button('Load Videos'): | |
if 'gsheed' not in st.session_state: | |
df = mysheet.read_gspread() | |
st.session_state.gsheed = df | |
progress_text = 'Loading...' | |
loading_bar = st.progress(0, text=progress_text) | |
item_limit=3 | |
df = st.session_state.gsheed | |
yt.get_channel_video_data(st.session_state["channel_id"],df, loading_bar, progress_text, item_limit) | |
df_videos = get_videos_from_yt(yt) | |
dataset = pd.DataFrame(df_videos) | |
st.markdown(dataset.style.hide(axis="index").to_html(), unsafe_allow_html=True) | |
######################## | |
# Sequence Loader | |
######################## | |
st.subheader("Sequence Loader") | |
# input hash as secret | |
input_hash = st.text_input("Enter Hash:") | |
item_limit = st.number_input(label="Number of Videos",value=3) | |
if st.button('Load Sequence'): | |
HASH_KEY = st.secrets["hash_key"] | |
if input_hash == HASH_KEY: | |
st.write("Access granted") | |
# read in spreadsheet | |
if 'gsheed' not in st.session_state: | |
df = mysheet.read_gspread() | |
st.session_state.gsheed = df | |
progress_text = 'Loading...' | |
loading_bar = st.progress(0, text=progress_text) | |
df_sheet = st.session_state.gsheed | |
yt.get_channel_video_data(st.session_state["channel_id"], df_sheet,loading_bar, progress_text, item_limit) | |
df_videos = get_videos_from_yt(yt) | |
dataset = pd.DataFrame(df_videos) | |
st.markdown(dataset.style.hide(axis="index").to_html(), unsafe_allow_html=True) | |
for sng in dataset['Video ID']: | |
subsng = sng[sng.find('>')+1:sng.find('</')] | |
st.write(subsng) | |
transcript_text, transcript_item_is_generated = get_transcript(subsng) | |
if transcript_item_is_generated: | |
get_punctuated_text(transcript_text) | |
else: | |
get_punctuated_text_to_dict(transcript_text) | |
get_extracted_text(st.session_state.punkt[0]) | |
video_data, yt_keywords, yt_channel_id = get_video_data(yt, subsng) | |
st.session_state["video_data"] = video_data | |
st.session_state["keywords"] = yt_keywords | |
st.session_state["channel_id"] = yt_channel_id | |
df_current_ts = get_meta_info(subsng, subsng) | |
st.write(df_current_ts) | |
df_sheet = st.session_state.gsheed | |
df_new_sheet = pd.concat([df_sheet,df_current_ts]) | |
mysheet.write_gspread(df_new_sheet) | |
st.session_state.gsheed = df_new_sheet | |
st.write('done') | |
st.write(st.session_state.gsheed) | |
else: | |
st.write("Access denied") | |
############### | |
# End of File # | |
############### | |
# hide_streamlit_style = """ | |
# <style> | |
# #MainMenu {visibility: hidden;} | |
# footer {visibility: hidden;} | |
# </style> | |
# """ | |
# st.markdown(hide_streamlit_style, unsafe_allow_html=True) | |