import numpy as np import cv2 from segment_anything import sam_model_registry, SamPredictor import matplotlib.pyplot as plt from PIL import Image class SAMAnalyzer: def __init__(self, model_path="sam_vit_h_4b8939.pth"): """Initialize SAM model for farmland segmentation""" self.model_path = model_path self.sam = None self.predictor = None try: print("Initializing SAM model...") self.sam = sam_model_registry["vit_h"](checkpoint=self.model_path) self.predictor = SamPredictor(self.sam) print("SAM model initialized successfully") except Exception as e: print(f"Error initializing SAM model: {e}") raise def process_image(self, image): """Process uploaded image using SAM""" try: print("Starting image processing...") if image is None: raise ValueError("No image provided") # Ensure image is in correct format if isinstance(image, np.ndarray): if len(image.shape) == 2: # Grayscale image = np.stack((image,)*3, axis=-1) elif len(image.shape) == 3 and image.shape[2] == 4: # RGBA image = image[:,:,:3] # Ensure image is in RGB format if image.shape[2] == 3: image = cv2.cvtColor(cv2.cvtColor(image, cv2.COLOR_RGB2BGR), cv2.COLOR_BGR2RGB) else: raise ValueError("Invalid image format") print("Segmenting farmland...") farmland_mask = self.segment_farmland(image) print("Calculating vegetation index...") veg_index = self.calculate_vegetation_index(image, farmland_mask) print("Analyzing crop health...") health_analysis = self.analyze_crop_health(veg_index, farmland_mask) print("Creating visualization...") fig = self.create_visualization(image, farmland_mask, veg_index) return veg_index, health_analysis, fig except Exception as e: print(f"Error in image processing: {e}") return None, None, None def segment_farmland(self, image): """Segment farmland using SAM2""" try: self.predictor.set_image(image) # Generate automatic mask proposals h, w = image.shape[:2] input_point = np.array([[w//2, h//2]]) # Center point input_label = np.array([1]) # Foreground masks, scores, logits = self.predictor.predict( point_coords=input_point, point_labels=input_label, multimask_output=True ) # Select best mask if len(masks) > 0: best_mask = masks[scores.argmax()] return best_mask else: raise ValueError("No valid masks generated") except Exception as e: print(f"Error in farmland segmentation: {e}") raise def calculate_vegetation_index(self, image, mask): """Calculate vegetation index using RGB""" try: # Extract RGB channels r, g, b = image[:,:,0], image[:,:,1], image[:,:,2] # Calculate visible-band vegetation index numerator = (2 * g.astype(float) - r.astype(float) - b.astype(float)) denominator = (2 * g.astype(float) + r.astype(float) + b.astype(float)) # Avoid division by zero denominator[denominator == 0] = 1e-10 # Calculate index and normalize veg_index = numerator / denominator veg_index = (veg_index + 1) / 2 # Normalize to 0-1 range # Apply mask veg_index = veg_index * mask return veg_index except Exception as e: print(f"Error calculating vegetation index: {e}") raise def analyze_crop_health(self, veg_index, mask): """Analyze crop health and insurance risk based on vegetation index""" try: valid_pixels = veg_index[mask > 0] if len(valid_pixels) == 0: return { 'average_index': 0, 'health_distribution': { 'low_vegetation': 0, 'moderate_vegetation': 0, 'high_vegetation': 0 }, 'overall_health': 'No vegetation detected', 'farm_size': 0, 'insurance_risk': 'Very High', 'insurance_recommendations': 'Cannot assess insurance without vegetation data' } # Calculate farm size (approximate) farm_size = np.sum(mask) * 0.0001 # Convert pixels to hectares (approximate) avg_index = np.mean(valid_pixels) health_categories = { 'low_vegetation': np.sum((valid_pixels <= 0.3)) / len(valid_pixels), 'moderate_vegetation': np.sum((valid_pixels > 0.3) & (valid_pixels <= 0.6)) / len(valid_pixels), 'high_vegetation': np.sum((valid_pixels > 0.6)) / len(valid_pixels) } # Calculate vegetation uniformity vegetation_uniformity = np.std(valid_pixels) # Determine insurance risk level insurance_risk = self.calculate_insurance_risk( avg_index, health_categories, vegetation_uniformity, farm_size ) # Get insurance recommendations insurance_recommendations = self.get_insurance_recommendations( insurance_risk, health_categories, farm_size ) return { 'average_index': avg_index, 'health_distribution': health_categories, 'overall_health': 'Healthy' if avg_index > 0.5 else 'Needs attention', 'farm_size': farm_size, 'vegetation_uniformity': vegetation_uniformity, 'insurance_risk': insurance_risk, 'insurance_recommendations': insurance_recommendations } except Exception as e: print(f"Error analyzing crop health: {e}") raise def calculate_insurance_risk(self, avg_index, health_distribution, uniformity, farm_size): """Calculate insurance risk level based on vegetation analysis""" risk_score = 0 # Vegetation health risk (0-40 points) if avg_index >= 0.6: risk_score += 40 elif avg_index >= 0.4: risk_score += 25 elif avg_index >= 0.2: risk_score += 10 # Vegetation distribution risk (0-30 points) if health_distribution['high_vegetation'] > 0.6: risk_score += 30 elif health_distribution['high_vegetation'] > 0.4: risk_score += 20 elif health_distribution['moderate_vegetation'] > 0.5: risk_score += 15 # Uniformity risk (0-20 points) if uniformity < 0.1: risk_score += 20 elif uniformity < 0.2: risk_score += 15 elif uniformity < 0.3: risk_score += 10 # Farm size risk (0-10 points) if farm_size > 10: # Large farm risk_score += 10 elif farm_size > 5: # Medium farm risk_score += 7 elif farm_size > 2: # Small farm risk_score += 5 # Determine risk level if risk_score >= 80: return "Low" elif risk_score >= 60: return "Moderate" elif risk_score >= 40: return "High" else: return "Very High" def get_insurance_recommendations(self, risk_level, health_distribution, farm_size): """Get detailed insurance recommendations based on analysis""" base_recommendations = { "Low": { "policy_type": "Standard Coverage", "premium_level": "Lower premiums likely", "coverage_options": [ "• Basic crop insurance", "• Optional revenue protection", "• Minimal deductible options", "• Standard natural disaster coverage" ], "additional_notes": "Farm shows good health indicators; standard coverage should be sufficient" }, "Moderate": { "policy_type": "Enhanced Coverage", "premium_level": "Moderate premiums", "coverage_options": [ "• Enhanced crop insurance", "• Revenue protection recommended", "• Weather index insurance", "• Moderate deductible options", "• Extended natural disaster coverage" ], "additional_notes": "Consider additional coverage for specific risks" }, "High": { "policy_type": "Comprehensive Coverage", "premium_level": "Higher premiums likely", "coverage_options": [ "• Comprehensive crop insurance", "• Multi-peril crop insurance recommended", "• Weather index insurance strongly advised", "• Consider higher coverage limits", "• Full natural disaster coverage", "• Supplemental coverage option (SCO)" ], "additional_notes": "Risk mitigation strategies should be implemented alongside insurance" }, "Very High": { "policy_type": "Maximum Coverage", "premium_level": "Significant premiums", "coverage_options": [ "• Maximum coverage crop insurance", "• Multi-peril crop insurance required", "• Weather index insurance essential", "• Additional risk management tools needed", "• Maximum natural disaster coverage", "• Enhanced coverage option (ECO)", "• Consider crop diversification insurance" ], "additional_notes": "Immediate risk mitigation actions recommended before planting" } } # Adjust recommendations based on farm size size_category = "small" if farm_size < 5 else "medium" if farm_size < 10 else "large" recommendations = base_recommendations[risk_level] # Add size-specific recommendations if size_category == "small": recommendations["coverage_options"].append("• Consider cooperative insurance options") recommendations["coverage_options"].append("• Micro-insurance options available") elif size_category == "medium": recommendations["coverage_options"].append("• Consider split coverage options") recommendations["coverage_options"].append("• Zone-based coverage recommended") else: recommendations["coverage_options"].append("• Consider zone-based coverage options") recommendations["coverage_options"].append("• Enterprise unit structure recommended") recommendations["coverage_options"].append("• Custom risk management solutions") return recommendations def create_visualization(self, image, mask, veg_index): """Create visualization of results""" try: # Clear any existing plots plt.close('all') # Create figure fig = plt.figure(figsize=(15, 5)) # Original image with mask overlay plt.subplot(131) plt.imshow(image) plt.imshow(mask, alpha=0.3, cmap='gray') plt.title('Segmented Farmland') plt.axis('off') # Vegetation index heatmap plt.subplot(132) im = plt.imshow(veg_index, cmap='RdYlGn', vmin=0, vmax=1) plt.colorbar(im, label='Vegetation Index') plt.title('Vegetation Index') plt.axis('off') # Health classification plt.subplot(133) health_mask = np.zeros_like(veg_index) health_mask[veg_index <= 0.3] = 1 # Low health_mask[(veg_index > 0.3) & (veg_index <= 0.6)] = 2 # Moderate health_mask[veg_index > 0.6] = 3 # High health_mask = health_mask * mask im = plt.imshow(health_mask, cmap='viridis', vmin=1, vmax=3) cbar = plt.colorbar(im, ticks=[1, 2, 3]) cbar.set_label('Vegetation Levels') cbar.set_ticklabels(['Low', 'Moderate', 'High']) plt.title('Vegetation Levels') plt.axis('off') # Adjust layout plt.tight_layout() return fig except Exception as e: print(f"Error creating visualization: {e}") raise def format_insurance_analysis(self, health_analysis): """Format insurance analysis results as text""" try: farm_size = health_analysis['farm_size'] risk_level = health_analysis['insurance_risk'] recommendations = health_analysis['insurance_recommendations'] return f""" 🏗️ Farm Analysis: • Approximate Size: {farm_size:.1f} hectares • Vegetation Uniformity: {health_analysis['vegetation_uniformity']:.2f} 🎯 Insurance Risk Level: {risk_level} 💡 Recommended Insurance Strategy: • Policy Type: {recommendations['policy_type']} • Premium Level: {recommendations['premium_level']} 📋 Recommended Coverage Options: {chr(10).join(recommendations['coverage_options'])} 📝 Additional Notes: {recommendations['additional_notes']} """ except Exception as e: print(f"Error formatting insurance analysis: {e}") return "Error generating insurance recommendations"