|
import gradio as gr
|
|
import logging
|
|
import sqlite3
|
|
from typing import List, Dict
|
|
import os
|
|
import zipfile
|
|
import tempfile
|
|
import shutil
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
db = None
|
|
|
|
|
|
class DatabaseError(Exception):
|
|
pass
|
|
|
|
|
|
|
|
def fetch_items_by_keyword(search_query: str) -> List[Dict]:
|
|
try:
|
|
with db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT m.id, m.title, m.url
|
|
FROM Media m
|
|
JOIN MediaKeywords mk ON m.id = mk.media_id
|
|
JOIN Keywords k ON mk.keyword_id = k.id
|
|
WHERE k.keyword LIKE ?
|
|
""", (f'%{search_query}%',))
|
|
results = cursor.fetchall()
|
|
return [{"id": r[0], "title": r[1], "url": r[2]} for r in results]
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error fetching items by keyword: {e}")
|
|
raise DatabaseError(f"Error fetching items by keyword: {e}")
|
|
|
|
|
|
def fetch_item_details(media_id: int) -> tuple:
|
|
try:
|
|
with db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT prompt, summary
|
|
FROM MediaModifications
|
|
WHERE media_id = ?
|
|
ORDER BY modification_date DESC
|
|
LIMIT 1
|
|
""", (media_id,))
|
|
prompt_summary_result = cursor.fetchone()
|
|
cursor.execute("SELECT content FROM Media WHERE id = ?", (media_id,))
|
|
content_result = cursor.fetchone()
|
|
|
|
prompt = prompt_summary_result[0] if prompt_summary_result else ""
|
|
summary = prompt_summary_result[1] if prompt_summary_result else ""
|
|
content = content_result[0] if content_result else ""
|
|
|
|
return content, prompt, summary
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error fetching item details: {e}")
|
|
return "", "", ""
|
|
|
|
|
|
def browse_items(search_query: str, search_type: str) -> List[Dict]:
|
|
try:
|
|
with db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if search_type == 'Title':
|
|
cursor.execute("SELECT id, title, url FROM Media WHERE title LIKE ?", (f'%{search_query}%',))
|
|
elif search_type == 'URL':
|
|
cursor.execute("SELECT id, title, url FROM Media WHERE url LIKE ?", (f'%{search_query}%',))
|
|
elif search_type == 'Keyword':
|
|
return fetch_items_by_keyword(search_query)
|
|
elif search_type == 'Content':
|
|
cursor.execute("SELECT id, title, url FROM Media WHERE content LIKE ?", (f'%{search_query}%',))
|
|
else:
|
|
raise ValueError(f"Invalid search type: {search_type}")
|
|
|
|
results = cursor.fetchall()
|
|
return [{"id": r[0], "title": r[1], "url": r[2]} for r in results]
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error fetching items by {search_type}: {e}")
|
|
raise DatabaseError(f"Error fetching items by {search_type}: {e}")
|
|
|
|
|
|
|
|
def export_item_as_markdown(media_id: int) -> str:
|
|
try:
|
|
content, prompt, summary = fetch_item_details(media_id)
|
|
title = f"Item {media_id}"
|
|
markdown_content = f"# {title}\n\n## Prompt\n{prompt}\n\n## Summary\n{summary}\n\n## Content\n{content}"
|
|
|
|
filename = f"export_item_{media_id}.md"
|
|
with open(filename, "w", encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
logger.info(f"Successfully exported item {media_id} to {filename}")
|
|
return filename
|
|
except Exception as e:
|
|
logger.error(f"Error exporting item {media_id}: {str(e)}")
|
|
return None
|
|
|
|
|
|
def export_items_by_keyword(keyword: str) -> str:
|
|
try:
|
|
items = fetch_items_by_keyword(keyword)
|
|
if not items:
|
|
logger.warning(f"No items found for keyword: {keyword}")
|
|
return None
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
folder_name = f"export_keyword_{keyword}"
|
|
export_folder = os.path.join(temp_dir, folder_name)
|
|
os.makedirs(export_folder)
|
|
|
|
for item in items:
|
|
content, prompt, summary = fetch_item_details(item['id'])
|
|
markdown_content = f"# {item['title']}\n\n## Prompt\n{prompt}\n\n## Summary\n{summary}\n\n## Content\n{content}"
|
|
|
|
|
|
file_name = f"{item['id']}_{item['title'][:50]}.md"
|
|
file_path = os.path.join(export_folder, file_name)
|
|
with open(file_path, "w", encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
|
|
zip_filename = f"{folder_name}.zip"
|
|
shutil.make_archive(os.path.join(temp_dir, folder_name), 'zip', export_folder)
|
|
|
|
|
|
final_zip_path = os.path.join(os.getcwd(), zip_filename)
|
|
shutil.move(os.path.join(temp_dir, zip_filename), final_zip_path)
|
|
|
|
logger.info(f"Successfully exported {len(items)} items for keyword '{keyword}' to {zip_filename}")
|
|
return final_zip_path
|
|
except Exception as e:
|
|
logger.error(f"Error exporting items for keyword '{keyword}': {str(e)}")
|
|
return None
|
|
|
|
|
|
def export_selected_items(selected_items: List[Dict]) -> str:
|
|
try:
|
|
if not selected_items:
|
|
logger.warning("No items selected for export")
|
|
return None
|
|
|
|
markdown_content = "# Selected Items\n\n"
|
|
for item in selected_items:
|
|
content, prompt, summary = fetch_item_details(item['id'])
|
|
markdown_content += f"## {item['title']}\n\n### Prompt\n{prompt}\n\n### Summary\n{summary}\n\n### Content\n{content}\n\n---\n\n"
|
|
|
|
filename = "export_selected_items.md"
|
|
with open(filename, "w", encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
logger.info(f"Successfully exported {len(selected_items)} selected items to {filename}")
|
|
return filename
|
|
except Exception as e:
|
|
logger.error(f"Error exporting selected items: {str(e)}")
|
|
return None
|
|
|
|
|
|
|
|
def display_search_results(search_query: str, search_type: str) -> List[Dict]:
|
|
try:
|
|
results = browse_items(search_query, search_type)
|
|
return [{"name": f"{item['title']} ({item['url']})", "value": item} for item in results]
|
|
except DatabaseError as e:
|
|
logger.error(f"Error in display_search_results: {str(e)}")
|
|
return []
|
|
|
|
|
|
|
|
with gr.Blocks() as demo:
|
|
gr.Markdown("# Content Export Interface")
|
|
|
|
with gr.Tab("Search and Export"):
|
|
search_query = gr.Textbox(label="Search Query")
|
|
search_type = gr.Radio(["Title", "URL", "Keyword", "Content"], label="Search By")
|
|
search_button = gr.Button("Search")
|
|
|
|
search_results = gr.CheckboxGroup(label="Search Results")
|
|
export_selected_button = gr.Button("Export Selected Items")
|
|
|
|
keyword_input = gr.Textbox(label="Enter keyword for export")
|
|
export_by_keyword_button = gr.Button("Export items by keyword")
|
|
|
|
export_output = gr.File(label="Download Exported File")
|
|
|
|
error_output = gr.Textbox(label="Status/Error Messages", interactive=False)
|
|
|
|
search_button.click(
|
|
fn=display_search_results,
|
|
inputs=[search_query, search_type],
|
|
outputs=[search_results, error_output]
|
|
)
|
|
|
|
export_selected_button.click(
|
|
fn=lambda selected: (export_selected_items(selected), "Exported selected items") if selected else (
|
|
None, "No items selected"),
|
|
inputs=[search_results],
|
|
outputs=[export_output, error_output]
|
|
)
|
|
|
|
export_by_keyword_button.click(
|
|
fn=lambda keyword: (
|
|
export_items_by_keyword(keyword), f"Exported items for keyword: {keyword}") if keyword else (
|
|
None, "No keyword provided"),
|
|
inputs=[keyword_input],
|
|
outputs=[export_output, error_output]
|
|
)
|
|
|
|
|
|
search_results.select(
|
|
fn=lambda item: (export_item_as_markdown(item['id']), f"Exported item: {item['title']}") if item else (
|
|
None, "No item selected"),
|
|
inputs=[gr.State(lambda: search_results.value)],
|
|
outputs=[export_output, error_output]
|
|
)
|
|
|
|
demo.launch()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|