File size: 6,728 Bytes
d81c653
 
 
 
 
 
 
 
82dd88a
d81c653
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82dd88a
3785bd9
 
d81c653
82dd88a
3785bd9
d81c653
 
 
82dd88a
d81c653
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82dd88a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d81c653
4ac6a43
d81c653
4ac6a43
 
 
 
 
 
82dd88a
 
4ac6a43
82dd88a
 
 
 
4ac6a43
 
 
 
d81c653
82dd88a
4ac6a43
 
 
82dd88a
4ac6a43
 
82dd88a
4ac6a43
82dd88a
0c52e07
4ac6a43
d81c653
 
 
0c52e07
d81c653
 
82dd88a
 
 
 
 
 
d81c653
 
 
 
0c52e07
 
 
 
 
 
 
82dd88a
3785bd9
82dd88a
d81c653
 
 
82dd88a
3785bd9
d81c653
82dd88a
3785bd9
d81c653
 
 
 
 
82dd88a
d81c653
82dd88a
d81c653
 
 
 
3785bd9
 
 
 
d81c653
 
 
 
 
 
3785bd9
d81c653
82dd88a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import gradio as gr
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward
import urllib3
from datetime import datetime, timedelta
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def load_sp500_tickers():
    """Load S&P 500 tickers from Wikipedia."""
    url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    response = requests.get(url, verify=False)
    soup = BeautifulSoup(response.content, 'html.parser')
    table = soup.find('table', {'id': 'constituents'})
    tickers = []
    if table:
        for row in table.find_all('tr')[1:]:
            cells = row.find_all('td')
            if cells:
                ticker = cells[0].text.strip()
                tickers.append(ticker)
    return tickers



def load_data(ticker, interval="1d"):
    """Load stock data using yfinance with a specified interval."""
    end_date = datetime.today()
    start_date = end_date - timedelta(days=365)  # Get 1 year of data
    data = yf.download(ticker, start=start_date, end=end_date, interval=interval)
    return data



def calculate_sma(data, window):
    """Calculate the Simple Moving Average (SMA) for a given window."""
    return data['Close'].rolling(window=window).mean()

def calculate_ema(data, window):
    """Calculate the Exponential Moving Average (EMA) for a given window."""
    return data['Close'].ewm(span=window, adjust=False).mean()

def average_downtrend(data, method, window=4):
    """Calculate the average difference between consecutive prices for the last 'window' candles."""
    if len(data) < window:
        return 0.0
    price_diffs = data[method].diff().iloc[-window:]
    avg_diff = price_diffs.mean()
    return avg_diff if avg_diff < 0 else 0.0


def score_candle(candle, prev_candle, trend_strength):
    """Score a single candle based on its characteristics and previous candle."""
    open_price = candle['Open']
    close_price = candle['Close']
    low_price = candle['Low']
    high_price = candle['High']
    prev_close = prev_candle['Close']

    # Bottom and top wick lengths
    bottom_wick_length = min(open_price, close_price) - low_price
    top_wick_length = high_price - max(open_price, close_price)

    # Initial score based on trend strength
    score = trend_strength * 2

    # Doji: Open and Close are almost the same (small body)
    if abs(open_price - close_price) <= 0.1 * (high_price - low_price):  # Adjust tolerance if needed
        score += 5  # Bonus points for doji candles

    # Hammer: Small body at the top, long bottom wick (typical reversal candle)
    if close_price < open_price and bottom_wick_length > 2 * (open_price - close_price):
        score += 7  # Extra points for hammer-like candles

    # Bottom Tailing Wick: Long bottom wick compared to the overall range
    if bottom_wick_length > 0.5 * (high_price - low_price):
        score += 6  # Extra points for bottom tailing wick

    # Additional logic: Boost red candles with long bottom wicks following a downtrend
    if close_price < open_price and bottom_wick_length > 0.5 * (open_price - close_price):
        score += 3  # Boost for red candle with long bottom wick

    # Penalize if the current close is higher than the previous close
    if close_price > prev_close:
        score -= ((close_price - prev_close) / prev_close) * 100


    return score

def score_today_candle(data, window=4):
    """Score today's candle based on the downtrend from the past 'window' days."""
    if len(data) < window + 1:
        return 0  # Not enough data

    today_candle = data.iloc[-1]
    prev_candle = data.iloc[-2]
    
    close_price = today_candle['Close']


    previous_data = data.iloc[-(window+1):-1]
    down_High = average_downtrend(previous_data, method="High",window=window) + average_downtrend(previous_data, method="High",window=7) / 2
    down_Close = average_downtrend(previous_data, method="Close",window=window) + average_downtrend(previous_data, method="Close",window=7) / 2


    avg_downtrend = (down_High + down_Close) / 2
  
    if avg_downtrend == 0.0:
        return -1

    # Calculate SMAs for the last row
    sma_50 = calculate_sma(data, window=50).iloc[-1]
    sma_200 = calculate_sma(data, window=200).iloc[-1]
    sma_20 = calculate_sma(data, window=20).iloc[-1]
    
    ema_10 = calculate_ema(data, window=10).iloc[-1]
    
    if (close_price < ema_10) or (close_price < sma_20) or  (close_price < sma_50) or (close_price < sma_200):
        return -1

    
    return score_candle(today_candle, prev_candle, abs(avg_downtrend))

def scan_sp500(top_n=25, progress=gr.Progress()):
    tickers = load_sp500_tickers()
    scores = []
    tickers.append("QQQ")

    for i, ticker in enumerate(progress.tqdm(tickers)):
        data = load_data(ticker)
        if not data.empty:
            score = score_today_candle(data)
            if score > 0:
                scores.append((ticker, score))
    
    scores = sorted(scores, key=lambda x: x[1], reverse=True)
    return scores[:top_n]

def next_business_day(date):
    next_day = date + timedelta(days=1)
    while next_day.weekday() >= 5:  # 5 = Saturday, 6 = Sunday
        next_day += timedelta(days=1)
    return next_day



def gradio_scan_sp500(top_n, interval, progress=gr.Progress()):
    progress(0, desc="Downloading Data")
    tickers = load_sp500_tickers()
    tickers.append("QQQ")
    
    progress(0.3, desc="Running Scanner")
    results = scan_sp500(top_n, interval, progress)
    
    # Get the last date of the data and find the next business day
    last_data = load_data(results[0][0], interval)  # Load data for the first ticker in results
    last_date = last_data.index[-1].date()
    next_market_day = next_business_day(last_date)
    date_created = next_market_day.strftime("%Y-%m-%d")
    
    output = f"Scan Results for Market Open on: {date_created}\n\n"
    output += "Top {} stocks based on pattern finder score:\n\n".format(top_n)
    for ticker, score in results:
        output += "{}: Total Score = {:.2f}\n".format(ticker, score)
    return output

iface = gr.Interface(
    fn=gradio_scan_sp500,
    inputs=[
        gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25),
        gr.Dropdown(choices=["1d", "1wk", "1mo"], label="Data Interval", value="1d"),
    ],
    outputs="text",
    title="S&P 500 Stock Scanner",
    description="Scan S&P 500 stocks and display top N stocks based on today's candle score.",
    allow_flagging="never",
)


if __name__ == "__main__":
    iface.launch(server_name="0.0.0.0", server_port=7860)