A Python library for creating and using multi-vector tables in DataStax Astra DB, supporting both client-side and server-side embedding generation with support for both synchronous and asynchronous operations.
- Overview
- Installation
- Quick Start
- Async Usage
- Multiple Vector Columns
- Schema Design
- Gutenberg Example
- Late Interaction
- API Reference
- Contributing
- License
AstraMultiVector provides classes to:
- Create database tables with multiple vector columns
- Associate each vector column with either:
- Client-side embeddings using sentence-transformers
- Server-side embeddings using Astra's Vectorize feature
- Search across any vector column using similarity search
- Support both synchronous and asynchronous operations
- Implement token-level late interaction models for advanced retrieval
This allows for storing and retrieving text data with multiple embedding representations, which is useful for:
- Multilingual document search
- Comparing different embedding models
- Specialized embeddings for different query types
- Token-level late interaction for higher precision retrieval
- Multimodal search with text and images
# Install from PyPI
pip install astra-multivector
# Or install from source
git clone https://github.com/datastax/astra-multivector.git
cd astra-multivector
pip install -e .
- Python 3.8 or higher
- Dependencies:
- astrapy >= 2.0.0
- pydantic>=2.10.6
- python-dotenv>=1.0.1
- sentence-transformers>=3.4.1
- rerankers[api,transformers]>=0.8.0
- tqdm>=4.67.1
Optional dependencies for late interaction models:
- colbert-ai >= 0.2.0
- colpali-engine>=0.3.1,<0.4.0
- torch >= 2.0.0
- transformers>=4.38.2
- scikit-learn>=1.3.0
- numpy>=1.24.0
from astrapy import DataAPIClient
from astra_multivector import AstraMultiVectorTable, VectorColumnOptions
from sentence_transformers import SentenceTransformer
# Create database connection
db = DataAPIClient(token="your-token").get_database(api_endpoint="your-api-endpoint")
# Create embedding models and vector options
english_model = SentenceTransformer("BAAI/bge-small-en-v1.5")
english_options = VectorColumnOptions.from_sentence_transformer(english_model)
# Create the table
table = AstraMultiVectorTable(
db=db,
table_name="my_vectors",
vector_column_options=[english_options]
)
# Insert data
table.insert_chunk("This is a sample text to embed and store.")
# Search
results = table.multi_vector_similarity_search(
query_text="sample text",
candidates_per_column=5
)
for result in results:
print(result["content"])
import asyncio
from astrapy.database import AsyncDatabase
from astrapy import DataAPIClient
from astra_multivector import AsyncAstraMultiVectorTable, VectorColumnOptions
async def main():
# Create async database connection
async_db = DataAPIClient(
token="your-token",
).get_async_database(
api_endpoint="your-api-endpoint",
)
# Create the table with the same vector options
async_table = AsyncAstraMultiVectorTable(
db=async_db,
table_name="my_vectors",
vector_column_options=[english_options],
default_concurrency_limit=10
)
# Batch insert with concurrency control
await async_table.bulk_insert_chunks(
text_chunks=["Text 1", "Text 2", "Text 3"],
max_concurrency=5
)
# Batch search
queries = ["first query", "second query", "third query"]
all_results = await async_table.batch_search_by_text(queries)
# Run the async code
asyncio.run(main())
You can create tables with multiple vector columns, each using a different model or vectorization approach:
from astrapy.constants import VectorMetric
from astrapy.info import TableVectorIndexOptions, VectorServiceOptions
# Client-side embedding with a Spanish model
spanish_model = SentenceTransformer("jinaai/jina-embeddings-v2-base-es")
spanish_options = VectorColumnOptions.from_sentence_transformer(
model=spanish_model,
table_vector_index_options=TableVectorIndexOptions(
metric=VectorMetric.COSINE,
)
)
# Server-side embedding with OpenAI
openai_options = VectorColumnOptions.from_vectorize(
column_name="openai_embeddings",
dimension=1536,
vector_service_options=VectorServiceOptions(
provider='openai',
model_name='text-embedding-3-small',
authentication={
"providerKey": "OPENAI_API_KEY_ASTRA_KMS_NAME",
},
),
table_vector_index_options=TableVectorIndexOptions(
metric=VectorMetric.COSINE,
)
)
# Create multi-vector table
table = AstraMultiVectorTable(
db=db,
table_name="multilingual_vectors",
vector_column_options=[spanish_options, openai_options]
)
The multi-vector architecture stores multiple vector representations of the same content in separate columns of a single table:
┌────────────────────────────────────────────────────────────────────────────────┐
│ Table: my_vectors │
├────────────┬─────────────────────┬──────────────────┬──────────────────────────┤
│ chunk_id │ content │ english_embeddings│ multi-lingual embeddings│
├────────────┼─────────────────────┼──────────────────┼──────────────────────────┤
│ UUID-1 │ "Hello world" │ [0.1, 0.2, ...] │ [0.3, 0.4, ...] │
│ UUID-2 │ "Vector search" │ [0.5, 0.6, ...] │ [0.7, 0.8, ...] │
└────────────┴─────────────────────┴──────────────────┴──────────────────────────┘
│ │ │ │
│ │ │ │
│ │ ▼ ▼
│ │ ┌─────────────┐ ┌───────────────┐
│ │ │ Vector Index│ │ Vector Index │
│ │ │ (english) │ │(multi-lingual)│
│ │ └─────────────┘ └───────────────┘
│ │
│ ▼
│ Used directly for
│ Vectorize columns
│
▼
Partition Key
This design allows for:
- Multiple embedding representations of the same content
- Choice of embedding model at query time
- Combination of results from different embeddings
The late interaction architecture splits documents into token-level embeddings across multiple tables:
┌────────────────────────────────────────┐ ┌────────────────────────────────────────┐
│ Table: my_colpali_docs │ │ Table: my_colpali_tokens │
├──────────┬───────────────────────────┐ │ ├──────────┬──────────┬─────────────────┐│
│ doc_id │ content | pg no. │ │ │ doc_id │ token_id │ token_embedding ││
├──────────┼───────────────────────────┤ │ ├──────────┼──────────┼─────────────────┤│
│ UUID-1 │ <<reference>>│ 1 | | │ UUID-1 │ UUID-1.1 │ [0.1, 0.2, ...] ││
│ UUID-2 │ <<reference>>│ 2 | │ | UUID-1 │ UUID-1.2 │ [0.3, 0.4, ...] ││
└──────────┴───────────────────────────┘ │ │ UUID-1 │ UUID-1.3 │ [0.5, 0.6, ...] ││
│ │ UUID-2 │ UUID-2.1 │ [0.7, 0.8, ...] ││
│ │ UUID-2 │ UUID-2.2 │ [0.9, 1.0, ...] ││
│ └──────────┴──────────┴─────────────────┘│
│ │
│ ▼
│ ┌─────────────┐
│ │ Vector Index│
│ └─────────────┘
│
▼
Partition Key
This architecture allows for:
- Token-level similarity matching between queries and documents
- Higher precision retrieval with late interaction models like ColBERT
- Multimodal matching between text and images with models like ColPali
Key implementation details:
- Both
doc_id
andtoken_id
are UUID types for maximum compatibility doc_id
is used as the partition key to efficiently retrieve all tokens for a document- Token table uses a vector index on
token_embedding
for similarity search - Documents are stored with their original content or a reference for retrieval and verification
The repository includes a complete example for ingesting and searching books from Project Gutenberg using multiple vector models. This example demonstrates:
-
Setting up multiple embedding models:
- Language-specific models (English, Spanish)
- OpenAI embeddings via Vectorize
-
Processing books in parallel with async operations:
- Concurrent book downloads
- Batch processing with configurable concurrency
-
Performing searches across different vector columns:
- Language-specific searches
- Parallel batch searching
To run the example:
# See examples/gutenberg_example.py
import asyncio
import os
from dotenv import load_dotenv
from astra_multivector import VectorColumnOptions, AsyncAstraMultiVectorTable
from astra_multivector.ingest import download_and_ingest_multiple_books
# Load environment variables
load_dotenv()
# Run the example
asyncio.run(main())
The library includes an optional sub-module for late interaction retrieval, which defers matching between query and document tokens until retrieval time, providing higher accuracy than traditional dense retrieval methods.
import uuid
from astrapy.database import AsyncDatabase
from astra_multivector.late_interaction import LateInteractionPipeline, ColBERTModel
# Initialize database and model
db = DataAPIClient(token="your-token").get_async_database(api_endpoint="your-api-endpoint")
model = ColBERTModel(model_name="answerdotai/answerai-colbert-small-v1")
# Create pipeline with optimization options
pipeline = LateInteractionPipeline(
db=db,
model=model,
base_table_name="my_colbert_index",
doc_pool_factor=2, # Compress document tokens by this factor
query_pool_distance=0.03, # Pool similar query tokens
default_concurrency_limit=10, # Control parallel operations
)
# Initialize tables
await pipeline.initialize()
# Index documents with dictionary format
doc_row = {
"content": "This is a sample document for testing late interaction retrieval.",
"doc_id": uuid.uuid4() # Optional: auto-generated if not provided
}
doc_id = await pipeline.index_document(doc_row)
# Batch indexing with concurrency control
docs = [
{"content": "Document one for batch indexing"},
{"content": "Document two for batch indexing"},
{"content": "Document three for batch indexing"}
]
doc_ids = await pipeline.bulk_index_documents(
document_rows=docs,
concurrency=5,
batch_size=2
)
# Search with auto-scaled parameters
results = await pipeline.search(
query="sample retrieval",
k=5, # Number of results to return
# Optional parameters, auto-calculated if not provided:
n_ann_tokens=200, # Tokens to retrieve per query token
n_maxsim_candidates=20 # Document candidates for scoring
)
# Process search results
for doc_id, score, content in results:
print(f"Document: {doc_id}, Score: {score:.4f}")
print(f"Content: {content}")
ColBERT is a text-to-text late interaction model that provides high-precision search:
from astra_multivector.late_interaction import LateInteractionPipeline, ColBERTModel
# Initialize model with specific checkpoint
model = ColBERTModel(
model_name="answerdotai/answerai-colbert-small-v1",
device="cuda" # or "cpu" for machines without GPUs
)
# Create pipeline
pipeline = LateInteractionPipeline(
db=db,
model=model,
base_table_name="my_colbert_index"
)
# Search
results = await pipeline.search(
query="detailed search query",
k=10
)
For multimodal search supporting images and text:
from PIL import Image
from astra_multivector.late_interaction import LateInteractionPipeline, ColPaliModel
# Initialize model
model = ColPaliModel(model_name="vidore/colpali-v0.1")
# Create pipeline
pipeline = LateInteractionPipeline(
db=db,
model=model,
base_table_name="my_colpali_index"
)
# Index an image
image = Image.open("example.jpg")
doc_id = await pipeline.index_document({
"content": image, # Directly pass PIL Image
"doc_id": uuid.uuid4()
})
# Search for images using text query
results = await pipeline.search(
query="a cat sitting on a chair",
k=5
)
# Search with image query requires preprocessing the image first
query_image = Image.open("query.jpg")
query_embeddings = await model.encode_query(query_image)
results = await pipeline.search_with_embeddings(
query_embeddings,
k=5
)
ColPali now supports direct image indexing, allowing you to pass PIL Image objects as document content. The pipeline automatically handles:
- Image preprocessing and tokenization
- Token-level embedding generation
- Proper storage with content type identification
- Retrieval with either text or image queries
The late interaction pipeline includes several optimizations to balance retrieval quality with computational efficiency:
-
Token Pooling:
- Query Pooling: Reduces query token count by merging similar tokens (controlled by
query_pool_distance
) - Document Pooling: Hierarchically pools document tokens to reduce index size (controlled by
doc_pool_factor
)
- Query Pooling: Reduces query token count by merging similar tokens (controlled by
-
Adaptive Parameter Scaling:
- Automatically scales search parameters based on result count
- Default values adapt to different
k
values without manual tuning
-
Concurrency Controls:
- Document-level parallelism for batch operations
- Token-level parallelism for efficient indexing
- Semaphore controls to prevent resource exhaustion
-
Caching:
- LRU cache for frequently accessed document embeddings
- Configurable cache size to balance memory usage and performance
Configures vector columns with embedding options:
from_sentence_transformer()
: For client-side embeddings with sentence-transformersfrom_vectorize()
: For server-side embeddings with Astra's Vectorize
# Configuration options
VectorColumnOptions.from_sentence_transformer(
model, # SentenceTransformer model instance
column_name=None, # Optional custom column name
table_vector_index_options # Vector index configuration
)
VectorColumnOptions.from_vectorize(
column_name, # Name for the vector column
dimension, # Vector dimension
vector_service_options, # Service provider configuration
table_vector_index_options # Vector index configuration
)
Synchronous table operations:
insert_chunk()
: Insert a single text chunk with embeddingsbulk_insert_chunks()
: Insert multiple chunks in batchesmulti_vector_similarity_search()
: Search for similar text in one or more vector columnsbatch_search_by_text()
: Perform multiple searches in parallelsearch_and_rerank()
: Search and rerank results with a reranker model
# Core operations
table.insert_chunk(
text, # Text to embed and store
chunk_id=None, # Optional UUID (auto-generated if None)
metadata=None # Optional metadata dictionary
)
table.multi_vector_similarity_search(
query_text, # Query string to search for
column_name=None, # Optional specific column to search (None = search all)
candidates_per_column=20, # Number of candidates per vector column
k=10, # Number of final results to return
include_similarity=True # Whether to include similarity scores
)
Asynchronous table operations:
insert_chunk()
: Insert a single text chunk asynchronouslybulk_insert_chunks()
: Insert multiple chunks with concurrency controlmulti_vector_similarity_search()
: Perform async search across one or more vector columnsbatch_search_by_text()
: Execute multiple searches in parallelsearch_and_rerank()
: Search and rerank results asynchronously with a reranker modelparallel_process_chunks()
: Process items in parallel with custom function
# Async operations
await async_table.bulk_insert_chunks(
text_chunks, # List of text chunks to insert
max_concurrency=10, # Maximum number of concurrent operations
batch_size=20, # Number of chunks per batch
chunk_ids=None, # Optional list of UUIDs (auto-generated if None)
metadata=None # Optional list of metadata dictionaries
)
await async_table.batch_search_by_text(
queries, # List of query strings
max_concurrency=10, # Maximum number of concurrent searches
column_name=None, # Optional specific column to search
k=10 # Number of results per query
)
Manages token-level late interaction models:
initialize()
: Create and configure document and token tablesindex_document()
: Index a single document with token-level embeddingsbulk_index_documents()
: Batch index multiple documents with concurrency controlsearch()
: Perform two-stage retrieval with auto-scaled parametersdelete_document()
: Remove a document and its tokens from the database
# Pipeline configuration
pipeline = LateInteractionPipeline(
db, # AsyncDatabase instance
model, # LateInteractionModel instance (ColBERT, ColPali)
base_table_name, # Base name for document and token tables
doc_pool_factor=2, # Factor by which to pool document embeddings
query_pool_distance=0.03, # Maximum distance for pooling query tokens
sim_metric="cosine", # Similarity metric (cosine or dot_product)
default_concurrency_limit=10, # Default concurrency for async operations
embedding_cache_size=1000 # Size of LRU cache for document embeddings
)
# Advanced search options
results = await pipeline.search(
query, # Query string or image
k=10, # Number of results to return
n_ann_tokens=None, # Tokens to retrieve per query (auto-calculated if None)
n_maxsim_candidates=None # Document candidates for scoring (auto-calculated if None)
)
Contributions to AstraMultiVector are welcome! Here's how you can contribute:
- Fork the repository and clone your fork
- Install development dependencies:
pip install -e ".[dev]"
- Install pre-commit hooks:
pre-commit install
All contributions should include tests:
# Run all tests
python tests/run_tests.py
# Check test coverage
python -m coverage run --source=astra_multivector tests/run_tests.py
python -m coverage report -m
Aim for at least 90% test coverage for new code.
- Create a new branch for your feature
- Make your changes with clear commit messages
- Add tests for new functionality
- Run the test suite to ensure everything passes
- Submit a pull request with a clear description of the changes
This project follows:
- PEP 8 for code style
- Google style docstrings
- Type annotations for all functions
Apache License 2.0