import os import numpy as np import pandas as pd from datetime import datetime, timedelta import requests from geopy.geocoders import Nominatim from geopy.exc import GeocoderTimedOut from scipy import stats # Get API key from environment variable OPENWEATHER_API_KEY = os.getenv('OPENWEATHER_API_KEY', 'default_key') class TobaccoAnalyzer: def __init__(self): self.api_key = OPENWEATHER_API_KEY self.optimal_conditions = { 'temperature': {'min': 20, 'max': 30}, 'humidity': {'min': 60, 'max': 80}, 'rainfall': {'min': 500/365, 'max': 1200/365}, 'ndvi': {'min': 0.3, 'max': 0.8} } self.geolocator = Nominatim(user_agent="tobacco_analyzer") self.tanzania_seasons = { 1: 'Main', 2: 'Main', 3: 'Main', 4: 'Late', 5: 'Late', 6: 'Dry', 7: 'Dry', 8: 'Dry', 9: 'Early', 10: 'Early', 11: 'Early', 12: 'Main' } def geocode_location(self, location_name): """Convert location name to coordinates""" try: location = self.geolocator.geocode(location_name) if location: return { 'lat': location.latitude, 'lon': location.longitude, 'address': location.address, 'region': self.get_tanzania_region(location.address) } return None except GeocoderTimedOut: return None def get_tanzania_region(self, address): """Extract Tanzania region from address""" if address: address_parts = address.lower().split(',') tanzania_regions = ['tabora', 'urambo', 'sikonge', 'nzega'] for part in address_parts: if any(region in part.strip() for region in tanzania_regions): return part.strip() return None def get_weather_data(self, lat, lon, historical_days=90, forecast_days=90): """Get historical and forecast weather data""" historical_data = [] # Get historical data for day in range(historical_days): date = datetime.now() - timedelta(days=day) url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={self.api_key}&units=metric&dt={int(date.timestamp())}" try: response = requests.get(url) if response.status_code == 200: data = response.json() weather_data = { 'date': date, 'temperature': float(data['main']['temp']), 'humidity': float(data['main']['humidity']), 'rainfall': float(data.get('rain', {}).get('1h', 0)) * 24, 'type': 'historical', 'description': data['weather'][0]['description'], 'temp_min': float(data['main']['temp_min']), 'temp_max': float(data['main']['temp_max']) } historical_data.append(weather_data) except Exception as e: print(f"Error fetching historical data: {e}") # Get forecast data forecast_data = [] try: forecast_url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={self.api_key}&units=metric" response = requests.get(forecast_url) if response.status_code == 200: data = response.json() for item in data['list']: date = datetime.fromtimestamp(item['dt']) forecast = { 'date': date, 'temperature': float(item['main']['temp']), 'humidity': float(item['main']['humidity']), 'rainfall': float(item.get('rain', {}).get('3h', 0)) * 8, 'type': 'forecast', 'description': item['weather'][0]['description'], 'temp_min': float(item['main']['temp_min']), 'temp_max': float(item['main']['temp_max']) } forecast_data.append(forecast) except Exception as e: print(f"Error fetching forecast data: {e}") # Combine and process all data all_data = pd.DataFrame(historical_data + forecast_data) if not all_data.empty: # Sort by date all_data = all_data.sort_values('date') # Add analysis columns all_data['month'] = all_data['date'].dt.month all_data['season'] = all_data['month'].map(self.tanzania_seasons) # Calculate temperature range all_data['temp_range'] = all_data['temp_max'] - all_data['temp_min'] # Calculate rolling averages all_data['temp_7day_avg'] = all_data['temperature'].rolling(window=7, min_periods=1).mean() all_data['humidity_7day_avg'] = all_data['humidity'].rolling(window=7, min_periods=1).mean() all_data['rainfall_7day_avg'] = all_data['rainfall'].rolling(window=7, min_periods=1).mean() # Calculate suitability and NDVI all_data['daily_suitability'] = self.calculate_daily_suitability(all_data) all_data['estimated_ndvi'] = self.estimate_ndvi(all_data) return all_data return pd.DataFrame() def analyze_trends(self, df): """Analyze weather trends and patterns""" try: historical = df[df['type'] == 'historical'] forecast = df[df['type'].isin(['forecast', 'forecast_extended'])] if len(historical) < 2: return None # Create time index for trend calculation historical['days'] = (historical['date'] - historical['date'].min()).dt.total_seconds() / (24*60*60) # Calculate trends temp_trend = stats.linregress(historical['days'], historical['temperature']) humidity_trend = stats.linregress(historical['days'], historical['humidity']) rainfall_trend = stats.linregress(historical['days'], historical['rainfall']) ndvi_trend = stats.linregress(historical['days'], historical['estimated_ndvi']) analysis = { 'historical': { 'temperature': { 'mean': historical['temperature'].mean(), 'std': historical['temperature'].std(), 'trend': temp_trend.slope, 'trend_r2': temp_trend.rvalue**2 }, 'humidity': { 'mean': historical['humidity'].mean(), 'std': historical['humidity'].std(), 'trend': humidity_trend.slope, 'trend_r2': humidity_trend.rvalue**2 }, 'rainfall': { 'mean': historical['rainfall'].mean(), 'std': historical['rainfall'].std(), 'trend': rainfall_trend.slope, 'trend_r2': rainfall_trend.rvalue**2 }, 'ndvi': { 'mean': historical['estimated_ndvi'].mean(), 'std': historical['estimated_ndvi'].std(), 'trend': ndvi_trend.slope, 'trend_r2': ndvi_trend.rvalue**2 } } } if not forecast.empty: analysis['forecast'] = { 'temperature': { 'mean': forecast['temperature'].mean(), 'std': forecast['temperature'].std() }, 'humidity': { 'mean': forecast['humidity'].mean(), 'std': forecast['humidity'].std() }, 'rainfall': { 'mean': forecast['rainfall'].mean(), 'std': forecast['rainfall'].std() }, 'ndvi': { 'mean': forecast['estimated_ndvi'].mean(), 'std': forecast['estimated_ndvi'].std() } } return analysis except Exception as e: print(f"Error in trend analysis: {e}") return None def calculate_daily_suitability(self, df): """Calculate daily growing suitability""" try: # Temperature suitability temp_suit = 1 - np.clip(abs(df['temperature'] - 25) / 10, 0, 1) # Temperature range suitability temp_range_suit = 1 - np.clip(df['temp_range'] / 15, 0, 1) # Humidity suitability humidity_suit = 1 - np.clip(abs(df['humidity'] - 70) / 30, 0, 1) # Rainfall suitability daily_rainfall_target = (self.optimal_conditions['rainfall']['min'] + self.optimal_conditions['rainfall']['max']) / 2 rainfall_suit = 1 - np.clip(abs(df['rainfall'] - daily_rainfall_target) / daily_rainfall_target, 0, 1) # Combine scores with weights suitability = ( 0.35 * temp_suit + 0.15 * temp_range_suit + 0.25 * humidity_suit + 0.25 * rainfall_suit ) return np.clip(suitability, 0, 1) except Exception as e: print(f"Error calculating suitability: {e}") return pd.Series(0.5, index=df.index) def estimate_ndvi(self, weather_data): """Estimate NDVI based on weather conditions""" try: # Normalize weather parameters normalized_temp = (weather_data['temperature'] - 15) / (30 - 15) normalized_humidity = (weather_data['humidity'] - 50) / (80 - 50) normalized_rainfall = weather_data['rainfall'] / 5 # Season adjustment factors season_factors = { 'Main': 1.0, 'Early': 0.8, 'Late': 0.7, 'Dry': 0.5 } # Apply season adjustments season_multiplier = weather_data['season'].map(season_factors) # Calculate estimated NDVI estimated_ndvi = ( 0.4 * normalized_temp + 0.3 * normalized_humidity + 0.3 * normalized_rainfall ) * season_multiplier return np.clip(estimated_ndvi, -1, 1) except Exception as e: print(f"Error estimating NDVI: {e}") return pd.Series(0, index=weather_data.index)