import gradio as gr import pandas as pd import yfinance as yf from datetime import datetime, timedelta import requests from bs4 import BeautifulSoup from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward import urllib3 from datetime import datetime, timedelta urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def load_sp500_tickers(): """Load S&P 500 tickers from Wikipedia.""" url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" response = requests.get(url, verify=False) soup = BeautifulSoup(response.content, 'html.parser') table = soup.find('table', {'id': 'constituents'}) tickers = [] if table: for row in table.find_all('tr')[1:]: cells = row.find_all('td') if cells: ticker = cells[0].text.strip() tickers.append(ticker) return tickers def load_data(ticker): """Load stock data using yfinance.""" end_date = datetime.today() start_date = end_date - timedelta(days=365) # Get 1 year of data data = yf.download(ticker, start=start_date, end=end_date) return data def calculate_sma(data, window): """Calculate the Simple Moving Average (SMA) for a given window.""" return data['Close'].rolling(window=window).mean() def calculate_ema(data, window): """Calculate the Exponential Moving Average (EMA) for a given window.""" return data['Close'].ewm(span=window, adjust=False).mean() def average_downtrend(data, method, window=4): """Calculate the average difference between consecutive prices for the last 'window' candles.""" if len(data) < window: return 0.0 price_diffs = data[method].diff().iloc[-window:] avg_diff = price_diffs.mean() return avg_diff if avg_diff < 0 else 0.0 def score_today_candle(data, window=4): if len(data) < window + 1: return 0 trend_score = score_downward_trend(data.iloc[-window:], window=window) candle_score = score_candle(data.iloc[-1], data, len(data) - 1) risk_reward = calculate_risk_reward(data, len(data) - 1) # Combine scores (you can adjust the weights as needed) total_score = trend_score + candle_score + (risk_reward * 10) return total_score def scan_sp500(top_n=25, progress=gr.Progress()): tickers = load_sp500_tickers() scores = [] tickers.append("QQQ") for i, ticker in enumerate(progress.tqdm(tickers)): data = load_data(ticker) if not data.empty: score = score_today_candle(data) if score > 0: scores.append((ticker, score)) scores = sorted(scores, key=lambda x: x[1], reverse=True) return scores[:top_n] def next_business_day(date): next_day = date + timedelta(days=1) while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday next_day += timedelta(days=1) return next_day def gradio_scan_sp500(top_n, progress=gr.Progress()): progress(0, desc="Downloading Data") tickers = load_sp500_tickers() tickers.append("QQQ") progress(0.3, desc="Running Scanner") results = scan_sp500(top_n, progress) # Get the last date of the data and find the next business day last_data = load_data(results[0][0]) # Load data for the first ticker in results last_date = last_data.index[-1].date() next_market_day = next_business_day(last_date) date_created = next_market_day.strftime("%Y-%m-%d") output = f"Scan Results for Market Open on: {date_created}\n\n" output += "Top {} stocks based on pattern finder score:\n\n".format(top_n) for ticker, score in results: output += "{}: Total Score = {:.2f}\n".format(ticker, score) return output iface = gr.Interface( fn=gradio_scan_sp500, inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25), outputs="text", title="S&P 500 Stock Scanner", description="Scan S&P 500 stocks and display top N stocks based on today's candle score.", allow_flagging="never", ) if __name__ == "__main__": iface.launch()