import os import gradio as gr from sqlalchemy import text from smolagents import tool, CodeAgent, HfApiModel import spaces import pandas as pd from database import ( engine, create_dynamic_table, clear_database, insert_rows_into_table, get_table_schema ) def get_data_table(): """ Fetches all data from the current table and returns it as a Pandas DataFrame. """ try: # Get list of tables with engine.connect() as con: tables = con.execute(text( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" )).fetchall() if not tables: return pd.DataFrame() # Use the first table found table_name = tables[0][0] with engine.connect() as con: result = con.execute(text(f"SELECT * FROM {table_name}")) rows = result.fetchall() if not rows: return pd.DataFrame() columns = result.keys() df = pd.DataFrame(rows, columns=columns) return df except Exception as e: return pd.DataFrame({"Error": [str(e)]}) def process_sql_file(file_path): """ Process an SQL file and execute its contents. """ try: # Read the SQL file with open(file_path, 'r') as file: sql_content = file.read() # Replace AUTO_INCREMENT with AUTOINCREMENT for SQLite compatibility sql_content = sql_content.replace('AUTO_INCREMENT', 'AUTOINCREMENT') # Split into individual statements statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()] # Clear existing database clear_database() # Execute each statement with engine.begin() as conn: for statement in statements: if statement.strip(): conn.execute(text(statement)) return True, "SQL file successfully executed! Click 'Continue' to proceed to query interface..." except Exception as e: return False, f"Error processing SQL file: {str(e)}" def process_csv_file(file_path): """ Process a CSV file and load it into the database. """ try: # Read the CSV file df = pd.read_csv(file_path) if len(df.columns) == 0: return False, "Error: File contains no columns" # Clear existing database and create new table clear_database() table = create_dynamic_table(df) # Convert DataFrame to list of dictionaries and insert records = df.to_dict('records') insert_rows_into_table(records, table) return True, "CSV file successfully loaded! Click 'Continue' to proceed to query interface..." except Exception as e: return False, f"Error processing CSV file: {str(e)}" def process_uploaded_file(file): """ Process the uploaded file (either SQL or CSV). """ try: if file is None: return False, "Please upload a file." # Get file extension file_ext = os.path.splitext(file)[1].lower() if file_ext == '.sql': return process_sql_file(file) elif file_ext == '.csv': return process_csv_file(file) else: return False, "Error: Unsupported file type. Please upload either a .sql or .csv file." except Exception as e: return False, f"Error processing file: {str(e)}" @tool def sql_engine(query: str) -> str: """ Executes an SQL query and returns formatted results. Args: query: The SQL query string to execute on the database. Must be a valid SELECT query. Returns: str: The formatted query results as a string. """ try: with engine.connect() as con: rows = con.execute(text(query)).fetchall() if not rows: return "No results found." if len(rows) == 1 and len(rows[0]) == 1: return str(rows[0][0]) return "\n".join([", ".join(map(str, row)) for row in rows]) except Exception as e: return f"Error: {str(e)}" agent = CodeAgent( tools=[sql_engine], model=HfApiModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct"), ) def query_sql(user_query: str) -> str: """ Converts natural language input to an SQL query using CodeAgent. """ schema = get_table_schema() if not schema: return "Error: No data table exists. Please upload a file first." schema_info = ( f"The database has the following schema:\n" f"{schema}\n" "Generate a valid SQL SELECT query using ONLY these column names.\n" "DO NOT explain your reasoning, and DO NOT return anything other than the SQL query itself." ) generated_sql = agent.run(f"{schema_info} Convert this request into SQL: {user_query}") if not isinstance(generated_sql, str): return f"{generated_sql}" if not generated_sql.strip().lower().startswith(("select", "show", "pragma")): return "Error: Only SELECT queries are allowed." result = sql_engine(generated_sql) try: float_result = float(result) return f"{float_result:.2f}" except ValueError: return result def update_schema(): """ Updates the schema display. """ schema = get_table_schema() if schema: return f"### Current Schema:\n```\n{schema}\n```" return "No data loaded" # Create the Gradio interface with gr.Blocks() as demo: # Upload Interface Components with gr.Group() as upload_group: gr.Markdown(""" # Data Query Interface Upload your data file to begin. ### Supported File Types: - SQL (.sql): SQL file containing CREATE TABLE and INSERT statements - CSV (.csv): CSV file with headers that will be automatically converted to a table ### CSV Requirements: - Must include headers - First column will be used as the primary key - Column types will be automatically detected ### SQL Requirements: - Must contain valid SQL statements - Statements must be separated by semicolons - Should include CREATE TABLE and data insertion statements """) file_input = gr.File( label="Upload Data File", file_types=[".csv", ".sql"], type="filepath" ) upload_status = gr.Textbox(label="Status", interactive=False) continue_btn = gr.Button("Continue", visible=False) # Query Interface Components with gr.Group(visible=False) as query_group: gr.Markdown("## Data Query Interface") # Data Display Section gr.Markdown("### Current Data") data_table = gr.Dataframe( value=get_data_table(), label="Data Table", interactive=False ) schema_display = gr.Markdown(value="Loading schema...") # Query Section with gr.Row(): with gr.Column(): user_input = gr.Textbox( label="Ask a question about the data", placeholder="Enter your question here..." ) query_output = gr.Textbox( label="Result", interactive=False ) with gr.Row(): refresh_table_btn = gr.Button("Refresh Table") refresh_schema_btn = gr.Button("Refresh Schema") back_btn = gr.Button("Upload New File") def handle_upload(file): success, message = process_uploaded_file(file) if success: df = get_data_table() schema = get_table_schema() return { upload_status: message, continue_btn: gr.update(visible=True), data_table: df, schema_display: f"### Current Schema:\n```\n{schema}\n```" if schema else "No schema available" } return { upload_status: message, continue_btn: gr.update(visible=False) } def switch_to_query(): df = get_data_table() schema = get_table_schema() return { upload_group: gr.update(visible=False), query_group: gr.update(visible=True), data_table: df, schema_display: f"### Current Schema:\n```\n{schema}\n```" if schema else "No schema available" } def switch_to_upload(): return { upload_group: gr.update(visible=True), query_group: gr.update(visible=False), continue_btn: gr.update(visible=False), upload_status: gr.update(value="") } # Event handlers file_input.upload( fn=handle_upload, outputs=[ upload_status, continue_btn, data_table, schema_display ] ) continue_btn.click( fn=switch_to_query, outputs=[ upload_group, query_group, data_table, schema_display ] ) back_btn.click( fn=switch_to_upload, outputs=[ upload_group, query_group, continue_btn, upload_status ] ) user_input.change( fn=query_sql, inputs=[user_input], outputs=[query_output] ) refresh_table_btn.click( fn=get_data_table, outputs=[data_table] ) refresh_schema_btn.click( fn=update_schema, outputs=[schema_display] ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)