Stock-Market-Scanner / pattern_finder.py
Arkm20's picture
Main Commit
d81c653 verified
import pandas as pd
import numpy as np
def load_data(ticker):
try:
# Load data from CSV
data = pd.read_csv(f'tickers/{ticker}.csv', index_col="Date", parse_dates=['Date'])
data.sort_index(inplace=True) # Ensure data is sorted by date
return data
except FileNotFoundError:
print(f"Data for {ticker} not found.")
return None
def sma(data, period):
return data['Close'].rolling(window=period).mean()
def ema(data, period):
return data['Close'].ewm(span=period, adjust=False).mean()
def score_downward_trend(data, window=4):
"""
Score the downward trend based on price action and volume.
"""
if len(data) < window:
return 0 # Not enough data
score = 0
for j in range(1, window):
if data['High'].iloc[j] < data['High'].iloc[j-1]:
score += 1 # Increment score for each lower high
if data['Close'].iloc[j] < data['Close'].iloc[j-1]:
score += 1 # Increment score for each lower close
if data['Volume'].iloc[j] > data['Volume'].iloc[j-1]:
score += 0.5 # Increment score for increasing volume during downtrend
return score
def score_candle(candle, data, i):
"""
Score the candle based on its pattern, volume, and position relative to moving averages.
"""
open_price, close_price, low_price, high_price = candle['Open'], candle['Close'], candle['Low'], candle['High']
prev_candle = data.iloc[i-1]
score = 0
body = abs(close_price - open_price)
bottom_wick_length = min(open_price, close_price) - low_price
top_wick_length = high_price - max(open_price, close_price)
# Add 16 points if candle is green and there's a significant gap
if close_price > open_price and low_price > prev_candle['High']:
score += 16
# Rest of the existing scoring logic
if bottom_wick_length > 2 * body:
score += 10
if abs(open_price - close_price) < (0.1 * (high_price - low_price)):
score += 15
if bottom_wick_length > top_wick_length:
score += 8
# Volume analysis
if candle['Volume'] > data['Volume'].rolling(window=20).mean().iloc[i]:
score += 5
# Moving average analysis
ema_20 = ema(data.iloc[:i+1], 20).iloc[-1]
sma_50 = sma(data.iloc[:i+1], 50).iloc[-1]
sma_200 = sma(data.iloc[:i+1], 200).iloc[-1]
if close_price > ema_20 and open_price < ema_20:
score += 5
if close_price > sma_50:
score += 3
if close_price > sma_200:
score += 2
# Momentum indicator
rsi = calculate_rsi(data.iloc[:i+1], period=14).iloc[-1]
if rsi < 30:
score += 5
penalty = 0
conditions_met = 0
if candle['High'] > prev_candle['High']:
conditions_met += 1
if candle['Low'] > prev_candle['High']:
conditions_met += 1
if candle['Close'] > max(prev_candle['Close'], prev_candle['Open']):
conditions_met += 1
if candle['Open'] > max(prev_candle['Open'], prev_candle['Close']):
conditions_met += 1
current_avg = (candle['Open'] + candle['Close'] + candle['High'] + candle['Low']) / 4
prev_avg = (prev_candle['Open'] + prev_candle['Close'] + prev_candle['High'] + prev_candle['Low']) / 4
if current_avg > prev_avg:
conditions_met += 1
if conditions_met == 3:
penalty = -10
elif conditions_met == 4:
penalty = -12
elif conditions_met == 5:
penalty = -17
score += penalty
return score
def calculate_rsi(data, period=14):
delta = data['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_risk_reward(data, entry_index, stop_loss_percent=0.02, target_percent=0.06):
entry_price = data['Close'].iloc[entry_index]
stop_loss = entry_price * (1 - stop_loss_percent)
target = entry_price * (1 + target_percent)
risk = entry_price - stop_loss
reward = target - entry_price
return reward / risk
def find_reversal_patterns(data, window=4, candle_score_threshold=20, trend_score_threshold=5, risk_reward_threshold=2):
patterns = []
for i in range(window, len(data)):
trend_score = score_downward_trend(data.iloc[i-window:i], window=window)
if trend_score >= trend_score_threshold:
candle_score = score_candle(data.iloc[i], data, i)
if candle_score >= candle_score_threshold:
risk_reward = calculate_risk_reward(data, i)
if risk_reward >= risk_reward_threshold:
format_date = data.index[i].strftime('%Y-%m-%d')
patterns.append((format_date, trend_score, candle_score, risk_reward))
return patterns
def back_reversal_finder(data, window=4, candle_score_threshold=20, trend_score_threshold=4.5, risk_reward_threshold=1.5):
patterns = []
for i in range(window, len(data)):
trend_score = score_downward_trend(data.iloc[i-window:i], window=window)
if trend_score >= trend_score_threshold:
candle_score = score_candle(data.iloc[i], data, i)
if candle_score >= candle_score_threshold:
risk_reward = calculate_risk_reward(data, i)
if risk_reward >= risk_reward_threshold:
format_date = data.index[i].strftime('%Y-%m-%d')
patterns.append(format_date)
return patterns
def check_for_reversal_patterns(ticker, window=4, candle_score_threshold=20, trend_score_threshold=5, risk_reward_threshold=2):
data = load_data(ticker)
if data is None:
return
patterns = find_reversal_patterns(data, window=window, candle_score_threshold=candle_score_threshold,
trend_score_threshold=trend_score_threshold, risk_reward_threshold=risk_reward_threshold)
if patterns:
print(f"{ticker}: Potential reversal patterns found:")
for date, trend_score, candle_score, risk_reward in patterns:
print(f"Date: {date}, Trend Score: {trend_score:.2f}, Candle Score: {candle_score:.2f}, Risk-Reward: {risk_reward:.2f}")
else:
print(f"{ticker}: No clear reversal patterns detected.")
def main():
ticker = input("Enter Ticker: ").upper()
check_for_reversal_patterns(ticker)
if __name__ == '__main__':
main()