import gradio as gr import pandas as pd import yfinance as yf from datetime import datetime, timedelta import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, as_completed from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def load_sp500_tickers(): """Load S&P 500 tickers from Wikipedia.""" url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" response = requests.get(url, verify=False) soup = BeautifulSoup(response.content, 'html.parser') table = soup.find('table', {'id': 'constituents'}) tickers = [] if table: for row in table.find_all('tr')[1:]: cells = row.find_all('td') if cells: ticker = cells[0].text.strip() tickers.append(ticker) return tickers def load_data(ticker): """Load stock data using yfinance.""" end_date = datetime.today() start_date = end_date - timedelta(days=365) # Get 1 year of data data = yf.download(ticker, start=start_date, end=end_date) return data def calculate_sma(data, window): """Calculate the Simple Moving Average (SMA) for a given window.""" return data['Close'].rolling(window=window).mean() def calculate_ema(data, window): """Calculate the Exponential Moving Average (EMA) for a given window.""" return data['Close'].ewm(span=window, adjust=False).mean() def average_downtrend(data, method, window=4): """Calculate the average difference between consecutive prices for the last 'window' candles.""" if len(data) < window: return 0.0 price_diffs = data[method].diff().iloc[-window:] avg_diff = price_diffs.mean() return avg_diff if avg_diff < 0 else 0.0 def score_candle(candle, prev_candle, trend_strength): """Score a single candle based on its characteristics and previous candle.""" # [Keep the existing scoring logic] # ... return score def score_today_candle(data, window=4): """Score today's candle based on the downtrend from the past 'window' days.""" if len(data) < window + 1: return 0 # Not enough data today_candle = data.iloc[-1] prev_candle = data.iloc[-2] close_price = today_candle['Close'] previous_data = data.iloc[-(window+1):-1] down_High = average_downtrend(previous_data, method="High", window=window) + average_downtrend(previous_data, method="High", window=7) / 2 down_Close = average_downtrend(previous_data, method="Close", window=window) + average_downtrend(previous_data, method="Close", window=7) / 2 avg_downtrend = (down_High + down_Close) / 2 if avg_downtrend == 0.0: return -1 sma_50 = calculate_sma(data, window=50).iloc[-1] sma_200 = calculate_sma(data, window=200).iloc[-1] sma_20 = calculate_sma(data, window=20).iloc[-1] ema_10 = calculate_ema(data, window=10).iloc[-1] if (close_price < ema_10) or (close_price < sma_20) or (close_price < sma_50) or (close_price < sma_200): return -1 return score_candle(today_candle, prev_candle, abs(avg_downtrend)) def scan_ticker(ticker): """Load data for a ticker and calculate its score.""" data = load_data(ticker) if not data.empty: score = score_today_candle(data) if score > 0: return ticker, score return None def scan_sp500(top_n=25, progress=gr.Progress()): tickers = load_sp500_tickers() tickers.append("QQQ") scores = [] with ThreadPoolExecutor(max_workers=10) as executor: futures = {executor.submit(scan_ticker, ticker): ticker for ticker in tickers} for i, future in enumerate(as_completed(futures), 1): result = future.result() if result: scores.append(result) progress(i / len(tickers), desc="Processing Tickers") scores = sorted(scores, key=lambda x: x[1], reverse=True) return scores[:top_n] def gradio_scan_sp500(top_n, progress=gr.Progress()): progress(0, desc="Downloading Data") tickers = load_sp500_tickers() tickers.append("QQQ") progress(0.3, desc="Running Scanner") results = scan_sp500(top_n, progress) last_data = load_data(results[0][0]) last_date = last_data.index[-1].date() next_market_day = next_business_day(last_date) date_created = next_market_day.strftime("%Y-%m-%d") output = f"Scan Results for Market Open on: {date_created}\n\n" output += "Top {} stocks based on pattern finder score:\n\n".format(top_n) for ticker, score in results: output += "{}: Total Score = {:.2f}\n".format(ticker, score) return output iface = gr.Interface( fn=gradio_scan_sp500, inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25), outputs="text", title="S&P 500 Stock Scanner", description="Scan S&P 500 stocks and display top N stocks based on today's candle score.", allow_flagging="never", ) if __name__ == "__main__": iface.launch(server_name="0.0.0.0", server_port=7860)