import os |
import numpy as np |
import pandas as pd |
from datetime import datetime, timedelta |
import requests |
from geopy.geocoders import Nominatim |
from geopy.exc import GeocoderTimedOut |
from scipy import stats |
OPENWEATHER_API_KEY = os.getenv('OPENWEATHER_API_KEY', 'default_key') |
class TobaccoAnalyzer: |
def __init__(self): |
self.api_key = OPENWEATHER_API_KEY |
self.optimal_conditions = { |
'temperature': {'min': 20, 'max': 30}, |
'humidity': {'min': 60, 'max': 80}, |
'rainfall': {'min': 500/365, 'max': 1200/365}, |
'ndvi': {'min': 0.3, 'max': 0.8} |
} |
self.geolocator = Nominatim(user_agent="tobacco_analyzer") |
self.tanzania_seasons = { |
1: 'Main', 2: 'Main', 3: 'Main', |
4: 'Late', 5: 'Late', 6: 'Dry', |
7: 'Dry', 8: 'Dry', 9: 'Early', |
10: 'Early', 11: 'Early', 12: 'Main' |
} |
def geocode_location(self, location_name): |
"""Convert location name to coordinates""" |
try: |
location = self.geolocator.geocode(location_name) |
if location: |
return { |
'lat': location.latitude, |
'lon': location.longitude, |
'address': location.address, |
'region': self.get_tanzania_region(location.address) |
} |
return None |
except GeocoderTimedOut: |
return None |
def get_tanzania_region(self, address): |
"""Extract Tanzania region from address""" |
if address: |
address_parts = address.lower().split(',') |
tanzania_regions = ['tabora', 'urambo', 'sikonge', 'nzega'] |
for part in address_parts: |
if any(region in part.strip() for region in tanzania_regions): |
return part.strip() |
return None |
def get_weather_data(self, lat, lon, historical_days=90, forecast_days=90): |
"""Get historical and forecast weather data""" |
historical_data = [] |
for day in range(historical_days): |
date = datetime.now() - timedelta(days=day) |
url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={self.api_key}&units=metric&dt={int(date.timestamp())}" |
try: |
response = requests.get(url) |
if response.status_code == 200: |
data = response.json() |
weather_data = { |
'date': date, |
'temperature': float(data['main']['temp']), |
'humidity': float(data['main']['humidity']), |
'rainfall': float(data.get('rain', {}).get('1h', 0)) * 24, |
'type': 'historical', |
'description': data['weather'][0]['description'], |
'temp_min': float(data['main']['temp_min']), |
'temp_max': float(data['main']['temp_max']) |
} |
historical_data.append(weather_data) |
except Exception as e: |
print(f"Error fetching historical data: {e}") |
forecast_data = [] |
try: |
forecast_url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={self.api_key}&units=metric" |
response = requests.get(forecast_url) |
if response.status_code == 200: |
data = response.json() |
for item in data['list']: |
date = datetime.fromtimestamp(item['dt']) |
forecast = { |
'date': date, |
'temperature': float(item['main']['temp']), |
'humidity': float(item['main']['humidity']), |
'rainfall': float(item.get('rain', {}).get('3h', 0)) * 8, |
'type': 'forecast', |
'description': item['weather'][0]['description'], |
'temp_min': float(item['main']['temp_min']), |
'temp_max': float(item['main']['temp_max']) |
} |
forecast_data.append(forecast) |
except Exception as e: |
print(f"Error fetching forecast data: {e}") |
all_data = pd.DataFrame(historical_data + forecast_data) |
if not all_data.empty: |
all_data = all_data.sort_values('date') |
all_data['month'] = all_data['date'].dt.month |
all_data['season'] = all_data['month'].map(self.tanzania_seasons) |
all_data['temp_range'] = all_data['temp_max'] - all_data['temp_min'] |
all_data['temp_7day_avg'] = all_data['temperature'].rolling(window=7, min_periods=1).mean() |
all_data['humidity_7day_avg'] = all_data['humidity'].rolling(window=7, min_periods=1).mean() |
all_data['rainfall_7day_avg'] = all_data['rainfall'].rolling(window=7, min_periods=1).mean() |
all_data['daily_suitability'] = self.calculate_daily_suitability(all_data) |
all_data['estimated_ndvi'] = self.estimate_ndvi(all_data) |
return all_data |
return pd.DataFrame() |
def analyze_trends(self, df): |
"""Analyze weather trends and patterns""" |
try: |
historical = df[df['type'] == 'historical'] |
forecast = df[df['type'].isin(['forecast', 'forecast_extended'])] |
if len(historical) < 2: |
return None |
historical['days'] = (historical['date'] - historical['date'].min()).dt.total_seconds() / (24*60*60) |
temp_trend = stats.linregress(historical['days'], historical['temperature']) |
humidity_trend = stats.linregress(historical['days'], historical['humidity']) |
rainfall_trend = stats.linregress(historical['days'], historical['rainfall']) |
ndvi_trend = stats.linregress(historical['days'], historical['estimated_ndvi']) |
analysis = { |
'historical': { |
'temperature': { |
'mean': historical['temperature'].mean(), |
'std': historical['temperature'].std(), |
'trend': temp_trend.slope, |
'trend_r2': temp_trend.rvalue**2 |
}, |
'humidity': { |
'mean': historical['humidity'].mean(), |
'std': historical['humidity'].std(), |
'trend': humidity_trend.slope, |
'trend_r2': humidity_trend.rvalue**2 |
}, |
'rainfall': { |
'mean': historical['rainfall'].mean(), |
'std': historical['rainfall'].std(), |
'trend': rainfall_trend.slope, |
'trend_r2': rainfall_trend.rvalue**2 |
}, |
'ndvi': { |
'mean': historical['estimated_ndvi'].mean(), |
'std': historical['estimated_ndvi'].std(), |
'trend': ndvi_trend.slope, |
'trend_r2': ndvi_trend.rvalue**2 |
} |
} |
} |
if not forecast.empty: |
analysis['forecast'] = { |
'temperature': { |
'mean': forecast['temperature'].mean(), |
'std': forecast['temperature'].std() |
}, |
'humidity': { |
'mean': forecast['humidity'].mean(), |
'std': forecast['humidity'].std() |
}, |
'rainfall': { |
'mean': forecast['rainfall'].mean(), |
'std': forecast['rainfall'].std() |
}, |
'ndvi': { |
'mean': forecast['estimated_ndvi'].mean(), |
'std': forecast['estimated_ndvi'].std() |
} |
} |
return analysis |
except Exception as e: |
print(f"Error in trend analysis: {e}") |
return None |
def calculate_daily_suitability(self, df): |
"""Calculate daily growing suitability""" |
try: |
temp_suit = 1 - np.clip(abs(df['temperature'] - 25) / 10, 0, 1) |
temp_range_suit = 1 - np.clip(df['temp_range'] / 15, 0, 1) |
humidity_suit = 1 - np.clip(abs(df['humidity'] - 70) / 30, 0, 1) |
daily_rainfall_target = (self.optimal_conditions['rainfall']['min'] + |
self.optimal_conditions['rainfall']['max']) / 2 |
rainfall_suit = 1 - np.clip(abs(df['rainfall'] - daily_rainfall_target) / |
daily_rainfall_target, 0, 1) |
suitability = ( |
0.35 * temp_suit + |
0.15 * temp_range_suit + |
0.25 * humidity_suit + |
0.25 * rainfall_suit |
) |
return np.clip(suitability, 0, 1) |
except Exception as e: |
print(f"Error calculating suitability: {e}") |
return pd.Series(0.5, index=df.index) |
def estimate_ndvi(self, weather_data): |
"""Estimate NDVI based on weather conditions""" |
try: |
normalized_temp = (weather_data['temperature'] - 15) / (30 - 15) |
normalized_humidity = (weather_data['humidity'] - 50) / (80 - 50) |
normalized_rainfall = weather_data['rainfall'] / 5 |
season_factors = { |
'Main': 1.0, |
'Early': 0.8, |
'Late': 0.7, |
'Dry': 0.5 |
} |
season_multiplier = weather_data['season'].map(season_factors) |
estimated_ndvi = ( |
0.4 * normalized_temp + |
0.3 * normalized_humidity + |
0.3 * normalized_rainfall |
) * season_multiplier |
return np.clip(estimated_ndvi, -1, 1) |
except Exception as e: |
print(f"Error estimating NDVI: {e}") |
return pd.Series(0, index=weather_data.index) |