Spaces:
Running
Running
import re | |
import os | |
import json | |
import time | |
import fitz | |
import difflib | |
import secrets | |
import openai | |
import string | |
import logging | |
import requests | |
import streamlit as st | |
from pandas import DataFrame | |
from openai import AzureOpenAI | |
from difflib import SequenceMatcher | |
from tempfile import NamedTemporaryFile | |
from typing import Tuple, Dict, List, Union | |
from diff_match_patch import diff_match_patch | |
import base64 | |
import pandas as pd | |
class FindUpdate: | |
def clean_string(text: str, stem: str = "None") -> str: | |
"""Clean the input text by removing punctuation, numbers, and optionally stemming or lemmatizing the words. | |
Args: | |
text (str): The input text to be cleaned. | |
stem (str, optional): The stemming method to be used. Options are "None" (default), "Stem", "Lem", or "Spacy". | |
Returns: | |
str: The cleaned text. | |
Raises: | |
None | |
""" | |
try: | |
final_string = "" | |
# Replace any characters with nothing or a space | |
text = re.sub(r'\n', ' ', text) | |
text = re.sub(r' +', ' ', text) | |
text = re.sub(r'[^\x00-\x7f]',r'', text) | |
# Remove punctuation | |
translator = str.maketrans('', '', string.punctuation) | |
text = text.translate(translator) | |
# Remove numbers | |
text_filtered = [re.sub(r'\w*\d\w*\s+', '', w) for w in text] | |
text_filtered = [re.sub('[0-9]', '', w) for w in text_filtered] | |
# Stem or Lemmatize | |
if stem == 'Stem': | |
stemmer = PorterStemmer() | |
text_stemmed = [stemmer.stem(y) for y in text_filtered] | |
elif stem == 'Lem': | |
lem = WordNetLemmatizer() | |
text_stemmed = [lem.lemmatize(y) for y in text_filtered] | |
elif stem == 'Spacy': | |
text_filtered = nlp(' '.join(text_filtered)) | |
text_stemmed = [y.lemma_ for y in text_filtered] | |
else: | |
text_stemmed = text_filtered | |
final_string = ''.join(text_stemmed) | |
except Exception as e: | |
# Handle the exception here | |
logging.error(f"An error occurred in clean_string: {e}") | |
return None | |
return final_string | |
def extract_text_without_header_footer(page: fitz.Page, page_height: float, header_height: float, footer_height: float) -> str: | |
""" | |
Extract the text from a page of a PDF document, excluding the header and footer. | |
Args: | |
page (fitz.Page): The page object from the PyMuPDF library. | |
page_height (float): The height of the page. | |
header_height (float): The height of the header to be excluded. | |
footer_height (float): The height of the footer to be excluded. | |
Returns: | |
str: The extracted text. | |
Raises: | |
None | |
""" | |
try: | |
exclude_top = header_height | |
exclude_bottom = page_height - footer_height | |
text = "" | |
for text_block in page.get_text_blocks(): | |
bbox = fitz.Rect(text_block[:4]) # Get bounding box of text block | |
if bbox.y0 >= exclude_top and bbox.y1 <= exclude_bottom: | |
text += text_block[4] + "\n" # Add text content to result | |
return text | |
except Exception as e: | |
# Handle the exception here | |
logging.error(f"An error occurred in extract_text_without_header_footer: {e}") | |
return "" | |
def rect_to_dict(rect: fitz.Rect) -> Dict[str, float]: | |
""" | |
Convert a PyMuPDF Rect object to a dictionary. | |
Args: | |
rect (fitz.Rect): The Rect object from the PyMuPDF library. | |
Returns: | |
Dict[str, float]: The dictionary representation of the Rect object. | |
Raises: | |
None | |
""" | |
return { | |
"x1": rect[0], | |
"y1": rect[1], | |
"x2": rect[2], | |
"y2": rect[3] | |
} | |
def extract_sections(pdf_path: str, header_height: float, footer_height: float) -> Dict[str, Dict[str, List[str]]]: | |
""" | |
Extract sections from a PDF document, excluding the header and footer. | |
Args: | |
pdf_path (str): The path to the PDF document. | |
header_height (float): The height of the header to be excluded. | |
footer_height (float): The height of the footer to be excluded. | |
Returns: | |
Dict[str, Dict[str, List[str]]]: A dictionary containing the extracted sections. | |
Raises: | |
None | |
""" | |
try: | |
doc = fitz.open(pdf_path) | |
sections = {} | |
current_section = None | |
for page_num in range(2, len(doc)): | |
page = doc.load_page(page_num) | |
page_height = page.rect.height | |
text = FindUpdate.extract_text_without_header_footer(page, page_height, header_height, footer_height) | |
lines = text.split('\n') | |
print("start") | |
for line in lines: | |
match = re.match(r'^(?:\d+\.\d*(?:\.\d+)*)|AT&T ALLIANCE PROGRAM AGREEMENT', line.strip()) | |
if match: | |
print("line:",line) | |
section_num = match.group() | |
if section_num != current_section: | |
current_section = section_num | |
sections[current_section] = {'page_num': str(page_num + 1), 'coords': [], 'content': []} | |
if current_section: | |
sections[current_section]['content'].append(line) | |
bbox = page.search_for(line) | |
if bbox: | |
for cord in bbox: | |
sections[current_section]['coords'].append(FindUpdate.rect_to_dict(cord)) | |
print("section-keys:",sections.keys()) | |
for key in sections.keys(): | |
if 'content' in sections[key]: | |
sections[key]['content'] = (' '.join(sections[key]['content']))[len(key):].lstrip() | |
print("Content:",sections) | |
return sections | |
except Exception as e: | |
# Handle the exception here | |
logging.error(f"An error occurred in extract_sections: {e}") | |
return {} | |
def check_identify_changes(template_text: str, contract_text: str) -> str: | |
""" | |
Compare the template text with the contract text and identify the changes made. | |
Args: | |
template_text (str): The template text. | |
contract_text (str): The contract text. | |
Returns: | |
str: A sentence prompt describing the changes made. | |
Raises: | |
None | |
""" | |
try: | |
dmp = diff_match_patch() | |
diff = dmp.diff_main(template_text, contract_text) | |
dmp.diff_cleanupSemantic(diff) | |
if all([True if each[0] == 0 or each[1] == ' ' else False for each in diff]): | |
return('no_change') | |
else: | |
deleted = [] | |
added = [] | |
for each in diff: | |
if each[0] == -1: | |
deleted.append(each[1]) | |
elif each[0] == 1: | |
added.append(each[1]) | |
sentence_prompt = f""" | |
template text = {template_text} | |
contract_text = {contract_text} | |
deleted from template --- {deleted} | |
added to the actual contract -- {added} | |
""" | |
return(sentence_prompt) | |
except Exception as e: | |
# Handle the exception here | |
logging.error(f"An error occurred in check_identify_changes: {e}") | |
return "" | |
def open_ai(prompt: str) -> str: | |
""" | |
Generate AI response using OpenAI GPT-4 model. | |
Args: | |
prompt (str): The prompt for the AI model. | |
Returns: | |
str: The generated AI response. | |
Raises: | |
None | |
""" | |
stat = 0 | |
while stat == 0: | |
try: | |
client = AzureOpenAI(api_key=os.getenv('API'), | |
api_version="2024-02-01", | |
azure_endpoint=os.getenv("URL"),) | |
conversation = [] | |
conversation.append({"role": "user", "content": prompt}) | |
response = client.chat.completions.create( | |
model="GPT-4o", | |
messages=conversation, | |
temperature=0, | |
max_tokens=3000, | |
stop=None | |
) | |
stat = 1 | |
except openai.RateLimitError as e: | |
logging.error(f"Rate limit error occurred: {e}") | |
stat = 0 | |
time.sleep(60) | |
except Exception as e: | |
logging.error(f"An error occurred in open_ai: {e}") | |
stat = 0 | |
output = response.choices[0].message.content | |
return output | |
def compare_strings(string1: str, string2: str) -> float: | |
""" | |
Compare two strings and return their similarity ratio. | |
Args: | |
string1 (str): The first string. | |
string2 (str): The second string. | |
Returns: | |
float: The similarity ratio between the two strings. | |
Raises: | |
None | |
""" | |
matcher = SequenceMatcher(None, string1, string2) | |
similarity_ratio = matcher.ratio() | |
return similarity_ratio | |
def get_main_section(section_number: str) -> str: | |
""" | |
Get the main section number from a given section number. | |
Args: | |
section_number (str): The section number. | |
Returns: | |
str: The main section number. | |
Raises: | |
None+ | |
""" | |
parts = section_number.split('.') | |
if len(parts) > 1: | |
parts.pop() | |
return ".".join(parts) | |
def increment_section(section_number: str) -> str: | |
""" | |
Increment a section number by one. | |
Args: | |
section_number (str): The section number. | |
Returns: | |
str: The incremented section number. | |
Raises: | |
None | |
""" | |
parts = section_number.split('.') | |
last_part = '0' if not parts[-1] else parts[-1] # Get the last part of the section number | |
# Check if the last part is numeric | |
if last_part.isdigit(): | |
# Convert the last part to an integer and increment it by one | |
incremented_last_part = str(int(last_part) + 1) | |
# Replace the last part in the parts list | |
parts[-1] = incremented_last_part | |
else: | |
# If the last part is not numeric, return the original section number | |
return section_number | |
# Join the parts back together with '.' | |
incremented_section = '.'.join(parts) | |
return incremented_section | |
def count_subsections(dictionary: Dict[str, Union[str, Dict]]) -> Dict[str, int]: | |
""" | |
Count the number of subsections in a dictionary. | |
Args: | |
dictionary (Dict[str, Union[str, Dict]]): The dictionary containing the subsections. | |
Returns: | |
Dict[str, int]: A dictionary with the main section numbers as keys and the count of subsections as values. | |
Raises: | |
None | |
""" | |
section_counts = {} | |
for key in dictionary: | |
# Split the key by '.' and take the first part as the section number | |
section_number = FindUpdate.get_main_section(key) | |
# Check if the section number is already in the section_counts dictionary | |
if section_number in section_counts: | |
# Increment the count for the section | |
section_counts[section_number] += 1 | |
else: | |
# Initialize the count for the section | |
section_counts[section_number] = 1 | |
return section_counts | |
def split_page_section(page_no: str, sec_no: str) -> Tuple[str, Tuple[int]]: | |
""" | |
Split the page number and section number into separate components. | |
Args: | |
page_no (str): The page number. | |
sec_no (str): The section number. | |
Returns: | |
Tuple[str, Tuple[int]]: A tuple containing the page number and section number. | |
Raises: | |
None | |
""" | |
try: | |
page_number = page_no | |
section_numbers = sec_no | |
section_numbers = [int(num) if num and num.isdigit() else 0 for num in section_numbers] | |
section_number = tuple(section_numbers) | |
return (page_number, section_number) | |
except Exception as e: | |
logging.error(f"An error occurred in split_page_section: {e}") | |
return ("", ()) | |
def process_comparisons(result_agreement: Dict[str, Dict], result_template: Dict[str, Dict]) -> Tuple[List[Dict], List[Dict], List[Dict]]: | |
""" | |
Process the comparisons between the agreement and template texts. | |
Args: | |
result_agreement (Dict[str, Dict]): The agreement text. | |
result_template (Dict[str, Dict]): The template text. | |
Returns: | |
Tuple[List[Dict], List[Dict], List[Dict]]: A tuple containing the changes list, actual list, and changes_ui list. | |
Raises: | |
None | |
""" | |
# Initialize a flag to track if there are any changes | |
changes = {} | |
changes_ui = {} | |
actual = {} | |
compared_sections = [] | |
actual_list = [] | |
changes_list = [] | |
changes_ui_list = [] | |
sections_added = False | |
current_section = 0 | |
prev_section = 0 | |
# Iterate through the keys and values of the dictionaries | |
for key in result_template: | |
current_section = FindUpdate.get_main_section(key) | |
if sections_added and current_section == prev_section: | |
contract_key = FindUpdate.increment_section(current_section) | |
else: | |
contract_key = key | |
if prev_section != current_section: | |
sections_added = False | |
if contract_key in result_agreement: | |
if FindUpdate.count_subsections(result_agreement)[current_section] != FindUpdate.count_subsections(result_template)[current_section]: | |
if FindUpdate.compare_strings(result_template[key]['content'], result_agreement[contract_key]['content']) < .1: | |
# Actual JSON | |
actual = {} | |
actual["actual"] = "" | |
actual["page_number"] = result_template[key]['page_num'] | |
actual["section_number"] = key | |
actual["actual_coords"] = "" | |
actual_list.append(actual) | |
# Changes JSON | |
changes = {} | |
changes["changed"] = result_agreement[contract_key]['content'] + ' Analysis:New clause added' + '~!~addition' | |
changes["changed_page_number"] = result_agreement[contract_key]['page_num'] | |
changes["changed_section_number"] = key | |
changes["changed_coords"] = result_agreement[contract_key]['coords'] | |
changes_list.append(changes) | |
# changes_ui JSON | |
changes_ui = {} | |
changes_ui["changed"] = result_agreement[contract_key]['content'] + ' Analysis:New clause added' + '~!~addition' | |
changes_ui["changed_page_number"] = result_agreement[contract_key]['page_num'] | |
changes_ui["changed_section_number"] = key | |
changes_ui["changed_coords"] = result_agreement[contract_key]['coords'] | |
changes_ui_list.append(changes_ui) | |
contract_key = FindUpdate.increment_section(contract_key) | |
sections_added = True | |
compared_sections.append(contract_key) | |
if contract_key in result_agreement: | |
try: | |
prompt = f"""" | |
As an attorney representing your client, your task is to compare the template text and the contract text provided and identify any changes that may impact the agreement or the involved parties. Specifically, you need to analyze the legal implications and considerations of the word 'subcontractors' when it appears in the changed text. Instead of focusing solely on the addition or removal of the letters 'or,' provide a comprehensive analysis based on the complete word 'subcontractors.' Classify the changes as either "minor_change" or "major_change." | |
Please provide the changed text alone as a separate paragraph under the "Changed:" subheading, and the analysis of the changes as a separate paragraph under the "Analysis:" subheading. At the end, add ~!~ and classification like ~!~minor_change or ~!~major_change. | |
EXAMPLES - | |
1. | |
Contract text: | |
Verify identification credentials including Social Security number, driver’s license, educational credentials, employment history, home address, and citizenship indicia; | |
In connection with providing access to a customer’s facilities or systems, comply with any additional investigation or screening requirements required by such customer as communicated in advance; test for use of illicit drugs including opiates, cocaine, cannabinoids, amphetamines, and phencyclidine. | |
Template text: | |
Verify identification credentials including Social Security number, driver’s license, educational credentials, employment history, home address, and citizenship indicia. | |
In connection with providing access to a customer’s facilities or systems, comply with any additional investigation or screening requirements required by such customer as communicated in advance. Test for use of illicit drugs including opiates, cocaine, cannabinoids, amphetamines, and phencyclidine. | |
Differences Observed - | |
Changed: "When (i) the customer or end user is a federal, state, or local government entity" | |
Analysis: | |
A new sentence is added: "When (i) the customer or end user is a federal, state, or local government entity" in the template text. This clause specifies a condition under which the drug testing requirement applies. In the template text, this condition is excluded, meaning that the drug testing requirement would apply regardless of whether the customer or end user is a government entity. This could potentially change the scope and applicability of the drug testing provision.~!~major_change | |
2. | |
Contract text: | |
Perform a criminal background check to determine, in the counties, states, and federal court districts where the candidate has lived, worked, or attended school in the previous ten years, whether the candidate has been: (1) convicted of any felony; (2) convicted of a misdemeanor involving violence, theft, computer crimes, fraud, financial crimes, drug distribution, unlawful possession or use of a dangerous weapon, or sexual misconduct; or, (3) listed on any government registry as a sex offender (together, “Conviction”); and in connection with providing access to a customer’s facilities or systems, comply with any additional investigation or screening requirements required by such customer as communicated in advance. | |
Template text: | |
Perform a criminal background check to determine, in the counties, states, and federal court districts where the candidate has lived, worked, or attended school in the previous ten years, whether the candidate has been: (1) convicted of any felony; (2) convicted of a misdemeanor involving violence, theft, computer crimes, fraud, financial crimes, drug distribution, unlawful possession or use of a dangerous weapon, or sexual misconduct; or, (3) listed on any government registry as a sex offender (together, “Conviction”). In connection with providing access to a customer’s facilities or systems, comply with any additional investigation or screening requirements required by such customer as communicated in advance. | |
Differences Observed - | |
Changed: "and" | |
Analysis: | |
The contract text includes the word "and," which isn't in the template. This could potentially change the interpretation of the agreement, as it could be read as each clause being a separate requirement, rather than a list of requirements.~!~major_change | |
3. | |
Contract text: | |
The service provider may not assign, delegate, or otherwise transfer its rights or obligations under this Agreement, voluntarily or involuntarily, without the prior written consent of the other party except that the service provider may delegate certain obligations hereunder to its subcontractors as contemplated by this Agreement. Any attempted assignment, delegation, or transfer not consented to in writing will be void. Notwithstanding the foregoing, with notice to the other party, either party may assign this Agreement, in whole or in part, to any affiliate, successor-in-interest or without securing the consent of the other party. Any assignment of money will be void if (i) the assignor fails to give the non-assigning party at least thirty (30) days prior written notice, or (ii) the assignment purports to impose upon the non-assigning party additional costs or obligations in addition to the payment of such money, or (iii) the assignment purports to preclude the other party from dealing solely and directly with the service provider in all matters pertaining to this Agreement. This Agreement binds and benefits both parties and their permitted successors and assigns. | |
Template text: | |
The service provider may not assign, delegate, or otherwise transfer its rights or obligations under this Agreement, voluntarily or involuntarily, without the prior written consent of the other party except that the service provider may delegate certain obligations hereunder to its subcontracts as contemplated by this Agreement. Any attempted assignment, delegation, or transfer not consented to in writing will be void. Notwithstanding the foregoing, with notice to the other party, either party may assign this Agreement, in whole or in part, to any affiliate, successor-in-interest or without securing the consent of the other party. Any assignment of money will be void if (i) the assignor fails to give the non-assigning party at least thirty (30) days prior written notice, or (ii) the assignment purports to impose upon the non-assigning party additional costs or obligations in addition to the payment of such money, or (iii) the assignment purports to preclude the other party from dealing solely and directly with the service provider in all matters pertaining to this Agreement. This Agreement binds and benefits both parties and their permitted successors and assigns. | |
Differences Observed - | |
Changed: "Subcontractors" | |
Analysis: The contract text includes the word "Subcontractors" instead of "Subcontracts" as in the template. This change could potentially alter the meaning of the agreement as it specifies a different entity (Subcontractors vs Subcontracts). However, given the context, it seems likely that this is a typographical error in the template text, and the intended meaning remains the same.~!~minor_change | |
4. | |
Contract text: | |
This AGREEMENT is entered into between [Company Name], a [State] corporation, which sometimes does business as [DBA Name], (“Company”) and [Other Party Name], a [State] LLC (“Solution Provider” or “SP”). The company and solution provider may be referred to collectively as the “Parties” or individually as a “Party.” | |
Template text: | |
ALLIANCE PROGRAM AGREEMENT This AGREEMENT is entered into between [Company Name], a [State] corporation, which sometimes does business as [DBA Name], (“Company”) and ________________________, a __________ corporation (“Solution Provider” or “SP”). The company and solution provider may be referred to collectively as the “Parties” or individually as a “Party.” | |
Differences Observed - | |
Changed:"[Other Party Name], a [State] LLC" | |
Analysis: The contract text includes the specific name of the corporation "[Other Party Name], a [State] LLC" instead of a placeholder as in the template. This is a minor change as it is expected that the specific name of the corporation would be filled in the actual contract.~!~minor_change | |
""" | |
res = FindUpdate.check_identify_changes(FindUpdate.clean_string(result_template[key]['content']), FindUpdate.clean_string(result_agreement[contract_key]['content'])) | |
if res != 'no_change': | |
final_prompt =prompt + res | |
llm_res = FindUpdate.open_ai(final_prompt) | |
# Actual JSON | |
actual = {} | |
actual["actual"] = result_template[key]['content'] | |
actual["page_number"] = result_template[key]['page_num'] | |
actual["section_number"] = key | |
actual["actual_coords"] = result_template[key]['coords'] | |
actual_list.append(actual) | |
# Changes JSON | |
changes = {} | |
changes["changed"] = llm_res | |
changes["changed_page_number"] = result_agreement[contract_key]['page_num'] | |
changes["changed_section_number"] = key | |
changes["changed_coords"] = result_agreement[contract_key]['coords'] | |
changes_list.append(changes) | |
# changes_ui JSON | |
changes_ui = {} | |
changes_ui["changed"] = llm_res | |
changes_ui["changed_page_number"] = result_agreement[contract_key]['page_num'] | |
changes_ui["changed_section_number"] = key | |
changes_ui["changed_coords"] = result_agreement[contract_key]['coords'] | |
changes_ui_list.append(changes_ui) | |
except Exception as e: | |
logging.error(f"An error occurred in check_identify_changes: {e}") | |
else: | |
# Actual JSON | |
actual = {} | |
actual["actual"] = result_template[key]['content'] | |
actual["page_number"] = result_template[key]['page_num'] | |
actual["section_number"] = key | |
actual["actual_coords"] = result_template[key]['coords'] | |
actual_list.append(actual) | |
prev_section = current_section | |
for key in result_template: | |
if key not in result_agreement: | |
actual = {} | |
# Actual JSON | |
actual["actual"] = result_template[key]['content'] | |
actual["page_number"] = result_template[key]['page_num'] | |
actual["section_number"] = key | |
actual['actual_coords'] = '' | |
actual_list.append(actual) | |
#Changes JSON | |
changes ={} | |
changes["changed"] = result_template[key]['content']+' Analysis:missing'+'~!~missing' | |
changes["changed_page_number"] = result_template[key]['page_num'] | |
changes["changed_section_number"] = key | |
changes["changed_coords"] = result_template[key]['coords'] | |
changes_list.append(changes) | |
#changes_ui JSON | |
changes_ui = {} | |
changes_ui["changed"] = result_template[key]['content']+' Analysis:missing'+'~!~missing' | |
changes_ui["changed_page_number"] = result_template[key]['page_num'] | |
changes_ui["changed_section_number"] = key | |
changes_ui["changed_coords"] = result_template[key]['coords'] | |
changes_ui_list.append(changes_ui) | |
for key in result_agreement: | |
if key not in result_template and key not in compared_sections: | |
actual ={} | |
#Actual JSON | |
actual["actual"] = "" | |
actual["page_number"] = result_agreement[key]['page_num'] | |
actual["section_number"] = key | |
actual["actual_coords"] = "" | |
actual_list.append(actual) | |
changes = {} | |
#Changes JSON | |
changes["changed"] = result_agreement[key]['content']+' Analysis:New clause added'+'~!~addition' | |
changes["changed_page_number"] = result_agreement[key]['page_num'] | |
changes["changed_section_number"] = key | |
changes["changed_coords"] = result_agreement[key]['coords'] | |
changes_list.append(changes) | |
#changes_ui JSON | |
changes_ui ={} | |
changes_ui["changed"] = result_agreement[key]['content']+' Analysis:New clause added'+'~!~addition' | |
changes_ui["changed_page_number"] = result_agreement[key]['page_num'] | |
changes_ui["changed_section_number"] = key | |
changes_ui["changed_coords"] = result_agreement[key]['coords'] | |
changes_ui_list.append(changes_ui) | |
return changes_list,actual_list,changes_ui_list | |
def remove_analysis(text: str) -> str: | |
""" | |
Remove the analysis portion from the given text. | |
Args: | |
text (str): The text to remove the analysis from. | |
Returns: | |
str: The text with the analysis portion removed. | |
""" | |
new_text = re.sub(r"(Analysis.*)", "", text) | |
return new_text | |
def json_output(actual: List[Dict[str, str]], changes_ui: List[Dict[str, str]], file: str, template_files: str, result_template: str, result_agreement: str) -> Dict[str, any]: | |
""" | |
Generate a JSON output based on the provided inputs. | |
Args: | |
actual (List[Dict[str, str]]): The list of actual changes. | |
changes_ui (List[Dict[str, str]]): The list of UI changes. | |
file (str): The file path. | |
template_files (str): The template path. | |
result_template (str): The template result. | |
result_agreement (str): The agreement result. | |
Returns: | |
Dict[str, any]: The generated JSON output. | |
""" | |
deletion_prompt = """ | |
{} : This clause has been deleted from the existing contract. what is the impact? provide me short analysis in 1 single paragraph should not exceed 100 words. | |
""" | |
addition_prompt = """ | |
{} : This clause has been added in the existing contract. what is the impact? provide me short analysis in 1 single paragraph should not exceed 100 words. | |
""" | |
json_output = {} | |
json_output["input_file_path"] = file | |
comparison_list = [] | |
for i in range(len(actual)): | |
actual_changes = '{}' | |
actual_changes_json = json.loads(actual_changes) | |
analysis_final = re.sub(r'\n', ' ', changes_ui[i]['changed'].split("~!~")[0].split("Analysis:")[1]) | |
if changes_ui[i]['changed'].split('~!~')[-1] == 'addition' or changes_ui[i]['changed'].split('~!~')[-1] == 'missing': | |
continue | |
actual_changes_json.update({"actual": actual[i]['actual']}) | |
actual_changes_json.update({"page_number": actual[i]['page_number']}) | |
actual_changes_json.update({"section_number": actual[i]['section_number']}) | |
actual_changes_json.update({"actual_coords": actual[i]['actual_coords']}) | |
actual_changes_json.update({"changed": FindUpdate.remove_analysis(changes_ui[i]["changed"])}) | |
actual_changes_json.update({"changed_page_number": changes_ui[i]["changed_page_number"]}) | |
actual_changes_json.update({"changed_section_number": changes_ui[i]["changed_section_number"]}) | |
actual_changes_json.update({"changed_coords": changes_ui[i]["changed_coords"]}) | |
actual_changes_json.update({"analysis": analysis_final}) | |
actual_changes_json.update({"type_of_change": changes_ui[i]['changed'].split("~!~")[1]}) | |
comparison_list.append(actual_changes_json) | |
for i in range(len(changes_ui)): | |
actual_changes = '{}' | |
actual_changes_json = json.loads(actual_changes) | |
if changes_ui[i]['changed'].split('~!~')[-1] == 'missing': | |
actual_changes_json.update({"actual": actual[i]['actual']}) | |
actual_changes_json.update({"page_number": actual[i]['page_number']}) | |
actual_changes_json.update({"section_number": actual[i]['section_number']}) | |
actual_changes_json.update({"actual_coords": actual[i]['actual_coords']}) | |
actual_changes_json.update({"changed": FindUpdate.remove_analysis(changes_ui[i]["changed"])}) | |
actual_changes_json.update({"changed_page_number": changes_ui[i]["changed_page_number"]}) | |
actual_changes_json.update({"changed_section_number": changes_ui[i]["changed_section_number"]}) | |
actual_changes_json.update({"changed_coords": changes_ui[i]["changed_coords"]}) | |
final_deletion_prompt = deletion_prompt.format(actual[i]['actual']) | |
actual_changes_json.update({"analysis": FindUpdate.open_ai(final_deletion_prompt)}) | |
actual_changes_json.update({"type_of_change": "missing"}) | |
comparison_list.append(actual_changes_json) | |
if changes_ui[i]['changed'].split('~!~')[-1] == 'addition': | |
actual_changes_json.update({"actual": actual[i]['actual']}) | |
actual_changes_json.update({"page_number": actual[i]['page_number']}) | |
actual_changes_json.update({"section_number": actual[i]['section_number']}) | |
actual_changes_json.update({"actual_coords": actual[i]['actual_coords']}) | |
actual_changes_json.update({"changed": FindUpdate.remove_analysis(changes_ui[i]["changed"])}) | |
actual_changes_json.update({"changed_page_number": changes_ui[i]["changed_page_number"]}) | |
actual_changes_json.update({"changed_section_number": changes_ui[i]["changed_section_number"]}) | |
actual_changes_json.update({"changed_coords": changes_ui[i]["changed_coords"]}) | |
final_addition_prompt =addition_prompt.format(changes_ui[i]["changed"]) | |
actual_changes_json.update({"analysis": FindUpdate.open_ai(final_addition_prompt)}) | |
actual_changes_json.update({"type_of_change": "addition"}) | |
comparison_list.append(actual_changes_json) | |
json_output["changes"] = comparison_list | |
# Sort the data based on the "actual" key in ascending order | |
sorted_data = sorted(json_output["changes"], key=lambda x: FindUpdate.split_page_section(x["page_number"], x["section_number"])) | |
# Update the JSON data with the sorted_data | |
json_output["changes"] = sorted_data | |
return json_output | |
def process_files_template(file: str) -> List[str]: | |
""" | |
Process the template file and extract sections. | |
Args: | |
file (str): The path of the template file. | |
Returns: | |
List[str]: The extracted sections from the template file. | |
""" | |
try: | |
# Set the header and footer heights | |
header_height = 50 | |
footer_height = 80 | |
# Extract sections from the file | |
sections = FindUpdate.extract_sections(file, header_height, footer_height) | |
print("Template:",sections) | |
return sections | |
except Exception as e: | |
# Create a logger | |
logger = logging.getLogger(__name__) | |
logger.error(f"Error processing template file: {str(e)}") | |
raise | |
def process_files_original(file: str) -> List[str]: | |
""" | |
Process the original file and extract sections. | |
Args: | |
file (str): The path of the original file. | |
Returns: | |
List[str]: The extracted sections from the original file. | |
""" | |
try: | |
# Set the header and footer heights | |
header_height = 50 | |
footer_height = 80 | |
# Extract sections from the file | |
sections = FindUpdate.extract_sections(file, header_height, footer_height) | |
print("Original:",sections) | |
return sections | |
except Exception as e: | |
# Create a logger | |
logger = logging.getLogger(__name__) | |
logger.error(f"Error processing original file: {str(e)}") | |
raise | |
def main_processing_function(file_path: str, template_path: str): | |
""" | |
Processes the provided PDF file and its template to identify and report changes. | |
Args: | |
file_path (str): Path to the PDF file to be processed. | |
template_path (str): Path to the corresponding template PDF file. | |
Returns: | |
dict: A dictionary containing the processed results and change analysis. | |
""" | |
logger = logging.getLogger(__name__) | |
try: | |
# Process the original PDF file | |
result_agreement = FindUpdate.process_files_original(file_path) | |
# Process the template PDF file | |
result_template = FindUpdate.process_files_template(template_path) | |
# Compare the sections extracted from the original and template files | |
changes, actual, changes_ui = FindUpdate.process_comparisons(result_agreement, result_template) | |
# Generate a JSON output summarizing the changes | |
final_output = FindUpdate.json_output(actual, changes_ui, file_path, template_path, result_template, result_agreement) | |
return final_output | |
except Exception as e: | |
logger.error(f"Error processing files: {e}") | |
raise Exception(f"Error during file processing: {str(e)}") |