Spaces:
Runtime error
Runtime error
File size: 5,049 Bytes
d038098 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import os
import uuid
from pathlib import Path
from pinecone.grpc import PineconeGRPC as Pinecone
from pinecone import ServerlessSpec
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
current_dir = Path(__file__).resolve().parent
os.environ['PINECONE_API_KEY'] = "988da8ab-3725-4047-b622-cc42d07ecb6c"
os.environ['OPENAI_API_KEY'] = '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'
class DataIndexer:
source_file = os.path.join(current_dir, 'sources.txt')
def __init__(self, index_name='langchain-repo') -> None:
# self.embedding_client = InferenceClient(
# "dunzhang/stella_en_1.5B_v5",
# )
self.embedding_client = OpenAIEmbeddings()
self.index_name = index_name
self.pinecone_client = Pinecone(api_key=os.environ.get('PINECONE_API_KEY'))
if index_name not in self.pinecone_client.list_indexes().names():
self.pinecone_client.create_index(
name=index_name,
dimension=1536,
metric='cosine',
spec=ServerlessSpec(
cloud='aws',
region='us-east-1'
)
)
self.index = self.pinecone_client.Index(self.index_name)
self.source_index = self.get_source_index()
# self.source_index = None
def get_source_index(self):
if not os.path.isfile(self.source_file):
print('No source file')
return None
print('create source index')
with open(self.source_file, 'r') as file:
sources = file.readlines()
sources = [s.rstrip('\n') for s in sources]
vectorstore = Chroma.from_texts(
sources, embedding=self.embedding_client
)
return vectorstore
def index_data(self, docs, batch_size=32):
with open(self.source_file, 'a') as file:
for doc in docs:
file.writelines(doc.metadata['source'] + '\n')
for i in range(0, len(docs), batch_size):
batch = docs[i: i + batch_size]
values = self.embedding_client.embed_documents([
doc.page_content for doc in batch
])
# values = self.embedding_client.feature_extraction([
# doc.page_content for doc in batch
# ])
vector_ids = [str(uuid.uuid4()) for _ in batch]
metadatas = [{
'text': doc.page_content,
**doc.metadata
} for doc in batch]
vectors = [{
'id': vector_id,
'values': value,
'metadata': metadata
} for vector_id, value, metadata in zip(vector_ids, values, metadatas)]
try:
upsert_response = self.index.upsert(vectors=vectors)
print(upsert_response)
except Exception as e:
print(e)
def search(self, text_query, top_k=5, hybrid_search=False):
print('text query:', text_query)
filter = None
if hybrid_search and self.source_index:
source_docs = self.source_index.similarity_search(text_query, 50)
print("source_docs", source_docs)
filter = {"source": {"$in":[doc.page_content for doc in source_docs]}}
# vector = self.embedding_client.feature_extraction(text_query)
vector = self.embedding_client.embed_query(text_query)
result = self.index.query(
vector=vector,
top_k=top_k,
include_metadata=True,
filter=filter
)
docs = []
for res in result["matches"]:
metadata = res["metadata"]
if 'text' in metadata:
text = metadata.pop('text')
docs.append(text)
return docs
if __name__ == '__main__':
from langchain_community.document_loaders import GitLoader
from langchain_text_splitters import (
Language,
RecursiveCharacterTextSplitter,
)
loader = GitLoader(
clone_url="https://github.com/langchain-ai/langchain",
repo_path="./code_data/langchain_repo/",
branch="master",
)
python_splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON, chunk_size=10000, chunk_overlap=100
)
docs = loader.load()
docs = [doc for doc in docs if doc.metadata['file_type'] in ['.py', '.md']]
docs = [doc for doc in docs if len(doc.page_content) < 50000]
docs = python_splitter.split_documents(docs)
for doc in docs:
doc.page_content = '# {}\n\n'.format(doc.metadata['source']) + doc.page_content
indexer = DataIndexer()
with open('/app/sources.txt', 'a') as file:
for doc in docs:
file.writelines(doc.metadata['source'] + '\n')
print('DONE')
indexer.index_data(docs)
|