|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
|
import re
|
|
|
|
from typing import List, Optional, Tuple, Dict, Any
|
|
|
|
from openai import OpenAI
|
|
from tqdm import tqdm
|
|
|
|
|
|
from transformers import GPT2Tokenizer
|
|
import nltk
|
|
from nltk.tokenize import sent_tokenize, word_tokenize
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
|
|
from App_Function_Libraries.Tokenization_Methods_Lib import openai_tokenize
|
|
from App_Function_Libraries.Utils import load_comprehensive_config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ntlk_prep():
|
|
nltk.download('punkt')
|
|
|
|
|
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
|
|
|
|
|
|
config = load_comprehensive_config()
|
|
openai_api_key = config.get('API', 'openai_api_key', fallback=None)
|
|
|
|
def load_document(file_path):
|
|
with open(file_path, 'r') as file:
|
|
text = file.read()
|
|
return re.sub('\\s+', ' ', text).strip()
|
|
|
|
|
|
def improved_chunking_process(text: str, chunk_options: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
chunk_method = chunk_options.get('method', 'words')
|
|
max_chunk_size = chunk_options.get('max_size', 300)
|
|
overlap = chunk_options.get('overlap', 0)
|
|
language = chunk_options.get('language', 'english')
|
|
adaptive = chunk_options.get('adaptive', False)
|
|
multi_level = chunk_options.get('multi_level', False)
|
|
|
|
if adaptive:
|
|
max_chunk_size = adaptive_chunk_size(text, max_chunk_size)
|
|
|
|
if multi_level:
|
|
chunks = multi_level_chunking(text, chunk_method, max_chunk_size, overlap, language)
|
|
else:
|
|
if chunk_method == 'words':
|
|
chunks = chunk_text_by_words(text, max_chunk_size, overlap)
|
|
elif chunk_method == 'sentences':
|
|
chunks = chunk_text_by_sentences(text, max_chunk_size, overlap, language)
|
|
elif chunk_method == 'paragraphs':
|
|
chunks = chunk_text_by_paragraphs(text, max_chunk_size, overlap)
|
|
elif chunk_method == 'tokens':
|
|
chunks = chunk_text_by_tokens(text, max_chunk_size, overlap)
|
|
else:
|
|
chunks = [text]
|
|
|
|
return [{'text': chunk, 'metadata': get_chunk_metadata(chunk, text)} for chunk in chunks]
|
|
|
|
|
|
def adaptive_chunk_size(text: str, base_size: int) -> int:
|
|
|
|
avg_word_length = sum(len(word) for word in text.split()) / len(text.split())
|
|
if avg_word_length > 6:
|
|
return int(base_size * 0.8)
|
|
return base_size
|
|
|
|
|
|
def multi_level_chunking(text: str, method: str, max_size: int, overlap: int, language: str) -> List[str]:
|
|
|
|
paragraphs = chunk_text_by_paragraphs(text, max_size * 2, overlap)
|
|
|
|
|
|
chunks = []
|
|
for para in paragraphs:
|
|
if method == 'words':
|
|
chunks.extend(chunk_text_by_words(para, max_size, overlap))
|
|
elif method == 'sentences':
|
|
chunks.extend(chunk_text_by_sentences(para, max_size, overlap, language))
|
|
else:
|
|
chunks.append(para)
|
|
|
|
return chunks
|
|
|
|
|
|
def chunk_text_by_words(text: str, max_words: int = 300, overlap: int = 0) -> List[str]:
|
|
words = text.split()
|
|
chunks = []
|
|
for i in range(0, len(words), max_words - overlap):
|
|
chunk = ' '.join(words[i:i + max_words])
|
|
chunks.append(chunk)
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def chunk_text_by_sentences(text: str, max_sentences: int = 10, overlap: int = 0, language: str = 'english') -> List[
|
|
str]:
|
|
nltk.download('punkt', quiet=True)
|
|
sentences = nltk.sent_tokenize(text, language=language)
|
|
chunks = []
|
|
for i in range(0, len(sentences), max_sentences - overlap):
|
|
chunk = ' '.join(sentences[i:i + max_sentences])
|
|
chunks.append(chunk)
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def chunk_text_by_paragraphs(text: str, max_paragraphs: int = 5, overlap: int = 0) -> List[str]:
|
|
paragraphs = re.split(r'\n\s*\n', text)
|
|
chunks = []
|
|
for i in range(0, len(paragraphs), max_paragraphs - overlap):
|
|
chunk = '\n\n'.join(paragraphs[i:i + max_paragraphs])
|
|
chunks.append(chunk)
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def chunk_text_by_tokens(text: str, max_tokens: int = 1000, overlap: int = 0) -> List[str]:
|
|
|
|
|
|
words = text.split()
|
|
chunks = []
|
|
current_chunk = []
|
|
current_token_count = 0
|
|
|
|
for word in words:
|
|
word_token_count = len(word) // 4 + 1
|
|
if current_token_count + word_token_count > max_tokens and current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
current_chunk = current_chunk[-overlap:] if overlap > 0 else []
|
|
current_token_count = sum(len(w) // 4 + 1 for w in current_chunk)
|
|
|
|
current_chunk.append(word)
|
|
current_token_count += word_token_count
|
|
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
|
|
return post_process_chunks(chunks)
|
|
|
|
|
|
def post_process_chunks(chunks: List[str]) -> List[str]:
|
|
return [chunk.strip() for chunk in chunks if chunk.strip()]
|
|
|
|
|
|
def get_chunk_metadata(chunk: str, full_text: str) -> Dict[str, Any]:
|
|
start_index = full_text.index(chunk)
|
|
return {
|
|
'start_index': start_index,
|
|
'end_index': start_index + len(chunk),
|
|
'word_count': len(chunk.split()),
|
|
'char_count': len(chunk)
|
|
}
|
|
|
|
|
|
|
|
def chunk_text_hybrid(text, max_tokens=1000):
|
|
sentences = nltk.tokenize.sent_tokenize(text)
|
|
chunks = []
|
|
current_chunk = []
|
|
current_length = 0
|
|
|
|
for sentence in sentences:
|
|
tokens = tokenizer.encode(sentence)
|
|
if current_length + len(tokens) <= max_tokens:
|
|
current_chunk.append(sentence)
|
|
current_length += len(tokens)
|
|
else:
|
|
chunks.append(' '.join(current_chunk))
|
|
current_chunk = [sentence]
|
|
current_length = len(tokens)
|
|
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
|
|
return chunks
|
|
|
|
|
|
def chunk_on_delimiter(input_string: str,
|
|
max_tokens: int,
|
|
delimiter: str) -> List[str]:
|
|
chunks = input_string.split(delimiter)
|
|
combined_chunks, _, dropped_chunk_count = combine_chunks_with_no_minimum(
|
|
chunks, max_tokens, chunk_delimiter=delimiter, add_ellipsis_for_overflow=True)
|
|
if dropped_chunk_count > 0:
|
|
print(f"Warning: {dropped_chunk_count} chunks were dropped due to exceeding the token limit.")
|
|
combined_chunks = [f"{chunk}{delimiter}" for chunk in combined_chunks]
|
|
return combined_chunks
|
|
|
|
|
|
def recursive_summarize_chunks(chunks, summarize_func, custom_prompt):
|
|
summarized_chunks = []
|
|
current_summary = ""
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
if i == 0:
|
|
current_summary = summarize_func(chunk, custom_prompt)
|
|
else:
|
|
combined_text = current_summary + "\n\n" + chunk
|
|
current_summary = summarize_func(combined_text, custom_prompt)
|
|
|
|
summarized_chunks.append(current_summary)
|
|
|
|
return summarized_chunks
|
|
|
|
|
|
|
|
sample_text = """
|
|
Natural language processing (NLP) is a subfield of linguistics, computer science, and artificial intelligence
|
|
concerned with the interactions between computers and human language, in particular how to program computers
|
|
to process and analyze large amounts of natural language data. The result is a computer capable of "understanding"
|
|
the contents of documents, including the contextual nuances of the language within them. The technology can then
|
|
accurately extract information and insights contained in the documents as well as categorize and organize the documents themselves.
|
|
|
|
Challenges in natural language processing frequently involve speech recognition, natural language understanding,
|
|
and natural language generation.
|
|
|
|
Natural language processing has its roots in the 1950s. Already in 1950, Alan Turing published an article titled
|
|
"Computing Machinery and Intelligence" which proposed what is now called the Turing test as a criterion of intelligence.
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def count_units(text, unit='tokens'):
|
|
if unit == 'words':
|
|
return len(text.split())
|
|
elif unit == 'tokens':
|
|
return len(word_tokenize(text))
|
|
elif unit == 'characters':
|
|
return len(text)
|
|
else:
|
|
raise ValueError("Invalid unit. Choose 'words', 'tokens', or 'characters'.")
|
|
|
|
|
|
def semantic_chunking(text, max_chunk_size=2000, unit='words'):
|
|
nltk.download('punkt', quiet=True)
|
|
sentences = sent_tokenize(text)
|
|
vectorizer = TfidfVectorizer()
|
|
sentence_vectors = vectorizer.fit_transform(sentences)
|
|
|
|
chunks = []
|
|
current_chunk = []
|
|
current_size = 0
|
|
|
|
for i, sentence in enumerate(sentences):
|
|
sentence_size = count_units(sentence, unit)
|
|
if current_size + sentence_size > max_chunk_size and current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
overlap_size = count_units(' '.join(current_chunk[-3:]), unit)
|
|
current_chunk = current_chunk[-3:]
|
|
current_size = overlap_size
|
|
|
|
current_chunk.append(sentence)
|
|
current_size += sentence_size
|
|
|
|
if i + 1 < len(sentences):
|
|
current_vector = sentence_vectors[i]
|
|
next_vector = sentence_vectors[i + 1]
|
|
similarity = cosine_similarity(current_vector, next_vector)[0][0]
|
|
if similarity < 0.5 and current_size >= max_chunk_size // 2:
|
|
chunks.append(' '.join(current_chunk))
|
|
overlap_size = count_units(' '.join(current_chunk[-3:]), unit)
|
|
current_chunk = current_chunk[-3:]
|
|
current_size = overlap_size
|
|
|
|
if current_chunk:
|
|
chunks.append(' '.join(current_chunk))
|
|
|
|
return chunks
|
|
|
|
|
|
def semantic_chunk_long_file(file_path, max_chunk_size=1000, overlap=100):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
|
|
chunks = semantic_chunking(content, max_chunk_size, overlap)
|
|
return chunks
|
|
except Exception as e:
|
|
logging.error(f"Error chunking text file: {str(e)}")
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
client = OpenAI(api_key=openai_api_key)
|
|
def get_chat_completion(messages, model='gpt-4-turbo'):
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=messages,
|
|
temperature=0,
|
|
)
|
|
return response.choices[0].message.content
|
|
|
|
|
|
|
|
|
|
def combine_chunks_with_no_minimum(
|
|
chunks: List[str],
|
|
max_tokens: int,
|
|
chunk_delimiter="\n\n",
|
|
header: Optional[str] = None,
|
|
add_ellipsis_for_overflow=False,
|
|
) -> Tuple[List[str], List[int]]:
|
|
dropped_chunk_count = 0
|
|
output = []
|
|
output_indices = []
|
|
candidate = (
|
|
[] if header is None else [header]
|
|
)
|
|
candidate_indices = []
|
|
for chunk_i, chunk in enumerate(chunks):
|
|
chunk_with_header = [chunk] if header is None else [header, chunk]
|
|
|
|
if len(openai_tokenize(chunk_delimiter.join(chunk_with_header))) > max_tokens:
|
|
print(f"warning: chunk overflow")
|
|
if (
|
|
add_ellipsis_for_overflow
|
|
|
|
and len(openai_tokenize(chunk_delimiter.join(candidate + ["..."]))) <= max_tokens
|
|
):
|
|
candidate.append("...")
|
|
dropped_chunk_count += 1
|
|
continue
|
|
|
|
|
|
extended_candidate_token_count = len(openai_tokenize(chunk_delimiter.join(candidate + [chunk])))
|
|
|
|
if extended_candidate_token_count > max_tokens:
|
|
output.append(chunk_delimiter.join(candidate))
|
|
output_indices.append(candidate_indices)
|
|
candidate = chunk_with_header
|
|
candidate_indices = [chunk_i]
|
|
|
|
else:
|
|
candidate.append(chunk)
|
|
candidate_indices.append(chunk_i)
|
|
|
|
if (header is not None and len(candidate) > 1) or (header is None and len(candidate) > 0):
|
|
output.append(chunk_delimiter.join(candidate))
|
|
output_indices.append(candidate_indices)
|
|
return output, output_indices, dropped_chunk_count
|
|
|
|
|
|
def rolling_summarize(text: str,
|
|
detail: float = 0,
|
|
model: str = 'gpt-4-turbo',
|
|
additional_instructions: Optional[str] = None,
|
|
minimum_chunk_size: Optional[int] = 500,
|
|
chunk_delimiter: str = ".",
|
|
summarize_recursively=False,
|
|
verbose=False):
|
|
"""
|
|
Summarizes a given text by splitting it into chunks, each of which is summarized individually.
|
|
The level of detail in the summary can be adjusted, and the process can optionally be made recursive.
|
|
|
|
Parameters:
|
|
- text (str): The text to be summarized.
|
|
- detail (float, optional): A value between 0 and 1
|
|
indicating the desired level of detail in the summary. 0 leads to a higher level summary, and 1 results in a more
|
|
detailed summary. Defaults to 0.
|
|
- additional_instructions (Optional[str], optional): Additional instructions to provide to the
|
|
model for customizing summaries. - minimum_chunk_size (Optional[int], optional): The minimum size for text
|
|
chunks. Defaults to 500.
|
|
- chunk_delimiter (str, optional): The delimiter used to split the text into chunks. Defaults to ".".
|
|
- summarize_recursively (bool, optional): If True, summaries are generated recursively, using previous summaries for context.
|
|
- verbose (bool, optional): If True, prints detailed information about the chunking process.
|
|
Returns:
|
|
- str: The final compiled summary of the text.
|
|
|
|
The function first determines the number of chunks by interpolating between a minimum and a maximum chunk count
|
|
based on the `detail` parameter. It then splits the text into chunks and summarizes each chunk. If
|
|
`summarize_recursively` is True, each summary is based on the previous summaries, adding more context to the
|
|
summarization process. The function returns a compiled summary of all chunks.
|
|
"""
|
|
|
|
|
|
assert 0 <= detail <= 1
|
|
|
|
|
|
max_chunks = len(chunk_on_delimiter(text, minimum_chunk_size, chunk_delimiter))
|
|
min_chunks = 1
|
|
num_chunks = int(min_chunks + detail * (max_chunks - min_chunks))
|
|
|
|
|
|
|
|
document_length = len(openai_tokenize(text))
|
|
chunk_size = max(minimum_chunk_size, document_length // num_chunks)
|
|
text_chunks = chunk_on_delimiter(text, chunk_size, chunk_delimiter)
|
|
if verbose:
|
|
print(f"Splitting the text into {len(text_chunks)} chunks to be summarized.")
|
|
|
|
print(f"Chunk lengths are {[len(openai_tokenize(x)) for x in text_chunks]}")
|
|
|
|
|
|
system_message_content = "Rewrite this text in summarized form."
|
|
if additional_instructions is not None:
|
|
system_message_content += f"\n\n{additional_instructions}"
|
|
|
|
accumulated_summaries = []
|
|
for i, chunk in enumerate(tqdm(text_chunks)):
|
|
if summarize_recursively and accumulated_summaries:
|
|
|
|
combined_text = accumulated_summaries[-1] + "\n\n" + chunk
|
|
user_message_content = f"Previous summary and new content to summarize:\n\n{combined_text}"
|
|
else:
|
|
user_message_content = chunk
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_message_content},
|
|
{"role": "user", "content": user_message_content}
|
|
]
|
|
|
|
response = get_chat_completion(messages, model=model)
|
|
accumulated_summaries.append(response)
|
|
|
|
final_summary = '\n\n'.join(accumulated_summaries)
|
|
return final_summary
|
|
|
|
|
|
|
|
|