|
import os |
|
import numpy as np |
|
import pandas as pd |
|
from datetime import datetime, timedelta |
|
import requests |
|
from geopy.geocoders import Nominatim |
|
from geopy.exc import GeocoderTimedOut |
|
from scipy import stats |
|
|
|
|
|
OPENWEATHER_API_KEY = os.getenv('OPENWEATHER_API_KEY', 'default_key') |
|
|
|
class TobaccoAnalyzer: |
|
def __init__(self): |
|
self.api_key = OPENWEATHER_API_KEY |
|
self.optimal_conditions = { |
|
'temperature': {'min': 20, 'max': 30}, |
|
'humidity': {'min': 60, 'max': 80}, |
|
'rainfall': {'min': 500/365, 'max': 1200/365}, |
|
'ndvi': {'min': 0.3, 'max': 0.8} |
|
} |
|
self.geolocator = Nominatim(user_agent="tobacco_analyzer") |
|
self.tanzania_seasons = { |
|
1: 'Main', 2: 'Main', 3: 'Main', |
|
4: 'Late', 5: 'Late', 6: 'Dry', |
|
7: 'Dry', 8: 'Dry', 9: 'Early', |
|
10: 'Early', 11: 'Early', 12: 'Main' |
|
} |
|
|
|
def geocode_location(self, location_name): |
|
"""Convert location name to coordinates""" |
|
try: |
|
location = self.geolocator.geocode(location_name) |
|
if location: |
|
return { |
|
'lat': location.latitude, |
|
'lon': location.longitude, |
|
'address': location.address, |
|
'region': self.get_tanzania_region(location.address) |
|
} |
|
return None |
|
except GeocoderTimedOut: |
|
return None |
|
|
|
def get_tanzania_region(self, address): |
|
"""Extract Tanzania region from address""" |
|
if address: |
|
address_parts = address.lower().split(',') |
|
tanzania_regions = ['tabora', 'urambo', 'sikonge', 'nzega'] |
|
for part in address_parts: |
|
if any(region in part.strip() for region in tanzania_regions): |
|
return part.strip() |
|
return None |
|
|
|
def get_weather_data(self, lat, lon, historical_days=90, forecast_days=90): |
|
"""Get historical and forecast weather data""" |
|
historical_data = [] |
|
|
|
|
|
for day in range(historical_days): |
|
date = datetime.now() - timedelta(days=day) |
|
url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={self.api_key}&units=metric&dt={int(date.timestamp())}" |
|
try: |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
data = response.json() |
|
weather_data = { |
|
'date': date, |
|
'temperature': float(data['main']['temp']), |
|
'humidity': float(data['main']['humidity']), |
|
'rainfall': float(data.get('rain', {}).get('1h', 0)) * 24, |
|
'type': 'historical', |
|
'description': data['weather'][0]['description'], |
|
'temp_min': float(data['main']['temp_min']), |
|
'temp_max': float(data['main']['temp_max']) |
|
} |
|
historical_data.append(weather_data) |
|
except Exception as e: |
|
print(f"Error fetching historical data: {e}") |
|
|
|
|
|
forecast_data = [] |
|
try: |
|
forecast_url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={self.api_key}&units=metric" |
|
response = requests.get(forecast_url) |
|
if response.status_code == 200: |
|
data = response.json() |
|
for item in data['list']: |
|
date = datetime.fromtimestamp(item['dt']) |
|
forecast = { |
|
'date': date, |
|
'temperature': float(item['main']['temp']), |
|
'humidity': float(item['main']['humidity']), |
|
'rainfall': float(item.get('rain', {}).get('3h', 0)) * 8, |
|
'type': 'forecast', |
|
'description': item['weather'][0]['description'], |
|
'temp_min': float(item['main']['temp_min']), |
|
'temp_max': float(item['main']['temp_max']) |
|
} |
|
forecast_data.append(forecast) |
|
|
|
except Exception as e: |
|
print(f"Error fetching forecast data: {e}") |
|
|
|
|
|
all_data = pd.DataFrame(historical_data + forecast_data) |
|
|
|
if not all_data.empty: |
|
|
|
all_data = all_data.sort_values('date') |
|
|
|
|
|
all_data['month'] = all_data['date'].dt.month |
|
all_data['season'] = all_data['month'].map(self.tanzania_seasons) |
|
|
|
|
|
all_data['temp_range'] = all_data['temp_max'] - all_data['temp_min'] |
|
|
|
|
|
all_data['temp_7day_avg'] = all_data['temperature'].rolling(window=7, min_periods=1).mean() |
|
all_data['humidity_7day_avg'] = all_data['humidity'].rolling(window=7, min_periods=1).mean() |
|
all_data['rainfall_7day_avg'] = all_data['rainfall'].rolling(window=7, min_periods=1).mean() |
|
|
|
|
|
all_data['daily_suitability'] = self.calculate_daily_suitability(all_data) |
|
all_data['estimated_ndvi'] = self.estimate_ndvi(all_data) |
|
|
|
return all_data |
|
|
|
return pd.DataFrame() |
|
|
|
def analyze_trends(self, df): |
|
"""Analyze weather trends and patterns""" |
|
try: |
|
historical = df[df['type'] == 'historical'] |
|
forecast = df[df['type'].isin(['forecast', 'forecast_extended'])] |
|
|
|
if len(historical) < 2: |
|
return None |
|
|
|
|
|
historical['days'] = (historical['date'] - historical['date'].min()).dt.total_seconds() / (24*60*60) |
|
|
|
|
|
temp_trend = stats.linregress(historical['days'], historical['temperature']) |
|
humidity_trend = stats.linregress(historical['days'], historical['humidity']) |
|
rainfall_trend = stats.linregress(historical['days'], historical['rainfall']) |
|
ndvi_trend = stats.linregress(historical['days'], historical['estimated_ndvi']) |
|
|
|
analysis = { |
|
'historical': { |
|
'temperature': { |
|
'mean': historical['temperature'].mean(), |
|
'std': historical['temperature'].std(), |
|
'trend': temp_trend.slope, |
|
'trend_r2': temp_trend.rvalue**2 |
|
}, |
|
'humidity': { |
|
'mean': historical['humidity'].mean(), |
|
'std': historical['humidity'].std(), |
|
'trend': humidity_trend.slope, |
|
'trend_r2': humidity_trend.rvalue**2 |
|
}, |
|
'rainfall': { |
|
'mean': historical['rainfall'].mean(), |
|
'std': historical['rainfall'].std(), |
|
'trend': rainfall_trend.slope, |
|
'trend_r2': rainfall_trend.rvalue**2 |
|
}, |
|
'ndvi': { |
|
'mean': historical['estimated_ndvi'].mean(), |
|
'std': historical['estimated_ndvi'].std(), |
|
'trend': ndvi_trend.slope, |
|
'trend_r2': ndvi_trend.rvalue**2 |
|
} |
|
} |
|
} |
|
|
|
if not forecast.empty: |
|
analysis['forecast'] = { |
|
'temperature': { |
|
'mean': forecast['temperature'].mean(), |
|
'std': forecast['temperature'].std() |
|
}, |
|
'humidity': { |
|
'mean': forecast['humidity'].mean(), |
|
'std': forecast['humidity'].std() |
|
}, |
|
'rainfall': { |
|
'mean': forecast['rainfall'].mean(), |
|
'std': forecast['rainfall'].std() |
|
}, |
|
'ndvi': { |
|
'mean': forecast['estimated_ndvi'].mean(), |
|
'std': forecast['estimated_ndvi'].std() |
|
} |
|
} |
|
|
|
return analysis |
|
|
|
except Exception as e: |
|
print(f"Error in trend analysis: {e}") |
|
return None |
|
|
|
def calculate_daily_suitability(self, df): |
|
"""Calculate daily growing suitability""" |
|
try: |
|
|
|
temp_suit = 1 - np.clip(abs(df['temperature'] - 25) / 10, 0, 1) |
|
|
|
|
|
temp_range_suit = 1 - np.clip(df['temp_range'] / 15, 0, 1) |
|
|
|
|
|
humidity_suit = 1 - np.clip(abs(df['humidity'] - 70) / 30, 0, 1) |
|
|
|
|
|
daily_rainfall_target = (self.optimal_conditions['rainfall']['min'] + |
|
self.optimal_conditions['rainfall']['max']) / 2 |
|
rainfall_suit = 1 - np.clip(abs(df['rainfall'] - daily_rainfall_target) / |
|
daily_rainfall_target, 0, 1) |
|
|
|
|
|
suitability = ( |
|
0.35 * temp_suit + |
|
0.15 * temp_range_suit + |
|
0.25 * humidity_suit + |
|
0.25 * rainfall_suit |
|
) |
|
|
|
return np.clip(suitability, 0, 1) |
|
|
|
except Exception as e: |
|
print(f"Error calculating suitability: {e}") |
|
return pd.Series(0.5, index=df.index) |
|
|
|
def estimate_ndvi(self, weather_data): |
|
"""Estimate NDVI based on weather conditions""" |
|
try: |
|
|
|
normalized_temp = (weather_data['temperature'] - 15) / (30 - 15) |
|
normalized_humidity = (weather_data['humidity'] - 50) / (80 - 50) |
|
normalized_rainfall = weather_data['rainfall'] / 5 |
|
|
|
|
|
season_factors = { |
|
'Main': 1.0, |
|
'Early': 0.8, |
|
'Late': 0.7, |
|
'Dry': 0.5 |
|
} |
|
|
|
|
|
season_multiplier = weather_data['season'].map(season_factors) |
|
|
|
|
|
estimated_ndvi = ( |
|
0.4 * normalized_temp + |
|
0.3 * normalized_humidity + |
|
0.3 * normalized_rainfall |
|
) * season_multiplier |
|
|
|
return np.clip(estimated_ndvi, -1, 1) |
|
|
|
except Exception as e: |
|
print(f"Error estimating NDVI: {e}") |
|
return pd.Series(0, index=weather_data.index) |