Spaces:
Running
Running
import os | |
import time | |
import asyncio | |
import logging | |
import streamlit as st | |
from markdown_pdf import MarkdownPdf, Section | |
from utils.file_utils import load_prompt, save_intermediate_output | |
from utils.llm_utils import get_generation_model, async_generate_text, upload_to_gemini, wait_for_files_active | |
logger = logging.getLogger(__name__) | |
# Get model configuration (using the "thinking" variant) | |
default_model_name, default_generation_config = get_generation_model("thinking") | |
def create_comparative_table_prompt(structured_outputs, table_base_prompt): | |
""" | |
Create a prompt to generate a comparative table. | |
This internal prompt is not shown to the user. | |
""" | |
prompt = "Structured Outputs from the PDF Papers:\n" | |
for paper_id, content in structured_outputs.items(): | |
prompt += f"\nPaper: {paper_id}\n-------\n{content}\n" | |
prompt += "\n" + table_base_prompt + "\n" | |
return prompt | |
async def generate_comparative_table(structured_outputs): | |
""" | |
Generate the comparative table (hidden from the user). | |
""" | |
papers_table_prompt_path = os.path.join("prompts", "papers_table.prompt") | |
table_base_prompt = load_prompt(papers_table_prompt_path) | |
table_prompt = create_comparative_table_prompt(structured_outputs, table_base_prompt) | |
logger.info("Generating dynamic comparative table...") | |
table_output = await async_generate_text( | |
table_prompt, | |
model_name=default_model_name, | |
generation_config=default_generation_config | |
) | |
return table_output | |
async def process_single_pdf(file_obj, elements_prompt): | |
""" | |
Process a single PDF: save it locally, upload it, wait for processing, | |
and extract its structured output. | |
""" | |
pdf_basename = file_obj.name | |
temp_pdf_path = os.path.join("promp_tmp", pdf_basename) | |
with open(temp_pdf_path, "wb") as f: | |
f.write(file_obj.getbuffer()) | |
st.toast(f"Uploading and processing {pdf_basename}...") | |
logger.info(f"Processing {pdf_basename}...") | |
# Upload the file and wait until it's active. | |
uploaded_file = upload_to_gemini(temp_pdf_path, mime_type="application/pdf") | |
wait_for_files_active([uploaded_file]) | |
st.toast(f"Extracting content from {pdf_basename}...") | |
result = await async_generate_text( | |
elements_prompt, | |
pdf_file=uploaded_file, # NOTE: using 'pdf_file' to match the expected parameter | |
model_name=default_model_name, | |
generation_config=default_generation_config | |
) | |
logger.info(f"Completed extraction for {pdf_basename}") | |
return pdf_basename, result | |
async def process_multiple_pdfs(uploaded_files): | |
""" | |
Process multiple PDFs concurrently and return a dictionary mapping filenames | |
to their structured outputs. | |
Raises an exception if fewer than 2 files are provided. | |
""" | |
if len(uploaded_files) < 2: | |
raise Exception("Please provide at least two PDF files for review.") | |
elements_prompt_path = os.path.join("prompts", "elements_review.prompt") | |
elements_prompt = load_prompt(elements_prompt_path) | |
tasks = [] | |
for file_obj in uploaded_files: | |
tasks.append(asyncio.create_task(process_single_pdf(file_obj, elements_prompt))) | |
await asyncio.sleep(1) # slight delay between scheduling tasks | |
intermediate_results = await asyncio.gather(*tasks) | |
structured_outputs = {fname: output for fname, output in intermediate_results} | |
return structured_outputs | |
async def generate_final_review_pdf(structured_outputs): | |
""" | |
Generate the final literature review by performing the following steps: | |
0. (Hidden) Generate a comparative table. | |
1. Draft the outline. | |
2. Generate the final synthesis (incorporating the comparative table, outline, and structured outputs). | |
3. Check the final writeup for hallucinations/inaccuracies. | |
4. Clean up the final text and convert it to PDF. | |
All prompts (except the check prompt) are loaded from files. | |
The check prompt remains hardcoded. | |
""" | |
progress_bar = st.progress(0) | |
st.toast("Starting review generation...") | |
time.sleep(0.5) | |
# Step 0: Hidden comparative table generation. | |
table_analysis = await generate_comparative_table(structured_outputs) | |
progress_bar.progress(10) | |
time.sleep(0.5) | |
# Step 1: Draft the outline. | |
with st.spinner("Drafting outline..."): | |
outline_prompt_path = os.path.join("prompts", "papers_outline.prompt") | |
outline_prompt = load_prompt(outline_prompt_path) | |
for fname, output in structured_outputs.items(): | |
outline_prompt += f"\nPaper: {fname}\n-------\n{output}\n\n" | |
outline = await async_generate_text( | |
outline_prompt, | |
model_name=default_model_name, | |
generation_config=default_generation_config | |
) | |
st.success("Outline drafted!") | |
progress_bar.progress(30) | |
time.sleep(0.5) | |
# Step 2: Draft the final review. | |
with st.spinner("Drafting final review..."): | |
synthesis_prompt_path = os.path.join("prompts", "papers_synthesis.prompt") | |
loaded_final_prompt = load_prompt(synthesis_prompt_path) | |
final_prompt = "" | |
final_prompt += "\nComparative Table:\n" + table_analysis + "\n\n" | |
final_prompt += "Comparative Outline:\n" + outline + "\n\n" | |
final_prompt += "Papers for Analysis:\n" | |
for fname, output in structured_outputs.items(): | |
final_prompt += f"\nPaper: {fname}\n-------\n{output}\n\n" | |
final_prompt += "\n" + loaded_final_prompt | |
final_writeup = await async_generate_text( | |
final_prompt, | |
model_name=default_model_name, | |
generation_config=default_generation_config | |
) | |
st.success("Final review drafted!") | |
progress_bar.progress(60) | |
time.sleep(0.5) | |
# Step 3: Check final writeup (using the hardcoded check prompt). | |
with st.spinner("Checking final review..."): | |
check_prompt = ( | |
"Review the following final literature review writeup along with the structured outputs from the source papers. " | |
"Your task is to ensure that there are no hallucinations or inaccuracies in the final writeup. " | |
"If any issues are detected, make the most minimal edits necessary to correct them. Otherwise, do not change anything in the text - nor the style or format. " | |
"Output only the final text (do not include any explanations or extra instructions).\n\n" | |
"Final Writeup:\n" | |
"----------------\n" | |
f"{final_writeup}\n\n" | |
"Structured Outputs:\n" | |
) | |
for fname, output in structured_outputs.items(): | |
check_prompt += f"\nPaper: {fname}\n-------\n{output}\n\n" | |
final_checked_writeup = await async_generate_text( | |
check_prompt, | |
model_name=default_model_name, | |
generation_config=default_generation_config | |
) | |
st.success("Review check complete!") | |
progress_bar.progress(80) | |
time.sleep(0.5) | |
# Step 4: Generate PDF output. | |
with st.spinner("Generating PDF output..."): | |
from utils.markdown_utils import robust_clean_markdown, normalize_heading_levels | |
final_checked_review = final_checked_writeup.strip() | |
final_checked_review = robust_clean_markdown(final_checked_review) | |
final_checked_review = normalize_heading_levels(final_checked_review) | |
pdf_doc = MarkdownPdf(toc_level=2) | |
pdf_doc.add_section(Section(final_checked_review, toc=True)) | |
output_pdf_path = "final_literature_review.pdf" | |
try: | |
pdf_doc.save(output_pdf_path) | |
st.success("PDF successfully created!") | |
logger.info(f"PDF successfully created: {output_pdf_path}") | |
except Exception as e: | |
st.toast("Error generating PDF output: " + str(e)) | |
logger.error(f"Error generating PDF: {e}") | |
progress_bar.progress(100) | |
return final_checked_review |