|
import streamlit as st
|
|
import pandas as pd
|
|
import sqlite3
|
|
import io
|
|
import base64
|
|
import os
|
|
from datetime import datetime
|
|
|
|
|
|
st.set_page_config(
|
|
page_title="Database Operations Tool",
|
|
page_icon="ποΈ",
|
|
layout="wide",
|
|
initial_sidebar_state="expanded"
|
|
)
|
|
|
|
|
|
st.markdown("""
|
|
<style>
|
|
.main {
|
|
padding: 1rem;
|
|
}
|
|
.stButton>button {
|
|
width: 100%;
|
|
background-color: #4CAF50;
|
|
color: white;
|
|
padding: 0.5rem;
|
|
border-radius: 5px;
|
|
border: none;
|
|
margin: 0.5rem 0;
|
|
}
|
|
.stButton>button:hover {
|
|
background-color: #45a049;
|
|
}
|
|
.reportview-container {
|
|
background: #fafafa;
|
|
}
|
|
.css-1d391kg {
|
|
padding: 1rem;
|
|
}
|
|
.stSelectbox {
|
|
margin: 1rem 0;
|
|
}
|
|
</style>
|
|
""", unsafe_allow_html=True)
|
|
|
|
def init_db():
|
|
"""Initialize SQLite database"""
|
|
if not os.path.exists('databases'):
|
|
os.makedirs('databases')
|
|
conn = sqlite3.connect('databases/main.db')
|
|
c = conn.cursor()
|
|
c.execute('''CREATE TABLE IF NOT EXISTS database_list
|
|
(name TEXT PRIMARY KEY, created_date TEXT)''')
|
|
conn.commit()
|
|
return conn
|
|
|
|
def get_database_names():
|
|
"""Get list of databases"""
|
|
conn = init_db()
|
|
c = conn.cursor()
|
|
c.execute("SELECT name FROM database_list")
|
|
databases = [row[0] for row in c.fetchall()]
|
|
conn.close()
|
|
return databases
|
|
|
|
def create_table(db_name, columns, column_types):
|
|
"""Create a new table with specified column types"""
|
|
conn = sqlite3.connect(f'databases/{db_name}.db')
|
|
c = conn.cursor()
|
|
columns_with_types = [f"{col} {type_}" for col, type_ in zip(columns, column_types)]
|
|
query = f'''CREATE TABLE IF NOT EXISTS data
|
|
({', '.join(columns_with_types)})'''
|
|
c.execute(query)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_table_data(db_name):
|
|
"""Get table data"""
|
|
conn = sqlite3.connect(f'databases/{db_name}.db')
|
|
return pd.read_sql_query("SELECT * FROM data", conn)
|
|
|
|
def get_columns(db_name):
|
|
"""Get column names and types"""
|
|
conn = sqlite3.connect(f'databases/{db_name}.db')
|
|
c = conn.cursor()
|
|
c.execute("PRAGMA table_info(data)")
|
|
columns = [(row[1], row[2]) for row in c.fetchall()]
|
|
conn.close()
|
|
return columns
|
|
|
|
def delete_rows(db_name, condition_col, condition_val):
|
|
"""Delete rows based on condition"""
|
|
conn = sqlite3.connect(f'databases/{db_name}.db')
|
|
c = conn.cursor()
|
|
c.execute(f"DELETE FROM data WHERE {condition_col} = ?", (condition_val,))
|
|
deleted_count = c.rowcount
|
|
conn.commit()
|
|
conn.close()
|
|
return deleted_count
|
|
|
|
def update_row(db_name, condition_col, condition_val, update_col, update_val):
|
|
"""Update row based on condition"""
|
|
conn = sqlite3.connect(f'databases/{db_name}.db')
|
|
c = conn.cursor()
|
|
c.execute(f"UPDATE data SET {update_col} = ? WHERE {condition_col} = ?",
|
|
(update_val, condition_val))
|
|
updated_count = c.rowcount
|
|
conn.commit()
|
|
conn.close()
|
|
return updated_count
|
|
|
|
def bulk_import_data(db_name, df):
|
|
"""Bulk import data from DataFrame"""
|
|
conn = sqlite3.connect(f'databases/{db_name}.db')
|
|
df.to_sql('data', conn, if_exists='append', index=False)
|
|
conn.close()
|
|
|
|
def main():
|
|
st.title("ποΈ Database Operations Tool")
|
|
|
|
|
|
with st.sidebar:
|
|
st.header("Database Operations")
|
|
|
|
|
|
with st.expander("Create New Database", expanded=True):
|
|
new_db_name = st.text_input("Database Name")
|
|
columns_input = st.text_input("Column Names (comma-separated)")
|
|
column_types = st.multiselect("Column Types",
|
|
["TEXT", "INTEGER", "REAL", "BLOB", "DATE"],
|
|
["TEXT"])
|
|
|
|
if st.button("Create Database"):
|
|
if new_db_name and columns_input and column_types:
|
|
try:
|
|
columns = [col.strip() for col in columns_input.split(",")]
|
|
if len(columns) != len(column_types):
|
|
st.error("β Number of columns and types must match!")
|
|
return
|
|
|
|
conn = init_db()
|
|
c = conn.cursor()
|
|
c.execute("INSERT INTO database_list VALUES (?, ?)",
|
|
(new_db_name, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
|
|
conn.commit()
|
|
conn.close()
|
|
create_table(new_db_name, columns, column_types)
|
|
st.success("β
Database created successfully!")
|
|
except sqlite3.IntegrityError:
|
|
st.error("β Database name already exists!")
|
|
else:
|
|
st.warning("β οΈ Please fill all fields")
|
|
|
|
|
|
databases = get_database_names()
|
|
if databases:
|
|
selected_db = st.selectbox("π Select Database", databases)
|
|
|
|
if selected_db:
|
|
df = get_table_data(selected_db)
|
|
columns = get_columns(selected_db)
|
|
|
|
tabs = st.tabs(["π Manage Data", "π Update Data", "ποΈ Delete Data", "π₯ Import/Export"])
|
|
|
|
|
|
with tabs[0]:
|
|
st.subheader("Data Preview")
|
|
st.dataframe(df, use_container_width=True)
|
|
|
|
st.subheader("Add New Row")
|
|
new_row_data = {}
|
|
for col, type_ in columns:
|
|
if type_ == 'DATE':
|
|
new_row_data[col] = st.date_input(f"Enter {col}")
|
|
elif type_ == 'INTEGER':
|
|
new_row_data[col] = st.number_input(f"Enter {col}", step=1)
|
|
elif type_ == 'REAL':
|
|
new_row_data[col] = st.number_input(f"Enter {col}", step=0.1)
|
|
else:
|
|
new_row_data[col] = st.text_input(f"Enter {col}")
|
|
|
|
if st.button("Add Row"):
|
|
if all(str(val) != "" for val in new_row_data.values()):
|
|
conn = sqlite3.connect(f'databases/{selected_db}.db')
|
|
c = conn.cursor()
|
|
placeholders = ','.join(['?' for _ in columns])
|
|
query = f"INSERT INTO data VALUES ({placeholders})"
|
|
c.execute(query, list(new_row_data.values()))
|
|
conn.commit()
|
|
conn.close()
|
|
st.success("β
Row added!")
|
|
st.rerun()
|
|
|
|
|
|
with tabs[1]:
|
|
st.subheader("Update Records")
|
|
col1, col2 = st.columns(2)
|
|
|
|
with col1:
|
|
condition_col = st.selectbox("Select Column for Condition",
|
|
[col for col, _ in columns])
|
|
condition_val = st.text_input("Enter Value to Match")
|
|
|
|
with col2:
|
|
update_col = st.selectbox("Select Column to Update",
|
|
[col for col, _ in columns])
|
|
update_val = st.text_input("Enter New Value")
|
|
|
|
if st.button("Update Records"):
|
|
if condition_val and update_val:
|
|
updated = update_row(selected_db, condition_col,
|
|
condition_val, update_col, update_val)
|
|
st.success(f"β
Updated {updated} records!")
|
|
st.rerun()
|
|
|
|
|
|
with tabs[2]:
|
|
st.subheader("Delete Records")
|
|
del_col = st.selectbox("Select Column for Deletion Condition",
|
|
[col for col, _ in columns])
|
|
del_val = st.text_input("Enter Value to Delete")
|
|
|
|
if st.button("Delete Records"):
|
|
if del_val:
|
|
deleted = delete_rows(selected_db, del_col, del_val)
|
|
st.success(f"β
Deleted {deleted} records!")
|
|
st.rerun()
|
|
|
|
|
|
with tabs[3]:
|
|
st.subheader("Import Data")
|
|
uploaded_file = st.file_uploader("Choose a CSV file", type="csv")
|
|
if uploaded_file is not None:
|
|
import_df = pd.read_csv(uploaded_file)
|
|
if st.button("Import Data"):
|
|
bulk_import_data(selected_db, import_df)
|
|
st.success("β
Data imported successfully!")
|
|
st.rerun()
|
|
|
|
st.subheader("Export Data")
|
|
export_format = st.selectbox("Select Format",
|
|
["CSV", "Excel", "JSON"])
|
|
|
|
if export_format == "CSV":
|
|
csv = df.to_csv(index=False)
|
|
st.download_button(
|
|
label="π₯ Download CSV",
|
|
data=csv,
|
|
file_name=f"{selected_db}.csv",
|
|
mime="text/csv"
|
|
)
|
|
elif export_format == "Excel":
|
|
buffer = io.BytesIO()
|
|
df.to_excel(buffer, index=False)
|
|
st.download_button(
|
|
label="π₯ Download Excel",
|
|
data=buffer.getvalue(),
|
|
file_name=f"{selected_db}.xlsx",
|
|
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
|
)
|
|
else:
|
|
json_str = df.to_json(orient='records')
|
|
st.download_button(
|
|
label="π₯ Download JSON",
|
|
data=json_str,
|
|
file_name=f"{selected_db}.json",
|
|
mime="application/json"
|
|
)
|
|
else:
|
|
st.info("π Welcome! Start by creating a new database using the sidebar.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |