import os import gradio as gr from sqlalchemy import text from smolagents import CodeAgent, HfApiModel import pandas as pd from io import StringIO import tempfile from database import ( engine, create_dynamic_table, clear_database, insert_rows_into_table ) # Initialize the AI agent agent = CodeAgent( tools=[], model=HfApiModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct"), ) def get_data_table(): """Fetch and return the current table data as DataFrame""" try: with engine.connect() as con: tables = con.execute(text( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" )).fetchall() if not tables: return pd.DataFrame() table_name = tables[0][0] with engine.connect() as con: result = con.execute(text(f"SELECT * FROM {table_name}")) rows = result.fetchall() columns = result.keys() return pd.DataFrame(rows, columns=columns) if rows else pd.DataFrame() except Exception as e: return pd.DataFrame({"Error": [str(e)]}) def process_txt_file(file_path): """Analyze text file and convert to structured table""" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() structure_prompt = f""" Convert this text into valid CSV format: {content} Requirements: 1. First row must be headers 2. Consistent columns per row 3. Quote fields containing commas 4. Maintain original data relationships Return ONLY the CSV content. """ csv_output = agent.run(structure_prompt) try: df = pd.read_csv( StringIO(csv_output), on_bad_lines='warn', dtype=str, encoding_errors='ignore' ).dropna(how='all') except pd.errors.ParserError as pe: return False, f"CSV Parsing Error: {str(pe)}", pd.DataFrame() if df.empty or len(df.columns) == 0: return False, "No structured data found", pd.DataFrame() clear_database() table = create_dynamic_table(df) insert_rows_into_table(df.to_dict('records'), table) return True, "Text analyzed successfully!", df.head(10) except Exception as e: return False, f"Processing error: {str(e)}", pd.DataFrame() def handle_upload(file_obj): """Handle file upload and processing""" if file_obj is None: return [ "Please upload a text file.", None, "No schema", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) ] success, message, df = process_txt_file(file_obj) if success: schema = "\n".join([f"- {col} (text)" for col in df.columns]) return [ message, df, f"### Detected Schema:\n```\n{schema}\n```", gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) ] return [ message, None, "No schema", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) ] def query_analysis(user_query: str) -> str: """Handle natural language queries about the data""" try: df = get_data_table() if df.empty: return "Please upload and process a file first." analysis_prompt = f""" Analyze this data: {df.head().to_csv()} Question: {user_query} Provide: 1. Direct answer 2. Numerical formatting 3. Data references Use Markdown formatting. """ return agent.run(analysis_prompt) except Exception as e: return f"Query error: {str(e)}" def download_csv(): """Generate CSV file for download""" df = get_data_table() if not df.empty: temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, "processed_data.csv") df.to_csv(file_path, index=False) return file_path return None # Gradio interface setup with gr.Blocks() as demo: with gr.Group() as upload_group: gr.Markdown(""" # Text Data Analyzer Upload unstructured text files to analyze and query their data """) file_input = gr.File( label="Upload Text File", file_types=[".txt"], type="filepath" ) status = gr.Textbox(label="Processing Status", interactive=False) with gr.Group(visible=False) as query_group: with gr.Row(): with gr.Column(scale=1): with gr.Row(): user_input = gr.Textbox(label="Ask about the data", scale=4) submit_btn = gr.Button("Submit", scale=1) query_output = gr.Markdown(label="Analysis Results") with gr.Column(scale=2): gr.Markdown("### Extracted Data Preview") data_table = gr.Dataframe( label="Structured Data", interactive=False ) download_btn = gr.DownloadButton( "Download as CSV", visible=False ) schema_display = gr.Markdown() refresh_btn = gr.Button("Refresh View") # Event handlers file_input.upload( fn=handle_upload, inputs=file_input, outputs=[status, data_table, schema_display, upload_group, query_group, download_btn] ) submit_btn.click( fn=query_analysis, inputs=user_input, outputs=query_output ) user_input.submit( fn=query_analysis, inputs=user_input, outputs=query_output ) refresh_btn.click( fn=lambda: (get_data_table().head(10), "Schema refreshed"), outputs=[data_table, schema_display] ) download_btn.click( fn=download_csv, outputs=download_btn ) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True )