File size: 20,214 Bytes
857dbaf
c0e82c0
857dbaf
c0e82c0
 
 
 
857dbaf
 
dd41bc7
 
c0e82c0
 
 
 
 
 
dd41bc7
c0e82c0
 
857dbaf
 
 
 
dd41bc7
c0e82c0
05116ba
857dbaf
 
 
 
 
 
c0e82c0
857dbaf
 
 
 
 
 
6a369e2
 
05116ba
9d6a9e2
857dbaf
c0e82c0
 
857dbaf
c0e82c0
 
 
 
 
 
857dbaf
dd41bc7
c0e82c0
dd41bc7
c0e82c0
dd41bc7
c0e82c0
dd41bc7
 
857dbaf
c0e82c0
dd41bc7
c0e82c0
857dbaf
dd41bc7
 
857dbaf
dd41bc7
857dbaf
dd41bc7
 
857dbaf
 
dd41bc7
 
 
c0e82c0
dd41bc7
 
 
857dbaf
 
 
dd41bc7
857dbaf
dd41bc7
857dbaf
 
 
 
 
dd41bc7
857dbaf
 
dd41bc7
 
 
 
 
 
 
 
 
857dbaf
c0e82c0
857dbaf
 
dd41bc7
857dbaf
05116ba
857dbaf
 
 
 
 
 
 
 
 
 
 
05116ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
857dbaf
 
 
 
 
dd41bc7
857dbaf
 
c0e82c0
 
dd41bc7
857dbaf
 
c0e82c0
857dbaf
 
 
c0e82c0
857dbaf
9d6a9e2
dd41bc7
 
c0e82c0
 
dd41bc7
 
 
 
 
 
 
 
857dbaf
c0e82c0
 
 
 
 
 
 
 
 
 
dd41bc7
857dbaf
 
 
dd41bc7
857dbaf
c0e82c0
 
 
857dbaf
 
 
 
 
dd41bc7
857dbaf
dd41bc7
857dbaf
dd41bc7
857dbaf
 
dd41bc7
 
 
857dbaf
 
dd41bc7
857dbaf
 
 
 
 
 
 
 
dd41bc7
 
 
c0e82c0
dd41bc7
 
 
 
c0e82c0
dd41bc7
857dbaf
 
c0e82c0
 
 
 
 
dd41bc7
c0e82c0
 
 
 
 
 
 
 
 
 
 
 
 
ec3110b
c0e82c0
 
 
 
 
 
 
 
 
 
 
 
 
 
dd41bc7
c0e82c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
05116ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0e82c0
 
 
 
 
 
 
 
 
 
 
 
 
dd41bc7
c0e82c0
 
 
9d6a9e2
c0e82c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dd41bc7
 
c0e82c0
dd41bc7
 
c0e82c0
dd41bc7
 
 
c0e82c0
dd41bc7
 
 
 
 
c0e82c0
9d6a9e2
dd41bc7
c0e82c0
dd41bc7
 
 
 
 
 
 
 
 
 
 
 
857dbaf
 
 
 
 
 
 
 
 
 
c0e82c0
857dbaf
dd41bc7
 
857dbaf
c0e82c0
ecd85b6
 
dd41bc7
 
 
 
 
 
ecd85b6
c0e82c0
857dbaf
c0e82c0
9d6a9e2
c0e82c0
857dbaf
 
 
 
 
 
c0e82c0
857dbaf
 
 
c0e82c0
857dbaf
 
c0e82c0
857dbaf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0e82c0
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
"""
YouTube Video Analysis and Interaction Module

This module provides a comprehensive set of tools for analyzing YouTube videos,
extracting information, and answering questions based on video content. It leverages
the LangChain library for natural language processing tasks and the YouTube Transcript
API for fetching video transcripts.

Classes:
    YouTubeTranscriptPointsExtractor: 
        Extracts and formats comments with clickable timestamps from a YouTube video transcript.
    QuestionAnswerExtractor: 
        Processes user questions and extracts answers from video transcripts.
    YouTubeAgent: 
        Manages the overall agent setup for interacting with YouTube videos and processing user queries.

Key Features:
    - Main points formatted as youtube comment with clickable timestamps
    - Question answering based on video content
    - Flexible AI agent for handling various YouTube video-related tasks
"""

import os
import openai
import json
from typing import List, Dict, Any, Union, Type
import requests
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough
from langchain.agents import tool, AgentExecutor
from langchain.output_parsers.openai_functions import JsonKeyOutputFunctionsParser, JsonOutputFunctionsParser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain.agents.format_scratchpad import format_to_openai_functions
from langchain.memory import ConversationBufferWindowMemory

# _ = load_dotenv(find_dotenv()) # read local .env file
openai.api_key = os.getenv('OPENAI_API_KEY') #os.environ['OPENAI_API_KEY']
rapid_api_key = os.getenv('RAPID_API_KEY')
model_name="gpt-4o-mini"

def get_temperature():
    return 0  #Default value

def set_temperature(new_temperature):
    global get_temperature
    def new_get_temperature():
        return new_temperature
    get_temperature = new_get_temperature
    # print(f"Temperature set to: {get_temperature()}")

class TimestampedPoint_1(BaseModel):
    """Pydantic model for representing extracted points from Youtube-Transcript"""
    timestamp: float = Field(description="The timestamp (in floating-point number) of when main points are discussed in the video.")
    main_point: str = Field(description="A title for Main point.")
    summary: str = Field(description="A summary of main points discussed at that timestamp.")
    emoji: str = Field(description="An emoji that matches the summary.")
    
class TimestampedPoint_2(BaseModel):
    """Pydantic model for representing extracted points."""
    main_point: str = Field(description="The main topic, theme, or subject extracted from the subtitle.")
    timestamp: float = Field(description="The timestamp (in floating-point number) from the video where the main point is mentioned.")
    summary: str = Field(description="The context or brief explanation of the main point.")
    emoji: str = Field(description="An emoji that represents or summarizes the main point.")
    
class YouTubeTranscriptPointsExtractor:
    """
    A tool for extracting and formatting main points with clickable timestamps from YouTube video transcripts.

    This class provides methods to process transcripts, identify key points,
    and format them for use in YouTube comments with clickable timestamps.
    """

    class PointsCollection_1(BaseModel):
        """Pydantic model for representing a collection of timestamped points."""
        points: List[TimestampedPoint_1]
    
    class PointsCollection_2(BaseModel):
        """Pydantic model for representing a collection of timestamped points."""
        points: List[TimestampedPoint_2]
    
    @staticmethod
    @tool(return_direct=True)
    def extract_clickable_points(youtube_video_id: str) -> str:
        """
        Extracts and formats comments with clickable timestamps from a YouTube video transcript.

        Args:
            youtube_video_id (str): The ID of the YouTube video.

        Returns:
            str: Formatted string of main points with clickable timestamps, ready for use in YouTube comments.
        """
        try:
            transcript = YouTubeTranscriptPointsExtractor._fetch_transcript(youtube_video_id)
            extracted_points_1 = YouTubeTranscriptPointsExtractor._process_transcript(transcript, YouTubeTranscriptPointsExtractor.PointsCollection_1)
            formatted_output_1 = YouTubeTranscriptPointsExtractor._format_for_youtube_comment(extracted_points_1, True)
            formatted_output_1a = YouTubeTranscriptPointsExtractor._format_for_youtube_comment(extracted_points_1, False)
            
            extracted_points_2 = YouTubeTranscriptPointsExtractor._process_transcript(transcript, YouTubeTranscriptPointsExtractor.PointsCollection_2)
            formatted_output_2 = YouTubeTranscriptPointsExtractor._format_for_youtube_comment(extracted_points_2, True)
            formatted_output_2a = YouTubeTranscriptPointsExtractor._format_for_youtube_comment(extracted_points_2, False)
            return f"""Main points extracted from YouTube video (ID: {youtube_video_id})\nOutput_style_1:\n```\n{formatted_output_1}\n```\nOutput_Style_1a:\n```\n{formatted_output_1a}\n```\nOutput_Style_2a:\n```\n{formatted_output_2}\n```\nOutput_Style_2a:\n```\n{formatted_output_2a}\n```\nChoose the style that best suits your needs for presenting the main points of the video."""
        except Exception as e:
            raise

    @staticmethod
    def _fetch_transcript(youtube_video_id: str) -> str:
        """
        Fetches the transcript for a YouTube video using a third-party API.

        Args:
            youtube_video_id (str): The ID of the YouTube video.

        Returns:
            str: The full transcript of the video.

        Raises:
            Exception: If there's an error fetching the transcript.
        """
        try:
            details_url = "https://youtube-media-downloader.p.rapidapi.com/v2/video/details"
            subtitles_url = "https://youtube-media-downloader.p.rapidapi.com/v2/video/subtitles"
            querystring = {"videoId": youtube_video_id}
            headers = {
                "x-rapidapi-key": rapid_api_key,
                "x-rapidapi-host": "youtube-media-downloader.p.rapidapi.com"
            }
            details_response = requests.get(details_url, headers=headers, params=querystring)
            print(details_response)
            sub_url = details_response.json()['subtitles']['items'][0]['url']
            querystring = {"subtitleUrl": sub_url, "format": "json"}
            subtitles_response = requests.get(subtitles_url, headers=headers, params=querystring)

            transcript_json = subtitles_response.json()
            transcript_data = [f"{entry['startMs']/1000:.2f}: {entry['text']} " for entry in transcript_json]
            return "".join(transcript_data)
        except Exception as e:
            raise

    @staticmethod
    def _process_transcript(transcript: str, info_model: Union[Type[PointsCollection_1], Type[PointsCollection_2]]) -> List[Dict[str, Any]]:
        """
        Extracts main points from the transcript using NLP techniques.
        
        This method maintains a conversation history to provide context for subsequent calls.
        
        Args:
            transcript (str): The full transcript of the video.
        
        Returns:
            List[Dict[str, Any]]: A list of dictionaries containing extracted main points.
        """
        main_points_extraction_function = [convert_to_openai_function(info_model)]
        
        model = ChatOpenAI(temperature=get_temperature(), model=model_name)

        extraction_model = model.bind(functions=main_points_extraction_function, function_call={"name": info_model.__name__})
        
        system_message = f"""
        You are an AI assistant that extracts essential info from video transcripts.
        You have the authority to make improvements as you see fit.
        
        Rules To Follow:
        - Refining the summaries for clarity and conciseness.
        - Adjusting emoji choices to better represent the content.
        - Removing redundant information.
        - Grouping two points into a single point if the timestamps are close enough.
        
        Your goal is to produce a refined and accurate representation of the main points from the video transcript. Use your judgment to balance adherence to the specific rules with overall improvement of the extracted information.
        """
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_message),
            ("human", "{input}")
        ])
        
        extraction_chain = prompt | extraction_model | JsonKeyOutputFunctionsParser(key_name="points")

        text_splitter = RecursiveCharacterTextSplitter(chunk_overlap=0, chunk_size=16000, separators=[f" {char}" for char in "123456789"])
        
        prep = RunnableLambda(lambda x: [{"input": doc} for doc in text_splitter.split_text(x)])
        
        chain = prep | extraction_chain.map() | YouTubeTranscriptPointsExtractor._flatten
        
        result_1 = chain.invoke(transcript)
                
        return result_1

    @staticmethod
    def _flatten(matrix):
        """Flattens a 2D list into a 1D list."""
        return [item for row in matrix for item in row]
    
    @staticmethod
    def _format_for_youtube_comment(points: List[Dict[str, Any]], detailed: bool = True) -> str:
        """
        Formats extracted main points into a YouTube-style comment with clickable timestamps.

        Args:
            points (List[Dict[str, Any]]): List of dictionaries containing main points with timestamps.
            detailed (bool): If True, returns a detailed format with emojis and summaries. 
                             If False, returns a simpler format with just timestamps and main points.

        Returns:
            str: Formatted string representing the main points as a YouTube comment with clickable timestamps.
        """
        def _format_timestamp(seconds):
            hours = int(seconds // 3600)
            minutes = int((seconds % 3600) // 60)
            seconds = int(seconds % 60)
            return f"{hours:02}:{minutes:02}:{seconds:02}"

        formatted_comment = ""
        for point in points:
            timestamp = _format_timestamp(point['timestamp'])
            main_point = point['main_point'].rstrip('.')
            
            if detailed:
                emoji = point['emoji']
                summary = point['summary']
                formatted_comment += f"{timestamp} {emoji} {main_point}: {summary}\n"
            else:
                formatted_comment += f"{timestamp} {main_point}\n"
            
        return formatted_comment.strip()

class Answer(BaseModel):
    """Pydantic model for representing an answer to a question."""
    answer: str = Field(description="The answer to the user's question based on the video transcript.")
    confidence: float = Field(description="A confidence score between 0 and 1 indicating how certain the model is about the answer.")
    
class QuestionAnswerExtractor:
    """
    A tool for answering questions about YouTube videos based on their transcripts.

    This class provides methods to process transcripts and generate answers to user questions
    using natural language processing techniques.
    """
    
    class Info(BaseModel):
        """Pydantic model for representing a collection of answers."""
        answers: List[Answer]

    @staticmethod
    @tool(return_direct=False)
    def get_answer(youtube_video_id: str, question: str) -> str:
        """
        Answers a question about a YouTube video based on its transcript.
        
        Args:
            youtube_video_id (str): The ID of the YouTube video.
            question (str): The user's question about the video.

        Returns:
            str: Formatted string containing the answer to the user's question.
        """
        try:
            transcript = QuestionAnswerExtractor._get_youtube_video_transcript(youtube_video_id)
            answer = QuestionAnswerExtractor._extract_answer(transcript, question)
            return answer
        except Exception as e:
            return f"Error answering question: {str(e)}"

    @staticmethod
    def _get_youtube_video_transcript(youtube_video_id: str) -> str:
        """
        Fetches the transcript for a YouTube video.
        
        Args:
            youtube_video_id (str): The ID of the YouTube video.
        
        Returns:
            str: The full transcript of the video.
        
        Raises:
            Exception: If there's an error fetching the transcript.
        """
        try:
            details_url = "https://youtube-media-downloader.p.rapidapi.com/v2/video/details"
            subtitles_url = "https://youtube-media-downloader.p.rapidapi.com/v2/video/subtitles"
            querystring = {"videoId": youtube_video_id}
            headers = {
                "x-rapidapi-key": rapid_api_key,
                "x-rapidapi-host": "youtube-media-downloader.p.rapidapi.com"
            }
            details_response = requests.get(details_url, headers=headers, params=querystring)
            print(details_response)
            sub_url = details_response.json()['subtitles']['items'][0]['url']
            querystring = {"subtitleUrl": sub_url, "format": "json"}
            subtitles_response = requests.get(subtitles_url, headers=headers, params=querystring)

            transcript_json = subtitles_response.json()
            transcript_data = [f"{entry['startMs']/1000:.2f}: {entry['text']} " for entry in transcript_json]
            return "".join(transcript_data)
        except Exception as e:
            raise

    @staticmethod
    def _extract_answer(transcript: str, question: str) -> List[Answer]:
        """
        Extracts an answer to the user's question from the YouTube video transcript.
        
        Args:
            transcript (str): The full transcript of the video.
            question (str): The user's question about the video.
        
        Returns:
            List[Answer]: A list containing a single Answer object with the consolidated answer.
        """
        answer_extraction_function = [convert_to_openai_function(QuestionAnswerExtractor.Info)]
        
        model = ChatOpenAI(temperature=get_temperature(), model=model_name)
        extraction_model = model.bind(functions=answer_extraction_function, function_call={"name": "Info"})
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are an AI assistant tasked with answering questions about a video based on its transcript."),
            ("human", "Transcript: {transcript}\n\nQuestion: {question}\n\nProvide an answer to the question based on the transcript, along with a confidence score.")
        ])
        
        extraction_chain = prompt | extraction_model | JsonKeyOutputFunctionsParser(key_name="answers")
        
        text_splitter = RecursiveCharacterTextSplitter(chunk_overlap=192, chunk_size=8000, separators=[f" {char}" for char in "ABCDEFGHIJKLMNOPQRSTUVWXYZ"])
        
        def prepare_input(x):
            chunks = text_splitter.split_text(x['transcript'])
            return [{"transcript": chunk, "question": x['question']} for chunk in chunks]
        
        prep = RunnableLambda(prepare_input)
        
        chain = prep | extraction_chain.map() | QuestionAnswerExtractor._flatten
        
        # Get partial answers
        partial_answers = chain.invoke({"transcript": transcript, "question": question})
        
        # Filter out low-confidence answers
        filtered_answers = [answer for answer in partial_answers if answer['confidence'] > 0.4]
        
        # If all answers were filtered out, return a low-confidence "no answer" response
        if not filtered_answers:
            return "I couldn't find a reliable answer to your question based on the video transcript."
        
        # Consolidate filtered partial answers
        consolidation_prompt = ChatPromptTemplate.from_messages([
            ("system", "You are an AI assistant tasked with consolidating multiple partial answers into a comprehensive final answer."),
            ("human", "Question: {question}\n\nPartial Answers: {partial_answers}\n\nPlease provide a consolidated, comprehensive answer to the question based on these partial answers. Ignore any information from answers with low confidence (0.5 or below).")
        ])
        
        consolidation_model = ChatOpenAI(temperature=get_temperature(), model=model_name)
        consolidation_chain = consolidation_prompt | consolidation_model 
        
        final_answer = consolidation_chain.invoke({
            "question": question,
            "partial_answers": json.dumps(filtered_answers, indent=2)
        })
        
        return final_answer.content
        
    @staticmethod
    def _flatten(matrix):
        """Flattens a 2D list into a 1D list."""
        return [item for row in matrix for item in row]
    
class YouTubeAgent:
    """
    An agent for interacting with YouTube videos and processing user queries.

    This class sets up the necessary components for an AI agent that can understand
    and respond to user queries about YouTube videos.
    """

    def __init__(self):
        """Initializes the YouTubeAgent with necessary tools and components."""
        
        self.tools = [
            QuestionAnswerExtractor.get_answer,
            YouTubeTranscriptPointsExtractor.extract_clickable_points,
        ]
        
        self.sys_message = """You are a helpful assistant.
        
        Important instructions:
        1. Only use the 'extract_clickable_points' tool when the user explicitly asks for clickable points or timestamps from a video.
        2. For all other queries, including general questions about video content, use the 'get_answer' tool.
        3. If the user's query is unclear, ask for clarification before using any tools.
        4. Always provide concise and relevant responses based on the tool outputs.

        Remember to interpret the user's intent carefully and use the appropriate tools."""
        
        self.functions = [convert_to_openai_function(f) for f in self.tools]
        
        self.model = ChatOpenAI(temperature=get_temperature(), model=model_name).bind(functions=self.functions)
        
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", self.sys_message),
            MessagesPlaceholder(variable_name="history"),
            ("user", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
        
        self.agent_chain = RunnablePassthrough.assign(
            agent_scratchpad= lambda x: format_to_openai_functions(x["intermediate_steps"])
        ) | self.prompt | self.model | OpenAIFunctionsAgentOutputParser()
        
        self.memory = ConversationBufferWindowMemory(k=3, return_messages=True, memory_key="history")
        self.agent_executor = AgentExecutor(agent=self.agent_chain, tools=self.tools, memory=self.memory)

    def invoke(self, input_text: str) -> str:
        """
        Processes a user input and returns the agent's response.

        Args:
            input_text (str): The user's input query.

        Returns:
            str: The agent's response to the user's query.
        """
        try:
            result = self.agent_executor.invoke({"input": input_text})
            return result['output']
        except Exception as e:
            return f"An error occurred: {str(e)}"

# youtube_agent = YouTubeAgent()
# video_link = "https://www.youtube.com/watch?v=-OSxeoIAs2w"
# main_points = youtube_agent.invoke(f"The race involves which challenges in the following video {video_link}")