Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
def load_data(ticker): | |
try: | |
# Load data from CSV | |
data = pd.read_csv(f'tickers/{ticker}.csv', index_col="Date", parse_dates=['Date']) | |
data.sort_index(inplace=True) # Ensure data is sorted by date | |
return data | |
except FileNotFoundError: | |
print(f"Data for {ticker} not found.") | |
return None | |
def sma(data, period): | |
return data['Close'].rolling(window=period).mean() | |
def ema(data, period): | |
return data['Close'].ewm(span=period, adjust=False).mean() | |
def score_downward_trend(data, window=4): | |
""" | |
Score the downward trend based on price action and volume. | |
""" | |
if len(data) < window: | |
return 0 # Not enough data | |
score = 0 | |
for j in range(1, window): | |
if data['High'].iloc[j] < data['High'].iloc[j-1]: | |
score += 1 # Increment score for each lower high | |
if data['Close'].iloc[j] < data['Close'].iloc[j-1]: | |
score += 1 # Increment score for each lower close | |
if data['Volume'].iloc[j] > data['Volume'].iloc[j-1]: | |
score += 0.5 # Increment score for increasing volume during downtrend | |
return score | |
def score_candle(candle, data, i): | |
""" | |
Score the candle based on its pattern, volume, and position relative to moving averages. | |
""" | |
open_price, close_price, low_price, high_price = candle['Open'], candle['Close'], candle['Low'], candle['High'] | |
prev_candle = data.iloc[i-1] | |
score = 0 | |
body = abs(close_price - open_price) | |
bottom_wick_length = min(open_price, close_price) - low_price | |
top_wick_length = high_price - max(open_price, close_price) | |
# Add 16 points if candle is green and there's a significant gap | |
if close_price > open_price and low_price > prev_candle['High']: | |
score += 16 | |
# Rest of the existing scoring logic | |
if bottom_wick_length > 2 * body: | |
score += 10 | |
if abs(open_price - close_price) < (0.1 * (high_price - low_price)): | |
score += 15 | |
if bottom_wick_length > top_wick_length: | |
score += 8 | |
# Volume analysis | |
if candle['Volume'] > data['Volume'].rolling(window=20).mean().iloc[i]: | |
score += 5 | |
# Moving average analysis | |
ema_20 = ema(data.iloc[:i+1], 20).iloc[-1] | |
sma_50 = sma(data.iloc[:i+1], 50).iloc[-1] | |
sma_200 = sma(data.iloc[:i+1], 200).iloc[-1] | |
if close_price > ema_20 and open_price < ema_20: | |
score += 5 | |
if close_price > sma_50: | |
score += 3 | |
if close_price > sma_200: | |
score += 2 | |
# Momentum indicator | |
rsi = calculate_rsi(data.iloc[:i+1], period=14).iloc[-1] | |
if rsi < 30: | |
score += 5 | |
penalty = 0 | |
conditions_met = 0 | |
if candle['High'] > prev_candle['High']: | |
conditions_met += 1 | |
if candle['Low'] > prev_candle['High']: | |
conditions_met += 1 | |
if candle['Close'] > max(prev_candle['Close'], prev_candle['Open']): | |
conditions_met += 1 | |
if candle['Open'] > max(prev_candle['Open'], prev_candle['Close']): | |
conditions_met += 1 | |
current_avg = (candle['Open'] + candle['Close'] + candle['High'] + candle['Low']) / 4 | |
prev_avg = (prev_candle['Open'] + prev_candle['Close'] + prev_candle['High'] + prev_candle['Low']) / 4 | |
if current_avg > prev_avg: | |
conditions_met += 1 | |
if conditions_met == 3: | |
penalty = -10 | |
elif conditions_met == 4: | |
penalty = -12 | |
elif conditions_met == 5: | |
penalty = -17 | |
score += penalty | |
return score | |
def calculate_rsi(data, period=14): | |
delta = data['Close'].diff() | |
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
rs = gain / loss | |
return 100 - (100 / (1 + rs)) | |
def calculate_risk_reward(data, entry_index, stop_loss_percent=0.02, target_percent=0.06): | |
entry_price = data['Close'].iloc[entry_index] | |
stop_loss = entry_price * (1 - stop_loss_percent) | |
target = entry_price * (1 + target_percent) | |
risk = entry_price - stop_loss | |
reward = target - entry_price | |
return reward / risk | |
def find_reversal_patterns(data, window=4, candle_score_threshold=20, trend_score_threshold=5, risk_reward_threshold=2): | |
patterns = [] | |
for i in range(window, len(data)): | |
trend_score = score_downward_trend(data.iloc[i-window:i], window=window) | |
if trend_score >= trend_score_threshold: | |
candle_score = score_candle(data.iloc[i], data, i) | |
if candle_score >= candle_score_threshold: | |
risk_reward = calculate_risk_reward(data, i) | |
if risk_reward >= risk_reward_threshold: | |
format_date = data.index[i].strftime('%Y-%m-%d') | |
patterns.append((format_date, trend_score, candle_score, risk_reward)) | |
return patterns | |
def back_reversal_finder(data, window=4, candle_score_threshold=20, trend_score_threshold=4.5, risk_reward_threshold=1.5): | |
patterns = [] | |
for i in range(window, len(data)): | |
trend_score = score_downward_trend(data.iloc[i-window:i], window=window) | |
if trend_score >= trend_score_threshold: | |
candle_score = score_candle(data.iloc[i], data, i) | |
if candle_score >= candle_score_threshold: | |
risk_reward = calculate_risk_reward(data, i) | |
if risk_reward >= risk_reward_threshold: | |
format_date = data.index[i].strftime('%Y-%m-%d') | |
patterns.append(format_date) | |
return patterns | |
def check_for_reversal_patterns(ticker, window=4, candle_score_threshold=20, trend_score_threshold=5, risk_reward_threshold=2): | |
data = load_data(ticker) | |
if data is None: | |
return | |
patterns = find_reversal_patterns(data, window=window, candle_score_threshold=candle_score_threshold, | |
trend_score_threshold=trend_score_threshold, risk_reward_threshold=risk_reward_threshold) | |
if patterns: | |
print(f"{ticker}: Potential reversal patterns found:") | |
for date, trend_score, candle_score, risk_reward in patterns: | |
print(f"Date: {date}, Trend Score: {trend_score:.2f}, Candle Score: {candle_score:.2f}, Risk-Reward: {risk_reward:.2f}") | |
else: | |
print(f"{ticker}: No clear reversal patterns detected.") | |
def main(): | |
ticker = input("Enter Ticker: ").upper() | |
check_for_reversal_patterns(ticker) | |
if __name__ == '__main__': | |
main() |