File size: 9,652 Bytes
d26280a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
This module is responsible for processing the corpus and feeding it into chromaDB. It will receive a corpus of text. 
It will then split it into chunks of specified length. For each of those chunks, it will append surrounding context.
It will only include full words.
"""

import re
import bisect

import extensions.superboogav2.parameters as parameters

from .data_preprocessor import TextPreprocessorBuilder, TextSummarizer
from .chromadb import ChromaCollector

def preprocess_text_no_summary(text) -> str:
    builder = TextPreprocessorBuilder(text)
    if parameters.should_to_lower():
        builder.to_lower()

    if parameters.should_remove_punctuation():
        builder.remove_punctuation()

    if parameters.should_remove_specific_pos():
        builder.remove_specific_pos()

    if parameters.should_remove_stopwords():
        builder.remove_stopwords

    if parameters.should_lemmatize():
        builder.lemmatize()

    if parameters.should_merge_spaces():
        builder.merge_spaces

    if parameters.should_strip():
        builder.strip()

    if parameters.get_num_conversion_strategy():
        if parameters.get_num_conversion_strategy() == parameters.NUM_TO_WORD_METHOD:
            builder.num_to_word(parameters.get_min_num_length())
        elif parameters.get_num_conversion_strategy() == parameters.NUM_TO_CHAR_METHOD:
            builder.num_to_char(parameters.get_min_num_length())
        elif parameters.get_num_conversion_strategy() == parameters.NUM_TO_CHAR_LONG_METHOD:
            builder.num_to_char_long(parameters.get_min_num_length())
    
    return builder.build()


def preprocess_text(text) -> list[str]:
    important_sentences = TextSummarizer.process_long_text(text, parameters.get_min_num_sentences())
    return [preprocess_text_no_summary(sent) for sent in important_sentences]


def _create_chunks_with_context(corpus, chunk_len, context_left, context_right):
    """
    This function takes a corpus of text and splits it into chunks of a specified length, 
    then adds a specified amount of context to each chunk. The context is added by first 
    going backwards from the start of the chunk and then going forwards from the end of the 
    chunk, ensuring that the context includes only whole words and that the total context length 
    does not exceed the specified limit. This function uses binary search for efficiency.

    Returns:
    chunks (list of str): The chunks of text.
    chunks_with_context (list of str): The chunks of text with added context.
    chunk_with_context_start_indices (list of int): The starting indices of each chunk with context in the corpus.
    """
    words = re.split('(\\s+)', corpus)
    word_start_indices = [0]
    current_index = 0

    for word in words:
        current_index += len(word)
        word_start_indices.append(current_index)

    chunks, chunk_lengths, chunk_start_indices, chunk_with_context_start_indices = [], [], [], []
    current_length = 0
    current_index = 0
    chunk = []

    for word in words:
        if current_length + len(word) > chunk_len:
            chunks.append(''.join(chunk))
            chunk_lengths.append(current_length)
            chunk_start_indices.append(current_index - current_length)
            chunk = [word]
            current_length = len(word)
        else:
            chunk.append(word)
            current_length += len(word)
        current_index += len(word)

    if chunk:
        chunks.append(''.join(chunk))
        chunk_lengths.append(current_length)
        chunk_start_indices.append(current_index - current_length)

    chunks_with_context = []
    for start_index, chunk_length in zip(chunk_start_indices, chunk_lengths):
        context_start_index = bisect.bisect_right(word_start_indices, start_index - context_left)
        context_end_index = bisect.bisect_left(word_start_indices, start_index + chunk_length + context_right)

        # Combine all the words in the context range (before, chunk, and after)
        chunk_with_context = ''.join(words[context_start_index:context_end_index])
        chunks_with_context.append(chunk_with_context)
        
        # Determine the start index of the chunk with context
        chunk_with_context_start_index = word_start_indices[context_start_index]
        chunk_with_context_start_indices.append(chunk_with_context_start_index)

    return chunks, chunks_with_context, chunk_with_context_start_indices


def _clear_chunks(data_chunks, data_chunks_with_context, data_chunk_starting_indices):
    distinct_data_chunks = []
    distinct_data_chunks_with_context = []
    distinct_data_chunk_starting_indices = []

    seen_chunks = dict()

    for chunk, context, index in zip(data_chunks, data_chunks_with_context, data_chunk_starting_indices):
        # Skip the chunk if it does not contain any alphanumeric characters
        if not any(char.isalnum() for char in chunk):
            continue

        seen_chunk_start = seen_chunks.get(chunk)
        if seen_chunk_start:
            # If we've already seen this exact chunk, and the context around it it very close to the seen chunk, then skip it.
            if abs(seen_chunk_start-index) < parameters.get_delta_start():
                continue
        
        distinct_data_chunks.append(chunk)
        distinct_data_chunks_with_context.append(context)
        distinct_data_chunk_starting_indices.append(index)

        seen_chunks[chunk] = index

    return distinct_data_chunks, distinct_data_chunks_with_context, distinct_data_chunk_starting_indices


def process_and_add_to_collector(corpus: str, collector: ChromaCollector, clear_collector_before_adding: bool, metadata: dict):
    # Defining variables
    chunk_lens = [int(len.strip()) for len in parameters.get_chunk_len().split(',')]
    context_len = [int(len.strip()) for len in parameters.get_context_len().split(',')]
    if len(context_len) >= 3:
        raise f"Context len has too many values: {len(context_len)}"
    if len(context_len) == 2:
        context_left = context_len[0]
        context_right = context_len[1]
    else:
        context_left = context_right = context_len[0]

    data_chunks = []
    data_chunks_with_context = []
    data_chunk_starting_indices = []

    # Handling chunk_regex
    if parameters.get_chunk_regex():
        if parameters.get_chunk_separator():
            cumulative_length = 0  # This variable will store the length of the processed corpus
            sections = corpus.split(parameters.get_chunk_separator())
            for section in sections:
                special_chunks = list(re.finditer(parameters.get_chunk_regex(), section))
                for match in special_chunks:
                    chunk = match.group(0)
                    start_index = match.start()
                    end_index = start_index + len(chunk)
                    context = section[max(0, start_index - context_left):min(len(section), end_index + context_right)]
                    data_chunks.append(chunk)
                    data_chunks_with_context.append(context)
                    data_chunk_starting_indices.append(cumulative_length + max(0, start_index - context_left))
                cumulative_length += len(section) + len(parameters.get_chunk_separator())  # Update the length of the processed corpus
        else:
            special_chunks = list(re.finditer(parameters.get_chunk_regex(), corpus))
            for match in special_chunks:
                chunk = match.group(0)
                start_index = match.start()
                end_index = start_index + len(chunk)
                context = corpus[max(0, start_index - context_left):min(len(corpus), end_index + context_right)]
                data_chunks.append(chunk)
                data_chunks_with_context.append(context)
                data_chunk_starting_indices.append(max(0, start_index - context_left))

    for chunk_len in chunk_lens:
        # Breaking the data into chunks and adding those to the db
        if parameters.get_chunk_separator():
            cumulative_length = 0  # This variable will store the length of the processed corpus
            sections = corpus.split(parameters.get_chunk_separator())
            for section in sections:
                chunks, chunks_with_context, context_start_indices = _create_chunks_with_context(section, chunk_len, context_left, context_right)
                context_start_indices = [cumulative_length + i for i in context_start_indices]  # Add the length of the processed corpus to each start index
                data_chunks.extend(chunks)
                data_chunks_with_context.extend(chunks_with_context)
                data_chunk_starting_indices.extend(context_start_indices)
                cumulative_length += len(section) + len(parameters.get_chunk_separator())  # Update the length of the processed corpus
        else:
            chunks, chunks_with_context, context_start_indices = _create_chunks_with_context(corpus, chunk_len, context_left, context_right)
            data_chunks.extend(chunks)
            data_chunks_with_context.extend(chunks_with_context)
            data_chunk_starting_indices.extend(context_start_indices)

    data_chunks = [preprocess_text_no_summary(chunk) for chunk in data_chunks]

    data_chunks, data_chunks_with_context, data_chunk_starting_indices = _clear_chunks(
        data_chunks, data_chunks_with_context, data_chunk_starting_indices
    )

    if clear_collector_before_adding:
        collector.clear()
    collector.add(data_chunks, data_chunks_with_context, data_chunk_starting_indices, [metadata]*len(data_chunks) if metadata is not None else None)