|
import os |
|
import gradio as gr |
|
from sqlalchemy import text |
|
from smolagents import tool, CodeAgent, HfApiModel |
|
import pandas as pd |
|
from io import StringIO |
|
from database import ( |
|
engine, |
|
create_dynamic_table, |
|
clear_database, |
|
insert_rows_into_table, |
|
get_table_schema |
|
) |
|
|
|
|
|
agent = CodeAgent( |
|
tools=[], |
|
model=HfApiModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct"), |
|
) |
|
|
|
def get_data_table(): |
|
"""Fetch and return the current table data as DataFrame""" |
|
try: |
|
with engine.connect() as con: |
|
tables = con.execute(text( |
|
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" |
|
)).fetchall() |
|
|
|
if not tables: |
|
return pd.DataFrame() |
|
|
|
table_name = tables[0][0] |
|
|
|
with engine.connect() as con: |
|
result = con.execute(text(f"SELECT * FROM {table_name}")) |
|
rows = result.fetchall() |
|
columns = result.keys() |
|
|
|
return pd.DataFrame(rows, columns=columns) if rows else pd.DataFrame() |
|
|
|
except Exception as e: |
|
return pd.DataFrame({"Error": [str(e)]}) |
|
|
|
def process_txt_file(file_path): |
|
"""Analyze text file and convert to structured table""" |
|
try: |
|
with open(file_path, 'r') as f: |
|
content = f.read() |
|
|
|
|
|
structure_prompt = f""" |
|
Analyze this text and convert it into a structured table format: |
|
{content} |
|
|
|
Return ONLY valid CSV format with appropriate headers. |
|
Maintain original data types and relationships. |
|
""" |
|
csv_output = agent.run(structure_prompt) |
|
|
|
|
|
df = pd.read_csv(StringIO(csv_output)) |
|
|
|
|
|
validation_prompt = f""" |
|
Validate this structured data: |
|
{df.head().to_csv()} |
|
|
|
Fix any formatting issues and return corrected CSV. |
|
""" |
|
corrected_csv = agent.run(validation_prompt) |
|
df = pd.read_csv(StringIO(corrected_csv)) |
|
|
|
|
|
clear_database() |
|
table = create_dynamic_table(df) |
|
insert_rows_into_table(df.to_dict('records'), table) |
|
|
|
return True, "Text analyzed successfully!", df |
|
|
|
except Exception as e: |
|
return False, f"Error: {str(e)}", pd.DataFrame() |
|
|
|
def handle_upload(file_obj): |
|
"""Handle file upload and processing""" |
|
if file_obj is None: |
|
return "Please upload a text file.", None, "No schema", gr.update(visible=True), gr.update(visible=False) |
|
|
|
success, message, df = process_txt_file(file_obj) |
|
if success: |
|
column_info = {col: {'type': str(df[col].dtype)} for col in df.columns} |
|
schema = "\n".join([f"- {col} ({info['type']})" for col, info in column_info.items()]) |
|
return ( |
|
message, |
|
df, |
|
f"### Detected Schema:\n```\n{schema}\n```", |
|
gr.update(visible=False), |
|
gr.update(visible=True) |
|
) |
|
return message, None, "No schema", gr.update(visible=True), gr.update(visible=False) |
|
|
|
def query_analysis(user_query: str) -> str: |
|
"""Handle natural language queries about the data""" |
|
try: |
|
df = get_data_table() |
|
if df.empty: |
|
return "No data available. Upload a text file first." |
|
|
|
analysis_prompt = f""" |
|
Analyze this dataset: |
|
{df.head().to_csv()} |
|
|
|
Question: {user_query} |
|
|
|
Provide a detailed answer considering: |
|
- Data patterns and relationships |
|
- Statistical measures where applicable |
|
- Clear numerical formatting |
|
- Natural language explanations |
|
|
|
Structure your response with: |
|
1. Direct answer first |
|
2. Supporting analysis |
|
3. Data references |
|
""" |
|
|
|
return agent.run(analysis_prompt) |
|
|
|
except Exception as e: |
|
return f"Analysis error: {str(e)}" |
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Group() as upload_group: |
|
gr.Markdown(""" |
|
# Text Data Analyzer |
|
Upload any text document containing structured information: |
|
- Reports |
|
- Log files |
|
- Research data |
|
- Meeting notes with tabular content |
|
""") |
|
file_input = gr.File(label="Upload Text File", file_types=[".txt"], type="filepath") |
|
status = gr.Textbox(label="Processing Status", interactive=False) |
|
|
|
with gr.Group(visible=False) as query_group: |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
user_input = gr.Textbox(label="Ask about the data") |
|
query_output = gr.Markdown(label="Analysis Results") |
|
with gr.Column(scale=2): |
|
gr.Markdown("### Extracted Data Preview") |
|
data_table = gr.Dataframe(interactive=False) |
|
schema_display = gr.Markdown() |
|
refresh_btn = gr.Button("Refresh View") |
|
|
|
|
|
file_input.upload( |
|
fn=handle_upload, |
|
inputs=file_input, |
|
outputs=[status, data_table, schema_display, upload_group, query_group] |
|
) |
|
|
|
user_input.submit( |
|
fn=query_analysis, |
|
inputs=user_input, |
|
outputs=query_output |
|
) |
|
|
|
refresh_btn.click( |
|
fn=lambda: (get_data_table(), "Schema refreshed"), |
|
outputs=[data_table, schema_display] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |