""" YouTube Video Analysis Module This module provides tools for analyzing YouTube videos, including transcript extraction and main points summarization. It uses the LangChain library for natural language processing tasks and the YouTube Transcript API for fetching video transcripts. Classes: YouTubeTranscriptTool: Handles fetching and processing of YouTube video transcripts. MainPointsExtractor: Extracts and formats main points from YouTube video transcripts. YouTubeAgent: Manages the overall agent setup for interacting with YouTube videos. SummaryExtractor: Extracts summaries from YouTube video transcripts. Usage: youtube_agent = YouTubeAgent() video_link = "https://www.youtube.com/watch?v=VIDEO_ID" results = process_video(video_link, youtube_agent) """ import os import openai from typing import List, Dict, Any from youtube_transcript_api import YouTubeTranscriptApi from langchain_core.pydantic_v1 import BaseModel, Field from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai import ChatOpenAI from langchain.schema.runnable import RunnableLambda, RunnablePassthrough from langchain.agents import tool, AgentExecutor from langchain.output_parsers.openai_functions import JsonKeyOutputFunctionsParser from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.utils.function_calling import convert_to_openai_function from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser from langchain.agents.format_scratchpad import format_to_openai_functions from langchain.memory import ConversationBufferWindowMemory from functools import wraps import functools import logging import traceback # Set up logging with more detailed format logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s') logger = logging.getLogger(__name__) # Define a decorator for error logging def log_errors(func): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: logger.error(f"Error in {func.__name__}: {str(e)}") logger.error(f"Traceback: {traceback.format_exc()}") raise return wrapper class YouTubeTranscriptTool: """ A tool for fetching and processing YouTube video transcripts. This class provides methods to retrieve transcripts with or without timestamps, and to split transcripts into manageable chunks. """ @staticmethod @tool(return_direct=True) def get_transcript_with_timestamps(youtube_video_id: str, chunk_number: int = 0) -> str: """ Retrieves a YouTube video transcript with timestamps. Args: youtube_video_id (str): The ID of the YouTube video. chunk_number (int): The index of the transcript chunk to retrieve. Returns: str: The requested transcript chunk with timestamps. """ return YouTubeTranscriptTool._get_transcript(youtube_video_id, chunk_number, include_timestamps=True) @staticmethod @tool(return_direct=True) def get_transcript_without_timestamps(youtube_video_id: str, chunk_number: int = 0) -> str: """ Retrieves a YouTube video transcript without timestamps. Args: youtube_video_id (str): The ID of the YouTube video. chunk_number (int): The index of the transcript chunk to retrieve. Returns: str: The requested transcript chunk without timestamps. """ return YouTubeTranscriptTool._get_transcript(youtube_video_id, chunk_number, include_timestamps=False) @staticmethod @log_errors def _get_transcript(youtube_video_id: str, chunk_number: int, include_timestamps: bool) -> str: """ Internal method to fetch and process the transcript. Args: youtube_video_id (str): The ID of the YouTube video. chunk_number (int): The index of the transcript chunk to retrieve. include_timestamps (bool): Whether to include timestamps in the transcript. Returns: str: The processed transcript chunk. Raises: ValueError: If the requested chunk number is out of range. """ try: transcript_json = YouTubeTranscriptApi.get_transcript(youtube_video_id) text_splitter = RecursiveCharacterTextSplitter( chunk_size=8192, chunk_overlap=0, separators=[f" {char}" for char in "ABCDEFGHIJKLMNOPQRSTUVWXYZ"] ) if include_timestamps: transcript_data = [f"{entry['start']:.2f}: {entry['text']} " for entry in transcript_json] else: transcript_data = [entry['text'] for entry in transcript_json] transcript_text = " ".join(transcript_data) transcript_splits = text_splitter.split_text(transcript_text) if chunk_number >= len(transcript_splits): raise ValueError(f"Chunk number {chunk_number} is out of range. Total chunks: {len(transcript_splits)}") chunked_text = transcript_splits[chunk_number] return YouTubeTranscriptTool._format_response(transcript_splits, chunk_number, chunked_text) except Exception as e: logger.error(f"Error in _get_transcript: {str(e)}") return f"Error fetching transcript: {str(e)}" @staticmethod def _format_response(transcript_splits: List[str], chunk_number: int, chunked_text: str) -> str: """ Formats the transcript chunk response. Args: transcript_splits (List[str]): All transcript chunks. chunk_number (int): The index of the current chunk. chunked_text (str): The text of the current chunk. Returns: str: Formatted response string. """ if len(transcript_splits) == 1: return f"Note: Complete subtitles returned.\n\nSubtitles:{chunked_text}" elif chunk_number == len(transcript_splits) - 1: return f"Note: Last chunk of subtitles returned.\n\nSubtitles:{chunked_text}" else: return f"Note: Partial subtitles returned. To get the next chunk, use chunk_number = {chunk_number + 1}.\n\nSubtitles:{chunked_text}" class Points(BaseModel): """Pydantic model for representing extracted points.""" point: str = Field(description="The main topic, theme, or subject extracted from the subtitle.") context: str = Field(description="The context or brief explanation of the main point.") emoji: str = Field(description="An emoji that represents or summarizes the main point.") timestamp: float = Field(description="The timestamp (in floating-point number) from the video where the main point is mentioned.") class MainPointsExtractor: """ A tool for extracting and formatting main points from YouTube video transcripts. This class provides methods to process transcripts and identify key points using natural language processing techniques. """ class Info(BaseModel): """Pydantic model for representing a collection of points.""" points: List[Points] @staticmethod @tool(return_direct=True) @log_errors def get_youtube_video_main_points(youtube_video_id: str) -> str: """ Extracts and formats main points from a YouTube video transcript. Args: youtube_video_id (str): The ID of the YouTube video. Returns: str: Formatted string of main points extracted from the video. """ try: transcript = MainPointsExtractor._get_youtube_video_transcript(youtube_video_id) main_points = MainPointsExtractor._extract_main_points(transcript) return MainPointsExtractor._format_youtube_comment(main_points) except Exception as e: logger.error(f"Error in get_youtube_video_main_points: {str(e)}") return f"Error extracting main points: {str(e)}" @staticmethod @log_errors def _get_youtube_video_transcript(youtube_video_id: str) -> str: """ Fetches the transcript for a YouTube video. Args: youtube_video_id (str): The ID of the YouTube video. Returns: str: The full transcript of the video. Raises: Exception: If there's an error fetching the transcript. """ try: transcript_json = YouTubeTranscriptApi.get_transcript(youtube_video_id) transcript_data = [f"{entry['start']:.2f}: {entry['text']} " for entry in transcript_json] return "".join(transcript_data) except Exception as e: logger.error(f"Error fetching transcript: {str(e)}") raise @staticmethod @functools.lru_cache(maxsize=16) def _extract_main_points(transcript: str) -> List[Dict[str, Any]]: """ Extracts main points from the transcript using NLP techniques. This method is cached to improve performance for repeated calls. Args: transcript (str): The full transcript of the video. Returns: List[Dict[str, Any]]: A list of dictionaries containing extracted main points. """ main_points_extraction_function = [convert_to_openai_function(MainPointsExtractor.Info)] model = ChatOpenAI(temperature=0) extraction_model = model.bind(functions=main_points_extraction_function, function_call={"name": "Info"}) prompt = ChatPromptTemplate.from_messages([("human", "{input}")]) extraction_chain = prompt | extraction_model | JsonKeyOutputFunctionsParser(key_name="points") text_splitter = RecursiveCharacterTextSplitter(chunk_overlap=0, chunk_size=8192, separators=[f" {char}" for char in "123456789"]) prep = RunnableLambda(lambda x: [{"input": doc} for doc in text_splitter.split_text(x)]) chain = prep | extraction_chain.map() | MainPointsExtractor._flatten return chain.invoke(transcript) @staticmethod @log_errors def _flatten(matrix): """Flattens a 2D list into a 1D list.""" return [item for row in matrix for item in row] @staticmethod @log_errors def _format_youtube_comment(json_data: List[Dict[str, Any]]) -> str: """ Formats extracted main points into a YouTube-style comment. Args: json_data (List[Dict[str, Any]]): List of dictionaries containing main points. Returns: str: Formatted string representing the main points as a YouTube comment. """ def _format_timestamp(seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02}:{minutes:02}:{seconds:02}" formatted_comment = "" for entry in json_data: timestamp = _format_timestamp(entry['timestamp']) emoji = entry['emoji'] point = entry['point'] context = entry['context'] formatted_comment += f"{timestamp} {emoji} {point}: {context}\n" return formatted_comment.strip() class Summary(BaseModel): """Pydantic model for representing extracted summary.""" summary: str = Field(description="Extract detailed information from the content.") class SummaryExtractor: """ A tool for extracting and formatting summaries from YouTube video transcripts. This class provides methods to process transcripts and generate concise summaries using natural language processing techniques. """ class Info(BaseModel): """Pydantic model for representing a collection of summaries.""" summary: List[Summary] @staticmethod @tool(return_direct=False) @log_errors def get_youtube_video_summary(youtube_video_id: str) -> str: """ Extracts and formats a summary from a YouTube video transcript. Args: youtube_video_id (str): The ID of the YouTube video. Returns: str: Formatted string of the summary extracted from the video. """ try: transcript = SummaryExtractor._get_youtube_video_transcript(youtube_video_id) summary = SummaryExtractor._extract_summary(transcript) return SummaryExtractor._format_summary(summary) except Exception as e: logger.error(f"Error in get_youtube_video_summary: {str(e)}") return f"Error extracting summary: {str(e)}" @staticmethod @log_errors def _get_youtube_video_transcript(youtube_video_id: str) -> str: """ Fetches the transcript for a YouTube video. Args: youtube_video_id (str): The ID of the YouTube video. Returns: str: The full transcript of the video. Raises: Exception: If there's an error fetching the transcript. """ try: transcript_json = YouTubeTranscriptApi.get_transcript(youtube_video_id) transcript_data = [entry['text'] for entry in transcript_json] return " ".join(transcript_data) except Exception as e: logger.error(f"Error fetching transcript: {str(e)}") raise @staticmethod @functools.lru_cache(maxsize=16) def _extract_summary(transcript: str) -> List[Summary]: """ Extracts a summary from a YouTube video transcript. Args: transcript (str): The full transcript of the video. Returns: Summary: A Summary object containing the extracted summary. """ summary_extraction_function = [convert_to_openai_function(SummaryExtractor.Info)] model = ChatOpenAI(temperature=0) extraction_model = model.bind(functions=summary_extraction_function, function_call={"name": "Info"}) prompt = ChatPromptTemplate.from_messages([("human", "{input}")]) extraction_chain = prompt | extraction_model | JsonKeyOutputFunctionsParser(key_name="summary") text_splitter = RecursiveCharacterTextSplitter(chunk_overlap=0, chunk_size=8192, separators=[f" {char}" for char in "ABCDEFGHIJKLMNOPQRSTUVWXYZ"]) prep = RunnableLambda(lambda x: [{"input": doc} for doc in text_splitter.split_text(x)]) chain = prep | extraction_chain.map() | MainPointsExtractor._flatten return chain.invoke(transcript) @staticmethod def _format_summary(summaries: List[Summary]) -> str: """ Formats the list of summaries into a single string. Args: summaries (List[Summary]): List of Summary objects. Returns: str: A formatted string containing all summaries. """ return "\n\n".join([s["summary"] for s in summaries]) class YouTubeAgent: """ An agent for interacting with YouTube videos and processing user queries. This class sets up the necessary components for an AI agent that can understand and respond to user queries about YouTube videos. """ def __init__(self): """Initializes the YouTubeAgent with necessary tools and components.""" self.tools = [ YouTubeTranscriptTool.get_transcript_with_timestamps, YouTubeTranscriptTool.get_transcript_without_timestamps, MainPointsExtractor.get_youtube_video_main_points, SummaryExtractor.get_youtube_video_summary ] self.sys_message = "You are a helpful assistant." self.functions = [convert_to_openai_function(f) for f in self.tools] self.model = ChatOpenAI(temperature=0).bind(functions=self.functions) self.prompt = ChatPromptTemplate.from_messages([ ("system", self.sys_message), MessagesPlaceholder(variable_name="history"), ("user", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad") ]) self.agent_chain = RunnablePassthrough.assign( agent_scratchpad= lambda x: format_to_openai_functions(x["intermediate_steps"]) ) | self.prompt | self.model | OpenAIFunctionsAgentOutputParser() self.memory = ConversationBufferWindowMemory(k=3, return_messages=True, memory_key="history") self.agent_executor = AgentExecutor(agent=self.agent_chain, tools=self.tools, memory=self.memory) @log_errors def invoke(self, input_text: str) -> str: """ Processes a user input and returns the agent's response. Args: input_text (str): The user's input query. Returns: str: The agent's response to the user's query. """ try: result = self.agent_executor.invoke({"input": input_text}) return result['output'] except Exception as e: logger.error(f"Error in YouTubeAgent.invoke: {str(e)}") return f"An error occurred: {str(e)}" # # Usage example # if __name__ == "__main__": # youtube_agent = YouTubeAgent() # video_link = "https://www.youtube.com/watch?v=dZxbVGhpEkI" # try: # main_points = youtube_agent.invoke(f"Can you get summary of the following video {video_link}") # except Exception as e: # logger.error(f"An error occurred during processing: {str(e)}") # print(f"An error occurred: {str(e)}")