MAGInet_demo / app.py
OnsAouedi's picture
Update app.py
3d84896 verified
import torch
import torch.nn as nn
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
import os
import logging
import joblib
from tqdm import tqdm
import tempfile
import json
from math import radians, cos, sin, asin, sqrt, atan2, degrees
import time
import functools
# ============================
# Configure Logging
# ============================
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def add_time_decimal_feature(df):
"""
Add 'time_decimal' feature by combining 'hour' and 'minutes'.
:param df: DataFrame with 'hour' and 'minutes' columns.
:return: DataFrame with 'time_decimal' and without 'hour' and 'minutes'.
"""
if 'time_decimal' in df.columns:
logging.info("'time_decimal' feature already exists. Skipping creation.")
return df
elif 'hour' in df.columns and 'minutes' in df.columns:
logging.info("Adding 'time_decimal' feature...")
df['time_decimal'] = df['hour'] + df['minutes'] / 60.0
df = df.drop(columns=['hour', 'minutes']) # Drop 'hour' and 'minutes' after creation
logging.info("'time_decimal' feature added.")
else:
logging.warning("Neither 'time_decimal' nor 'hour' and 'minutes' columns found. Cannot create 'time_decimal' feature.")
raise ValueError("Input data must contain 'time_decimal' or both 'hour' and 'minutes' columns.")
return df
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great-circle distance between two points on the Earth.
:param lon1: Longitude of point 1 (in decimal degrees)
:param lat1: Latitude of point 1 (in decimal degrees)
:param lon2: Longitude of point 2 (in decimal degrees)
:param lat2: Latitude of point 2 (in decimal degrees)
:return: Distance in kilometers
"""
# Convert decimal degrees to radians
lon1_rad, lat1_rad, lon2_rad, lat2_rad = map(np.radians, [lon1, lat1, lon2, lat2])
# Haversine formula
dlon = lon2_rad - lon1_rad
dlat = lat2_rad - lat1_rad
a = np.sin(dlat/2)**2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon/2)**2
c = 2 * np.arcsin(np.sqrt(a))
r = 6371 # Radius of Earth in kilometers
return c * r
def calculate_bearing(lon1, lat1, lon2, lat2):
"""
Calculate the bearing between two points.
:param lon1: Longitude of point 1 (in decimal degrees)
:param lat1: Latitude of point 1 (in decimal degrees)
:param lon2: Longitude of point 2 (in decimal degrees)
:param lat2: Latitude of point 2 (in decimal degrees)
:return: Bearing in degrees
"""
# Convert decimal degrees to radians
lon1_rad, lat1_rad, lon2_rad, lat2_rad = map(radians, [lon1, lat1, lon2, lat2])
dlon = lon2_rad - lon1_rad
x = sin(dlon) * cos(lat2_rad)
y = cos(lat1_rad) * sin(lat2_rad) - (sin(lat1_rad) * cos(lat2_rad) * cos(dlon))
initial_bearing = atan2(x, y)
# Convert from radians to degrees and normalize
initial_bearing = degrees(initial_bearing)
compass_bearing = (initial_bearing + 360) % 360
return compass_bearing
def angular_divergence(bearing1, bearing2):
"""
Calculate the smallest angle difference between two bearings.
:param bearing1: First bearing in degrees
:param bearing2: Second bearing in degrees
:return: Angular divergence in degrees
"""
diff = abs(bearing1 - bearing2) % 360
return min(diff, 360 - diff)
def denormalize(scaled_lat, scaled_lon, scaler, lat_idx, lon_idx):
"""
Denormalize latitude and longitude using the scaler's parameters.
:param scaled_lat: Scaled latitude values (numpy array).
:param scaled_lon: Scaled longitude values (numpy array).
:param scaler: The scaler object used for normalization.
:param lat_idx: Index of 'latitude_degrees' in the scaler's feature list.
:param lon_idx: Index of 'longitude_degrees' in the scaler's feature list.
:return: Tuple of (denormalized_lat, denormalized_lon).
"""
lat_min = scaler.data_min_[lat_idx]
lat_max = scaler.data_max_[lat_idx]
lon_min = scaler.data_min_[lon_idx]
lon_max = scaler.data_max_[lon_idx]
denorm_lat = scaled_lat * (lat_max - lat_min) + lat_min
denorm_lon = scaled_lon * (lon_max - lon_min) + lon_min
return denorm_lat, denorm_lon
def create_dataset_grouped_by_mmsi(df_scaled, seq_len, forecast_horizon, features_to_scale, future_features):
"""
Create input and output sequences grouped by original MMSI.
Returns scaled last known positions.
"""
Xs, ys, mmsis = [], [], []
last_known_positions_scaled = []
grouped = df_scaled.groupby('original_mmsi')
for mmsi, group in tqdm(grouped, desc="Creating sequences"):
if len(group) >= seq_len + forecast_horizon:
for i in range(len(group) - seq_len - forecast_horizon + 1):
# Select scaled features for the sequence
sequence = group.iloc[i:(i + seq_len)][features_to_scale].to_numpy()
# Future positions to predict (scaled)
future_positions = group[['latitude_degrees', 'longitude_degrees']].iloc[i + seq_len:i + seq_len + forecast_horizon].to_numpy()
# Future features
future_feature_values = group[future_features].iloc[i + seq_len].values
future_feature_array = np.tile(future_feature_values, (seq_len, 1))
# Combine sequence with future features
sequence_with_future_features = np.hstack((sequence, future_feature_array))
Xs.append(sequence_with_future_features)
ys.append(future_positions)
mmsis.append(mmsi)
# Store last known positions (scaled)
last_lat_scaled = group['latitude_degrees'].iloc[i + seq_len - 1]
last_lon_scaled = group['longitude_degrees'].iloc[i + seq_len - 1]
last_known_positions_scaled.append((last_lat_scaled, last_lon_scaled))
return np.array(Xs, dtype=np.float32), np.array(ys, dtype=np.float32), np.array(mmsis), last_known_positions_scaled
# ============================
# Model Definitions
# ============================
class LSTMModelTeacher(nn.Module):
def __init__(self, in_dim, hidden_dim, forecast_horizon, n_layers=7, dropout=0.2):
"""
Teacher LSTM Model.
:param in_dim: Number of input features.
:param hidden_dim: Number of hidden units.
:param forecast_horizon: Number of future steps to predict.
:param n_layers: Number of LSTM layers.
:param dropout: Dropout rate.
"""
super(LSTMModelTeacher, self).__init__()
self.forecast_horizon = forecast_horizon # Store as an instance attribute
self.embedding = nn.Linear(in_dim, hidden_dim)
self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers=n_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_dim, forecast_horizon * 2)
def forward(self, x):
x = self.embedding(x)
x, _ = self.lstm(x)
x = self.fc(x[:, -1, :]) # Use the last timestep for prediction
x = x.view(-1, self.forecast_horizon, 2) # Shape: (batch_size, forecast_horizon, 2)
return x
class LSTMModelStudent(nn.Module):
def __init__(self, in_dim, hidden_dim, forecast_horizon, n_layers=3, dropout=0.2):
"""
Student LSTM Model.
:param in_dim: Number of input features.
:param hidden_dim: Number of hidden units.
:param forecast_horizon: Number of future steps to predict.
:param n_layers: Number of LSTM layers.
:param dropout: Dropout rate.
"""
super(LSTMModelStudent, self).__init__()
self.forecast_horizon = forecast_horizon # Store as an instance attribute
self.embedding = nn.Linear(in_dim, hidden_dim)
self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers=n_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_dim, forecast_horizon * 2)
def forward(self, x):
x = self.embedding(x)
x, _ = self.lstm(x)
x = self.fc(x[:, -1, :]) # Use the last timestep for prediction
x = x.view(-1, self.forecast_horizon, 2) # Shape: (batch_size, forecast_horizon, 2)
return x
# ============================
# Model Loading Functions
# ============================
def load_models(model_paths):
"""
Load teacher, student, and cargo vessel models, including submodels for North, Mid, and South areas.
:param model_paths: Dictionary containing paths to the models.
:return: Dictionary of loaded models.
"""
models = {}
logging.info("Loading Teacher model...")
# Teacher model input dimension
teacher_in_dim = 15 # Features including 'future_hour_feature' (time_decimal)
# Load Teacher Model (Global)
teacher = LSTMModelTeacher(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=7, dropout=0.2)
teacher.load_state_dict(torch.load(model_paths['teacher'], map_location=torch.device('cpu')))
teacher.eval()
models['Teacher'] = teacher
logging.info("Teacher model loaded successfully.")
logging.info("Loading Student North model...")
# Student North model input dimension is the same as teacher
student_north = LSTMModelStudent(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=3, dropout=0.2)
student_north.load_state_dict(torch.load(model_paths['student_north'], map_location=torch.device('cpu')))
student_north.eval()
models['Student_North'] = student_north
logging.info("Student North model loaded successfully.")
logging.info("Loading Student Mid model...")
student_mid = LSTMModelStudent(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=3, dropout=0.2)
student_mid.load_state_dict(torch.load(model_paths['student_mid'], map_location=torch.device('cpu')))
student_mid.eval()
models['Student_Mid'] = student_mid
logging.info("Student Mid model loaded successfully.")
logging.info("Loading Student South model...")
student_south = LSTMModelStudent(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=3, dropout=0.2)
student_south.load_state_dict(torch.load(model_paths['student_south'], map_location=torch.device('cpu')))
student_south.eval()
models['Student_South'] = student_south
logging.info("Student South model loaded successfully.")
# Load Cargo Vessel model
logging.info("Loading Cargo Vessel model...")
# Cargo Vessel model input dimension
cargo_in_dim = 6 + 3 # + 3 future features ('day', 'month', 'time_decimal')
cargo_model = LSTMModelTeacher(in_dim=cargo_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=10, dropout=0.2)
cargo_model.load_state_dict(torch.load(model_paths['cargo_vessel'], map_location=torch.device('cpu')))
cargo_model.eval()
models['Cargo_Vessel'] = cargo_model
logging.info("Cargo Vessel model loaded successfully.")
return models
def load_scalers(scaler_paths):
"""
Load scalers for each model.
:param scaler_paths: Dictionary containing paths to the scaler files.
:return: Dictionary of loaded scalers.
"""
loaded_scalers = {}
for model_name, scaler_path in scaler_paths.items():
if os.path.exists(scaler_path):
loaded_scalers[model_name] = joblib.load(scaler_path)
logging.info(f"Loaded scaler for {model_name} from '{scaler_path}'.")
else:
logging.error(f"Scaler file for {model_name} not found at '{scaler_path}'.")
raise FileNotFoundError(f"Scaler file for {model_name} not found at '{scaler_path}'. Please provide the correct path.")
return loaded_scalers
def determine_subarea(df):
"""
Determine the sub-area (North, Mid, South) based on latitude and longitude ranges.
:param df: DataFrame containing 'latitude_degrees' and 'longitude_degrees'.
:return: String indicating the sub-area.
"""
# Define sub-area boundaries
subareas = {
'North': {'lat_min': 30, 'lat_max': 60, 'lon_min': -80, 'lon_max': -10},
'Mid': {'lat_min': 0, 'lat_max': 30, 'lon_min': -80, 'lon_max': 10},
'South': {'lat_min': -80, 'lat_max': 0, 'lon_min': -60, 'lon_max': 20}
}
# Count the number of data points in each sub-area
counts = {}
for area, bounds in subareas.items():
count = df[
(df['latitude_degrees'] >= bounds['lat_min']) & (df['latitude_degrees'] <= bounds['lat_max']) &
(df['longitude_degrees'] >= bounds['lon_min']) & (df['longitude_degrees'] <= bounds['lon_max'])
].shape[0]
counts[area] = count
logging.info(f"Sub-area '{area}': {count} records.")
# Determine the sub-area with the maximum count
predominant_subarea = max(counts, key=counts.get)
logging.info(f"Predominant sub-area determined: {predominant_subarea}")
# If no data points fall into any sub-area, default to Teacher
if counts[predominant_subarea] == 0:
logging.warning("No data points found in any sub-area. Defaulting to Teacher model.")
return 'Teacher'
return predominant_subarea
def select_model(models, subarea, model_choice):
"""
Select the appropriate model based on the sub-area and model choice.
:param models: Dictionary of loaded models.
:param subarea: String indicating the sub-area.
:param model_choice: String indicating the selected model.
:return: Tuple of (selected_model, selected_model_name).
"""
if model_choice == "Auto-Select":
if subarea in ['North', 'Mid', 'South']:
selected_model = models.get(f'Student_{subarea}')
selected_model_name = f'Student_{subarea}'
else:
selected_model = models.get('Teacher')
selected_model_name = 'Teacher'
else:
selected_model = models.get(model_choice)
selected_model_name = model_choice
logging.info(f"Selected model: {selected_model_name}")
return selected_model, selected_model_name
# ============================
# Evaluation Metrics Calculation
# ============================
def calculate_classic_metrics(y_true, y_pred):
"""
Calculate MAE, MSE, and RMSE directly on latitude/longitude pairs.
:param y_true: Ground truth positions (numpy array of shape (num_samples, 2)).
:param y_pred: Predicted positions (numpy array of shape (num_samples, 2)).
:return: Dictionary containing the classic metrics.
"""
# Calculate MAE
mae = mean_absolute_error(y_true, y_pred)
# Calculate MSE
mse = mean_squared_error(y_true, y_pred)
# Calculate RMSE
rmse = np.sqrt(mse)
classic_metrics = {
'MAE (degrees)': mae,
'MSE (degrees^2)': mse,
'RMSE (degrees)': rmse
}
logging.info(f"Calculated classic metrics: {classic_metrics}")
return classic_metrics
def calculate_distance_metrics(y_true, y_pred):
"""
Calculate metrics based on distance (in kilometers).
:param y_true: Ground truth positions (numpy array of shape (num_samples, 2)).
:param y_pred: Predicted positions (numpy array of shape (num_samples, 2)).
:return: Dictionary containing the distance-based metrics.
"""
# Calculate haversine distance between predicted and true positions
distances = np.array([
haversine(y_true[i, 1], y_true[i, 0], y_pred[i, 1], y_pred[i, 0])
for i in range(len(y_true))
]) # Assuming columns are [latitude, longitude]
# Calculate MAE
mae = np.mean(np.abs(distances))
# Calculate MSE
mse = np.mean(np.square(distances))
# Calculate RMSE
rmse = np.sqrt(mse)
# Calculate RSE (Relative Squared Error)
variance = np.var(distances)
rse = mse / variance if variance != 0 else float('inf')
metrics = {
'MAE (km)': mae,
'MSE (km^2)': mse,
'RMSE (km)': rmse,
'RSE': rse
}
logging.info(f"Calculated distance metrics: {metrics}")
return metrics
# ============================
# Classical Metrics Prediction
# ============================
def classical_prediction(file_path, model_choice, min_mmsi, max_mmsi, models, loaded_scalers):
"""
Preprocess the input CSV and make predictions using the selected model.
Calculate classical evaluation metrics and include inference time.
"""
try:
logging.info("Starting classical prediction...")
# Load the uploaded CSV file and filter based on MMSI
logging.info("Loading uploaded CSV file...")
df = pd.read_csv(file_path, delimiter=',')
logging.info(f"Uploaded CSV file loaded with {df.shape[0]} records.")
df = df[(df['mmsi'] >= min_mmsi) & (df['mmsi'] <= max_mmsi)]
if df.empty:
error_message = "No data available after applying MMSI filters."
logging.error(error_message)
return {"error": error_message}, None, None, None
# Select the appropriate model and scaler
if model_choice == "Auto-Select":
temp_df = df.copy()
subarea = determine_subarea(temp_df)
selected_model, selected_model_name = select_model(models, subarea, model_choice)
scaler = loaded_scalers[selected_model_name]
else:
if model_choice in models:
selected_model = models[model_choice]
selected_model_name = model_choice
scaler = loaded_scalers[selected_model_name]
else:
error_message = f"Selected model '{model_choice}' is not available."
logging.error(error_message)
return {"error": error_message}, None, None, None
logging.info(f"Using scaler for model: {selected_model_name}")
# Adjust features_to_scale based on the selected model
if selected_model_name == 'Cargo_Vessel':
features_to_scale = [
"mmsi", "latitude_degrees", "longitude_degrees",
"day", "month", "time_decimal" # Removed 'ship_type'
]
future_features = ['day', 'month', 'time_decimal']
else:
features_to_scale = [
"mmsi", "sog_kt", "latitude_degrees", "longitude_degrees", "cog_degrees",
"dimension_a_m", "dimension_b_m", "dimension_c_m", "dimension_d_m",
"ship_type", "day", "month", "year", "time_decimal"
]
future_features = ['time_decimal']
# Check if the necessary columns exist
expected_columns = features_to_scale
if not all(col in df.columns for col in expected_columns):
error_message = (
f"Input data does not have the correct columns.\n"
f"Expected columns for {selected_model_name}: {expected_columns}\n"
f"Got columns: {list(df.columns)}"
)
logging.error(error_message)
return {"error": error_message}, None, None, None
logging.info("Input CSV has the correct columns.")
if selected_model_name != 'Cargo_Vessel':
df = add_time_decimal_feature(df)
else:
if 'time_decimal' not in df.columns:
error_message = "Cargo model requires 'time_decimal' column."
logging.error(error_message)
return {"error": error_message}, None, None, None
# Normalize the data
logging.info("Normalizing the data...")
X_new = df[features_to_scale]
X_scaled = scaler.transform(X_new)
df_scaled = pd.DataFrame(X_scaled, columns=features_to_scale, index=df.index)
df_scaled['original_mmsi'] = df['mmsi']
# Create sequences and get last known positions (scaled)
seq_len = 24
forecast_horizon = 1
X, y, mmsi_seq, last_known_positions_scaled = create_dataset_grouped_by_mmsi(
df_scaled, seq_len, forecast_horizon, features_to_scale, future_features
)
if X.size == 0:
error_message = "Not enough data to create sequences."
logging.error(error_message)
return {"error": error_message}, None, None, None
logging.info(f"Created {X.shape[0]} sequences.")
# Inference
logging.info("Starting model inference...")
test_dataset = torch.utils.data.TensorDataset(torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32))
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)
all_predictions = []
all_y_true = []
start_time = time.time() # Start inference time tracking
with torch.no_grad():
for batch in test_loader:
X_batch, y_batch = batch
predictions = selected_model(X_batch).cpu().numpy()
all_predictions.append(predictions)
all_y_true.append(y_batch.numpy())
inference_time = time.time() - start_time # End inference time
all_predictions = np.concatenate(all_predictions, axis=0)
y_true = np.concatenate(all_y_true, axis=0)
y_pred = all_predictions
logging.info(f"Inference completed in {inference_time:.2f} seconds.")
# Denormalize predictions and real values
lat_idx = features_to_scale.index("latitude_degrees")
lon_idx = features_to_scale.index("longitude_degrees")
pred_lat, pred_lon = denormalize(y_pred[:, :, 0], y_pred[:, :, 1], scaler, lat_idx, lon_idx)
true_lat, true_lon = denormalize(y_true[:, :, 0], y_true[:, :, 1], scaler, lat_idx, lon_idx)
# Denormalize last known positions
last_lat_scaled = np.array([pos[0] for pos in last_known_positions_scaled])
last_lon_scaled = np.array([pos[1] for pos in last_known_positions_scaled])
last_lat_denorm, last_lon_denorm = denormalize(
last_lat_scaled, last_lon_scaled, scaler, lat_idx, lon_idx
)
# Squeeze arrays to ensure they are 1-dimensional
pred_lat = pred_lat.squeeze()
pred_lon = pred_lon.squeeze()
true_lat = true_lat.squeeze()
true_lon = true_lon.squeeze()
last_lat_denorm = last_lat_denorm.squeeze()
last_lon_denorm = last_lon_denorm.squeeze()
mmsi_seq = mmsi_seq.squeeze()
# Calculate the classic evaluation metrics
y_true_pairs = np.column_stack((true_lat, true_lon))
y_pred_pairs = np.column_stack((pred_lat, pred_lon))
classic_metrics = calculate_classic_metrics(y_true=y_true_pairs, y_pred=y_pred_pairs)
classic_metrics['Inference Time (seconds)'] = inference_time # Include inference time
# Calculate error in Km for each prediction
logging.info("Calculating error in kilometers for each prediction...")
error_km = np.array([
haversine(pred_lon[i], pred_lat[i], true_lon[i], true_lat[i])
for i in range(len(pred_lat))
])
# Prepare metrics and output CSV
metrics_df = pd.DataFrame([classic_metrics])
metrics_json = metrics_df.to_json(orient="records")
metrics_json = json.loads(metrics_json)[0]
# Prepare predicted and real positions DataFrame, including error in Km
predicted_df = pd.DataFrame({
'MMSI': mmsi_seq[:len(y_pred)],
'Last Known Latitude': last_lat_denorm,
'Last Known Longitude': last_lon_denorm,
'Predicted Latitude': pred_lat,
'Predicted Longitude': pred_lon,
'Real Latitude': true_lat,
'Real Longitude': true_lon,
'Error (Km)': error_km
})
# Save predictions as CSV
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w', newline='') as tmp_positions_file:
predicted_df.to_csv(tmp_positions_file, index=False)
positions_csv_path = tmp_positions_file.name
logging.info("Classical prediction completed.")
return metrics_json, positions_csv_path, inference_time, None
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return None, None, None, str(e)
# ============================
# Abnormal Behavior Detection
# ============================
def abnormal_behavior_detection(prediction_file_path, alpha=0.5, threshold=10.0):
"""
Detect abnormal behavior based on angular divergence and distance difference.
Accepts a CSV file containing real and predicted positions.
"""
try:
logging.info("Starting abnormal behavior detection...")
# Load the CSV file containing real and predicted positions
logging.info("Loading prediction CSV file...")
df = pd.read_csv(prediction_file_path)
logging.info(f"Prediction CSV file loaded with {df.shape[0]} records.")
# Check if necessary columns exist
expected_columns = [
'MMSI',
'Last Known Latitude',
'Last Known Longitude',
'Predicted Latitude',
'Predicted Longitude',
'Real Latitude',
'Real Longitude'
]
if not all(col in df.columns for col in expected_columns):
error_message = (
f"Input data does not have the correct columns.\n"
f"Expected columns: {expected_columns}\n"
f"Got columns: {list(df.columns)}"
)
logging.error(error_message)
return None, error_message
# Extract necessary data
mmsi_seq = df['MMSI'].values
last_lat_flat = df['Last Known Latitude'].values
last_lon_flat = df['Last Known Longitude'].values
pred_lat_flat = df['Predicted Latitude'].values
pred_lon_flat = df['Predicted Longitude'].values
true_lat_flat = df['Real Latitude'].values
true_lon_flat = df['Real Longitude'].values
# Calculate bearings
logging.info("Calculating bearings for predictions and real values...")
bearings_pred = [
calculate_bearing(last_lon_flat[i], last_lat_flat[i], pred_lon_flat[i], pred_lat_flat[i])
for i in range(len(pred_lat_flat))
]
bearings_true = [
calculate_bearing(last_lon_flat[i], last_lat_flat[i], true_lon_flat[i], true_lat_flat[i])
for i in range(len(true_lat_flat))
]
# Calculate angular divergence Δθ
logging.info("Calculating angular divergence (Δθ)...")
delta_theta = [
angular_divergence(bearings_pred[i], bearings_true[i])
for i in range(len(bearings_pred))
]
# Calculate distance difference Δd
logging.info("Calculating distance difference (Δd)...")
delta_d = [
haversine(last_lon_flat[i], last_lat_flat[i], pred_lon_flat[i], pred_lat_flat[i]) -
haversine(last_lon_flat[i], last_lat_flat[i], true_lon_flat[i], true_lat_flat[i])
for i in range(len(pred_lat_flat))
]
# Compute the score
logging.info("Computing the abnormal behavior score...")
score = [alpha * abs(dd) + (1 - alpha) * dt for dd, dt in zip(delta_d, delta_theta)]
# Determine abnormal behavior
logging.info("Determining abnormal behavior based on the score...")
abnormal_behavior = [1 if s >= threshold else 0 for s in score] # 1: Abnormal, 0: Normal
# Create DataFrame for saving
abnormal_behavior_df = pd.DataFrame({
'MMSI': mmsi_seq,
'Last Known Latitude': last_lat_flat,
'Last Known Longitude': last_lon_flat,
'Predicted Latitude': pred_lat_flat,
'Predicted Longitude': pred_lon_flat,
'Real Latitude': true_lat_flat,
'Real Longitude': true_lon_flat,
'Distance Difference (Δd) [km]': delta_d,
'Angular Divergence (Δθ) [degrees]': delta_theta,
'Score (αΔd + (1-α)Δθ)': score,
'Abnormal Behavior (1=Abnormal, 0=Normal)': abnormal_behavior
})
# Save abnormal behavior dataset as CSV
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w', newline='') as tmp_abnormal_file:
abnormal_behavior_df.to_csv(tmp_abnormal_file, index=False)
abnormal_csv_path = tmp_abnormal_file.name
logging.info("Abnormal behavior detection completed.")
return abnormal_csv_path, None
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return None, str(e)
# ============================
# Define Gradio Interface
# ============================
def main():
model_paths = {
'teacher': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256/horizon_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_seq_24/run_1/best_model.pth',
'student_north': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_North/horizon1_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_North_seq_24/run_1/best_model.pth',
'student_mid': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_Mid/horizon1_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_Mid_seq_24/run_1/best_model.pth',
'student_south': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_South/horizon1_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_South_seq_24/run_1/best_model.pth',
'cargo_vessel': 'LSTMModel_cargo_horizon1_with_month_day_time_input_batch256_cleaned/horizon_data_LSTMModel_cargo_horizon1_with_month_day_time_input_batch256_cleaned_seq_24/run_1/best_model.pth'
}
scaler_paths = {
'Teacher': 'scaler_train_wholedata_up.joblib',
'Student_North': 'scaler_train_North_up.joblib',
'Student_Mid': 'scaler_train_Mid_up.joblib',
'Student_South': 'scaler_train_South_up.joblib',
'Cargo_Vessel': 'scaler_features_cargo_cleaned.joblib'
}
logging.info("Loading models and scalers...")
models = load_models(model_paths)
loaded_scalers = load_scalers(scaler_paths)
logging.info("All models and scalers loaded successfully.")
# Define the Gradio components for classical prediction tab
classical_tab = gr.Interface(
fn=functools.partial(classical_prediction, models=models, loaded_scalers=loaded_scalers),
inputs=[
gr.File(label="Upload CSV File", type='filepath'),
gr.Dropdown(
choices=["Auto-Select", "Teacher", "Student_North", "Student_Mid", "Student_South", "Cargo_Vessel"],
value="Auto-Select",
label="Choose Model"
),
gr.Number(label="Min MMSI", value=0),
gr.Number(label="Max MMSI", value=999999999)
],
outputs=[
gr.JSON(label="Classical Metrics (Degrees)"),
gr.File(label="Download Predicted & Real Positions CSV"),
gr.Number(label="Inference Time (seconds)"),
gr.Textbox(label="Error Message", lines=2, visible=False)
],
title="Classical Prediction & Metrics",
description=(
"Upload a CSV file and select a model to get classical evaluation metrics such as MAE, MSE, RMSE. "
"The inference time is also provided."
)
)
# Define the Gradio components for abnormal behavior detection tab
abnormal_tab = gr.Interface(
fn=functools.partial(abnormal_behavior_detection),
inputs=[
gr.File(label="Upload Predicted Positions CSV", type='filepath'),
gr.Slider(minimum=0, maximum=1, step=0.1, value=0.5, label="Alpha (α)"),
gr.Number(label="Threshold", value=10.0)
],
outputs=[
gr.File(label="Download Abnormal Behavior CSV"),
gr.Textbox(label="Error Message", lines=2, visible=False)
],
title="Abnormal Behavior Detection",
description=(
"Upload the CSV file containing real and predicted positions from the Classical Prediction tab. "
"Adjust the Alpha and Threshold parameters to compute abnormal behavior."
)
)
# Combine the two tabs using Gradio Tabs component
with gr.Blocks() as demo:
gr.Markdown("# Vessel Trajectory Prediction and Abnormal Behavior Detection")
with gr.Tabs():
with gr.TabItem("Classical Prediction"):
classical_tab.render()
with gr.TabItem("Abnormal Behavior Detection"):
abnormal_tab.render()
# Launch the Gradio interface
logging.info("Launching Gradio interface...")
demo.launch()
logging.info("Gradio interface launched successfully.")
# Run the app
if __name__ == "__main__":
main()