## Get envirionment variables

In [1]:
import os
from dotenv import find_dotenv, dotenv_values

keys = list(dotenv_values(find_dotenv('.env')).items())
OPENAI_API_KEY = os.environ['OPENAI_API_KEY'] = keys[0][1]
LANGCHAIN_API_KEY = os.environ['LANGCHAIN_API_KEY'] = keys[1][1]
POLYGON_API_KEY = os.environ['POLYGON_API_KEY'] = keys[2][1]
EMAIL = os.environ['EMAIL'] = keys[3][1] #make this a user entry

## Install Required Libraries

In [2]:
!pip install langchain_core langchain_openai langchain_community langsmith openai polygon-api-client pypdf yfinance -qU


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Create Tools

The following tools were ripped directly from the langchain source code to remove the requirement for current data from the API.

In [3]:
"""
Util that calls several of Polygon's stock market REST APIs.
Docs: https://polygon.io/docs/stocks/getting-started
"""

import json
from typing import Any, Dict, Optional

import requests
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.utils import get_from_dict_or_env

POLYGON_BASE_URL = "https://api.polygon.io/"

class PolygonAPIWrapper(BaseModel):
    """Wrapper for Polygon API."""

    polygon_api_key: Optional[str] = None

    @root_validator(pre=True)
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that api key in environment."""
        polygon_api_key = get_from_dict_or_env(
            values, "polygon_api_key", "POLYGON_API_KEY"
        )
        values["polygon_api_key"] = polygon_api_key

        return values

    def get_financials(self, ticker: str) -> Optional[dict]:
        """
        Get fundamental financial data, which is found in balance sheets,
        income statements, and cash flow statements for a given ticker.

        /vX/reference/financials
        """
        url = (
            f"{POLYGON_BASE_URL}vX/reference/financials?"
            f"ticker={ticker}&"
            f"apiKey={self.polygon_api_key}"
        )
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        if status != "OK":
            raise ValueError(f"API Error: {data}")

        return data.get("results", None)

    def get_last_quote(self, ticker: str) -> Optional[dict]:
        """
        Get the most recent National Best Bid and Offer (Quote) for a ticker.

        /v2/last/nbbo/{ticker}
        """
        url = f"{POLYGON_BASE_URL}v2/last/nbbo/{ticker}?apiKey={self.polygon_api_key}"
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        if status != "OK":
            raise ValueError(f"API Error: {data}")

        return data.get("results", None)

    def get_ticker_news(self, ticker: str) -> Optional[dict]:
        """
        Get the most recent news articles relating to a stock ticker symbol,
        including a summary of the article and a link to the original source.

        /v2/reference/news
        """
        url = (
            f"{POLYGON_BASE_URL}v2/reference/news?"
            f"ticker={ticker}&"
            f"apiKey={self.polygon_api_key}"
        )
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        if status != "OK":
            raise ValueError(f"API Error: {data}")

        return data.get("results", None)

    def get_aggregates(self, ticker: str, **kwargs: Any) -> Optional[dict]:
        """
        Get aggregate bars for a stock over a given date range
        in custom time window sizes.

        /v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}
        """
        timespan = kwargs.get("timespan", "day")
        multiplier = kwargs.get("timespan_multiplier", 1)
        from_date = kwargs.get("from_date", None)
        to_date = kwargs.get("to_date", None)
        adjusted = kwargs.get("adjusted", True)
        sort = kwargs.get("sort", "asc")

        url = (
            f"{POLYGON_BASE_URL}v2/aggs"
            f"/ticker/{ticker}"
            f"/range/{multiplier}"
            f"/{timespan}"
            f"/{from_date}"
            f"/{to_date}"
            f"?apiKey={self.polygon_api_key}"
            f"&adjusted={adjusted}"
            f"&sort={sort}"
        )
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        
        return data.get("results", None)

    def run(self, mode: str, ticker: str, **kwargs: Any) -> str:
        if mode == "get_financials":
            return json.dumps(self.get_financials(ticker))
        elif mode == "get_last_quote":
            return json.dumps(self.get_last_quote(ticker))
        elif mode == "get_ticker_news":
            return json.dumps(self.get_ticker_news(ticker))
        elif mode == "get_aggregates":
            return json.dumps(self.get_aggregates(ticker, **kwargs))
        else:
            raise ValueError(f"Invalid mode {mode} for Polygon API.")


In [4]:
from typing import Optional, Type

from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import BaseTool

class PolygonAggregatesSchema(BaseModel):
    """Input for PolygonAggregates."""

    ticker: str = Field(
        description="The ticker symbol to fetch aggregates for.",
    )
    timespan: str = Field(
        description="The size of the time window. "
        "Possible values are: "
        "second, minute, hour, day, week, month, quarter, year. "
        "Default is 'day'",
    )
    timespan_multiplier: int = Field(
        description="The number of timespans to aggregate. "
        "For example, if timespan is 'day' and "
        "timespan_multiplier is 1, the result will be daily bars. "
        "If timespan is 'day' and timespan_multiplier is 5, "
        "the result will be weekly bars.  "
        "Default is 1.",
    )
    from_date: str = Field(
        description="The start of the aggregate time window. "
        "Either a date with the format YYYY-MM-DD or "
        "a millisecond timestamp.",
    )
    to_date: str = Field(
        description="The end of the aggregate time window. "
        "Either a date with the format YYYY-MM-DD or "
        "a millisecond timestamp.",
    )


class PolygonAggregates(BaseTool):
    """
    Tool that gets aggregate bars (stock prices) over a
    given date range for a given ticker from Polygon.
    """

    mode: str = "get_aggregates"
    name: str = "polygon_aggregates"
    description: str = (
        "A wrapper around Polygon's Aggregates API. "
        "This tool is useful for fetching aggregate bars (stock prices) for a ticker. "
        "Input should be the ticker, date range, timespan, and timespan multiplier"
        " that you want to get the aggregate bars for. This should be done when "
        "attempting to retreive the current stock price for any valuation calculation."
    )
    args_schema: Type[PolygonAggregatesSchema] = PolygonAggregatesSchema

    api_wrapper: PolygonAPIWrapper

    def _run(
        self,
        ticker: str,
        timespan: str,
        timespan_multiplier: int,
        from_date: str,
        to_date: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the Polygon API tool."""
        return self.api_wrapper.run(
            mode=self.mode,
            ticker=ticker,
            timespan=timespan,
            timespan_multiplier=timespan_multiplier,
            from_date=from_date,
            to_date=to_date,
        )


In [5]:
from typing import Optional, Type

from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.tools import BaseTool

class Inputs(BaseModel):
    """Inputs for Polygon's Financials API"""

    query: str


class PolygonFinancials(BaseTool):
    """Tool that gets the financials of a ticker from Polygon"""

    mode: str = "get_financials"
    name: str = "polygon_financials"
    description: str = (
        "A wrapper around Polygon's Stock Financials API. "
        "This tool is useful for fetching fundamental financials from "
        "balance sheets, income statements, and cash flow statements "
        "for a stock ticker. The input should be the ticker that you want "
        "to get the latest fundamental financial data for. "
        "If a duckduckgo_search call returns a list of stocks, "
        "The tickers for those stocks can be passed to this function to "
        "retreive financial data and provide an accurate response "
        "to the user."
    )
    args_schema: Type[BaseModel] = Inputs

    api_wrapper: PolygonAPIWrapper

    def _run(
        self,
        query: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the Polygon API tool."""
        return self.api_wrapper.run(self.mode, ticker=query)


In [6]:
from typing import Optional, Type

from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.tools import BaseTool

class Inputs(BaseModel):
    """Inputs for Polygon's Ticker News API"""

    query: str


class PolygonTickerNews(BaseTool):
    """Tool that gets the latest news for a given ticker from Polygon"""

    mode: str = "get_ticker_news"
    name: str = "polygon_ticker_news"
    description: str = (
        "A wrapper around Polygon's Ticker News API. "
        "This tool is useful for fetching the latest news for a stock. "
        "Input should be the ticker that you want to get the latest news for."
    )
    args_schema: Type[BaseModel] = Inputs

    api_wrapper: PolygonAPIWrapper

    def _run(
        self,
        query: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the Polygon API tool."""
        return self.api_wrapper.run(self.mode, ticker=query)


In [7]:
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

#load the vectorstore and initialize the retriever
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
db = FAISS.load_local("./data/vectorstore", embeddings, allow_dangerous_deserialization=True)

class RAGInput(BaseModel):
    """Input for the local data retrieval tool."""

    query: str = Field(description="retreive data from vectorstore")

class RAGAgent(BaseTool):
    """Tool that retrieves information from a local vectorstore of financial reports. These reports include 
    11-K, 10-K, 10-Q, 8-K, and SD filings from all of the S&P 500 companies, as of July 18th, 2024. Whenever specific financial 
    information is requested in the query, ensure to use the Condensed Consolidated Statements of Cash Flows sections of the financial 
    documents."""

    name: str = "vectorstore_retrieval"
    description: str = (
        """Tool that retrieves information from a local vectorstore of financial reports. These reports include 
    11-K, 10-K, 10-Q, and SD (Special disclosure) filings from all of the S&P 500 companies, as of July 18th, 2024. The information in this 
    vector store only pertains to fiscal years 2023 and 2024. Whenever specific financial 
    information is requested in the query, ensure to use the phrase 'Financial Information'. 
    If necessary, modify the user query to contain this phrase."""
    )
    args_schema: Type[BaseModel] = RAGInput

    def _run(
        self,
        query: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Retrieve data from the vectorstore."""
        retriever = db.as_retriever()
        return retriever.invoke(query)


In [432]:
from langchain.tools import tool
import datetime
import yfinance as yf
import pandas as pd

@tool
def get_datetime() -> str:
    """Get the current date and time in YYYY-MM-DD HH:MM:SS format."""
    return str(datetime.datetime.now())

@tool
def get_date() -> str:
    """Get the current date in YYYY-MM-DD format. Also useful when determining the current quarter."""
    return str(datetime.datetime.now()).split(" ")[0]

@tool
def get_time() -> str:
    """Get the current time in HH:MM:SS format."""
    return str(datetime.datetime.now()).split(" ")[1]

@tool
def get_quarter(date:str) -> str:
    """This tool takes a date in YYYY-MM-DD format as an argument and returns the quarter and year in the format 'QQ YYYY'."""
    quarters = {
        "01" : "Q1",
        "02" : "Q1",
        "03" : "Q1",
        "04" : "Q2",
        "05" : "Q2",
        "06" : "Q2",
        "07" : "Q3",
        "08" : "Q3",
        "09" : "Q3",
        "10" : "Q4",
        "11" : "Q4",
        "12" : "Q4",
    }
    return quarters[date.split("-")[1]] + f" {date.split('-')[0]}"    

@tool
def calculate_percent_valuation(intrinsic_value:float, current_stock_price:float):
    """This tool can be used to calculate how overvalued or undervalued a stock is. It takes the calculated intrinsic value and the current stock price as arguments and returns the valuation percentage, in a format similar to '0.50' for 50%. The
    math performed by this function is (intrinsic_value-current_stock_price)/abs(intrinsic_value). The current stock price must be retrieved using the 'get_date' tool (to get the current date) and then using that date to access the 'polygon_aggregates' tool. 
    A positive percentage indicates an overvalued stock and a negative percentage indicates an undervalued stock."""
    return (intrinsic_value-current_stock_price)/abs(intrinsic_value)

@tool
def calculate_intrinsic_value(ticker, average_growth_rate):
    """This tool is helpful for calculating the intrinsic value of a stock. It takes the stock ticker, the average growth rate based on revenue (retrieved from financial reports or with the polygon API. This should be capped at plus or minus 300% per year.)"""
    wacc = calculate_wacc(ticker)

@tool
def calculate_wacc( #refer to https://www.gurufocus.com/term/wacc/SOFI#:~:text=SoFi%20Technologies%20WACC%20%25%20Calculation,the%20firm's%20cost%20of%20capital.
    ticker:str, 
    market_cap:float, 
    interest_expense:float, 
    tax_expense:float, 
    pre_tax_income:float, 
    long_term_debt:float
    ):
    """This tool is used to determine the weighted average cost of capital (WACC) when performing a DCF analysis. It takes the following arguments:

    ticker
    market capitalization - The market capitalization should be retrieved using the duckduckgo_search tool. Explicitly state 'nvidia market cap today'
    interest expense - trailing twelve month interest expense calculated from the response of the polygon_financials tool.
    tax expense - trailing twelve month tax expense calculated from the response of the polygon_financials tool.
    pre-tax income -  trailing twelve month pre-tax income calculated from the response of the polygon_financials tool.
    long term debt - long term debt calculated from the response of the polygon_financials tool.

    WACC is returned as a percentage in the format '0.057'."""
    
    treasury_yield10 = yf.Ticker("^TNX") 
    risk_free_rate = round(treasury_yield10.info['regularMarketPreviousClose']/100,2) 
    sp500_teturn = 0.10
    stock = yf.Ticker(f"{ticker}")
    beta = stock.info["beta"]

    cost_of_equity = round(risk_free_rate + beta*(sp500_teturn - risk_free_rate),2)
    weight_of_equity, weight_of_debt = get_weights(market_cap, long_term_debt)
    cost_of_debt = get_cost_of_debt(interest_expense, long_term_debt)
    tax_rate = get_tax_rate(tax_expense, pre_tax_income)
    wacc = round((weight_of_equity * cost_of_equity) + ((weight_of_debt * cost_of_debt ) * (1-tax_rate)),3)
    return wacc

def get_weights(market_cap, long_term_debt):
    e = market_cap
    d = long_term_debt
    weight_of_equity = e/(e+d)
    weight_of_debt = d/(e+d)
    return weight_of_equity, weight_of_debt

def get_cost_of_debt(interest_expense, long_term_debt) -> float:
    return interest_expense/long_term_debt

def get_tax_rate(tax_expense, pre_tax_income):
    tax_rate = tax_expense/pre_tax_income
    if tax_rate>1:
        return 1.00
    if tax_rate<0:
        return 0.00
    return tax_rate

def get_wacc(ticker):
    treasury_yield10 = yf.Ticker("^TNX") 
    risk_free_rate = round(treasury_yield10.info['regularMarketPrice']/100,2)
    sp500_teturn = 0.10
    stock = yf.Ticker(f"{ticker}")
    beta = stock.info["beta"]
    cost_of_equity = round(risk_free_rate + beta*(sp500_teturn - risk_free_rate),2)
    stock_bal = stock.balance_sheet



## Set up tool belt

In [433]:
from langchain_community.tools.ddg_search import DuckDuckGoSearchRun

api_wrapper = PolygonAPIWrapper(polygon_api_key=POLYGON_API_KEY)

tool_belt = [
    get_datetime,
    get_date,
    get_time,
    get_quarter,
    calculate_percent_valuation,
    calculate_wacc,
    RAGAgent(),
    DuckDuckGoSearchRun(),
    PolygonAggregates(api_wrapper=api_wrapper),
    PolygonFinancials(api_wrapper=api_wrapper),
    PolygonTickerNews(api_wrapper=api_wrapper),
]

## Set up tool executor

In [434]:
from langgraph.prebuilt import ToolExecutor

tool_executor = ToolExecutor(tool_belt)

## Set up model

In [435]:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o", temperature=0)

## Set up function calling

In [436]:
from langchain_core.utils.function_calling import convert_to_openai_function

functions = [convert_to_openai_function(t) for t in tool_belt]
model = model.bind_functions(functions)

## Set up agent state

In [437]:
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
import operator
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
  messages: Annotated[list, add_messages]

## Create nodes

In [438]:
import json
from operator import itemgetter
from langgraph.prebuilt import ToolInvocation
from langchain_core.messages import FunctionMessage
from langchain_core.prompts import PromptTemplate

def call_model(state):
  messages = state["messages"]
  response = model.invoke(messages)
  return {"messages" : [response]}

def call_tool(state):
  last_message = state["messages"][-1]

  action = ToolInvocation(
      tool=last_message.additional_kwargs["function_call"]["name"],
      tool_input=json.loads(
          last_message.additional_kwargs["function_call"]["arguments"]
      )
  )

  response = tool_executor.invoke(action)

  function_message = FunctionMessage(content=str(response), name=action.tool)

  return {"messages" : [function_message]}



In [439]:
from langgraph.graph import StateGraph, END

workflow = StateGraph(AgentState)

workflow.add_node("agent", call_model)
workflow.add_node("use tool", call_tool)

In [440]:
workflow.set_entry_point("agent")

In [441]:
def should_continue(state):
  last_message = state["messages"][-1]

  if "function_call" not in last_message.additional_kwargs:
    return "end"

  return "continue"

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue" : "use tool",
        "end" : END
    }
)

In [442]:
workflow.add_edge("use tool", "agent")

In [443]:
app = workflow.compile()

In [444]:
def print_messages(messages):
  next_is_tool = False
  initial_query = True
  with open("response.md", "w") as f:
    for message in messages["messages"]:
      if "function_call" in message.additional_kwargs:
        f.writelines("\n")
        f.writelines(f'Tool Call - Name: {message.additional_kwargs["function_call"]["name"]} + Query: {message.additional_kwargs["function_call"]["arguments"]}')
        print()
        print(f'Tool Call - Name: {message.additional_kwargs["function_call"]["name"]} + Query: {message.additional_kwargs["function_call"]["arguments"]}')
        next_is_tool = True
        continue
      if next_is_tool:
        print(f"Tool Response: {message.content}")
        next_is_tool = False
        continue
      if initial_query:
        f.writelines(f"Initial Query: {message.content}")
        f.writelines("\n")
        print(f"Initial Query: {message.content}")
        print()
        initial_query = False
        continue
      f.writelines("\n")
      f.writelines(f"Agent Response: {message.content}")
      print()
      print(f"Agent Response: {message.content}")

In [445]:
from langchain_core.messages import HumanMessage

inputs = {"messages" : [HumanMessage(content="Can you perform a DCF analysis of Nvidia stock? Please list out each step of the calculation.")]}

messages = app.invoke(inputs)

print_messages(messages)


Initial Query: Can you perform a DCF analysis of Nvidia stock? Please list out each step of the calculation.


Tool Call - Name: duckduckgo_search + Query: {"query":"Nvidia market cap today"}
Tool Response: Get the latest stock price, news, and analysis for NVIDIA, a leading semiconductor company. See market cap, earnings, dividends, and other metrics for NVDA on Nasdaq. NVIDIA has a market cap of $3.22 trillion as of June 20, 2024, up 229.90% from a year ago. See the historical chart, ranking, and statistics of NVIDIA's market cap since 1999. Today's Open. 121.35. Previous Close. 126.36. July 17, 2024 04:00 PM Pricing delayed by 20 minutes. Stock Chart. Quick Links. Email Alerts; RSS; Request Printed Materials; Download Library; Email Alerts. ... We intend to use our @NVIDIA Twitter account, NVIDIA Facebook page, ... Stock analysis for NVIDIA Corp (NVDA:NASDAQ GS) including stock price, stock chart, company news, key statistics, fundamentals and company profile. Nvidia cofounder and C