File size: 7,319 Bytes
305b47e
 
 
3f02f09
305b47e
 
 
3f02f09
 
 
 
 
 
 
 
 
 
305b47e
 
 
 
 
 
 
 
 
 
3f02f09
 
 
 
305b47e
3f02f09
 
 
 
 
 
 
 
 
 
 
 
 
 
 
305b47e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f02f09
 
 
 
 
305b47e
3f02f09
 
 
 
305b47e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f02f09
 
 
 
305b47e
 
 
 
 
 
 
 
 
 
 
 
 
 
3f02f09
305b47e
 
3f02f09
305b47e
 
 
 
 
 
 
 
 
 
3f02f09
 
 
 
305b47e
 
 
 
3f02f09
305b47e
3f02f09
305b47e
3f02f09
305b47e
3f02f09
 
 
 
 
 
 
 
 
305b47e
 
 
3f02f09
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
305b47e
 
 
 
 
 
 
3f02f09
 
 
 
 
 
 
305b47e
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# app.py
import gradio as gr
from transformers import pipeline
import imaplib  # Using built-in imaplib instead of imaplib-ng
import email
from email.header import decode_header
import traceback
from email_validator import validate_email, EmailNotValidError


def validate_email_address(email_address):
    """Validate email address format"""
    try:
        validate_email(email_address)
        return True
    except EmailNotValidError:
        return False


def create_summarizer():
    """Initialize the summarization pipeline"""
    return pipeline("summarization", model="facebook/bart-large-cnn")


def connect_to_email(email_address, app_password, imap_server="imap.gmail.com"):
    """Connect to email server using IMAP"""
    try:
        if not validate_email_address(email_address):
            return None, "Invalid email address format"

        print(f"Attempting to connect to {imap_server}...")
        mail = imaplib.IMAP4_SSL(imap_server)
        print("Connection established, attempting login...")

        try:
            mail.login(email_address, app_password)
            print("Login successful!")
            return mail, None
        except imaplib.IMAP4.error as e:
            if "Invalid credentials" in str(e):
                return None, "Invalid email or password. For Gmail, make sure you're using an App Password and have 2-Step Verification enabled."
            elif "IMAP not enabled" in str(e):
                return None, "IMAP is not enabled for this account. Please enable IMAP in your email settings."
            else:
                return None, f"Login error: {str(e)}"
    except imaplib.IMAP4.error as e:
        return None, f"IMAP error: {str(e)}"
    except Exception as e:
        return None, f"Error connecting to email: {str(e)}"


def get_emails(mail, num_emails=5):
    """Fetch recent emails from INBOX"""
    try:
        mail.select("INBOX")
        _, messages = mail.search(None, "ALL")
        email_ids = messages[0].split()

        # Get the last n email IDs
        recent_emails = email_ids[-num_emails:]

        emails = []
        for email_id in recent_emails:
            _, msg = mail.fetch(email_id, "(RFC822)")
            email_body = msg[0][1]
            email_message = email.message_from_bytes(email_body)

            subject = decode_header(email_message["Subject"])[0][0]
            if isinstance(subject, bytes):
                subject = subject.decode()

            # Get email body
            body = ""
            if email_message.is_multipart():
                for part in email_message.walk():
                    if part.get_content_type() == "text/plain":
                        try:
                            body = part.get_payload(decode=True).decode()
                            break
                        except:
                            continue
            else:
                try:
                    body = email_message.get_payload(decode=True).decode()
                except:
                    body = email_message.get_payload()

            emails.append({
                'subject': subject,
                'body': body
            })

        return emails, None
    except Exception as e:
        return None, f"Error fetching emails: {str(e)}"


def summarize_text(summarizer, text, max_length=130, min_length=30):
    """Summarize text using the BART model"""
    try:
        # Clean and prepare text
        text = text.replace('\n', ' ').strip()
        if len(text) < min_length:  # If text is too short, return as is
            return text

        summary = summarizer(text,
                             max_length=max_length,
                             min_length=min_length,
                             do_sample=False)[0]['summary_text']
        return summary
    except Exception as e:
        return f"Error summarizing text: {str(e)}"


def process_emails(email_address, app_password, num_emails=5):
    """Main function to process and summarize emails"""
    try:
        # Basic validation
        if not email_address or not app_password:
            return "Please provide both email address and password."

        # Initialize summarizer
        summarizer = create_summarizer()

        # Connect to email
        mail, error = connect_to_email(email_address, app_password)
        if error:
            return error

        # Get emails
        emails, error = get_emails(mail, num_emails)
        if error:
            return error

        # Process summaries
        output = "πŸ“§ Email Summaries:\n" + "=" * 50 + "\n"
        for i, email_data in enumerate(emails, 1):
            summary = summarize_text(summarizer, email_data['body'])
            output += f"\nπŸ“Œ Email {i}:\n"
            output += f"Subject: {email_data['subject']}\n"
            output += f"Summary: {summary}\n"
            output += "-" * 50 + "\n"

        return output

    except Exception as e:
        return f"Error: {str(e)}\n{traceback.format_exc()}"
    finally:
        if 'mail' in locals():
            try:
                mail.logout()
            except:
                pass


# Create Gradio interface
def create_interface():
    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("""
        # πŸ“§ Email Summarizer AI

        This app connects to your email account and provides AI-powered summaries of your recent emails.

        ### Setup Instructions:
        1. For Gmail users:
           - Enable 2-Step Verification in your Google Account
           - Generate an App Password: Google Account β†’ Security β†’ 2-Step Verification β†’ App Passwords
           - Use the generated App Password (not your regular password)

        2. For other email providers:
           - Make sure IMAP is enabled
           - Use your regular email and password
        """)

        with gr.Row():
            with gr.Column():
                email_input = gr.Textbox(
                    label="Email Address",
                    placeholder="[email protected]"
                )
                password_input = gr.Textbox(
                    label="App Password",
                    type="password",
                    placeholder="Enter your app password"
                )
                num_emails = gr.Slider(
                    label="Number of Emails to Summarize",
                    minimum=1,
                    maximum=20,
                    value=5,
                    step=1
                )
                summarize_btn = gr.Button("πŸš€ Summarize Emails", variant="primary")

            with gr.Column():
                output = gr.Textbox(
                    label="Email Summaries",
                    lines=20,
                    placeholder="Summaries will appear here..."
                )

        summarize_btn.click(
            fn=process_emails,
            inputs=[email_input, password_input, num_emails],
            outputs=output
        )

        gr.Markdown("""
        ### πŸ”’ Privacy Notice
        - Your credentials are only used for the current session
        - No data is stored permanently
        - All processing happens in real-time
        """)

    return demo


# Create and launch the interface
if __name__ == "__main__":
    demo = create_interface()
    demo.launch()