import logging import uuid from typing import Any from pydantic import BaseModel from qdrant_client import AsyncQdrantClient, models from mcp_server_qdrant.embeddings.base import EmbeddingProvider from mcp_server_qdrant.settings import METADATA_PATH logger = logging.getLogger(__name__) Metadata = dict[str, Any] ArbitraryFilter = dict[str, Any] class Entry(BaseModel): """ A single entry in the Qdrant collection. """ content: str metadata: Metadata | None = None SPARSE_VECTOR_NAME = "bm25" class QdrantConnector: """ Encapsulates the connection to a Qdrant server and all the methods to interact with it. :param qdrant_url: The URL of the Qdrant server. :param qdrant_api_key: The API key to use for the Qdrant server. :param collection_name: The name of the default collection to use. If not provided, each tool will require the collection name to be provided. :param embedding_provider: The embedding provider to use. :param qdrant_local_path: The path to the storage directory for the Qdrant client, if local mode is used. :param hybrid_search: Whether to enable hybrid search (dense + BM25 sparse vectors with RRF). """ def __init__( self, qdrant_url: str | None, qdrant_api_key: str | None, collection_name: str | None, embedding_provider: EmbeddingProvider, qdrant_local_path: str | None = None, field_indexes: dict[str, models.PayloadSchemaType] | None = None, hybrid_search: bool = False, ): self._qdrant_url = qdrant_url.rstrip("/") if qdrant_url else None self._qdrant_api_key = qdrant_api_key self._default_collection_name = collection_name self._embedding_provider = embedding_provider self._hybrid_search = hybrid_search and embedding_provider.supports_sparse() self._client = AsyncQdrantClient( location=qdrant_url, api_key=qdrant_api_key, path=qdrant_local_path ) self._field_indexes = field_indexes if self._hybrid_search: logger.info("Hybrid search enabled (dense + BM25 sparse vectors with RRF)") async def get_collection_names(self) -> list[str]: """ Get the names of all collections in the Qdrant server. :return: A list of collection names. """ response = await self._client.get_collections() return [collection.name for collection in response.collections] async def store(self, entry: Entry, *, collection_name: str | None = None): """ Store some information in the Qdrant collection, along with the specified metadata. :param entry: The entry to store in the Qdrant collection. :param collection_name: The name of the collection to store the information in, optional. If not provided, the default collection is used. """ collection_name = collection_name or self._default_collection_name assert collection_name is not None await self._ensure_collection_exists(collection_name) # Embed the document embeddings = await self._embedding_provider.embed_documents([entry.content]) # Build vector dict vector_name = self._embedding_provider.get_vector_name() vector_data: dict = {vector_name: embeddings[0]} # Add sparse vector if hybrid search is enabled if self._hybrid_search: sparse_embeddings = await self._embedding_provider.embed_documents_sparse( [entry.content] ) sparse = sparse_embeddings[0] vector_data[SPARSE_VECTOR_NAME] = models.SparseVector( indices=sparse.indices, values=sparse.values ) # Add to Qdrant payload = {"document": entry.content, METADATA_PATH: entry.metadata} await self._client.upsert( collection_name=collection_name, points=[ models.PointStruct( id=uuid.uuid4().hex, vector=vector_data, payload=payload, ) ], ) async def search( self, query: str, *, collection_name: str | None = None, limit: int = 10, query_filter: models.Filter | None = None, ) -> list[Entry]: """ Find points in the Qdrant collection. If there are no entries found, an empty list is returned. :param query: The query to use for the search. :param collection_name: The name of the collection to search in, optional. If not provided, the default collection is used. :param limit: The maximum number of entries to return. :param query_filter: The filter to apply to the query, if any. :return: A list of entries found. """ collection_name = collection_name or self._default_collection_name collection_exists = await self._client.collection_exists(collection_name) if not collection_exists: return [] query_vector = await self._embedding_provider.embed_query(query) vector_name = self._embedding_provider.get_vector_name() # Hybrid search: prefetch dense + sparse, fuse with RRF if self._hybrid_search: sparse_vector = await self._embedding_provider.embed_query_sparse(query) search_results = await self._client.query_points( collection_name=collection_name, prefetch=[ models.Prefetch( query=query_vector, using=vector_name, limit=limit, filter=query_filter, ), models.Prefetch( query=models.SparseVector( indices=sparse_vector.indices, values=sparse_vector.values, ), using=SPARSE_VECTOR_NAME, limit=limit, filter=query_filter, ), ], query=models.FusionQuery(fusion=models.Fusion.RRF), limit=limit, ) else: # Dense-only search (original behavior) search_results = await self._client.query_points( collection_name=collection_name, query=query_vector, using=vector_name, limit=limit, query_filter=query_filter, ) return [ Entry( content=result.payload["document"], metadata=result.payload.get("metadata"), ) for result in search_results.points ] async def _ensure_collection_exists(self, collection_name: str): """ Ensure that the collection exists, creating it if necessary. :param collection_name: The name of the collection to ensure exists. """ collection_exists = await self._client.collection_exists(collection_name) if not collection_exists: # Create the collection with the appropriate vector size vector_size = self._embedding_provider.get_vector_size() # Use the vector name as defined in the embedding provider vector_name = self._embedding_provider.get_vector_name() # Sparse vectors config for hybrid search (BM25) sparse_config = None if self._hybrid_search: sparse_config = { SPARSE_VECTOR_NAME: models.SparseVectorParams( modifier=models.Modifier.IDF, ) } await self._client.create_collection( collection_name=collection_name, vectors_config={ vector_name: models.VectorParams( size=vector_size, distance=models.Distance.COSINE, ) }, sparse_vectors_config=sparse_config, ) # Always index metadata.project for efficient filtering await self._client.create_payload_index( collection_name=collection_name, field_name="metadata.project", field_schema=models.PayloadSchemaType.KEYWORD, ) # Create payload indexes if configured if self._field_indexes: for field_name, field_type in self._field_indexes.items(): await self._client.create_payload_index( collection_name=collection_name, field_name=field_name, field_schema=field_type, )