import torch import torch.nn as nn import gradio as gr import pandas as pd import numpy as np from sklearn.metrics import mean_absolute_error, mean_squared_error import os import logging import joblib from tqdm import tqdm import tempfile import json from math import radians, cos, sin, asin, sqrt, atan2, degrees import time import functools # ============================ # Configure Logging # ============================ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def add_time_decimal_feature(df): """ Add 'time_decimal' feature by combining 'hour' and 'minutes'. :param df: DataFrame with 'hour' and 'minutes' columns. :return: DataFrame with 'time_decimal' and without 'hour' and 'minutes'. """ if 'time_decimal' in df.columns: logging.info("'time_decimal' feature already exists. Skipping creation.") return df elif 'hour' in df.columns and 'minutes' in df.columns: logging.info("Adding 'time_decimal' feature...") df['time_decimal'] = df['hour'] + df['minutes'] / 60.0 df = df.drop(columns=['hour', 'minutes']) # Drop 'hour' and 'minutes' after creation logging.info("'time_decimal' feature added.") else: logging.warning("Neither 'time_decimal' nor 'hour' and 'minutes' columns found. Cannot create 'time_decimal' feature.") raise ValueError("Input data must contain 'time_decimal' or both 'hour' and 'minutes' columns.") return df def haversine(lon1, lat1, lon2, lat2): """ Calculate the great-circle distance between two points on the Earth. :param lon1: Longitude of point 1 (in decimal degrees) :param lat1: Latitude of point 1 (in decimal degrees) :param lon2: Longitude of point 2 (in decimal degrees) :param lat2: Latitude of point 2 (in decimal degrees) :return: Distance in kilometers """ # Convert decimal degrees to radians lon1_rad, lat1_rad, lon2_rad, lat2_rad = map(np.radians, [lon1, lat1, lon2, lat2]) # Haversine formula dlon = lon2_rad - lon1_rad dlat = lat2_rad - lat1_rad a = np.sin(dlat/2)**2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon/2)**2 c = 2 * np.arcsin(np.sqrt(a)) r = 6371 # Radius of Earth in kilometers return c * r def calculate_bearing(lon1, lat1, lon2, lat2): """ Calculate the bearing between two points. :param lon1: Longitude of point 1 (in decimal degrees) :param lat1: Latitude of point 1 (in decimal degrees) :param lon2: Longitude of point 2 (in decimal degrees) :param lat2: Latitude of point 2 (in decimal degrees) :return: Bearing in degrees """ # Convert decimal degrees to radians lon1_rad, lat1_rad, lon2_rad, lat2_rad = map(radians, [lon1, lat1, lon2, lat2]) dlon = lon2_rad - lon1_rad x = sin(dlon) * cos(lat2_rad) y = cos(lat1_rad) * sin(lat2_rad) - (sin(lat1_rad) * cos(lat2_rad) * cos(dlon)) initial_bearing = atan2(x, y) # Convert from radians to degrees and normalize initial_bearing = degrees(initial_bearing) compass_bearing = (initial_bearing + 360) % 360 return compass_bearing def angular_divergence(bearing1, bearing2): """ Calculate the smallest angle difference between two bearings. :param bearing1: First bearing in degrees :param bearing2: Second bearing in degrees :return: Angular divergence in degrees """ diff = abs(bearing1 - bearing2) % 360 return min(diff, 360 - diff) def denormalize(scaled_lat, scaled_lon, scaler, lat_idx, lon_idx): """ Denormalize latitude and longitude using the scaler's parameters. :param scaled_lat: Scaled latitude values (numpy array). :param scaled_lon: Scaled longitude values (numpy array). :param scaler: The scaler object used for normalization. :param lat_idx: Index of 'latitude_degrees' in the scaler's feature list. :param lon_idx: Index of 'longitude_degrees' in the scaler's feature list. :return: Tuple of (denormalized_lat, denormalized_lon). """ lat_min = scaler.data_min_[lat_idx] lat_max = scaler.data_max_[lat_idx] lon_min = scaler.data_min_[lon_idx] lon_max = scaler.data_max_[lon_idx] denorm_lat = scaled_lat * (lat_max - lat_min) + lat_min denorm_lon = scaled_lon * (lon_max - lon_min) + lon_min return denorm_lat, denorm_lon def create_dataset_grouped_by_mmsi(df_scaled, seq_len, forecast_horizon, features_to_scale, future_features): """ Create input and output sequences grouped by original MMSI. Returns scaled last known positions. """ Xs, ys, mmsis = [], [], [] last_known_positions_scaled = [] grouped = df_scaled.groupby('original_mmsi') for mmsi, group in tqdm(grouped, desc="Creating sequences"): if len(group) >= seq_len + forecast_horizon: for i in range(len(group) - seq_len - forecast_horizon + 1): # Select scaled features for the sequence sequence = group.iloc[i:(i + seq_len)][features_to_scale].to_numpy() # Future positions to predict (scaled) future_positions = group[['latitude_degrees', 'longitude_degrees']].iloc[i + seq_len:i + seq_len + forecast_horizon].to_numpy() # Future features future_feature_values = group[future_features].iloc[i + seq_len].values future_feature_array = np.tile(future_feature_values, (seq_len, 1)) # Combine sequence with future features sequence_with_future_features = np.hstack((sequence, future_feature_array)) Xs.append(sequence_with_future_features) ys.append(future_positions) mmsis.append(mmsi) # Store last known positions (scaled) last_lat_scaled = group['latitude_degrees'].iloc[i + seq_len - 1] last_lon_scaled = group['longitude_degrees'].iloc[i + seq_len - 1] last_known_positions_scaled.append((last_lat_scaled, last_lon_scaled)) return np.array(Xs, dtype=np.float32), np.array(ys, dtype=np.float32), np.array(mmsis), last_known_positions_scaled # ============================ # Model Definitions # ============================ class LSTMModelTeacher(nn.Module): def __init__(self, in_dim, hidden_dim, forecast_horizon, n_layers=7, dropout=0.2): """ Teacher LSTM Model. :param in_dim: Number of input features. :param hidden_dim: Number of hidden units. :param forecast_horizon: Number of future steps to predict. :param n_layers: Number of LSTM layers. :param dropout: Dropout rate. """ super(LSTMModelTeacher, self).__init__() self.forecast_horizon = forecast_horizon # Store as an instance attribute self.embedding = nn.Linear(in_dim, hidden_dim) self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers=n_layers, dropout=dropout, batch_first=True) self.fc = nn.Linear(hidden_dim, forecast_horizon * 2) def forward(self, x): x = self.embedding(x) x, _ = self.lstm(x) x = self.fc(x[:, -1, :]) # Use the last timestep for prediction x = x.view(-1, self.forecast_horizon, 2) # Shape: (batch_size, forecast_horizon, 2) return x class LSTMModelStudent(nn.Module): def __init__(self, in_dim, hidden_dim, forecast_horizon, n_layers=3, dropout=0.2): """ Student LSTM Model. :param in_dim: Number of input features. :param hidden_dim: Number of hidden units. :param forecast_horizon: Number of future steps to predict. :param n_layers: Number of LSTM layers. :param dropout: Dropout rate. """ super(LSTMModelStudent, self).__init__() self.forecast_horizon = forecast_horizon # Store as an instance attribute self.embedding = nn.Linear(in_dim, hidden_dim) self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers=n_layers, dropout=dropout, batch_first=True) self.fc = nn.Linear(hidden_dim, forecast_horizon * 2) def forward(self, x): x = self.embedding(x) x, _ = self.lstm(x) x = self.fc(x[:, -1, :]) # Use the last timestep for prediction x = x.view(-1, self.forecast_horizon, 2) # Shape: (batch_size, forecast_horizon, 2) return x # ============================ # Model Loading Functions # ============================ def load_models(model_paths): """ Load teacher, student, and cargo vessel models, including submodels for North, Mid, and South areas. :param model_paths: Dictionary containing paths to the models. :return: Dictionary of loaded models. """ models = {} logging.info("Loading Teacher model...") # Teacher model input dimension teacher_in_dim = 15 # Features including 'future_hour_feature' (time_decimal) # Load Teacher Model (Global) teacher = LSTMModelTeacher(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=7, dropout=0.2) teacher.load_state_dict(torch.load(model_paths['teacher'], map_location=torch.device('cpu'))) teacher.eval() models['Teacher'] = teacher logging.info("Teacher model loaded successfully.") logging.info("Loading Student North model...") # Student North model input dimension is the same as teacher student_north = LSTMModelStudent(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=3, dropout=0.2) student_north.load_state_dict(torch.load(model_paths['student_north'], map_location=torch.device('cpu'))) student_north.eval() models['Student_North'] = student_north logging.info("Student North model loaded successfully.") logging.info("Loading Student Mid model...") student_mid = LSTMModelStudent(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=3, dropout=0.2) student_mid.load_state_dict(torch.load(model_paths['student_mid'], map_location=torch.device('cpu'))) student_mid.eval() models['Student_Mid'] = student_mid logging.info("Student Mid model loaded successfully.") logging.info("Loading Student South model...") student_south = LSTMModelStudent(in_dim=teacher_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=3, dropout=0.2) student_south.load_state_dict(torch.load(model_paths['student_south'], map_location=torch.device('cpu'))) student_south.eval() models['Student_South'] = student_south logging.info("Student South model loaded successfully.") # Load Cargo Vessel model logging.info("Loading Cargo Vessel model...") # Cargo Vessel model input dimension cargo_in_dim = 6 + 3 # + 3 future features ('day', 'month', 'time_decimal') cargo_model = LSTMModelTeacher(in_dim=cargo_in_dim, hidden_dim=200, forecast_horizon=1, n_layers=10, dropout=0.2) cargo_model.load_state_dict(torch.load(model_paths['cargo_vessel'], map_location=torch.device('cpu'))) cargo_model.eval() models['Cargo_Vessel'] = cargo_model logging.info("Cargo Vessel model loaded successfully.") return models def load_scalers(scaler_paths): """ Load scalers for each model. :param scaler_paths: Dictionary containing paths to the scaler files. :return: Dictionary of loaded scalers. """ loaded_scalers = {} for model_name, scaler_path in scaler_paths.items(): if os.path.exists(scaler_path): loaded_scalers[model_name] = joblib.load(scaler_path) logging.info(f"Loaded scaler for {model_name} from '{scaler_path}'.") else: logging.error(f"Scaler file for {model_name} not found at '{scaler_path}'.") raise FileNotFoundError(f"Scaler file for {model_name} not found at '{scaler_path}'. Please provide the correct path.") return loaded_scalers def determine_subarea(df): """ Determine the sub-area (North, Mid, South) based on latitude and longitude ranges. :param df: DataFrame containing 'latitude_degrees' and 'longitude_degrees'. :return: String indicating the sub-area. """ # Define sub-area boundaries subareas = { 'North': {'lat_min': 30, 'lat_max': 60, 'lon_min': -80, 'lon_max': -10}, 'Mid': {'lat_min': 0, 'lat_max': 30, 'lon_min': -80, 'lon_max': 10}, 'South': {'lat_min': -80, 'lat_max': 0, 'lon_min': -60, 'lon_max': 20} } # Count the number of data points in each sub-area counts = {} for area, bounds in subareas.items(): count = df[ (df['latitude_degrees'] >= bounds['lat_min']) & (df['latitude_degrees'] <= bounds['lat_max']) & (df['longitude_degrees'] >= bounds['lon_min']) & (df['longitude_degrees'] <= bounds['lon_max']) ].shape[0] counts[area] = count logging.info(f"Sub-area '{area}': {count} records.") # Determine the sub-area with the maximum count predominant_subarea = max(counts, key=counts.get) logging.info(f"Predominant sub-area determined: {predominant_subarea}") # If no data points fall into any sub-area, default to Teacher if counts[predominant_subarea] == 0: logging.warning("No data points found in any sub-area. Defaulting to Teacher model.") return 'Teacher' return predominant_subarea def select_model(models, subarea, model_choice): """ Select the appropriate model based on the sub-area and model choice. :param models: Dictionary of loaded models. :param subarea: String indicating the sub-area. :param model_choice: String indicating the selected model. :return: Tuple of (selected_model, selected_model_name). """ if model_choice == "Auto-Select": if subarea in ['North', 'Mid', 'South']: selected_model = models.get(f'Student_{subarea}') selected_model_name = f'Student_{subarea}' else: selected_model = models.get('Teacher') selected_model_name = 'Teacher' else: selected_model = models.get(model_choice) selected_model_name = model_choice logging.info(f"Selected model: {selected_model_name}") return selected_model, selected_model_name # ============================ # Evaluation Metrics Calculation # ============================ def calculate_classic_metrics(y_true, y_pred): """ Calculate MAE, MSE, and RMSE directly on latitude/longitude pairs. :param y_true: Ground truth positions (numpy array of shape (num_samples, 2)). :param y_pred: Predicted positions (numpy array of shape (num_samples, 2)). :return: Dictionary containing the classic metrics. """ # Calculate MAE mae = mean_absolute_error(y_true, y_pred) # Calculate MSE mse = mean_squared_error(y_true, y_pred) # Calculate RMSE rmse = np.sqrt(mse) classic_metrics = { 'MAE (degrees)': mae, 'MSE (degrees^2)': mse, 'RMSE (degrees)': rmse } logging.info(f"Calculated classic metrics: {classic_metrics}") return classic_metrics def calculate_distance_metrics(y_true, y_pred): """ Calculate metrics based on distance (in kilometers). :param y_true: Ground truth positions (numpy array of shape (num_samples, 2)). :param y_pred: Predicted positions (numpy array of shape (num_samples, 2)). :return: Dictionary containing the distance-based metrics. """ # Calculate haversine distance between predicted and true positions distances = np.array([ haversine(y_true[i, 1], y_true[i, 0], y_pred[i, 1], y_pred[i, 0]) for i in range(len(y_true)) ]) # Assuming columns are [latitude, longitude] # Calculate MAE mae = np.mean(np.abs(distances)) # Calculate MSE mse = np.mean(np.square(distances)) # Calculate RMSE rmse = np.sqrt(mse) # Calculate RSE (Relative Squared Error) variance = np.var(distances) rse = mse / variance if variance != 0 else float('inf') metrics = { 'MAE (km)': mae, 'MSE (km^2)': mse, 'RMSE (km)': rmse, 'RSE': rse } logging.info(f"Calculated distance metrics: {metrics}") return metrics # ============================ # Classical Metrics Prediction # ============================ def classical_prediction(file_path, model_choice, min_mmsi, max_mmsi, models, loaded_scalers): """ Preprocess the input CSV and make predictions using the selected model. Calculate classical evaluation metrics and include inference time. """ try: logging.info("Starting classical prediction...") # Load the uploaded CSV file and filter based on MMSI logging.info("Loading uploaded CSV file...") df = pd.read_csv(file_path, delimiter=',') logging.info(f"Uploaded CSV file loaded with {df.shape[0]} records.") df = df[(df['mmsi'] >= min_mmsi) & (df['mmsi'] <= max_mmsi)] if df.empty: error_message = "No data available after applying MMSI filters." logging.error(error_message) return {"error": error_message}, None, None, None # Select the appropriate model and scaler if model_choice == "Auto-Select": temp_df = df.copy() subarea = determine_subarea(temp_df) selected_model, selected_model_name = select_model(models, subarea, model_choice) scaler = loaded_scalers[selected_model_name] else: if model_choice in models: selected_model = models[model_choice] selected_model_name = model_choice scaler = loaded_scalers[selected_model_name] else: error_message = f"Selected model '{model_choice}' is not available." logging.error(error_message) return {"error": error_message}, None, None, None logging.info(f"Using scaler for model: {selected_model_name}") # Adjust features_to_scale based on the selected model if selected_model_name == 'Cargo_Vessel': features_to_scale = [ "mmsi", "latitude_degrees", "longitude_degrees", "day", "month", "time_decimal" # Removed 'ship_type' ] future_features = ['day', 'month', 'time_decimal'] else: features_to_scale = [ "mmsi", "sog_kt", "latitude_degrees", "longitude_degrees", "cog_degrees", "dimension_a_m", "dimension_b_m", "dimension_c_m", "dimension_d_m", "ship_type", "day", "month", "year", "time_decimal" ] future_features = ['time_decimal'] # Check if the necessary columns exist expected_columns = features_to_scale if not all(col in df.columns for col in expected_columns): error_message = ( f"Input data does not have the correct columns.\n" f"Expected columns for {selected_model_name}: {expected_columns}\n" f"Got columns: {list(df.columns)}" ) logging.error(error_message) return {"error": error_message}, None, None, None logging.info("Input CSV has the correct columns.") if selected_model_name != 'Cargo_Vessel': df = add_time_decimal_feature(df) else: if 'time_decimal' not in df.columns: error_message = "Cargo model requires 'time_decimal' column." logging.error(error_message) return {"error": error_message}, None, None, None # Normalize the data logging.info("Normalizing the data...") X_new = df[features_to_scale] X_scaled = scaler.transform(X_new) df_scaled = pd.DataFrame(X_scaled, columns=features_to_scale, index=df.index) df_scaled['original_mmsi'] = df['mmsi'] # Create sequences and get last known positions (scaled) seq_len = 24 forecast_horizon = 1 X, y, mmsi_seq, last_known_positions_scaled = create_dataset_grouped_by_mmsi( df_scaled, seq_len, forecast_horizon, features_to_scale, future_features ) if X.size == 0: error_message = "Not enough data to create sequences." logging.error(error_message) return {"error": error_message}, None, None, None logging.info(f"Created {X.shape[0]} sequences.") # Inference logging.info("Starting model inference...") test_dataset = torch.utils.data.TensorDataset(torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)) test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False) all_predictions = [] all_y_true = [] start_time = time.time() # Start inference time tracking with torch.no_grad(): for batch in test_loader: X_batch, y_batch = batch predictions = selected_model(X_batch).cpu().numpy() all_predictions.append(predictions) all_y_true.append(y_batch.numpy()) inference_time = time.time() - start_time # End inference time all_predictions = np.concatenate(all_predictions, axis=0) y_true = np.concatenate(all_y_true, axis=0) y_pred = all_predictions logging.info(f"Inference completed in {inference_time:.2f} seconds.") # Denormalize predictions and real values lat_idx = features_to_scale.index("latitude_degrees") lon_idx = features_to_scale.index("longitude_degrees") pred_lat, pred_lon = denormalize(y_pred[:, :, 0], y_pred[:, :, 1], scaler, lat_idx, lon_idx) true_lat, true_lon = denormalize(y_true[:, :, 0], y_true[:, :, 1], scaler, lat_idx, lon_idx) # Denormalize last known positions last_lat_scaled = np.array([pos[0] for pos in last_known_positions_scaled]) last_lon_scaled = np.array([pos[1] for pos in last_known_positions_scaled]) last_lat_denorm, last_lon_denorm = denormalize( last_lat_scaled, last_lon_scaled, scaler, lat_idx, lon_idx ) # Squeeze arrays to ensure they are 1-dimensional pred_lat = pred_lat.squeeze() pred_lon = pred_lon.squeeze() true_lat = true_lat.squeeze() true_lon = true_lon.squeeze() last_lat_denorm = last_lat_denorm.squeeze() last_lon_denorm = last_lon_denorm.squeeze() mmsi_seq = mmsi_seq.squeeze() # Calculate the classic evaluation metrics y_true_pairs = np.column_stack((true_lat, true_lon)) y_pred_pairs = np.column_stack((pred_lat, pred_lon)) classic_metrics = calculate_classic_metrics(y_true=y_true_pairs, y_pred=y_pred_pairs) classic_metrics['Inference Time (seconds)'] = inference_time # Include inference time # Calculate error in Km for each prediction logging.info("Calculating error in kilometers for each prediction...") error_km = np.array([ haversine(pred_lon[i], pred_lat[i], true_lon[i], true_lat[i]) for i in range(len(pred_lat)) ]) # Prepare metrics and output CSV metrics_df = pd.DataFrame([classic_metrics]) metrics_json = metrics_df.to_json(orient="records") metrics_json = json.loads(metrics_json)[0] # Prepare predicted and real positions DataFrame, including error in Km predicted_df = pd.DataFrame({ 'MMSI': mmsi_seq[:len(y_pred)], 'Last Known Latitude': last_lat_denorm, 'Last Known Longitude': last_lon_denorm, 'Predicted Latitude': pred_lat, 'Predicted Longitude': pred_lon, 'Real Latitude': true_lat, 'Real Longitude': true_lon, 'Error (Km)': error_km }) # Save predictions as CSV with tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w', newline='') as tmp_positions_file: predicted_df.to_csv(tmp_positions_file, index=False) positions_csv_path = tmp_positions_file.name logging.info("Classical prediction completed.") return metrics_json, positions_csv_path, inference_time, None except Exception as e: logging.error(f"An error occurred: {str(e)}") return None, None, None, str(e) # ============================ # Abnormal Behavior Detection # ============================ def abnormal_behavior_detection(prediction_file_path, alpha=0.5, threshold=10.0): """ Detect abnormal behavior based on angular divergence and distance difference. Accepts a CSV file containing real and predicted positions. """ try: logging.info("Starting abnormal behavior detection...") # Load the CSV file containing real and predicted positions logging.info("Loading prediction CSV file...") df = pd.read_csv(prediction_file_path) logging.info(f"Prediction CSV file loaded with {df.shape[0]} records.") # Check if necessary columns exist expected_columns = [ 'MMSI', 'Last Known Latitude', 'Last Known Longitude', 'Predicted Latitude', 'Predicted Longitude', 'Real Latitude', 'Real Longitude' ] if not all(col in df.columns for col in expected_columns): error_message = ( f"Input data does not have the correct columns.\n" f"Expected columns: {expected_columns}\n" f"Got columns: {list(df.columns)}" ) logging.error(error_message) return None, error_message # Extract necessary data mmsi_seq = df['MMSI'].values last_lat_flat = df['Last Known Latitude'].values last_lon_flat = df['Last Known Longitude'].values pred_lat_flat = df['Predicted Latitude'].values pred_lon_flat = df['Predicted Longitude'].values true_lat_flat = df['Real Latitude'].values true_lon_flat = df['Real Longitude'].values # Calculate bearings logging.info("Calculating bearings for predictions and real values...") bearings_pred = [ calculate_bearing(last_lon_flat[i], last_lat_flat[i], pred_lon_flat[i], pred_lat_flat[i]) for i in range(len(pred_lat_flat)) ] bearings_true = [ calculate_bearing(last_lon_flat[i], last_lat_flat[i], true_lon_flat[i], true_lat_flat[i]) for i in range(len(true_lat_flat)) ] # Calculate angular divergence Δθ logging.info("Calculating angular divergence (Δθ)...") delta_theta = [ angular_divergence(bearings_pred[i], bearings_true[i]) for i in range(len(bearings_pred)) ] # Calculate distance difference Δd logging.info("Calculating distance difference (Δd)...") delta_d = [ haversine(last_lon_flat[i], last_lat_flat[i], pred_lon_flat[i], pred_lat_flat[i]) - haversine(last_lon_flat[i], last_lat_flat[i], true_lon_flat[i], true_lat_flat[i]) for i in range(len(pred_lat_flat)) ] # Compute the score logging.info("Computing the abnormal behavior score...") score = [alpha * abs(dd) + (1 - alpha) * dt for dd, dt in zip(delta_d, delta_theta)] # Determine abnormal behavior logging.info("Determining abnormal behavior based on the score...") abnormal_behavior = [1 if s >= threshold else 0 for s in score] # 1: Abnormal, 0: Normal # Create DataFrame for saving abnormal_behavior_df = pd.DataFrame({ 'MMSI': mmsi_seq, 'Last Known Latitude': last_lat_flat, 'Last Known Longitude': last_lon_flat, 'Predicted Latitude': pred_lat_flat, 'Predicted Longitude': pred_lon_flat, 'Real Latitude': true_lat_flat, 'Real Longitude': true_lon_flat, 'Distance Difference (Δd) [km]': delta_d, 'Angular Divergence (Δθ) [degrees]': delta_theta, 'Score (αΔd + (1-α)Δθ)': score, 'Abnormal Behavior (1=Abnormal, 0=Normal)': abnormal_behavior }) # Save abnormal behavior dataset as CSV with tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w', newline='') as tmp_abnormal_file: abnormal_behavior_df.to_csv(tmp_abnormal_file, index=False) abnormal_csv_path = tmp_abnormal_file.name logging.info("Abnormal behavior detection completed.") return abnormal_csv_path, None except Exception as e: logging.error(f"An error occurred: {str(e)}") return None, str(e) # ============================ # Define Gradio Interface # ============================ def main(): model_paths = { 'teacher': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256/horizon_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_seq_24/run_1/best_model.pth', 'student_north': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_North/horizon1_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_North_seq_24/run_1/best_model.pth', 'student_mid': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_Mid/horizon1_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_Mid_seq_24/run_1/best_model.pth', 'student_south': 'LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_South/horizon1_data_LSTM_whole_atlantic_horizon1_with_time_decimal_input_batch256_KD_South_seq_24/run_1/best_model.pth', 'cargo_vessel': 'LSTMModel_cargo_horizon1_with_month_day_time_input_batch256_cleaned/horizon_data_LSTMModel_cargo_horizon1_with_month_day_time_input_batch256_cleaned_seq_24/run_1/best_model.pth' } scaler_paths = { 'Teacher': 'scaler_train_wholedata_up.joblib', 'Student_North': 'scaler_train_North_up.joblib', 'Student_Mid': 'scaler_train_Mid_up.joblib', 'Student_South': 'scaler_train_South_up.joblib', 'Cargo_Vessel': 'scaler_features_cargo_cleaned.joblib' } logging.info("Loading models and scalers...") models = load_models(model_paths) loaded_scalers = load_scalers(scaler_paths) logging.info("All models and scalers loaded successfully.") # Define the Gradio components for classical prediction tab classical_tab = gr.Interface( fn=functools.partial(classical_prediction, models=models, loaded_scalers=loaded_scalers), inputs=[ gr.File(label="Upload CSV File", type='filepath'), gr.Dropdown( choices=["Auto-Select", "Teacher", "Student_North", "Student_Mid", "Student_South", "Cargo_Vessel"], value="Auto-Select", label="Choose Model" ), gr.Number(label="Min MMSI", value=0), gr.Number(label="Max MMSI", value=999999999) ], outputs=[ gr.JSON(label="Classical Metrics (Degrees)"), gr.File(label="Download Predicted & Real Positions CSV"), gr.Number(label="Inference Time (seconds)"), gr.Textbox(label="Error Message", lines=2, visible=False) ], title="Classical Prediction & Metrics", description=( "Upload a CSV file and select a model to get classical evaluation metrics such as MAE, MSE, RMSE. " "The inference time is also provided." ) ) # Define the Gradio components for abnormal behavior detection tab abnormal_tab = gr.Interface( fn=functools.partial(abnormal_behavior_detection), inputs=[ gr.File(label="Upload Predicted Positions CSV", type='filepath'), gr.Slider(minimum=0, maximum=1, step=0.1, value=0.5, label="Alpha (α)"), gr.Number(label="Threshold", value=10.0) ], outputs=[ gr.File(label="Download Abnormal Behavior CSV"), gr.Textbox(label="Error Message", lines=2, visible=False) ], title="Abnormal Behavior Detection", description=( "Upload the CSV file containing real and predicted positions from the Classical Prediction tab. " "Adjust the Alpha and Threshold parameters to compute abnormal behavior." ) ) # Combine the two tabs using Gradio Tabs component with gr.Blocks() as demo: gr.Markdown("# Vessel Trajectory Prediction and Abnormal Behavior Detection") with gr.Tabs(): with gr.TabItem("Classical Prediction"): classical_tab.render() with gr.TabItem("Abnormal Behavior Detection"): abnormal_tab.render() # Launch the Gradio interface logging.info("Launching Gradio interface...") demo.launch() logging.info("Gradio interface launched successfully.") # Run the app if __name__ == "__main__": main()