import pandas as pd import os import json import zipfile import io import gradio as gr import logging from google.oauth2 import service_account from google.api_core.client_options import ClientOptions from google.cloud import documentai_v1 as documentai from google.cloud.documentai_v1.types import RawDocument from google.cloud import translate_v2 as translate from gradio import Interface # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Load credentials from environment variable credentials_raw = os.environ.get("google_authentication") if not credentials_raw: raise EnvironmentError("Google Cloud credentials not found in environment.") credentials_json = json.loads(credentials_raw) credentials = service_account.Credentials.from_service_account_info(credentials_json) logging.info("Loaded Google Cloud credentials successfully.") # Global DataFrame declaration results_df = pd.DataFrame(columns=["Filename", "Extracted Text", "Translated Text"]) # Google Cloud Document AI processor details project_id = "herbaria-ai" location = "us" processor_id = "4307b078717a399a" def translate_text(text, target_language="en"): translate_client = translate.Client(credentials=credentials) result = translate_client.translate(text, target_language=target_language) return result["translatedText"] def batch_process_documents(file_path: str, file_mime_type: str) -> tuple: logging.info(f"Processing document {file_path}.") opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com", credentials=credentials) client = documentai.DocumentProcessorServiceClient(client_options=opts) with open(file_path, "rb") as file_stream: raw_document = RawDocument(content=file_stream.read(), mime_type=file_mime_type) name = client.processor_path(project_id, location, processor_id) request = documentai.ProcessRequest(name=name, raw_document=raw_document) result = client.process_document(request=request) extracted_text = result.document.text translated_text = translate_text(extracted_text) logging.info(f"Document processed and translated for {file_path}.") return extracted_text, translated_text def unzip_and_find_jpgs(file_path): logging.info(f"Unzipping file {file_path}.") extract_path = "extracted_files" os.makedirs(extract_path, exist_ok=True) jpg_files = [] with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(extract_path) for root, dirs, files in os.walk(extract_path): if '__MACOSX' in root: continue for file in files: if file.lower().endswith('.jpg'): full_path = os.path.join(root, file) jpg_files.append(full_path) logging.info(f"Found {len(jpg_files)} JPG files in {file_path}.") return jpg_files def process_images(uploaded_file): logging.info("Started processing the uploaded file.") # Check if the function is triggered global results_df results_df = results_df.iloc[0:0] # Clear the DataFrame file_path = uploaded_file.name # Gradio provides the file path logging.info(f"Received file {file_path} for processing.") try: image_files = unzip_and_find_jpgs(file_path) if not image_files: logging.warning("No JPG files found in the zip.") return "No JPG files found in the zip." for file_path in image_files: logging.info(f"Processing image file {file_path}.") extracted_text, translated_text = batch_process_documents(file_path, "image/jpeg") new_row = pd.DataFrame([{ "Filename": os.path.basename(file_path), "Extracted Text": extracted_text, "Translated Text": translated_text }]) results_df = pd.concat([results_df, new_row], ignore_index=True) logging.info(f"Data added for file {file_path}.") except Exception as e: logging.error(f"An error occurred: {str(e)}") return f"An error occurred: {str(e)}" logging.info("Processing complete. Generating HTML output.") return results_df.to_html() # Set up the interface interface = Interface( fn=process_images, inputs=gr.components.File(label="Upload ZIP File", type="file"), outputs="html", title="Document AI Translation", description="Upload a ZIP file containing JPEG/JPG images, and the system will extract and translate text from each image.", debug=True ) if __name__ == "__main__": interface.launch(debug=True)