File size: 4,159 Bytes
d81c653
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8158bd4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import gradio as gr
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward
import urllib3
from datetime import datetime, timedelta
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def load_sp500_tickers():
    """Load S&P 500 tickers from Wikipedia."""
    url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    response = requests.get(url, verify=False)
    soup = BeautifulSoup(response.content, 'html.parser')
    table = soup.find('table', {'id': 'constituents'})
    tickers = []
    if table:
        for row in table.find_all('tr')[1:]:
            cells = row.find_all('td')
            if cells:
                ticker = cells[0].text.strip()
                tickers.append(ticker)
    return tickers



def load_data(ticker):
    """Load stock data using yfinance."""
    end_date = datetime.today()
    start_date = end_date - timedelta(days=365)  # Get 1 year of data
    data = yf.download(ticker, start=start_date, end=end_date)
    return data



def calculate_sma(data, window):
    """Calculate the Simple Moving Average (SMA) for a given window."""
    return data['Close'].rolling(window=window).mean()

def calculate_ema(data, window):
    """Calculate the Exponential Moving Average (EMA) for a given window."""
    return data['Close'].ewm(span=window, adjust=False).mean()

def average_downtrend(data, method, window=4):
    """Calculate the average difference between consecutive prices for the last 'window' candles."""
    if len(data) < window:
        return 0.0
    price_diffs = data[method].diff().iloc[-window:]
    avg_diff = price_diffs.mean()
    return avg_diff if avg_diff < 0 else 0.0


def score_today_candle(data, window=4):
    if len(data) < window + 1:
        return 0

    trend_score = score_downward_trend(data.iloc[-window:], window=window)
    candle_score = score_candle(data.iloc[-1], data, len(data) - 1)
    risk_reward = calculate_risk_reward(data, len(data) - 1)

    # Combine scores (you can adjust the weights as needed)
    total_score = trend_score + candle_score + (risk_reward * 10)

    return total_score

def scan_sp500(top_n=25, progress=gr.Progress()):
    tickers = load_sp500_tickers()
    scores = []
    tickers.append("QQQ")

    for i, ticker in enumerate(progress.tqdm(tickers)):
        data = load_data(ticker)
        if not data.empty:
            score = score_today_candle(data)
            if score > 0:
                scores.append((ticker, score))
    
    scores = sorted(scores, key=lambda x: x[1], reverse=True)
    return scores[:top_n]

def next_business_day(date):
    next_day = date + timedelta(days=1)
    while next_day.weekday() >= 5:  # 5 = Saturday, 6 = Sunday
        next_day += timedelta(days=1)
    return next_day



def gradio_scan_sp500(top_n, progress=gr.Progress()):
    progress(0, desc="Downloading Data")
    tickers = load_sp500_tickers()
    tickers.append("QQQ")
    
    progress(0.3, desc="Running Scanner")
    results = scan_sp500(top_n, progress)
    
    # Get the last date of the data and find the next business day
    last_data = load_data(results[0][0])  # Load data for the first ticker in results
    last_date = last_data.index[-1].date()
    next_market_day = next_business_day(last_date)
    date_created = next_market_day.strftime("%Y-%m-%d")
    
    output = f"Scan Results for Market Open on: {date_created}\n\n"
    output += "Top {} stocks based on pattern finder score:\n\n".format(top_n)
    for ticker, score in results:
        output += "{}: Total Score = {:.2f}\n".format(ticker, score)
    return output

iface = gr.Interface(
    fn=gradio_scan_sp500,
    inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25),
    outputs="text",
    title="S&P 500 Stock Scanner",
    description="Scan S&P 500 stocks and display top N stocks based on today's candle score.",
    allow_flagging="never",
)

if __name__ == "__main__":
    iface.launch(server_name="0.0.0.0", server_port=7860)