Spaces:
Sleeping
Sleeping
from smolagents import CodeAgent,DuckDuckGoSearchTool, HfApiModel,load_tool,tool | |
import datetime | |
import requests | |
import pytz | |
import yaml | |
from tools.final_answer import FinalAnswerTool | |
import os | |
from Gradio_UI import GradioUI | |
from stocksymbol import StockSymbol | |
from polygon import RESTClient | |
from typing import Union, Iterator, List | |
from polygon.rest.aggs import Agg | |
import time | |
import difflib | |
from datetime import datetime | |
from dateutil.relativedelta import relativedelta | |
# API | |
symbol_api_key = os.getenv("STOCK_SYMBOL") | |
polygon_api = os.getenv('POLYGON_API') | |
# Get the us market stock name | |
ss = StockSymbol(symbol_api_key) | |
symbol_list_us = ss.get_symbol_list(market="US") | |
# Create stock price retriever | |
client = RESTClient(api_key=polygon_api) | |
# utility function | |
def convert_timestamp_to_date(timestamp): | |
# Convert milliseconds to seconds | |
timestamp_seconds = timestamp / 1000.0 | |
# Create a datetime object | |
date_time = datetime.fromtimestamp(timestamp_seconds) | |
# Format the date to a readable string | |
return date_time.strftime('%B %d, %Y') | |
def find_stock_symbol(company_name, stock_list): | |
""" | |
Find the stock symbol based on the closest matching company name. | |
Args: | |
company_name (str): The name of the company to search for. | |
stock_list (list): A list of dictionaries containing stock information. | |
Returns: | |
str: The stock symbol of the closest matching company name or None if no match is found. | |
""" | |
# Extract long names from the stock list | |
long_names = [stock['longName'] for stock in stock_list] | |
# Find the closest match | |
closest_match = difflib.get_close_matches(company_name, long_names, n=1) | |
if closest_match: | |
# Get the index of the closest match | |
match_index = long_names.index(closest_match[0]) | |
# Return the corresponding symbol | |
return stock_list[match_index]['symbol'] | |
else: | |
return None | |
def get_dates(): | |
# Get current date | |
current_date = datetime.now() | |
# Get the date 5 months before | |
three_months_ago = current_date - relativedelta(months=3) | |
one_months_ago = current_date - relativedelta(months=1) | |
# Format dates as YYYY-MM-DD | |
one_months_ago_formatted = one_months_ago.strftime('%Y-%m-%d') | |
three_months_ago_formatted = three_months_ago.strftime('%Y-%m-%d') | |
return one_months_ago_formatted, three_months_ago_formatted | |
### Finally my tool | |
def get_stock_price(name_of_company: str) -> List[Agg]: | |
""" | |
Retrive the stock price of the company for the last 3 months. | |
Args: | |
name_of_company: the name of the company | |
Returns: | |
List: a list of aggregate stock price data. Each agg contains the following | |
Each Agg entry contains the following fields: | |
open: The price at which the stock opened at the beginning of the trading period. | |
high: The highest price reached by the stock during the trading period. | |
low: The lowest price reached by the stock during the trading period. | |
close: The price at which the stock closed at the end of the trading period. | |
volume: The total number of shares traded during the period. | |
vwap: The Volume Weighted Average Price, which gives an average price of the stock weighted by volume traded. | |
timestamp: The time at which the data point was recorded, represented as a Unix timestamp (milliseconds since January 1, 1970). | |
transactions: The total number of transactions that occurred during the trading period. | |
otc: Indicates if the trade was over-the-counter (OTC), which is typically not applicable for major stocks and is None here. | |
""" | |
symbol = find_stock_symbol(name_of_company, symbol_list_us) | |
# The dates | |
current_date, three_months_ago = get_dates() | |
ticker = symbol | |
# List Aggregates (Bars) | |
aggs = [] | |
for a in client.list_aggs(ticker=ticker, multiplier=1, timespan="month", from_=three_months_ago, to=current_date, limit=1): | |
aggs.append(a) | |
for agg in aggs: | |
agg.timestamp = convert_timestamp_to_date(agg.timestamp) | |
return aggs | |
# Below is an example of a tool that does nothing. Amaze us with your creativity ! | |
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) | |
def generate_pictures_with_watermelon_in_it(scene:str)-> str: #it's import to specify the return type | |
"""A tool that Generates an image with a watermelon included in it based on the provided prompt. | |
The watermelon is placed creatively and appropriately in the scene. | |
Args: | |
scene: describe the image scene setting | |
""" | |
#Keep this format for the description / args / args description but feel free to modify the tool | |
# Import tool from Hub | |
modified_prompt = ( | |
f"This is the provided scene: {scene}. Include a watermelon in a creative and appropriate way, " | |
"such as a watermelon-shaped object, a watermelon in the foreground, or a watermelon-themed scene." | |
) | |
result = image_generation_tool(modified_prompt) | |
return result | |
def my_custom_tool(arg1:str, arg2:int)-> str: #it's import to specify the return type | |
#Keep this format for the description / args / args description but feel free to modify the tool | |
"""A tool that does nothing yet | |
Args: | |
arg1: the first argument | |
arg2: the second argument | |
""" | |
return "What magic will you build ?" | |
def get_current_time_in_timezone(timezone: str) -> str: | |
"""A tool that fetches the current local time in a specified timezone. | |
Args: | |
timezone: A string representing a valid timezone (e.g., 'America/New_York'). | |
""" | |
try: | |
# Create timezone object | |
tz = pytz.timezone(timezone) | |
# Get current time in that timezone | |
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
return f"The current local time in {timezone} is: {local_time}" | |
except Exception as e: | |
return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
final_answer = FinalAnswerTool() | |
# If the agent does not answer, the model is overloaded, please use another model or the following Hugging Face Endpoint that also contains qwen2.5 coder: | |
# model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud' | |
model = HfApiModel( | |
max_tokens=2096, | |
temperature=0.5, | |
model_id='Qwen/Qwen2.5-Coder-32B-Instruct',# it is possible that this model may be overloaded | |
custom_role_conversions=None, | |
) | |
# Import tool from Hub | |
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) | |
with open("prompts.yaml", 'r') as stream: | |
prompt_templates = yaml.safe_load(stream) | |
agent = CodeAgent( | |
model=model, | |
tools=[final_answer, get_stock_price], ## add your tools here (don't remove final answer) | |
max_steps=6, | |
verbosity_level=1, | |
grammar=None, | |
planning_interval=None, | |
name=None, | |
description=None, | |
prompt_templates=prompt_templates | |
) | |
GradioUI(agent).launch() |