Spaces:
Running
Running
import os | |
from io import BytesIO | |
import pandas as pd | |
from fastapi import APIRouter, UploadFile, File, HTTPException | |
from fastapi.responses import StreamingResponse, JSONResponse | |
from azure.core.credentials import AzureKeyCredential | |
from azure.ai.formrecognizer import DocumentAnalysisClient | |
from dotenv import load_dotenv | |
from docx import Document | |
import re | |
# Load environment variables | |
load_dotenv() | |
router = APIRouter() | |
async def convert_to_markdown(file: UploadFile = File(...)): | |
""" | |
Convert a PDF file to markdown format. | |
Args: | |
file: The PDF file to convert | |
Returns: | |
StreamingResponse: Markdown file | |
""" | |
try: | |
# Read the uploaded file content | |
content = await file.read() | |
# Save the content to a temporary file | |
temp_pdf_path = "temp.pdf" | |
with open(temp_pdf_path, "wb") as f: | |
f.write(content) | |
# Analyze the document | |
result = analyze_document(temp_pdf_path) | |
# Create markdown file | |
temp_md_path = "temp.md" | |
create_markdown_file(result, temp_md_path) | |
# Read the markdown file | |
with open(temp_md_path, "rb") as f: | |
markdown_content = f.read() | |
# Clean up temporary files | |
os.remove(temp_pdf_path) | |
os.remove(temp_md_path) | |
# Return the markdown file as a download | |
return StreamingResponse( | |
BytesIO(markdown_content), | |
media_type="text/markdown", | |
headers={ | |
"Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}.md" | |
} | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def convert_to_excel(file: UploadFile = File(...)): | |
""" | |
Convert tables from markdown to Excel format. | |
Args: | |
file: The markdown file to convert | |
Returns: | |
StreamingResponse: Excel file containing all tables | |
""" | |
try: | |
# Read the markdown content | |
content = await file.read() | |
markdown_text = content.decode('utf-8') | |
# Extract tables from markdown | |
tables = extract_tables_from_markdown(markdown_text) | |
if not tables: | |
raise HTTPException(status_code=400, detail="No tables found in the markdown content") | |
# Create Excel file | |
excel_buffer = create_excel_from_markdown_tables(tables) | |
# Return the Excel file as a download | |
return StreamingResponse( | |
excel_buffer, | |
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
headers={ | |
"Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}_tables.xlsx" | |
} | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def convert_to_word(file: UploadFile = File(...)): | |
""" | |
Convert markdown to Word document format. | |
Args: | |
file: The markdown file to convert | |
Returns: | |
StreamingResponse: Word document file | |
""" | |
try: | |
# Read the markdown content | |
content = await file.read() | |
markdown_text = content.decode('utf-8') | |
# Create Word file | |
temp_docx_path = "temp.docx" | |
create_word_from_markdown(markdown_text, temp_docx_path) | |
# Read the Word file | |
with open(temp_docx_path, "rb") as f: | |
word_content = f.read() | |
# Clean up temporary file | |
os.remove(temp_docx_path) | |
# Return the Word file as a download | |
return StreamingResponse( | |
BytesIO(word_content), | |
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
headers={ | |
"Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}.docx" | |
} | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def analyze_document(file_path): | |
"""Analyze document using Azure Form Recognizer""" | |
endpoint = "https://aal-ocr-ai-azureapi.cognitiveservices.azure.com/" | |
key = os.getenv("AZURE_FORM_RECOGNIZER_KEY") | |
document_analysis_client = DocumentAnalysisClient( | |
endpoint=endpoint, credential=AzureKeyCredential(key) | |
) | |
with open(file_path, "rb") as f: | |
poller = document_analysis_client.begin_analyze_document( | |
"prebuilt-layout", document=f | |
) | |
result = poller.result() | |
return result | |
def extract_tables_from_markdown(markdown_text): | |
"""Extract tables from markdown text""" | |
tables = [] | |
current_table = [] | |
lines = markdown_text.split('\n') | |
in_table = False | |
for line in lines: | |
if '|' in line: | |
# Skip separator lines (e.g., |---|---|) | |
if re.match(r'^[\s|:-]+$', line): | |
continue | |
# Process table row | |
cells = [cell.strip() for cell in line.split('|')[1:-1]] | |
if cells: | |
if not in_table: | |
in_table = True | |
current_table.append(cells) | |
else: | |
if in_table: | |
if current_table: | |
tables.append(current_table) | |
current_table = [] | |
in_table = False | |
# Add the last table if exists | |
if current_table: | |
tables.append(current_table) | |
return tables | |
def create_excel_from_markdown_tables(tables): | |
"""Create Excel file from markdown tables""" | |
excel_buffer = BytesIO() | |
with pd.ExcelWriter(excel_buffer, engine='openpyxl') as writer: | |
for i, table in enumerate(tables): | |
if table: | |
# Convert table to DataFrame | |
df = pd.DataFrame(table[1:], columns=table[0]) | |
# Save to Excel sheet | |
sheet_name = f"Table_{i+1}" | |
df.to_excel(writer, sheet_name=sheet_name, index=False) | |
excel_buffer.seek(0) | |
return excel_buffer | |
def create_word_from_markdown(markdown_text, output_file): | |
"""Create Word document from markdown text""" | |
doc = Document() | |
lines = markdown_text.split('\n') | |
current_table = [] | |
in_table = False | |
for line in lines: | |
# Handle headers | |
if line.startswith('#'): | |
level = len(line.split()[0]) # Count the number of '#' | |
text = line.lstrip('#').strip() | |
doc.add_heading(text, level=min(level, 9)) | |
# Handle tables | |
elif '|' in line: | |
# Skip separator lines | |
if re.match(r'^[\s|:-]+$', line): | |
continue | |
# Process table row | |
cells = [cell.strip() for cell in line.split('|')[1:-1]] | |
if cells: | |
if not in_table: | |
in_table = True | |
current_table = [] | |
current_table.append(cells) | |
# Handle end of table | |
elif in_table: | |
if current_table: | |
table = doc.add_table(rows=len(current_table), cols=len(current_table[0])) | |
table.style = 'Table Grid' | |
for i, row in enumerate(current_table): | |
for j, cell in enumerate(row): | |
table.cell(i, j).text = cell | |
doc.add_paragraph() # Add space after table | |
current_table = [] | |
in_table = False | |
# Handle checkbox lists | |
elif line.strip().startswith('- ['): | |
p = doc.add_paragraph() | |
run = p.add_run() | |
if 'x' in line or 'X' in line: | |
run.add_text("☑ " + line[5:].strip()) | |
else: | |
run.add_text("☐ " + line[5:].strip()) | |
# Handle regular paragraphs | |
elif line.strip(): | |
doc.add_paragraph(line.strip()) | |
# Handle the last table if exists | |
if in_table and current_table: | |
table = doc.add_table(rows=len(current_table), cols=len(current_table[0])) | |
table.style = 'Table Grid' | |
for i, row in enumerate(current_table): | |
for j, cell in enumerate(row): | |
table.cell(i, j).text = cell | |
doc.save(output_file) | |
def create_markdown_file(result, output_file): | |
"""Create markdown file from analysis result""" | |
with open(output_file, 'w', encoding='utf-8') as md_file: | |
for page in result.pages: | |
# md_file.write(f"### Page {page.page_number}\n\n") | |
elements = [] | |
elements.extend([(paragraph.bounding_regions[0].polygon[0].y + paragraph.bounding_regions[0].polygon[0].x*0.05, 'paragraph', paragraph) | |
for paragraph in result.paragraphs if paragraph.bounding_regions[0].page_number == page.page_number]) | |
elements.sort(key=lambda x: x[0]) | |
page_width = page.width / 2 | |
min_distance = float('inf') | |
title_paragraph = None | |
for element in elements[:5]: | |
if element[1] == 'paragraph': | |
paragraph = element[2] | |
midpoint_x = (paragraph.bounding_regions[0].polygon[0].x + paragraph.bounding_regions[0].polygon[1].x) / 2 | |
midpoint_y = paragraph.bounding_regions[0].polygon[0].y | |
distance = ((midpoint_x - page_width) ** 2 + midpoint_y ** 2) ** 0.5 | |
if distance < min_distance: | |
min_distance = distance | |
title_paragraph = paragraph | |
if title_paragraph: | |
elements = [element for element in elements if element[2] != title_paragraph] | |
md_file.write(f"# {title_paragraph.content}\n\n") | |
elements.extend([(table.bounding_regions[0].polygon[0].y + table.bounding_regions[0].polygon[0].x*0.05, 'table', table) | |
for table in result.tables if table.bounding_regions[0].page_number == page.page_number]) | |
elements.extend([(mark.polygon[0].y + mark.polygon[0].x*0.05, 'selection_mark', mark) for mark in page.selection_marks]) | |
elements.sort(key=lambda x: x[0]) | |
table_cells = set() | |
for _, element_type, element in elements: | |
if element_type == 'paragraph': | |
if any(is_element_inside_table(element, get_table_max_polygon(table)) for table in result.tables): | |
continue | |
md_file.write(f"{element.content}\n\n") | |
elif element_type == 'table': | |
for row_idx in range(element.row_count): | |
row_content = "| " | |
for col_idx in range(element.column_count): | |
cell_content = "" | |
for cell in element.cells: | |
if cell.row_index == row_idx and cell.column_index == col_idx: | |
cell_content = cell.content | |
table_cells.add((cell.bounding_regions[0].polygon[0].x, cell.bounding_regions[0].polygon[0].y)) | |
break | |
row_content += f"{cell_content} | " | |
md_file.write(row_content + "\n") | |
md_file.write("\n") | |
elif element_type == 'selection_mark': | |
if element.state == "selected": | |
md_file.write("- [x] \n\n") | |
else: | |
md_file.write("- [ ] \n\n") | |
def create_word_file(result, output_file): | |
"""Create Word document from analysis result""" | |
# Create a new Word document | |
doc = Document() | |
# Analyze pages | |
for page in result.pages: | |
# Combine paragraphs, tables, and selection marks in the order they appear on the page | |
elements = [] | |
elements.extend([(paragraph.bounding_regions[0].polygon[0].y + paragraph.bounding_regions[0].polygon[0].x*0.01, 'paragraph', paragraph) | |
for paragraph in result.paragraphs if paragraph.bounding_regions[0].page_number == page.page_number]) | |
elements.sort(key=lambda x: x[0]) | |
# Find the paragraph which is possible to be document title | |
page_width = page.width / 2 | |
min_distance = float('inf') | |
title_paragraph = None | |
for element in elements[:5]: | |
if element[1] == 'paragraph': | |
paragraph = element[2] | |
midpoint_x = (paragraph.bounding_regions[0].polygon[0].x + paragraph.bounding_regions[0].polygon[1].x) / 2 | |
midpoint_y = paragraph.bounding_regions[0].polygon[0].y | |
distance = ((midpoint_x - page_width) ** 2 + midpoint_y ** 2) ** 0.5 | |
if distance < min_distance: | |
min_distance = distance | |
title_paragraph = paragraph | |
if title_paragraph: | |
elements = [element for element in elements if element[2] != title_paragraph] | |
doc.add_heading(title_paragraph.content, level=1) | |
# Continuous combine paragraphs, tables, and selection marks in the order they appear on the page | |
elements.extend([(table.bounding_regions[0].polygon[0].y + table.bounding_regions[0].polygon[0].x*0.01, 'table', table) | |
for table in result.tables if table.bounding_regions[0].page_number == page.page_number]) | |
elements.extend([(mark.polygon[0].y + mark.polygon[0].x*0.01, 'selection_mark', mark) | |
for mark in page.selection_marks]) | |
# Sort elements by the sum of their horizontal and vertical positions on the page | |
elements.sort(key=lambda x: x[0]) | |
# Track table cells to avoid duplicating content | |
table_cells = set() | |
for _, element_type, element in elements: | |
if element_type == 'paragraph': | |
# Skip lines that are part of a table | |
if any(is_element_inside_table(element, get_table_max_polygon(table)) for table in result.tables): | |
continue | |
doc.add_paragraph(element.content) | |
elif element_type == 'table': | |
table = doc.add_table(rows=element.row_count, cols=element.column_count) | |
table.style = 'Table Grid' | |
for row_idx in range(element.row_count): | |
row_cells = table.rows[row_idx].cells | |
for col_idx in range(element.column_count): | |
cell_content = "" | |
for cell in element.cells: | |
if cell.row_index == row_idx and cell.column_index == col_idx: | |
cell_content = cell.content | |
table_cells.add((cell.bounding_regions[0].polygon[0].x, cell.bounding_regions[0].polygon[0].y)) | |
break | |
row_cells[col_idx].text = cell_content | |
elif element_type == 'selection_mark': | |
p = doc.add_paragraph() | |
run = p.add_run() | |
if element.state == "selected": | |
run.add_text("☑ ") | |
else: | |
run.add_text("☐ ") | |
# Save Word document | |
doc.save(output_file) | |
def format_polygon(polygon): | |
"""Format polygon coordinates to string""" | |
if not polygon: | |
return "N/A" | |
return ", ".join([f"[{p.x}, {p.y}]" for p in polygon]) | |
def get_table_max_polygon(table): | |
"""Get the maximum polygon coordinates for a table""" | |
first_cell = table.cells[0] | |
first_coordinate = first_cell.bounding_regions[0].polygon[0] | |
last_cell = table.cells[-1] | |
last_coordinate = last_cell.bounding_regions[0].polygon[-1] | |
return [first_coordinate, last_coordinate] | |
def is_element_inside_table(element, table_max_polygon): | |
"""Check if an element is inside a table""" | |
element_x = element.bounding_regions[0].polygon[0].x | |
element_y = element.bounding_regions[0].polygon[0].y | |
first_coordinate = table_max_polygon[0] | |
last_coordinate = table_max_polygon[1] | |
return (first_coordinate.x <= element_x <= last_coordinate.x and | |
first_coordinate.y <= element_y <= last_coordinate.y) | |