Spaces:
Running
Running
import gradio as gr | |
import pandas as pd | |
import yfinance as yf | |
from datetime import datetime, timedelta | |
import requests | |
from bs4 import BeautifulSoup | |
from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward | |
import urllib3 | |
from datetime import datetime, timedelta | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
def load_sp500_tickers(): | |
"""Load S&P 500 tickers from Wikipedia.""" | |
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" | |
response = requests.get(url, verify=False) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
table = soup.find('table', {'id': 'constituents'}) | |
tickers = [] | |
if table: | |
for row in table.find_all('tr')[1:]: | |
cells = row.find_all('td') | |
if cells: | |
ticker = cells[0].text.strip() | |
tickers.append(ticker) | |
return tickers | |
def load_data(ticker): | |
"""Load stock data using yfinance.""" | |
end_date = datetime.today() | |
start_date = end_date - timedelta(days=365) # Get 1 year of data | |
data = yf.download(ticker, start=start_date, end=end_date) | |
return data | |
def calculate_sma(data, window): | |
"""Calculate the Simple Moving Average (SMA) for a given window.""" | |
return data['Close'].rolling(window=window).mean() | |
def calculate_ema(data, window): | |
"""Calculate the Exponential Moving Average (EMA) for a given window.""" | |
return data['Close'].ewm(span=window, adjust=False).mean() | |
def average_downtrend(data, method, window=4): | |
"""Calculate the average difference between consecutive prices for the last 'window' candles.""" | |
if len(data) < window: | |
return 0.0 | |
price_diffs = data[method].diff().iloc[-window:] | |
avg_diff = price_diffs.mean() | |
return avg_diff if avg_diff < 0 else 0.0 | |
def score_today_candle(data, window=4): | |
if len(data) < window + 1: | |
return 0 | |
trend_score = score_downward_trend(data.iloc[-window:], window=window) | |
candle_score = score_candle(data.iloc[-1], data, len(data) - 1) | |
risk_reward = calculate_risk_reward(data, len(data) - 1) | |
# Combine scores (you can adjust the weights as needed) | |
total_score = trend_score + candle_score + (risk_reward * 10) | |
return total_score | |
def scan_sp500(top_n=25, progress=gr.Progress()): | |
tickers = load_sp500_tickers() | |
scores = [] | |
tickers.append("QQQ") | |
for i, ticker in enumerate(progress.tqdm(tickers)): | |
data = load_data(ticker) | |
if not data.empty: | |
score = score_today_candle(data) | |
if score > 0: | |
scores.append((ticker, score)) | |
scores = sorted(scores, key=lambda x: x[1], reverse=True) | |
return scores[:top_n] | |
def next_business_day(date): | |
next_day = date + timedelta(days=1) | |
while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday | |
next_day += timedelta(days=1) | |
return next_day | |
def gradio_scan_sp500(top_n, progress=gr.Progress()): | |
progress(0, desc="Downloading Data") | |
tickers = load_sp500_tickers() | |
tickers.append("QQQ") | |
progress(0.3, desc="Running Scanner") | |
results = scan_sp500(top_n, progress) | |
# Get the last date of the data and find the next business day | |
last_data = load_data(results[0][0]) # Load data for the first ticker in results | |
last_date = last_data.index[-1].date() | |
next_market_day = next_business_day(last_date) | |
date_created = next_market_day.strftime("%Y-%m-%d") | |
output = f"Scan Results for Market Open on: {date_created}\n\n" | |
output += "Top {} stocks based on pattern finder score:\n\n".format(top_n) | |
for ticker, score in results: | |
output += "{}: Total Score = {:.2f}\n".format(ticker, score) | |
return output | |
iface = gr.Interface( | |
fn=gradio_scan_sp500, | |
inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25), | |
outputs="text", | |
title="S&P 500 Stock Scanner", | |
description="Scan S&P 500 stocks and display top N stocks based on today's candle score.", | |
allow_flagging="never", | |
) | |
if __name__ == "__main__": | |
iface.launch() |