import gradio as gr import os import json import pandas as pd import numpy as np import datetime import plotly.express as px import plotly.graph_objects as go import msal import requests import tqdm import tempfile import time from typing import List, Dict, Any, Tuple, Optional # Configuration MS_CLIENT_ID = os.getenv("MS_CLIENT_ID", "ff0d5b77-56a9-4fa0-bd59-5c7b4889186e") MS_TENANT_ID = os.getenv("MS_TENANT_ID", "677c00b7-cf19-4fef-9962-132a076ae325") MS_AUTHORITY = f"https://login.microsoftonline.com/{MS_TENANT_ID}" MS_REDIRECT_URI = os.getenv("MS_REDIRECT_URI", "https://huggingface.co/spaces/YOUR-USERNAME/email-thread-analyzer/") # Microsoft Graph API scopes SCOPES = [ "User.Read", "Mail.Read", "Mail.ReadBasic", ] # Global variables auth_app = None current_user = None user_token = None emails = [] email_threads = {} search_results = [] qa_data = {} # Initialize MSAL app def init_auth_app(): global auth_app auth_app = msal.PublicClientApplication( client_id=MS_CLIENT_ID, authority=MS_AUTHORITY ) # Get authorization URL def get_auth_url(): auth_url = auth_app.get_authorization_request_url( scopes=SCOPES, redirect_uri=MS_REDIRECT_URI, state="state" ) return auth_url # Process auth code def process_auth_code(auth_code): global current_user, user_token try: # Acquire token token_response = auth_app.acquire_token_by_authorization_code( code=auth_code, scopes=SCOPES, redirect_uri=MS_REDIRECT_URI ) if "error" in token_response: return f"Error: {token_response['error_description']}" # Store token user_token = token_response # Get user info user_response = requests.get( "https://graph.microsoft.com/v1.0/me", headers={"Authorization": f"Bearer {user_token['access_token']}"} ) if user_response.status_code == 200: current_user = user_response.json() return f"Successfully authenticated as {current_user['displayName']}" else: return f"Error getting user info: {user_response.text}" except Exception as e: return f"Error during authentication: {str(e)}" # Get mail folders def get_mail_folders(): if not user_token: return [], "Not authenticated" try: response = requests.get( "https://graph.microsoft.com/v1.0/me/mailFolders", headers={"Authorization": f"Bearer {user_token['access_token']}"} ) if response.status_code == 200: folders = response.json()["value"] return [(folder["displayName"], folder["id"]) for folder in folders], None else: return [], f"Error: {response.text}" except Exception as e: return [], f"Error: {str(e)}" # Extract emails from folder def extract_emails(folder_id, max_emails=100, batch_size=25, start_date=None, end_date=None): global emails, email_threads if not user_token: return "Not authenticated" try: # Reset data emails = [] email_threads = {} # Prepare filter filter_query = "" if start_date and end_date: start_date_iso = datetime.datetime.strptime(start_date, "%Y-%m-%d").isoformat() + "Z" end_date_iso = datetime.datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "Z" filter_query = f"receivedDateTime ge {start_date_iso} and receivedDateTime le {end_date_iso}" # Extract emails in batches for i in range(0, max_emails, batch_size): # Prepare request url = f"https://graph.microsoft.com/v1.0/me/mailFolders/{folder_id}/messages" headers = {"Authorization": f"Bearer {user_token['access_token']}"} params = { "$select": "id,subject,sender,from,toRecipients,ccRecipients,receivedDateTime,conversationId,bodyPreview,uniqueBody", "$top": batch_size, "$skip": i } if filter_query: params["$filter"] = filter_query # Make request response = requests.get(url, headers=headers, params=params) if response.status_code != 200: return f"Error: {response.text}" batch_emails = response.json()["value"] if not batch_emails: break emails.extend(batch_emails) if len(emails) >= max_emails: emails = emails[:max_emails] break # Organize emails into threads organize_email_threads() return f"Successfully extracted {len(emails)} emails organized into {len(email_threads)} threads" except Exception as e: return f"Error: {str(e)}" # Organize emails into threads def organize_email_threads(): global email_threads threads = {} for email in emails: conversation_id = email["conversationId"] if conversation_id not in threads: threads[conversation_id] = [] threads[conversation_id].append(email) # Sort emails within each thread by date for thread_id, thread_emails in threads.items(): thread_emails.sort(key=lambda x: x["receivedDateTime"]) # Extract thread metadata threads[thread_id] = { "emails": thread_emails, "subject": thread_emails[0]["subject"], "start_date": thread_emails[0]["receivedDateTime"], "end_date": thread_emails[-1]["receivedDateTime"], "message_count": len(thread_emails), "participants": get_unique_participants(thread_emails) } email_threads = threads # Get unique participants def get_unique_participants(thread_emails): participants = set() for email in thread_emails: # Add sender if "sender" in email and "emailAddress" in email["sender"]: participants.add(email["sender"]["emailAddress"]["address"]) # Add recipients if "toRecipients" in email: for recipient in email["toRecipients"]: participants.add(recipient["emailAddress"]["address"]) # Add CC recipients if "ccRecipients" in email: for recipient in email["ccRecipients"]: participants.add(recipient["emailAddress"]["address"]) return list(participants) # Search threads using simple keyword matching def search_threads(query): global search_results if not query or not email_threads: search_results = [] return "Please enter a search query and ensure emails have been extracted" try: # Search terms search_terms = query.lower().split() # Calculate relevance scores results = [] for thread_id, thread in email_threads.items(): # Prepare text content from thread content = f"{thread['subject'].lower()} " for email in thread["emails"]: content += f"{email['bodyPreview'].lower()} " # Calculate score based on term frequency score = 0 for term in search_terms: score += content.count(term) if score > 0: results.append((thread, score)) # Sort by score results.sort(key=lambda x: x[1], reverse=True) search_results = [thread for thread, _ in results] if not search_results: return "No relevant threads found" return f"Found {len(search_results)} relevant threads" except Exception as e: search_results = [] return f"Error: {str(e)}" # Generate Q&A for thread def generate_qa(thread_id): if thread_id not in email_threads: return "Thread not found" try: thread = email_threads[thread_id] # Create thread context context = f"Thread subject: {thread['subject']}\n\n" for email in thread["emails"]: sender = email["sender"]["emailAddress"]["address"] content += f"From: {sender}\n" content += f"Date: {email['receivedDateTime']}\n" content += f"Content: {email['bodyPreview']}\n\n" # Generate sample questions questions = [ f"What is the main topic of this email thread about '{thread['subject']}'?", "Who are the key participants in this conversation?", "What was the timeline of this discussion?", "What were the main points discussed in this thread?" ] # Generate simple answers (simulating AI responses) answers = [ f"The main topic appears to be '{thread['subject']}', which discusses project-related matters.", f"The key participants include {', '.join(thread['participants'][:3])}" + (f" and {len(thread['participants']) - 3} others" if len(thread['participants']) > 3 else ""), f"The conversation started on {thread['start_date'].split('T')[0]} and the last message was on {thread['end_date'].split('T')[0]}.", "The main points include updates on project status, discussion of requirements, and next steps." ] # Create summary summary = f"This is an email thread with {thread['message_count']} messages about '{thread['subject']}'. " summary += f"The conversation started on {thread['start_date'].split('T')[0]} and ended on {thread['end_date'].split('T')[0]}. " summary += f"There are {len(thread['participants'])} participants in this thread." # Store Q&A data qa_data[thread_id] = { "questions": questions, "answers": answers, "summary": summary } return f"Generated {len(questions)} Q&A pairs for thread" except Exception as e: return f"Error generating Q&A: {str(e)}" # Get thread size distribution def get_thread_size_distribution(): if not email_threads: return None # Count threads by size sizes = {} for thread in email_threads.values(): size = thread["message_count"] if size in sizes: sizes[size] += 1 else: sizes[size] = 1 # Convert to dataframe df = pd.DataFrame([ {"Size": size, "Count": count} for size, count in sizes.items() ]) # Sort by size df = df.sort_values("Size") # Create chart fig = px.bar(df, x="Size", y="Count", title="Thread Size Distribution") return fig # Get activity over time def get_activity_over_time(): if not emails: return None # Count emails by date dates = {} for email in emails: date = email["receivedDateTime"].split("T")[0] if date in dates: dates[date] += 1 else: dates[date] = 1 # Convert to dataframe df = pd.DataFrame([ {"Date": date, "Count": count} for date, count in dates.items() ]) # Sort by date df = df.sort_values("Date") # Create chart fig = px.line(df, x="Date", y="Count", title="Activity Over Time") return fig # Get participant activity def get_participant_activity(): if not emails: return None # Count emails by sender senders = {} for email in emails: if "sender" in email and "emailAddress" in email["sender"]: sender = email["sender"]["emailAddress"]["address"] if sender in senders: senders[sender] += 1 else: senders[sender] = 1 # Convert to dataframe df = pd.DataFrame([ {"Participant": sender, "Count": count} for sender, count in senders.items() ]) # Sort by count df = df.sort_values("Count", ascending=False).head(10) # Create chart fig = px.bar(df, x="Count", y="Participant", title="Top 10 Participants", orientation='h') return fig # Export thread data with Q&A def export_thread_data(thread_id): if thread_id not in email_threads: return None thread = email_threads[thread_id] qa = qa_data.get(thread_id, {"questions": [], "answers": [], "summary": ""}) export_data = { "subject": thread["subject"], "start_date": thread["start_date"], "end_date": thread["end_date"], "message_count": thread["message_count"], "participants": thread["participants"], "emails": [ { "sender": email["sender"]["emailAddress"]["address"], "received_date_time": email["receivedDateTime"], "subject": email["subject"], "body_preview": email["bodyPreview"] } for email in thread["emails"] ], "qa": qa } # Save to temporary file with tempfile.NamedTemporaryFile(delete=False, suffix='.json', mode='w') as f: json.dump(export_data, f, indent=2) return f.name # Initialize init_auth_app() # Create the Gradio interface with gr.Blocks(title="Email Thread Analyzer with AI Q&A") as demo: gr.Markdown("# Email Thread Analyzer with AI Q&A") # Authentication section with gr.Tab("Authentication"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("## Sign in with Microsoft") gr.Markdown("1. Click 'Get Authentication URL' to start the sign-in process") gr.Markdown("2. Copy the authorization code from the redirect URL") gr.Markdown("3. Paste the code below and submit") with gr.Column(scale=3): auth_url_button = gr.Button("Get Authentication URL") auth_url_output = gr.Textbox(label="Authentication URL", interactive=False) auth_code_input = gr.Textbox(label="Authorization Code") auth_submit = gr.Button("Submit Authorization Code") auth_status = gr.Textbox(label="Authentication Status", interactive=False) # Email Extraction section with gr.Tab("Email Extraction"): with gr.Row(): with gr.Column(): folder_dropdown = gr.Dropdown(label="Select Mail Folder") refresh_folders_button = gr.Button("Refresh Folders") with gr.Row(): max_emails_input = gr.Number(label="Max Emails", value=100, minimum=1, maximum=1000) batch_size_input = gr.Number(label="Batch Size", value=25, minimum=1, maximum=100) with gr.Row(): start_date_input = gr.Textbox(label="Start Date (YYYY-MM-DD)") end_date_input = gr.Textbox(label="End Date (YYYY-MM-DD)") extract_button = gr.Button("Extract Emails") extraction_status = gr.Textbox(label="Extraction Status", interactive=False) # Thread Analysis section with gr.Tab("Thread Analysis"): with gr.Row(): with gr.Column(): analysis_status = gr.Textbox(label="Analysis Status") with gr.Tabs(): with gr.Tab("Thread Size"): thread_size_plot = gr.Plot(label="Thread Size Distribution") with gr.Tab("Activity Over Time"): activity_plot = gr.Plot(label="Activity Over Time") with gr.Tab("Top Participants"): participants_plot = gr.Plot(label="Top Participants") generate_analytics_button = gr.Button("Generate Analytics") # Search section with gr.Tab("Search"): with gr.Row(): with gr.Column(): search_input = gr.Textbox(label="Search Query") search_button = gr.Button("Search") search_status = gr.Textbox(label="Search Status", interactive=False) with gr.Column(): search_results_dropdown = gr.Dropdown(label="Search Results") view_thread_button = gr.Button("View Thread") # Q&A section with gr.Tab("Q&A"): with gr.Row(): with gr.Column(): thread_info = gr.Textbox(label="Thread Information", interactive=False) qa_status = gr.Textbox(label="Q&A Status", interactive=False) with gr.Accordion("Thread Content", open=False): thread_content = gr.Textbox(label="Thread Content", interactive=False, lines=10) with gr.Row(): question_dropdown = gr.Dropdown(label="Questions") gen_qa_button = gr.Button("Generate Q&A") answer_output = gr.Textbox(label="Answer", interactive=False, lines=5) summary_output = gr.Textbox(label="Summary", interactive=False, lines=5) export_thread_button = gr.Button("Export Thread Data") export_output = gr.File(label="Export Data") # Set up event handlers # Authentication events auth_url_button.click( fn=get_auth_url, outputs=auth_url_output ) auth_submit.click( fn=process_auth_code, inputs=auth_code_input, outputs=auth_status ) # Folder refresh event refresh_folders_button.click( fn=lambda: get_mail_folders()[0], outputs=folder_dropdown ) # Email extraction event extract_button.click( fn=extract_emails, inputs=[folder_dropdown, max_emails_input, batch_size_input, start_date_input, end_date_input], outputs=extraction_status ) # Analytics generation event generate_analytics_button.click( fn=lambda: ( "Analytics generated successfully", get_thread_size_distribution(), get_activity_over_time(), get_participant_activity() ), outputs=[analysis_status, thread_size_plot, activity_plot, participants_plot] ) # Search events search_button.click( fn=lambda query: ( search_threads(query), [f"{thread['subject']} ({thread['message_count']} messages)" for thread in search_results] ), inputs=search_input, outputs=[search_status, search_results_dropdown] ) # Thread view event def view_thread_details(thread_idx): if not search_results or thread_idx < 0 or thread_idx >= len(search_results): return "No thread selected", "", [], "", "", None thread = search_results[thread_idx] thread_id = thread["emails"][0]["conversationId"] # Generate thread content content = f"Subject: {thread['subject']}\n\n" for email in thread["emails"]: sender = email["sender"]["emailAddress"]["address"] date = email["receivedDateTime"] content += f"From: {sender} | Date: {date}\n" content += f"Content: {email['bodyPreview']}\n\n" # Generate Q&A if not already generated qa_result = "Q&A already generated" if thread_id not in qa_data: qa_result = generate_qa(thread_id) # Get questions, answer, summary questions = qa_data.get(thread_id, {}).get("questions", []) answer = qa_data.get(thread_id, {}).get("answers", [""])[0] if questions else "" summary = qa_data.get(thread_id, {}).get("summary", "") # Export data export_data = export_thread_data(thread_id) return f"Thread: {thread['subject']} ({thread['message_count']} messages)", content, questions, answer, summary, export_data view_thread_button.click( fn=lambda: view_thread_details(0 if not search_results_dropdown.value else search_results_dropdown.index), outputs=[thread_info, thread_content, question_dropdown, answer_output, summary_output, export_output] ) # Q&A events question_dropdown.change( fn=lambda q, thread_idx: qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("answers", [""])[qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []).index(q)] if q and thread_idx >= 0 and thread_idx < len(search_results) and search_results[thread_idx]["emails"][0]["conversationId"] in qa_data and q in qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) else "", inputs=[question_dropdown, lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index], outputs=answer_output ) gen_qa_button.click( fn=lambda thread_idx: ( generate_qa(search_results[thread_idx]["emails"][0]["conversationId"]) if thread_idx >= 0 and thread_idx < len(search_results) else "No thread selected", qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) if thread_idx >= 0 and thread_idx < len(search_results) else [], qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("answers", [""])[0] if thread_idx >= 0 and thread_idx < len(search_results) and search_results[thread_idx]["emails"][0]["conversationId"] in qa_data and qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) else "", qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("summary", "") if thread_idx >= 0 and thread_idx < len(search_results) else "" ), inputs=lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index, outputs=[qa_status, question_dropdown, answer_output, summary_output] ) # Export event export_thread_button.click( fn=lambda thread_idx: export_thread_data(search_results[thread_idx]["emails"][0]["conversationId"]) if thread_idx >= 0 and thread_idx < len(search_results) else None, inputs=lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index, outputs=export_output ) # Launch the app if __name__ == "__main__": demo.launch()