Add metadata support and integration tests for QdrantConnector (#25)

This commit is contained in:
Kacper Łukawski
2025-03-10 17:06:26 +01:00
committed by GitHub
parent b9f773e99c
commit bd155b13d0
7 changed files with 223 additions and 27 deletions

View File

@@ -1,9 +1,24 @@
import logging
import uuid
from typing import Optional
from typing import Any, Dict, Optional
from pydantic import BaseModel
from qdrant_client import AsyncQdrantClient, models
from .embeddings.base import EmbeddingProvider
from mcp_server_qdrant.embeddings.base import EmbeddingProvider
logger = logging.getLogger(__name__)
Metadata = Dict[str, Any]
class Entry(BaseModel):
"""
A single entry in the Qdrant collection.
"""
content: str
metadata: Optional[Metadata] = None
class QdrantConnector:
@@ -53,30 +68,31 @@ class QdrantConnector:
},
)
async def store(self, information: str):
async def store(self, entry: Entry):
"""
Store some information in the Qdrant collection.
:param information: The information to store.
Store some information in the Qdrant collection, along with the specified metadata.
:param entry: The entry to store in the Qdrant collection.
"""
await self._ensure_collection_exists()
# Embed the document
embeddings = await self._embedding_provider.embed_documents([information])
embeddings = await self._embedding_provider.embed_documents([entry.content])
# Add to Qdrant
vector_name = self._embedding_provider.get_vector_name()
payload = {"document": entry.content, "metadata": entry.metadata}
await self._client.upsert(
collection_name=self._collection_name,
points=[
models.PointStruct(
id=uuid.uuid4().hex,
vector={vector_name: embeddings[0]},
payload={"document": information},
payload=payload,
)
],
)
async def search(self, query: str) -> list[str]:
async def search(self, query: str) -> list[Entry]:
"""
Find points in the Qdrant collection. If there are no entries found, an empty list is returned.
:param query: The query to use for the search.
@@ -97,4 +113,10 @@ class QdrantConnector:
limit=10,
)
return [result.payload["document"] for result in search_results]
return [
Entry(
content=result.payload["document"],
metadata=result.payload.get("metadata"),
)
for result in search_results
]