|
import os |
|
import gradio as gr |
|
from sqlalchemy import text |
|
from smolagents import CodeAgent, HfApiModel |
|
import pandas as pd |
|
from io import StringIO |
|
import tempfile |
|
from database import ( |
|
engine, |
|
create_dynamic_table, |
|
clear_database, |
|
insert_rows_into_table |
|
) |
|
|
|
|
|
agent = CodeAgent( |
|
tools=[], |
|
model=HfApiModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct"), |
|
) |
|
|
|
def get_data_table(): |
|
"""Fetch and return the current table data as DataFrame""" |
|
try: |
|
with engine.connect() as con: |
|
tables = con.execute(text( |
|
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" |
|
)).fetchall() |
|
|
|
if not tables: |
|
return pd.DataFrame() |
|
|
|
table_name = tables[0][0] |
|
|
|
with engine.connect() as con: |
|
result = con.execute(text(f"SELECT * FROM {table_name}")) |
|
rows = result.fetchall() |
|
columns = result.keys() |
|
|
|
return pd.DataFrame(rows, columns=columns) if rows else pd.DataFrame() |
|
|
|
except Exception as e: |
|
return pd.DataFrame({"Error": [str(e)]}) |
|
|
|
def process_txt_file(file_path): |
|
"""Analyze text file and convert to structured table""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
|
content = f.read() |
|
|
|
structure_prompt = f""" |
|
Convert this text into valid CSV format: |
|
{content} |
|
|
|
Requirements: |
|
1. First row must be headers |
|
2. Consistent columns per row |
|
3. Quote fields containing commas |
|
4. Maintain original data relationships |
|
|
|
Return ONLY the CSV content. |
|
""" |
|
csv_output = agent.run(structure_prompt) |
|
|
|
try: |
|
df = pd.read_csv( |
|
StringIO(csv_output), |
|
on_bad_lines='warn', |
|
dtype=str, |
|
encoding_errors='ignore' |
|
).dropna(how='all') |
|
except pd.errors.ParserError as pe: |
|
return False, f"CSV Parsing Error: {str(pe)}", pd.DataFrame() |
|
|
|
if df.empty or len(df.columns) == 0: |
|
return False, "No structured data found", pd.DataFrame() |
|
|
|
clear_database() |
|
table = create_dynamic_table(df) |
|
insert_rows_into_table(df.to_dict('records'), table) |
|
|
|
return True, "Text analyzed successfully!", df.head(10) |
|
|
|
except Exception as e: |
|
return False, f"Processing error: {str(e)}", pd.DataFrame() |
|
|
|
def handle_upload(file_obj): |
|
"""Handle file upload and processing""" |
|
if file_obj is None: |
|
return [ |
|
"Please upload a text file.", |
|
None, |
|
"No schema", |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=False) |
|
] |
|
|
|
success, message, df = process_txt_file(file_obj) |
|
if success: |
|
schema = "\n".join([f"- {col} (text)" for col in df.columns]) |
|
return [ |
|
message, |
|
df, |
|
f"### Detected Schema:\n```\n{schema}\n```", |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
gr.update(visible=True) |
|
] |
|
return [ |
|
message, |
|
None, |
|
"No schema", |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=False) |
|
] |
|
|
|
def query_analysis(user_query: str) -> str: |
|
"""Handle natural language queries about the data""" |
|
try: |
|
df = get_data_table() |
|
if df.empty: |
|
return "Please upload and process a file first." |
|
|
|
analysis_prompt = f""" |
|
Analyze this data: |
|
{df.head().to_csv()} |
|
|
|
Question: {user_query} |
|
|
|
Provide: |
|
1. Direct answer |
|
2. Numerical formatting |
|
3. Data references |
|
|
|
Use Markdown formatting. |
|
""" |
|
|
|
return agent.run(analysis_prompt) |
|
|
|
except Exception as e: |
|
return f"Query error: {str(e)}" |
|
|
|
def download_csv(): |
|
"""Generate CSV file for download""" |
|
df = get_data_table() |
|
if not df.empty: |
|
temp_dir = tempfile.gettempdir() |
|
file_path = os.path.join(temp_dir, "processed_data.csv") |
|
df.to_csv(file_path, index=False) |
|
return file_path |
|
return None |
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Group() as upload_group: |
|
gr.Markdown(""" |
|
# Text Data Analyzer |
|
Upload unstructured text files to analyze and query their data |
|
""") |
|
file_input = gr.File( |
|
label="Upload Text File", |
|
file_types=[".txt"], |
|
type="filepath" |
|
) |
|
status = gr.Textbox(label="Processing Status", interactive=False) |
|
|
|
with gr.Group(visible=False) as query_group: |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
with gr.Row(): |
|
user_input = gr.Textbox(label="Ask about the data", scale=4) |
|
submit_btn = gr.Button("Submit", scale=1) |
|
query_output = gr.Markdown(label="Analysis Results") |
|
with gr.Column(scale=2): |
|
gr.Markdown("### Extracted Data Preview") |
|
data_table = gr.Dataframe( |
|
label="Structured Data", |
|
interactive=False |
|
) |
|
download_btn = gr.DownloadButton( |
|
"Download as CSV", |
|
visible=False |
|
) |
|
schema_display = gr.Markdown() |
|
refresh_btn = gr.Button("Refresh View") |
|
|
|
|
|
file_input.upload( |
|
fn=handle_upload, |
|
inputs=file_input, |
|
outputs=[status, data_table, schema_display, upload_group, query_group, download_btn] |
|
) |
|
|
|
submit_btn.click( |
|
fn=query_analysis, |
|
inputs=user_input, |
|
outputs=query_output |
|
) |
|
|
|
user_input.submit( |
|
fn=query_analysis, |
|
inputs=user_input, |
|
outputs=query_output |
|
) |
|
|
|
refresh_btn.click( |
|
fn=lambda: (get_data_table().head(10), "Schema refreshed"), |
|
outputs=[data_table, schema_display] |
|
) |
|
|
|
download_btn.click( |
|
fn=download_csv, |
|
outputs=download_btn |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True |
|
) |