Arkm20 commited on
Commit
82dd88a
1 Parent(s): 0c52e07

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +66 -32
app.py CHANGED
@@ -5,8 +5,8 @@ from datetime import datetime, timedelta
5
  import requests
6
  from bs4 import BeautifulSoup
7
  from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward
8
- from concurrent.futures import ThreadPoolExecutor, as_completed
9
  import urllib3
 
10
  urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
11
 
12
 
@@ -26,24 +26,24 @@ def load_sp500_tickers():
26
  return tickers
27
 
28
 
 
29
  def load_data(ticker):
30
  """Load stock data using yfinance."""
31
  end_date = datetime.today()
32
- start_date = end_date - timedelta(days=365)
33
  data = yf.download(ticker, start=start_date, end=end_date)
34
  return data
35
 
36
 
 
37
  def calculate_sma(data, window):
38
  """Calculate the Simple Moving Average (SMA) for a given window."""
39
  return data['Close'].rolling(window=window).mean()
40
 
41
-
42
  def calculate_ema(data, window):
43
  """Calculate the Exponential Moving Average (EMA) for a given window."""
44
  return data['Close'].ewm(span=window, adjust=False).mean()
45
 
46
-
47
  def average_downtrend(data, method, window=4):
48
  """Calculate the average difference between consecutive prices for the last 'window' candles."""
49
  if len(data) < window:
@@ -53,6 +53,44 @@ def average_downtrend(data, method, window=4):
53
  return avg_diff if avg_diff < 0 else 0.0
54
 
55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  def score_today_candle(data, window=4):
57
  """Score today's candle based on the downtrend from the past 'window' days."""
58
  if len(data) < window + 1:
@@ -62,51 +100,46 @@ def score_today_candle(data, window=4):
62
  prev_candle = data.iloc[-2]
63
 
64
  close_price = today_candle['Close']
 
 
65
  previous_data = data.iloc[-(window+1):-1]
66
-
67
- down_High = average_downtrend(previous_data, method="High", window=window) + average_downtrend(previous_data, method="High", window=7) / 2
68
- down_Close = average_downtrend(previous_data, method="Close", window=window) + average_downtrend(previous_data, method="Close", window=7) / 2
 
69
  avg_downtrend = (down_High + down_Close) / 2
70
 
71
  if avg_downtrend == 0.0:
72
  return -1
73
 
 
74
  sma_50 = calculate_sma(data, window=50).iloc[-1]
75
  sma_200 = calculate_sma(data, window=200).iloc[-1]
76
  sma_20 = calculate_sma(data, window=20).iloc[-1]
 
77
  ema_10 = calculate_ema(data, window=10).iloc[-1]
78
 
79
- if (close_price < ema_10) or (close_price < sma_20) or (close_price < sma_50) or (close_price < sma_200):
80
  return -1
 
81
 
82
  return score_candle(today_candle, prev_candle, abs(avg_downtrend))
83
 
84
-
85
  def scan_sp500(top_n=25, progress=gr.Progress()):
86
  tickers = load_sp500_tickers()
87
  scores = []
88
  tickers.append("QQQ")
89
 
90
- with ThreadPoolExecutor(max_workers=10) as executor:
91
- futures = {executor.submit(load_data, ticker): ticker for ticker in tickers}
92
- total_tickers = len(futures)
93
-
94
- for i, future in enumerate(as_completed(futures)):
95
- ticker = futures[future]
96
- data = future.result()
97
-
98
- if not data.empty:
99
- score = score_today_candle(data)
100
- if score > 0:
101
- scores.append((ticker, score))
102
-
103
- # Update progress after each ticker is processed
104
- progress(i / total_tickers, desc=f"Processing {ticker}")
105
 
106
  scores = sorted(scores, key=lambda x: x[1], reverse=True)
107
  return scores[:top_n]
108
 
109
-
110
  def next_business_day(date):
111
  next_day = date + timedelta(days=1)
112
  while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday
@@ -114,26 +147,27 @@ def next_business_day(date):
114
  return next_day
115
 
116
 
 
117
  def gradio_scan_sp500(top_n, progress=gr.Progress()):
118
- progress(0, desc="Loading S&P 500 Tickers")
119
  tickers = load_sp500_tickers()
120
  tickers.append("QQQ")
121
 
122
- progress(0.2, desc="Running Stock Scanning")
123
  results = scan_sp500(top_n, progress)
124
 
125
- last_data = load_data(results[0][0])
 
126
  last_date = last_data.index[-1].date()
127
  next_market_day = next_business_day(last_date)
128
  date_created = next_market_day.strftime("%Y-%m-%d")
129
 
130
  output = f"Scan Results for Market Open on: {date_created}\n\n"
131
- output += f"Top {top_n} stocks based on pattern finder score:\n\n"
132
  for ticker, score in results:
133
- output += f"{ticker}: Total Score = {score:.2f}\n"
134
  return output
135
 
136
-
137
  iface = gr.Interface(
138
  fn=gradio_scan_sp500,
139
  inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25),
@@ -144,4 +178,4 @@ iface = gr.Interface(
144
  )
145
 
146
  if __name__ == "__main__":
147
- iface.launch(server_name="0.0.0.0", server_port=7860)
 
5
  import requests
6
  from bs4 import BeautifulSoup
7
  from pattern_finder import score_downward_trend, score_candle, calculate_risk_reward
 
8
  import urllib3
9
+ from datetime import datetime, timedelta
10
  urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
11
 
12
 
 
26
  return tickers
27
 
28
 
29
+
30
  def load_data(ticker):
31
  """Load stock data using yfinance."""
32
  end_date = datetime.today()
33
+ start_date = end_date - timedelta(days=365) # Get 1 year of data
34
  data = yf.download(ticker, start=start_date, end=end_date)
35
  return data
36
 
37
 
38
+
39
  def calculate_sma(data, window):
40
  """Calculate the Simple Moving Average (SMA) for a given window."""
41
  return data['Close'].rolling(window=window).mean()
42
 
 
43
  def calculate_ema(data, window):
44
  """Calculate the Exponential Moving Average (EMA) for a given window."""
45
  return data['Close'].ewm(span=window, adjust=False).mean()
46
 
 
47
  def average_downtrend(data, method, window=4):
48
  """Calculate the average difference between consecutive prices for the last 'window' candles."""
49
  if len(data) < window:
 
53
  return avg_diff if avg_diff < 0 else 0.0
54
 
55
 
56
+ def score_candle(candle, prev_candle, trend_strength):
57
+ """Score a single candle based on its characteristics and previous candle."""
58
+ open_price = candle['Open']
59
+ close_price = candle['Close']
60
+ low_price = candle['Low']
61
+ high_price = candle['High']
62
+ prev_close = prev_candle['Close']
63
+
64
+ # Bottom and top wick lengths
65
+ bottom_wick_length = min(open_price, close_price) - low_price
66
+ top_wick_length = high_price - max(open_price, close_price)
67
+
68
+ # Initial score based on trend strength
69
+ score = trend_strength * 2
70
+
71
+ # Doji: Open and Close are almost the same (small body)
72
+ if abs(open_price - close_price) <= 0.1 * (high_price - low_price): # Adjust tolerance if needed
73
+ score += 5 # Bonus points for doji candles
74
+
75
+ # Hammer: Small body at the top, long bottom wick (typical reversal candle)
76
+ if close_price < open_price and bottom_wick_length > 2 * (open_price - close_price):
77
+ score += 7 # Extra points for hammer-like candles
78
+
79
+ # Bottom Tailing Wick: Long bottom wick compared to the overall range
80
+ if bottom_wick_length > 0.5 * (high_price - low_price):
81
+ score += 6 # Extra points for bottom tailing wick
82
+
83
+ # Additional logic: Boost red candles with long bottom wicks following a downtrend
84
+ if close_price < open_price and bottom_wick_length > 0.5 * (open_price - close_price):
85
+ score += 3 # Boost for red candle with long bottom wick
86
+
87
+ # Penalize if the current close is higher than the previous close
88
+ if close_price > prev_close:
89
+ score -= ((close_price - prev_close) / prev_close) * 100
90
+
91
+
92
+ return score
93
+
94
  def score_today_candle(data, window=4):
95
  """Score today's candle based on the downtrend from the past 'window' days."""
96
  if len(data) < window + 1:
 
100
  prev_candle = data.iloc[-2]
101
 
102
  close_price = today_candle['Close']
103
+
104
+
105
  previous_data = data.iloc[-(window+1):-1]
106
+ down_High = average_downtrend(previous_data, method="High",window=window) + average_downtrend(previous_data, method="High",window=7) / 2
107
+ down_Close = average_downtrend(previous_data, method="Close",window=window) + average_downtrend(previous_data, method="Close",window=7) / 2
108
+
109
+
110
  avg_downtrend = (down_High + down_Close) / 2
111
 
112
  if avg_downtrend == 0.0:
113
  return -1
114
 
115
+ # Calculate SMAs for the last row
116
  sma_50 = calculate_sma(data, window=50).iloc[-1]
117
  sma_200 = calculate_sma(data, window=200).iloc[-1]
118
  sma_20 = calculate_sma(data, window=20).iloc[-1]
119
+
120
  ema_10 = calculate_ema(data, window=10).iloc[-1]
121
 
122
+ if (close_price < ema_10) or (close_price < sma_20) or (close_price < sma_50) or (close_price < sma_200):
123
  return -1
124
+
125
 
126
  return score_candle(today_candle, prev_candle, abs(avg_downtrend))
127
 
 
128
  def scan_sp500(top_n=25, progress=gr.Progress()):
129
  tickers = load_sp500_tickers()
130
  scores = []
131
  tickers.append("QQQ")
132
 
133
+ for i, ticker in enumerate(progress.tqdm(tickers)):
134
+ data = load_data(ticker)
135
+ if not data.empty:
136
+ score = score_today_candle(data)
137
+ if score > 0:
138
+ scores.append((ticker, score))
 
 
 
 
 
 
 
 
 
139
 
140
  scores = sorted(scores, key=lambda x: x[1], reverse=True)
141
  return scores[:top_n]
142
 
 
143
  def next_business_day(date):
144
  next_day = date + timedelta(days=1)
145
  while next_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday
 
147
  return next_day
148
 
149
 
150
+
151
  def gradio_scan_sp500(top_n, progress=gr.Progress()):
152
+ progress(0, desc="Downloading Data")
153
  tickers = load_sp500_tickers()
154
  tickers.append("QQQ")
155
 
156
+ progress(0.3, desc="Running Scanner")
157
  results = scan_sp500(top_n, progress)
158
 
159
+ # Get the last date of the data and find the next business day
160
+ last_data = load_data(results[0][0]) # Load data for the first ticker in results
161
  last_date = last_data.index[-1].date()
162
  next_market_day = next_business_day(last_date)
163
  date_created = next_market_day.strftime("%Y-%m-%d")
164
 
165
  output = f"Scan Results for Market Open on: {date_created}\n\n"
166
+ output += "Top {} stocks based on pattern finder score:\n\n".format(top_n)
167
  for ticker, score in results:
168
+ output += "{}: Total Score = {:.2f}\n".format(ticker, score)
169
  return output
170
 
 
171
  iface = gr.Interface(
172
  fn=gradio_scan_sp500,
173
  inputs=gr.Slider(minimum=1, maximum=100, step=1, label="Number of top stocks to display", value=25),
 
178
  )
179
 
180
  if __name__ == "__main__":
181
+ iface.launch(server_name="0.0.0.0", server_port=7860)