import pandas as pd import json from typing import Dict, Any, Tuple import os from constants import ( MODEL_NAME_MAP, DIMENSION_NAME_MAP, KEYWORD_NAME_MAP, MODEL_URLS, BASE_MODEL_GROUPS ) class MEGABenchEvalDataLoader: def __init__(self, base_path): self.base_path = base_path # Load both model and summary data at once self.KEYWORD_DATA, self.SUMMARY_DATA = self._load_data() self.SUPER_GROUPS = self._initialize_super_groups() self.MODEL_GROUPS = self._initialize_model_groups() def _get_base_path(self) -> str: raise NotImplementedError("Subclasses must implement _get_base_path") def _load_data(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: summary_data = {} keyword_data = {} model_folders = [f for f in os.listdir(self.base_path) if os.path.isdir(os.path.join(self.base_path, f))] for model_name in model_folders: model_path = f"{self.base_path}/{model_name}/summary_and_keyword_stats.json" with open(model_path, "r") as f: data = json.load(f) if "keyword_stats" in data: keyword_data[model_name] = data["keyword_stats"] if "model_summary" in data: summary_data[model_name] = data["model_summary"] return keyword_data, summary_data def _initialize_super_groups(self): # Get a sample model to access the structure sample_model = next(iter(self.KEYWORD_DATA)) # Create groups with task counts groups = {} self.keyword_display_map = {} # Add this map to store display-to-original mapping for dim in self.KEYWORD_DATA[sample_model]: dim_name = DIMENSION_NAME_MAP[dim] # Create a list of tuples (display_name, count, keyword) for sorting keyword_info = [] for keyword in self.KEYWORD_DATA[sample_model][dim]: # Get the task count for this keyword task_count = self.KEYWORD_DATA[sample_model][dim][keyword]["count"] original_name = KEYWORD_NAME_MAP.get(keyword, keyword) display_name = f"{original_name}({task_count})" keyword_info.append((display_name, task_count, keyword)) # Sort by count (descending) and then by display name (for ties) keyword_info.sort(key=lambda x: (-x[1], x[0])) # Store sorted display names and update mapping groups[dim_name] = [info[0] for info in keyword_info] for display_name, _, keyword in keyword_info: self.keyword_display_map[display_name] = keyword # Sort based on predefined order order = ["Application", "Skills", "Output Format", "Input Format", "Visual Input Number"] return {k: groups[k] for k in order if k in groups} def _initialize_model_groups(self) -> Dict[str, list]: available_models = set(self.KEYWORD_DATA.keys()) filtered_groups = {} for group_name, models in BASE_MODEL_GROUPS.items(): if group_name == "All": filtered_groups[group_name] = sorted(list(available_models)) else: filtered_models = [model for model in models if model in available_models] if filtered_models: filtered_groups[group_name] = filtered_models return filtered_groups def get_df(self, selected_super_group: str, selected_model_group: str) -> pd.DataFrame: original_dimension = get_original_dimension(selected_super_group) data = [] for model in self.MODEL_GROUPS[selected_model_group]: if model not in self.KEYWORD_DATA or model not in self.SUMMARY_DATA: continue model_data = self.KEYWORD_DATA[model] summary = self.SUMMARY_DATA[model] # Basic model information row = { "Models": get_display_model_name(model, as_link=True), "Overall": round(summary["overall_score"] * 100, 2), "Core": round(summary["core"]["macro_mean_score"] * 100, 2), "Open-ended": round(summary["open"]["macro_mean_score"] * 100, 2) } # Add dimension-specific scores if original_dimension in model_data: for display_name in self.SUPER_GROUPS[selected_super_group]: original_keyword = self.keyword_display_map[display_name] if original_keyword in model_data[original_dimension]: row[display_name] = round(model_data[original_dimension][original_keyword]["average_score"] * 100, 2) else: row[display_name] = None else: for display_name in self.SUPER_GROUPS[selected_super_group]: row[display_name] = None data.append(row) df = pd.DataFrame(data) df = df.sort_values(by="Overall", ascending=False) return df def get_leaderboard_data(self, selected_super_group: str, selected_model_group: str) -> Tuple[list, list]: df = self.get_df(selected_super_group, selected_model_group) # Get total task counts from the first model's data sample_model = "GPT_4o" total_core_tasks = self.SUMMARY_DATA[sample_model]["core"]["num_eval_tasks"] total_open_tasks = self.SUMMARY_DATA[sample_model]["open"]["num_eval_tasks"] total_tasks = total_core_tasks + total_open_tasks # Define headers with task counts on new line using Unicode line break column_headers = { "Rank": "Rank", "Models": "Models", "Overall": f"Overall\n({total_tasks})", "Core": f"Core\n({total_core_tasks})", "Open-ended": f"Open-ended\n({total_open_tasks})" } # Add rank column to DataFrame df = df.reset_index(drop=True) df.insert(0, 'Rank', range(1, len(df) + 1)) # Rename the columns in DataFrame to match headers df = df.rename(columns=column_headers) # For dimension columns, add task counts on new line dimension_headers = [] for display_name in self.SUPER_GROUPS[selected_super_group]: task_count = display_name.split('(')[1].rstrip(')') base_name = display_name.split('(')[0] dimension_headers.append(f"{base_name}\n({task_count})") headers = [ column_headers["Rank"], column_headers["Models"], column_headers["Overall"], column_headers["Core"], column_headers["Open-ended"] ] + dimension_headers data = df[[ column_headers["Rank"], column_headers["Models"], column_headers["Overall"], column_headers["Core"], column_headers["Open-ended"] ] + self.SUPER_GROUPS[selected_super_group]].values.tolist() return headers, data # Keep your helper functions def get_original_dimension(mapped_dimension): return next(k for k, v in DIMENSION_NAME_MAP.items() if v == mapped_dimension) def get_original_keyword(mapped_keyword): return next((k for k, v in KEYWORD_NAME_MAP.items() if v == mapped_keyword), mapped_keyword) def get_display_model_name(model_name: str, as_link: bool = True) -> str: display_name = MODEL_NAME_MAP.get(model_name, model_name) if as_link and model_name in MODEL_URLS: return f'{display_name}' return display_name