Spaces:
Running
Running
File size: 4,159 Bytes
d81c653 8158bd4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import gradio as gr
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward
import urllib3
from datetime import datetime, timedelta
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def load_sp500_tickers():
"""Load S&P 500 tickers from Wikipedia."""
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
response = requests.get(url, verify=False)
soup = BeautifulSoup(response.content, 'html.parser')
table = soup.find('table', {'id': 'constituents'})
tickers = []
if table:
for row in table.find_all('tr')[1:]:
cells = row.find_all('td')
if cells:
ticker = cells[0].text.strip()
tickers.append(ticker)
return tickers
def load_data(ticker):
"""Load stock data using yfinance."""
end_date = datetime.today()
start_date = end_date - timedelta(days=365) # Get 1 year of data
data = yf.download(ticker, start=start_date, end=end_date)
return data
def calculate_sma(data, window):
"""Calculate the Simple Moving Average (SMA) for a given window."""
return data['Close'].rolling(window=window).mean()
def calculate_ema(data, window):
"""Calculate the Exponential Moving Average (EMA) for a given window."""
return data['Close'].ewm(span=window, adjust=False).mean()
def average_downtrend(data, method, window=4):
"""Calculate the average difference between consecutive prices for the last 'window' candles."""
if len(data) < window:
return 0.0
price_diffs = data[method].diff().iloc[-window:]
avg_diff = price_diffs.mean()
return avg_diff if avg_diff < 0 else 0.0
def score_today_candle(data, window=4):
if len(data) < window + 1:
return 0
trend_score = score_downward_trend(data.iloc[-window:], window=window)
candle_score = score_candle(data.iloc[-1], data, len(data) - 1)
risk_reward = calculate_risk_reward(data, len(data) - 1)
# Combine scores (you can adjust the weights as needed)
total_score = trend_score + candle_score + (risk_reward * 10)
return total_score
def scan_sp500(top_n=25, progress=gr.Progress()):
tickers = load_sp500_tickers()
scores = []
tickers.append("QQQ")
for i, ticker in enumerate(progress.tqdm(tickers)):
data = load_data(ticker)
if not data.empty:
score = score_today_candle(data)
if score > 0:
scores.append((ticker, score))
scores = sorted(scores, key=lambda x: x[1], reverse=True)
return scores[:top_n]
def next_business_day(date):
next_day = date + timedelta(days=1)
while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday
next_day += timedelta(days=1)
return next_day
def gradio_scan_sp500(top_n, progress=gr.Progress()):
progress(0, desc="Downloading Data")
tickers = load_sp500_tickers()
tickers.append("QQQ")
progress(0.3, desc="Running Scanner")
results = scan_sp500(top_n, progress)
# Get the last date of the data and find the next business day
last_data = load_data(results[0][0]) # Load data for the first ticker in results
last_date = last_data.index[-1].date()
next_market_day = next_business_day(last_date)
date_created = next_market_day.strftime("%Y-%m-%d")
output = f"Scan Results for Market Open on: {date_created}\n\n"
output += "Top {} stocks based on pattern finder score:\n\n".format(top_n)
for ticker, score in results:
output += "{}: Total Score = {:.2f}\n".format(ticker, score)
return output
iface = gr.Interface(
fn=gradio_scan_sp500,
inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25),
outputs="text",
title="S&P 500 Stock Scanner",
description="Scan S&P 500 stocks and display top N stocks based on today's candle score.",
allow_flagging="never",
)
if __name__ == "__main__":
iface.launch(server_name="0.0.0.0", server_port=7860) |