Spaces:
Running
Running
File size: 6,754 Bytes
d81c653 82dd88a d81c653 82dd88a 3785bd9 d81c653 82dd88a 3785bd9 d81c653 82dd88a d81c653 82dd88a d81c653 4ac6a43 d81c653 4ac6a43 82dd88a 4ac6a43 82dd88a 4ac6a43 d81c653 82dd88a 4ac6a43 82dd88a 4ac6a43 82dd88a 4ac6a43 82dd88a 0c52e07 4ac6a43 d81c653 dafe272 d81c653 0c52e07 d81c653 82dd88a dafe272 82dd88a d81c653 0c52e07 82dd88a 3785bd9 82dd88a d81c653 82dd88a 3785bd9 d81c653 82dd88a 3785bd9 d81c653 82dd88a d81c653 82dd88a d81c653 3785bd9 5bdbcdf 3785bd9 d81c653 3785bd9 d81c653 82dd88a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import gradio as gr
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward
import urllib3
from datetime import datetime, timedelta
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def load_sp500_tickers():
"""Load S&P 500 tickers from Wikipedia."""
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
response = requests.get(url, verify=False)
soup = BeautifulSoup(response.content, 'html.parser')
table = soup.find('table', {'id': 'constituents'})
tickers = []
if table:
for row in table.find_all('tr')[1:]:
cells = row.find_all('td')
if cells:
ticker = cells[0].text.strip()
tickers.append(ticker)
return tickers
def load_data(ticker, interval="1d"):
"""Load stock data using yfinance with a specified interval."""
end_date = datetime.today()
start_date = end_date - timedelta(days=365) # Get 1 year of data
data = yf.download(ticker, start=start_date, end=end_date, interval=interval)
return data
def calculate_sma(data, window):
"""Calculate the Simple Moving Average (SMA) for a given window."""
return data['Close'].rolling(window=window).mean()
def calculate_ema(data, window):
"""Calculate the Exponential Moving Average (EMA) for a given window."""
return data['Close'].ewm(span=window, adjust=False).mean()
def average_downtrend(data, method, window=4):
"""Calculate the average difference between consecutive prices for the last 'window' candles."""
if len(data) < window:
return 0.0
price_diffs = data[method].diff().iloc[-window:]
avg_diff = price_diffs.mean()
return avg_diff if avg_diff < 0 else 0.0
def score_candle(candle, prev_candle, trend_strength):
"""Score a single candle based on its characteristics and previous candle."""
open_price = candle['Open']
close_price = candle['Close']
low_price = candle['Low']
high_price = candle['High']
prev_close = prev_candle['Close']
# Bottom and top wick lengths
bottom_wick_length = min(open_price, close_price) - low_price
top_wick_length = high_price - max(open_price, close_price)
# Initial score based on trend strength
score = trend_strength * 2
# Doji: Open and Close are almost the same (small body)
if abs(open_price - close_price) <= 0.1 * (high_price - low_price): # Adjust tolerance if needed
score += 5 # Bonus points for doji candles
# Hammer: Small body at the top, long bottom wick (typical reversal candle)
if close_price < open_price and bottom_wick_length > 2 * (open_price - close_price):
score += 7 # Extra points for hammer-like candles
# Bottom Tailing Wick: Long bottom wick compared to the overall range
if bottom_wick_length > 0.5 * (high_price - low_price):
score += 6 # Extra points for bottom tailing wick
# Additional logic: Boost red candles with long bottom wicks following a downtrend
if close_price < open_price and bottom_wick_length > 0.5 * (open_price - close_price):
score += 3 # Boost for red candle with long bottom wick
# Penalize if the current close is higher than the previous close
if close_price > prev_close:
score -= ((close_price - prev_close) / prev_close) * 100
return score
def score_today_candle(data, window=4):
"""Score today's candle based on the downtrend from the past 'window' days."""
if len(data) < window + 1:
return 0 # Not enough data
today_candle = data.iloc[-1]
prev_candle = data.iloc[-2]
close_price = today_candle['Close']
previous_data = data.iloc[-(window+1):-1]
down_High = average_downtrend(previous_data, method="High",window=window) + average_downtrend(previous_data, method="High",window=7) / 2
down_Close = average_downtrend(previous_data, method="Close",window=window) + average_downtrend(previous_data, method="Close",window=7) / 2
avg_downtrend = (down_High + down_Close) / 2
if avg_downtrend == 0.0:
return -1
# Calculate SMAs for the last row
sma_50 = calculate_sma(data, window=50).iloc[-1]
sma_200 = calculate_sma(data, window=200).iloc[-1]
sma_20 = calculate_sma(data, window=20).iloc[-1]
ema_10 = calculate_ema(data, window=10).iloc[-1]
if (close_price < ema_10) or (close_price < sma_20) or (close_price < sma_50) or (close_price < sma_200):
return -1
return score_candle(today_candle, prev_candle, abs(avg_downtrend))
def scan_sp500(top_n=25, interval, progress=gr.Progress()):
tickers = load_sp500_tickers()
scores = []
tickers.append("QQQ")
for i, ticker in enumerate(progress.tqdm(tickers)):
data = load_data(ticker, interval)
if not data.empty:
score = score_today_candle(data)
if score > 0:
scores.append((ticker, score))
scores = sorted(scores, key=lambda x: x[1], reverse=True)
return scores[:top_n]
def next_business_day(date):
next_day = date + timedelta(days=1)
while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday
next_day += timedelta(days=1)
return next_day
def gradio_scan_sp500(top_n, interval, progress=gr.Progress()):
progress(0, desc="Downloading Data")
tickers = load_sp500_tickers()
tickers.append("QQQ")
progress(0.3, desc="Running Scanner")
results = scan_sp500(top_n, interval, progress)
# Get the last date of the data and find the next business day
last_data = load_data(results[0][0], interval) # Load data for the first ticker in results
last_date = last_data.index[-1].date()
next_market_day = next_business_day(last_date)
date_created = next_market_day.strftime("%Y-%m-%d")
output = f"Scan Results for Market Open on: {date_created}\n\n"
output += "Top {} stocks based on pattern finder score:\n\n".format(top_n)
for ticker, score in results:
output += "{}: Total Score = {:.2f}\n".format(ticker, score)
return output
iface = gr.Interface(
fn=gradio_scan_sp500,
inputs=[
gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25),
gr.Dropdown(choices=["1m", "1d", "1wk", "1mo"], label="Data Interval", value="1d"),
],
outputs="text",
title="S&P 500 Stock Scanner",
description="Scan S&P 500 stocks and display top N stocks based on today's candle score.",
allow_flagging="never",
)
if __name__ == "__main__":
iface.launch(server_name="0.0.0.0", server_port=7860) |