Spaces:
Running
Running
# app.py | |
import gradio as gr | |
from transformers import pipeline | |
import imaplib # Using built-in imaplib instead of imaplib-ng | |
import email | |
from email.header import decode_header | |
import traceback | |
from email_validator import validate_email, EmailNotValidError | |
def validate_email_address(email_address): | |
"""Validate email address format""" | |
try: | |
validate_email(email_address) | |
return True | |
except EmailNotValidError: | |
return False | |
def create_summarizer(): | |
"""Initialize the summarization pipeline""" | |
return pipeline("summarization", model="facebook/bart-large-cnn") | |
def connect_to_email(email_address, app_password, imap_server="imap.gmail.com"): | |
"""Connect to email server using IMAP""" | |
try: | |
if not validate_email_address(email_address): | |
return None, "Invalid email address format" | |
print(f"Attempting to connect to {imap_server}...") | |
mail = imaplib.IMAP4_SSL(imap_server) | |
print("Connection established, attempting login...") | |
try: | |
mail.login(email_address, app_password) | |
print("Login successful!") | |
return mail, None | |
except imaplib.IMAP4.error as e: | |
if "Invalid credentials" in str(e): | |
return None, "Invalid email or password. For Gmail, make sure you're using an App Password and have 2-Step Verification enabled." | |
elif "IMAP not enabled" in str(e): | |
return None, "IMAP is not enabled for this account. Please enable IMAP in your email settings." | |
else: | |
return None, f"Login error: {str(e)}" | |
except imaplib.IMAP4.error as e: | |
return None, f"IMAP error: {str(e)}" | |
except Exception as e: | |
return None, f"Error connecting to email: {str(e)}" | |
def get_emails(mail, num_emails=5): | |
"""Fetch recent emails from INBOX""" | |
try: | |
mail.select("INBOX") | |
_, messages = mail.search(None, "ALL") | |
email_ids = messages[0].split() | |
# Get the last n email IDs | |
recent_emails = email_ids[-num_emails:] | |
emails = [] | |
for email_id in recent_emails: | |
_, msg = mail.fetch(email_id, "(RFC822)") | |
email_body = msg[0][1] | |
email_message = email.message_from_bytes(email_body) | |
subject = decode_header(email_message["Subject"])[0][0] | |
if isinstance(subject, bytes): | |
subject = subject.decode() | |
# Get email body | |
body = "" | |
if email_message.is_multipart(): | |
for part in email_message.walk(): | |
if part.get_content_type() == "text/plain": | |
try: | |
body = part.get_payload(decode=True).decode() | |
break | |
except: | |
continue | |
else: | |
try: | |
body = email_message.get_payload(decode=True).decode() | |
except: | |
body = email_message.get_payload() | |
emails.append({ | |
'subject': subject, | |
'body': body | |
}) | |
return emails, None | |
except Exception as e: | |
return None, f"Error fetching emails: {str(e)}" | |
def summarize_text(summarizer, text, max_length=130, min_length=30): | |
"""Summarize text using the BART model""" | |
try: | |
# Clean and prepare text | |
text = text.replace('\n', ' ').strip() | |
if len(text) < min_length: # If text is too short, return as is | |
return text | |
summary = summarizer(text, | |
max_length=max_length, | |
min_length=min_length, | |
do_sample=False)[0]['summary_text'] | |
return summary | |
except Exception as e: | |
return f"Error summarizing text: {str(e)}" | |
def process_emails(email_address, app_password, num_emails=5): | |
"""Main function to process and summarize emails""" | |
try: | |
# Basic validation | |
if not email_address or not app_password: | |
return "Please provide both email address and password." | |
# Initialize summarizer | |
summarizer = create_summarizer() | |
# Connect to email | |
mail, error = connect_to_email(email_address, app_password) | |
if error: | |
return error | |
# Get emails | |
emails, error = get_emails(mail, num_emails) | |
if error: | |
return error | |
# Process summaries | |
output = "π§ Email Summaries:\n" + "=" * 50 + "\n" | |
for i, email_data in enumerate(emails, 1): | |
summary = summarize_text(summarizer, email_data['body']) | |
output += f"\nπ Email {i}:\n" | |
output += f"Subject: {email_data['subject']}\n" | |
output += f"Summary: {summary}\n" | |
output += "-" * 50 + "\n" | |
return output | |
except Exception as e: | |
return f"Error: {str(e)}\n{traceback.format_exc()}" | |
finally: | |
if 'mail' in locals(): | |
try: | |
mail.logout() | |
except: | |
pass | |
# Create Gradio interface | |
def create_interface(): | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# π§ Email Summarizer AI | |
This app connects to your email account and provides AI-powered summaries of your recent emails. | |
### Setup Instructions: | |
1. For Gmail users: | |
- Enable 2-Step Verification in your Google Account | |
- Generate an App Password: Google Account β Security β 2-Step Verification β App Passwords | |
- Use the generated App Password (not your regular password) | |
2. For other email providers: | |
- Make sure IMAP is enabled | |
- Use your regular email and password | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
email_input = gr.Textbox( | |
label="Email Address", | |
placeholder="[email protected]" | |
) | |
password_input = gr.Textbox( | |
label="App Password", | |
type="password", | |
placeholder="Enter your app password" | |
) | |
num_emails = gr.Slider( | |
label="Number of Emails to Summarize", | |
minimum=1, | |
maximum=20, | |
value=5, | |
step=1 | |
) | |
summarize_btn = gr.Button("π Summarize Emails", variant="primary") | |
with gr.Column(): | |
output = gr.Textbox( | |
label="Email Summaries", | |
lines=20, | |
placeholder="Summaries will appear here..." | |
) | |
summarize_btn.click( | |
fn=process_emails, | |
inputs=[email_input, password_input, num_emails], | |
outputs=output | |
) | |
gr.Markdown(""" | |
### π Privacy Notice | |
- Your credentials are only used for the current session | |
- No data is stored permanently | |
- All processing happens in real-time | |
""") | |
return demo | |
# Create and launch the interface | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch() |