|
import json |
|
import os |
|
import shutil |
|
import textwrap |
|
from pathlib import Path |
|
|
|
|
|
from smolagents.utils import AgentError |
|
|
|
|
|
def serialize_agent_error(obj): |
|
if isinstance(obj, AgentError): |
|
return {"error_type": obj.__class__.__name__, "message": obj.message} |
|
else: |
|
return str(obj) |
|
|
|
|
|
def get_image_description(file_name: str, question: str, visual_inspection_tool) -> str: |
|
prompt = f"""Write a caption of 5 sentences for this image. Pay special attention to any details that might be useful for someone answering the following question: |
|
{question}. But do not try to answer the question directly! |
|
Do not add any information that is not present in the image.""" |
|
return visual_inspection_tool(image_path=file_name, question=prompt) |
|
|
|
|
|
def get_document_description(file_path: str, question: str, document_inspection_tool) -> str: |
|
prompt = f"""Write a caption of 5 sentences for this document. Pay special attention to any details that might be useful for someone answering the following question: |
|
{question}. But do not try to answer the question directly! |
|
Do not add any information that is not present in the document.""" |
|
return document_inspection_tool.forward_initial_exam_mode(file_path=file_path, question=prompt) |
|
|
|
|
|
def get_single_file_description(file_path: str, question: str, visual_inspection_tool, document_inspection_tool): |
|
file_extension = file_path.split(".")[-1] |
|
if file_extension in ["png", "jpg", "jpeg"]: |
|
file_description = f" - Attached image: {file_path}" |
|
file_description += ( |
|
f"\n -> Image description: {get_image_description(file_path, question, visual_inspection_tool)}" |
|
) |
|
return file_description |
|
elif file_extension in ["pdf", "xls", "xlsx", "docx", "doc", "xml"]: |
|
file_description = f" - Attached document: {file_path}" |
|
image_path = file_path.split(".")[0] + ".png" |
|
if os.path.exists(image_path): |
|
description = get_image_description(image_path, question, visual_inspection_tool) |
|
else: |
|
description = get_document_description(file_path, question, document_inspection_tool) |
|
file_description += f"\n -> File description: {description}" |
|
return file_description |
|
elif file_extension in ["mp3", "m4a", "wav"]: |
|
return f" - Attached audio: {file_path}" |
|
else: |
|
return f" - Attached file: {file_path}" |
|
|
|
|
|
def get_zip_description(file_path: str, question: str, visual_inspection_tool, document_inspection_tool): |
|
folder_path = file_path.replace(".zip", "") |
|
os.makedirs(folder_path, exist_ok=True) |
|
shutil.unpack_archive(file_path, folder_path) |
|
|
|
prompt_use_files = "" |
|
for root, dirs, files in os.walk(folder_path): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
prompt_use_files += "\n" + textwrap.indent( |
|
get_single_file_description(file_path, question, visual_inspection_tool, document_inspection_tool), |
|
prefix=" ", |
|
) |
|
return prompt_use_files |
|
|
|
|
|
def get_tasks_to_run(data, total: int, base_filename: Path, tasks_ids: list[int]): |
|
f = base_filename.parent / f"{base_filename.stem}_answers.jsonl" |
|
done = set() |
|
if f.exists(): |
|
with open(f, encoding="utf-8") as fh: |
|
done = {json.loads(line)["task_id"] for line in fh if line.strip()} |
|
|
|
tasks = [] |
|
for i in range(total): |
|
task_id = int(data[i]["task_id"]) |
|
if task_id not in done: |
|
if tasks_ids is not None: |
|
if task_id in tasks_ids: |
|
tasks.append(data[i]) |
|
else: |
|
tasks.append(data[i]) |
|
return tasks |
|
|