Spaces:
Running
Running
File size: 5,183 Bytes
d81c653 3480103 d81c653 3480103 d81c653 3480103 d81c653 4ac6a43 3480103 4ac6a43 3480103 d81c653 4ac6a43 d81c653 4ac6a43 3480103 4ac6a43 d81c653 4ac6a43 d81c653 3480103 d81c653 3480103 d81c653 3480103 d81c653 4ac6a43 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import gradio as gr
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def load_sp500_tickers():
"""Load S&P 500 tickers from Wikipedia."""
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
response = requests.get(url, verify=False)
soup = BeautifulSoup(response.content, 'html.parser')
table = soup.find('table', {'id': 'constituents'})
tickers = []
if table:
for row in table.find_all('tr')[1:]:
cells = row.find_all('td')
if cells:
ticker = cells[0].text.strip()
tickers.append(ticker)
return tickers
def load_data(ticker):
"""Load stock data using yfinance."""
end_date = datetime.today()
start_date = end_date - timedelta(days=365) # Get 1 year of data
data = yf.download(ticker, start=start_date, end=end_date)
return data
def calculate_sma(data, window):
"""Calculate the Simple Moving Average (SMA) for a given window."""
return data['Close'].rolling(window=window).mean()
def calculate_ema(data, window):
"""Calculate the Exponential Moving Average (EMA) for a given window."""
return data['Close'].ewm(span=window, adjust=False).mean()
def average_downtrend(data, method, window=4):
"""Calculate the average difference between consecutive prices for the last 'window' candles."""
if len(data) < window:
return 0.0
price_diffs = data[method].diff().iloc[-window:]
avg_diff = price_diffs.mean()
return avg_diff if avg_diff < 0 else 0.0
def score_candle(candle, prev_candle, trend_strength):
"""Score a single candle based on its characteristics and previous candle."""
# [Keep the existing scoring logic]
# ...
return score
def score_today_candle(data, window=4):
"""Score today's candle based on the downtrend from the past 'window' days."""
if len(data) < window + 1:
return 0 # Not enough data
today_candle = data.iloc[-1]
prev_candle = data.iloc[-2]
close_price = today_candle['Close']
previous_data = data.iloc[-(window+1):-1]
down_High = average_downtrend(previous_data, method="High", window=window) + average_downtrend(previous_data, method="High", window=7) / 2
down_Close = average_downtrend(previous_data, method="Close", window=window) + average_downtrend(previous_data, method="Close", window=7) / 2
avg_downtrend = (down_High + down_Close) / 2
if avg_downtrend == 0.0:
return -1
sma_50 = calculate_sma(data, window=50).iloc[-1]
sma_200 = calculate_sma(data, window=200).iloc[-1]
sma_20 = calculate_sma(data, window=20).iloc[-1]
ema_10 = calculate_ema(data, window=10).iloc[-1]
if (close_price < ema_10) or (close_price < sma_20) or (close_price < sma_50) or (close_price < sma_200):
return -1
return score_candle(today_candle, prev_candle, abs(avg_downtrend))
def scan_ticker(ticker):
"""Load data for a ticker and calculate its score."""
data = load_data(ticker)
if not data.empty:
score = score_today_candle(data)
if score > 0:
return ticker, score
return None
def scan_sp500(top_n=25, progress=gr.Progress()):
tickers = load_sp500_tickers()
tickers.append("QQQ")
scores = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(scan_ticker, ticker): ticker for ticker in tickers}
for i, future in enumerate(as_completed(futures), 1):
result = future.result()
if result:
scores.append(result)
progress(i / len(tickers), desc="Processing Tickers")
scores = sorted(scores, key=lambda x: x[1], reverse=True)
return scores[:top_n]
def gradio_scan_sp500(top_n, progress=gr.Progress()):
progress(0, desc="Downloading Data")
tickers = load_sp500_tickers()
tickers.append("QQQ")
progress(0.3, desc="Running Scanner")
results = scan_sp500(top_n, progress)
last_data = load_data(results[0][0])
last_date = last_data.index[-1].date()
next_market_day = next_business_day(last_date)
date_created = next_market_day.strftime("%Y-%m-%d")
output = f"Scan Results for Market Open on: {date_created}\n\n"
output += "Top {} stocks based on pattern finder score:\n\n".format(top_n)
for ticker, score in results:
output += "{}: Total Score = {:.2f}\n".format(ticker, score)
return output
iface = gr.Interface(
fn=gradio_scan_sp500,
inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25),
outputs="text",
title="S&P 500 Stock Scanner",
description="Scan S&P 500 stocks and display top N stocks based on today's candle score.",
allow_flagging="never",
)
if __name__ == "__main__":
iface.launch(server_name="0.0.0.0", server_port=7860)
|