|
import importlib.util
|
|
import os
|
|
|
|
from joblib import Parallel, delayed
|
|
|
|
from openhands.core.config import LLMConfig
|
|
from openhands.core.logger import openhands_logger as logger
|
|
|
|
try:
|
|
|
|
if importlib.util.find_spec('chromadb') is None:
|
|
raise ImportError(
|
|
'chromadb is not available. Please install it using poetry install --with llama-index'
|
|
)
|
|
|
|
if (
|
|
importlib.util.find_spec(
|
|
'llama_index.core.indices.vector_store.retrievers.retriever'
|
|
)
|
|
is None
|
|
or importlib.util.find_spec('llama_index.core.indices.vector_store.base')
|
|
is None
|
|
):
|
|
raise ImportError(
|
|
'llama_index is not available. Please install it using poetry install --with llama-index'
|
|
)
|
|
|
|
from llama_index.core import Document, VectorStoreIndex
|
|
from llama_index.core.base.embeddings.base import BaseEmbedding
|
|
from llama_index.core.ingestion import IngestionPipeline
|
|
from llama_index.core.schema import TextNode
|
|
|
|
LLAMA_INDEX_AVAILABLE = True
|
|
|
|
except ImportError:
|
|
LLAMA_INDEX_AVAILABLE = False
|
|
|
|
|
|
SUPPORTED_OLLAMA_EMBED_MODELS = [
|
|
'llama2',
|
|
'mxbai-embed-large',
|
|
'nomic-embed-text',
|
|
'all-minilm',
|
|
'stable-code',
|
|
'bge-m3',
|
|
'bge-large',
|
|
'paraphrase-multilingual',
|
|
'snowflake-arctic-embed',
|
|
]
|
|
|
|
|
|
def check_llama_index():
|
|
"""Utility function to check the availability of llama_index.
|
|
|
|
Raises:
|
|
ImportError: If llama_index is not available.
|
|
"""
|
|
if not LLAMA_INDEX_AVAILABLE:
|
|
raise ImportError(
|
|
'llama_index and its dependencies are not installed. '
|
|
'To use memory features, please run: poetry install --with llama-index.'
|
|
)
|
|
|
|
|
|
class EmbeddingsLoader:
|
|
"""Loader for embedding model initialization."""
|
|
|
|
@staticmethod
|
|
def get_embedding_model(strategy: str, llm_config: LLMConfig) -> 'BaseEmbedding':
|
|
"""Initialize and return the appropriate embedding model based on the strategy.
|
|
|
|
Parameters:
|
|
- strategy: The embedding strategy to use.
|
|
- llm_config: Configuration for the LLM.
|
|
|
|
Returns:
|
|
- An instance of the selected embedding model or None.
|
|
"""
|
|
|
|
if strategy in SUPPORTED_OLLAMA_EMBED_MODELS:
|
|
from llama_index.embeddings.ollama import OllamaEmbedding
|
|
|
|
return OllamaEmbedding(
|
|
model_name=strategy,
|
|
base_url=llm_config.embedding_base_url,
|
|
ollama_additional_kwargs={'mirostat': 0},
|
|
)
|
|
elif strategy == 'openai':
|
|
from llama_index.embeddings.openai import OpenAIEmbedding
|
|
|
|
return OpenAIEmbedding(
|
|
model='text-embedding-ada-002',
|
|
api_key=llm_config.api_key.get_secret_value()
|
|
if llm_config.api_key
|
|
else None,
|
|
)
|
|
elif strategy == 'azureopenai':
|
|
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
|
|
|
|
return AzureOpenAIEmbedding(
|
|
model='text-embedding-ada-002',
|
|
deployment_name=llm_config.embedding_deployment_name,
|
|
api_key=llm_config.api_key,
|
|
azure_endpoint=llm_config.base_url,
|
|
api_version=llm_config.api_version,
|
|
)
|
|
elif strategy == 'voyage':
|
|
from llama_index.embeddings.voyageai import VoyageEmbedding
|
|
|
|
return VoyageEmbedding(
|
|
model_name='voyage-code-3',
|
|
)
|
|
elif (strategy is not None) and (strategy.lower() == 'none'):
|
|
|
|
|
|
|
|
return None
|
|
else:
|
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
|
|
|
|
|
|
local_embed_model = HuggingFaceEmbedding(
|
|
model_name='BAAI/bge-small-en-v1.5'
|
|
)
|
|
|
|
|
|
import torch
|
|
|
|
|
|
|
|
if torch.cuda.is_available():
|
|
device = 'cuda'
|
|
elif torch.backends.mps.is_available() and torch.backends.mps.is_built():
|
|
device = 'mps'
|
|
else:
|
|
device = 'cpu'
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = ''
|
|
os.environ['PYTORCH_FORCE_CPU'] = (
|
|
'1'
|
|
)
|
|
|
|
|
|
torch.cuda.is_available = lambda: False
|
|
|
|
|
|
if device != 'mps' and hasattr(torch.backends, 'mps'):
|
|
torch.backends.mps.is_available = lambda: False
|
|
torch.backends.mps.is_built = False
|
|
|
|
|
|
logger.debug(f'Using device for embeddings: {device}')
|
|
|
|
return local_embed_model
|
|
|
|
|
|
|
|
|
|
|
|
def run_pipeline(
|
|
embed_model: 'BaseEmbedding', documents: list['Document'], num_workers: int
|
|
) -> list['TextNode']:
|
|
"""Run a pipeline embedding documents."""
|
|
|
|
|
|
pipeline = IngestionPipeline(
|
|
transformations=[
|
|
embed_model,
|
|
],
|
|
)
|
|
|
|
|
|
nodes = pipeline.run(
|
|
documents=documents, show_progress=True, num_workers=num_workers
|
|
)
|
|
return nodes
|
|
|
|
|
|
def insert_batch_docs(
|
|
index: 'VectorStoreIndex', documents: list['Document'], num_workers: int
|
|
) -> list['TextNode']:
|
|
"""Run the document indexing in parallel."""
|
|
results = Parallel(n_jobs=num_workers, backend='threading')(
|
|
delayed(index.insert)(doc) for doc in documents
|
|
)
|
|
return results
|
|
|