import gradio as gr import pandas as pd import yfinance as yf from datetime import datetime, timedelta import requests from bs4 import BeautifulSoup from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward import urllib3 from datetime import datetime, timedelta urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def load_sp500_tickers(): """Load S&P 500 tickers from Wikipedia.""" url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" response = requests.get(url, verify=False) soup = BeautifulSoup(response.content, 'html.parser') table = soup.find('table', {'id': 'constituents'}) tickers = [] if table: for row in table.find_all('tr')[1:]: cells = row.find_all('td') if cells: ticker = cells[0].text.strip() tickers.append(ticker) return tickers def load_data(ticker, interval="1d"): """Load stock data using yfinance with a specified interval.""" end_date = datetime.today() start_date = end_date - timedelta(days=365) # Get 1 year of data data = yf.download(ticker, start=start_date, end=end_date, interval=interval) return data def calculate_sma(data, window): """Calculate the Simple Moving Average (SMA) for a given window.""" return data['Close'].rolling(window=window).mean() def calculate_ema(data, window): """Calculate the Exponential Moving Average (EMA) for a given window.""" return data['Close'].ewm(span=window, adjust=False).mean() def average_downtrend(data, method, window=4): """Calculate the average difference between consecutive prices for the last 'window' candles.""" if len(data) < window: return 0.0 price_diffs = data[method].diff().iloc[-window:] avg_diff = price_diffs.mean() return avg_diff if avg_diff < 0 else 0.0 def score_candle(candle, prev_candle, trend_strength): """Score a single candle based on its characteristics and previous candle.""" open_price = candle['Open'] close_price = candle['Close'] low_price = candle['Low'] high_price = candle['High'] prev_close = prev_candle['Close'] # Bottom and top wick lengths bottom_wick_length = min(open_price, close_price) - low_price top_wick_length = high_price - max(open_price, close_price) # Initial score based on trend strength score = trend_strength * 2 # Doji: Open and Close are almost the same (small body) if abs(open_price - close_price) <= 0.1 * (high_price - low_price): # Adjust tolerance if needed score += 5 # Bonus points for doji candles # Hammer: Small body at the top, long bottom wick (typical reversal candle) if close_price < open_price and bottom_wick_length > 2 * (open_price - close_price): score += 7 # Extra points for hammer-like candles # Bottom Tailing Wick: Long bottom wick compared to the overall range if bottom_wick_length > 0.5 * (high_price - low_price): score += 6 # Extra points for bottom tailing wick # Additional logic: Boost red candles with long bottom wicks following a downtrend if close_price < open_price and bottom_wick_length > 0.5 * (open_price - close_price): score += 3 # Boost for red candle with long bottom wick # Penalize if the current close is higher than the previous close if close_price > prev_close: score -= ((close_price - prev_close) / prev_close) * 100 return score def score_today_candle(data, window=4): """Score today's candle based on the downtrend from the past 'window' days.""" if len(data) < window + 1: return 0 # Not enough data today_candle = data.iloc[-1] prev_candle = data.iloc[-2] close_price = today_candle['Close'] previous_data = data.iloc[-(window+1):-1] down_High = average_downtrend(previous_data, method="High",window=window) + average_downtrend(previous_data, method="High",window=7) / 2 down_Close = average_downtrend(previous_data, method="Close",window=window) + average_downtrend(previous_data, method="Close",window=7) / 2 avg_downtrend = (down_High + down_Close) / 2 if avg_downtrend == 0.0: return -1 # Calculate SMAs for the last row sma_50 = calculate_sma(data, window=50).iloc[-1] sma_200 = calculate_sma(data, window=200).iloc[-1] sma_20 = calculate_sma(data, window=20).iloc[-1] ema_10 = calculate_ema(data, window=10).iloc[-1] if (close_price < ema_10) or (close_price < sma_20) or (close_price < sma_50) or (close_price < sma_200): return -1 return score_candle(today_candle, prev_candle, abs(avg_downtrend)) def scan_sp500(top_n=25, interval, progress=gr.Progress()): tickers = load_sp500_tickers() scores = [] tickers.append("QQQ") for i, ticker in enumerate(progress.tqdm(tickers)): data = load_data(ticker, interval) if not data.empty: score = score_today_candle(data) if score > 0: scores.append((ticker, score)) scores = sorted(scores, key=lambda x: x[1], reverse=True) return scores[:top_n] def next_business_day(date): next_day = date + timedelta(days=1) while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday next_day += timedelta(days=1) return next_day def gradio_scan_sp500(top_n, interval, progress=gr.Progress()): progress(0, desc="Downloading Data") tickers = load_sp500_tickers() tickers.append("QQQ") progress(0.3, desc="Running Scanner") results = scan_sp500(top_n, interval, progress) # Get the last date of the data and find the next business day last_data = load_data(results[0][0], interval) # Load data for the first ticker in results last_date = last_data.index[-1].date() next_market_day = next_business_day(last_date) date_created = next_market_day.strftime("%Y-%m-%d") output = f"Scan Results for Market Open on: {date_created}\n\n" output += "Top {} stocks based on pattern finder score:\n\n".format(top_n) for ticker, score in results: output += "{}: Total Score = {:.2f}\n".format(ticker, score) return output iface = gr.Interface( fn=gradio_scan_sp500, inputs=[ gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25), gr.Dropdown(choices=["1m", "1d", "1wk", "1mo"], label="Data Interval", value="1d"), ], outputs="text", title="S&P 500 Stock Scanner", description="Scan S&P 500 stocks and display top N stocks based on today's candle score.", allow_flagging="never", ) if __name__ == "__main__": iface.launch(server_name="0.0.0.0", server_port=7860)