Dataverse: Building a Semantic Search Data Catalog with Vector DB and LLM

Transform enterprise data discovery with Dataverse - a semantic search-powered data catalog leveraging Milvus vector database, LLM embeddings, and knowledge graphs for intelligent metadata management.

GT
Gonnect Team
January 14, 202414 min readView on GitHub
PythonMilvusLLMVector SearchKnowledge GraphsSentenceTransformers

Introduction

Data discovery remains one of the most significant challenges in modern enterprises. According to industry research, 80% of data science projects take six months longer than planned, largely due to data access and data quality problems. The root cause? Fragmented data landscapes where finding the right data feels like searching for a needle in a haystack.

Dataverse addresses this challenge by building a semantic search data catalog that understands the meaning of your data, not just keywords. By combining vector databases, large language models (LLMs), and knowledge graphs, Dataverse transforms how teams discover, understand, and utilize data assets across siloed systems.

Key Insight: Traditional data catalogs rely on string matching - they find what you type, not what you mean. Semantic search finds what you're actually looking for.

The Data Discovery Problem

Knowledge Graph Architecture

Loading diagram...

Traditional Catalog Limitations

LimitationImpact
Keyword MatchingMisses semantically similar but differently named assets
Schema DependencyRequires exact field name knowledge
No ContextCannot understand relationships between datasets
Static MetadataManual tagging that becomes stale
Siloed SearchEach system has its own discovery mechanism

The Dataverse Solution

Dataverse = Vector DB + LLM + Knowledge Graphs + Data Catalog

This combination enables:

  • Semantic Understanding: Find datasets by meaning, not keywords
  • Conversational Discovery: Ask questions in natural language
  • Automatic Enrichment: AI-generated tags and descriptions
  • Relationship Mapping: Understand data lineage and connections
  • Unified Search: Single interface across all data sources

Architecture Deep Dive

Knowledge Graph Architecture

Loading diagram...

Core Components

┌─────────────────────────────────────────────────────────────────────┐
│                         DATAVERSE ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                       │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐           │
│  │  Data Source │    │  Data Source │    │  Data Source │           │
│  │   (SQL DB)   │    │  (Data Lake) │    │    (APIs)    │           │
│  └──────┬───────┘    └──────┬───────┘    └──────┬───────┘           │
│         │                   │                   │                    │
│         └───────────────────┼───────────────────┘                    │
│                             │                                        │
│                    ┌────────▼────────┐                               │
│                    │  Metadata       │                               │
│                    │  Ingestion      │                               │
│                    │  (Schema-less)  │                               │
│                    └────────┬────────┘                               │
│                             │                                        │
│         ┌───────────────────┼───────────────────┐                    │
│         │                   │                   │                    │
│  ┌──────▼──────┐    ┌───────▼───────┐   ┌──────▼──────┐             │
│  │  Knowledge  │    │    Vector     │   │    LLM      │             │
│  │   Graph     │    │   Database    │   │   Service   │             │
│  │  (Neo4j)    │    │   (Milvus)    │   │  (ChatGPT)  │             │
│  └──────┬──────┘    └───────┬───────┘   └──────┬──────┘             │
│         │                   │                   │                    │
│         └───────────────────┼───────────────────┘                    │
│                             │                                        │
│                    ┌────────▼────────┐                               │
│                    │  Semantic       │                               │
│                    │  Search API     │                               │
│                    └────────┬────────┘                               │
│                             │                                        │
│                    ┌────────▼────────┐                               │
│                    │  Data Catalog   │                               │
│                    │  UI / ChatBot   │                               │
│                    └─────────────────┘                               │
│                                                                       │
└─────────────────────────────────────────────────────────────────────┘

Technology Stack

ComponentTechnologyPurpose
Vector DatabaseMilvusSemantic similarity search across metadata
LLMChatGPT APINatural language understanding and generation
EmbeddingsSentenceTransformersConvert metadata to vector representations
Knowledge GraphNeo4jStore relationships and ontologies
API LayerFastAPIREST endpoints for catalog operations

Understanding Vector Embeddings

Vector embeddings transform text into high-dimensional numerical representations that capture semantic meaning:

from sentence_transformers import SentenceTransformer

# Initialize the embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')

# Sample metadata entries
metadata_entries = [
    "Customer transaction history with purchase details",
    "User behavior analytics for e-commerce platform",
    "Sales revenue reports by product category",
    "Client order data with shipping information"
]

# Generate embeddings
embeddings = model.encode(metadata_entries)

# Similar concepts have similar vectors
# "Customer transaction" and "Client order" will be close in vector space

Knowledge Graph Architecture

Loading diagram...

Milvus Integration

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

# Connect to Milvus
connections.connect("default", host="localhost", port="19530")

# Define schema for metadata collection
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="dataset_name", dtype=DataType.VARCHAR, max_length=256),
    FieldSchema(name="description", dtype=DataType.VARCHAR, max_length=2048),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
    FieldSchema(name="source_system", dtype=DataType.VARCHAR, max_length=128),
    FieldSchema(name="domain", dtype=DataType.VARCHAR, max_length=128),
]

schema = CollectionSchema(fields, description="Data catalog metadata")
collection = Collection("data_catalog", schema)

# Create index for fast similarity search
index_params = {
    "index_type": "IVF_FLAT",
    "metric_type": "COSINE",
    "params": {"nlist": 128}
}
collection.create_index("embedding", index_params)

Semantic Search Implementation

from sentence_transformers import SentenceTransformer
from pymilvus import Collection

class SemanticSearchEngine:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.collection = Collection("data_catalog")
        self.collection.load()

    def search(self, query: str, top_k: int = 10) -> list:
        """
        Perform semantic search on the data catalog
        """
        # Convert query to embedding
        query_embedding = self.model.encode([query])[0].tolist()

        # Search in Milvus
        search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}

        results = self.collection.search(
            data=[query_embedding],
            anns_field="embedding",
            param=search_params,
            limit=top_k,
            output_fields=["dataset_name", "description", "domain"]
        )

        return [
            {
                "dataset_name": hit.entity.get("dataset_name"),
                "description": hit.entity.get("description"),
                "domain": hit.entity.get("domain"),
                "similarity_score": hit.score
            }
            for hit in results[0]
        ]

# Usage example
engine = SemanticSearchEngine()

# Natural language query - no exact keyword matching required!
results = engine.search("datasets for customer churn prediction")

# Also finds:
# - "Client retention analytics"
# - "User attrition modeling data"
# - "Customer lifetime value metrics"

LLM-Powered Conversational Discovery

ChatGPT Integration for Q&A

import openai
from typing import List, Dict

class CatalogChatBot:
    def __init__(self, search_engine: SemanticSearchEngine):
        self.search_engine = search_engine
        self.conversation_history = []

    def ask(self, question: str) -> str:
        """
        Answer questions about the data catalog using LLM
        """
        # First, find relevant datasets
        relevant_datasets = self.search_engine.search(question, top_k=5)

        # Build context from search results
        context = self._build_context(relevant_datasets)

        # Create prompt for LLM
        system_prompt = """You are a helpful data catalog assistant.
        Use the provided dataset information to answer questions about
        available data assets. Be specific about dataset names and their contents."""

        user_prompt = f"""Based on the following datasets in our catalog:

{context}

User Question: {question}

Please provide a helpful response about relevant datasets."""

        # Call ChatGPT
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            temperature=0.7
        )

        return response.choices[0].message.content

    def _build_context(self, datasets: List[Dict]) -> str:
        context_parts = []
        for i, ds in enumerate(datasets, 1):
            context_parts.append(f"""
Dataset {i}: {ds['dataset_name']}
Description: {ds['description']}
Domain: {ds['domain']}
Relevance Score: {ds['similarity_score']:.2f}
""")
        return "\n".join(context_parts)

# Usage
chatbot = CatalogChatBot(engine)

# Conversational queries
response = chatbot.ask(
    "What datasets do we have for building a product recommender system?"
)
print(response)

# Example output:
# "Based on our catalog, I found several relevant datasets:
#  1. 'Product Interaction Logs' - Contains user click and purchase behavior
#  2. 'Customer Preferences Survey' - Explicit preference data from users
#  3. 'Product Catalog with Attributes' - Product features for content-based filtering
#  These datasets together would support both collaborative and content-based
#  recommendation approaches..."

Knowledge Graph for Relationship Discovery

Building the Data Lineage Graph

from neo4j import GraphDatabase

class KnowledgeGraphManager:
    def __init__(self, uri: str, user: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def create_dataset_node(self, dataset: dict):
        """Create a dataset node in the knowledge graph"""
        with self.driver.session() as session:
            session.run("""
                MERGE (d:Dataset {name: $name})
                SET d.description = $description,
                    d.domain = $domain,
                    d.owner = $owner,
                    d.created_at = datetime()
            """, **dataset)

    def create_lineage_relationship(self, source: str, target: str,
                                     transformation: str):
        """Create data lineage relationship"""
        with self.driver.session() as session:
            session.run("""
                MATCH (source:Dataset {name: $source})
                MATCH (target:Dataset {name: $target})
                MERGE (source)-[r:TRANSFORMS_TO {
                    transformation: $transformation,
                    created_at: datetime()
                }]->(target)
            """, source=source, target=target, transformation=transformation)

    def find_upstream_dependencies(self, dataset_name: str) -> list:
        """Find all upstream data sources"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH path = (upstream:Dataset)-[:TRANSFORMS_TO*]->(d:Dataset {name: $name})
                RETURN upstream.name as source,
                       length(path) as depth,
                       [rel in relationships(path) | rel.transformation] as transformations
                ORDER BY depth
            """, name=dataset_name)
            return [dict(record) for record in result]

# Build lineage
kg = KnowledgeGraphManager("bolt://localhost:7687", "neo4j", "password")

# Define data lineage
kg.create_lineage_relationship(
    source="raw_transactions",
    target="customer_360",
    transformation="ETL: Aggregate transactions by customer"
)

kg.create_lineage_relationship(
    source="customer_360",
    target="churn_features",
    transformation="Feature Engineering: Extract churn indicators"
)

Ontology-Based Auto-Tagging

class OntologyTagger:
    def __init__(self, kg_manager: KnowledgeGraphManager):
        self.kg = kg_manager
        self.domain_ontology = self._load_ontology()

    def auto_tag_dataset(self, dataset_name: str, description: str) -> list:
        """
        Automatically generate tags based on domain ontology
        """
        # Extract concepts from description using LLM
        concepts = self._extract_concepts(description)

        # Map to ontology terms
        tags = []
        for concept in concepts:
            ontology_matches = self._find_ontology_matches(concept)
            tags.extend(ontology_matches)

        # Store tags in knowledge graph
        self._store_tags(dataset_name, tags)

        return tags

    def _extract_concepts(self, text: str) -> list:
        """Use LLM to extract key concepts"""
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"Extract key data concepts from: {text}"
            }]
        )
        # Parse and return concepts
        return response.choices[0].message.content.split(", ")

    def _find_ontology_matches(self, concept: str) -> list:
        """Find matching terms in domain ontology"""
        # Use embedding similarity to match concepts to ontology
        matches = []
        for term in self.domain_ontology:
            if self._is_similar(concept, term):
                matches.append(term)
        return matches

Hybrid Search: Combining Vector and Keyword

class HybridSearchEngine:
    def __init__(self):
        self.vector_engine = SemanticSearchEngine()
        self.keyword_index = self._init_keyword_index()

    def search(self, query: str,
               semantic_weight: float = 0.7,
               keyword_weight: float = 0.3,
               filters: dict = None) -> list:
        """
        Combine semantic and keyword search with optional filtering
        """
        # Semantic search
        semantic_results = self.vector_engine.search(query, top_k=20)

        # Keyword search (using Elasticsearch or similar)
        keyword_results = self._keyword_search(query, top_k=20)

        # Apply filters (domain, owner, date range, etc.)
        if filters:
            semantic_results = self._apply_filters(semantic_results, filters)
            keyword_results = self._apply_filters(keyword_results, filters)

        # Combine results with weighted scoring
        combined = self._merge_results(
            semantic_results, keyword_results,
            semantic_weight, keyword_weight
        )

        return combined[:10]

    def _merge_results(self, semantic: list, keyword: list,
                       sem_weight: float, kw_weight: float) -> list:
        """Merge and re-rank results using weighted scores"""
        score_map = {}

        for item in semantic:
            name = item['dataset_name']
            score_map[name] = {
                'item': item,
                'score': item['similarity_score'] * sem_weight
            }

        for item in keyword:
            name = item['dataset_name']
            if name in score_map:
                score_map[name]['score'] += item['relevance'] * kw_weight
            else:
                score_map[name] = {
                    'item': item,
                    'score': item['relevance'] * kw_weight
                }

        # Sort by combined score
        sorted_results = sorted(
            score_map.values(),
            key=lambda x: x['score'],
            reverse=True
        )

        return [r['item'] for r in sorted_results]

# Usage with filters
hybrid_engine = HybridSearchEngine()

results = hybrid_engine.search(
    query="customer behavior analysis for marketing",
    filters={
        "domain": "marketing",
        "owner": "data-team",
        "created_after": "2024-01-01"
    }
)

Data Marketplace Features

Publishing Data Products

from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime

class DataProduct(BaseModel):
    name: str
    description: str
    domain: str
    owner: str
    sla: str
    access_instructions: str
    sample_queries: List[str]
    quality_score: float
    tags: List[str]

class DataMarketplace:
    def __init__(self, search_engine: HybridSearchEngine,
                 kg_manager: KnowledgeGraphManager):
        self.search = search_engine
        self.kg = kg_manager

    def publish_data_product(self, product: DataProduct):
        """Publish a data product to the marketplace"""
        # Generate embedding for the product
        embedding = self._generate_embedding(product)

        # Store in vector database
        self._store_in_vector_db(product, embedding)

        # Create knowledge graph nodes
        self._create_kg_nodes(product)

        # Auto-generate additional tags using LLM
        auto_tags = self._auto_generate_tags(product)
        product.tags.extend(auto_tags)

        return {"status": "published", "product_id": product.name}

    def discover_products(self, use_case: str) -> List[DataProduct]:
        """Find data products for a specific use case"""
        results = self.search.search(
            query=use_case,
            semantic_weight=0.8,
            keyword_weight=0.2
        )
        return results

Governance and Security

Access Control Integration

class GovernanceLayer:
    def __init__(self, catalog: DataMarketplace):
        self.catalog = catalog
        self.policies = {}

    def check_access(self, user: str, dataset: str, action: str) -> bool:
        """Check if user has permission for action on dataset"""
        # Get user's roles and groups
        user_roles = self._get_user_roles(user)

        # Get dataset policies
        dataset_policies = self._get_dataset_policies(dataset)

        # Evaluate access
        for policy in dataset_policies:
            if self._policy_allows(policy, user_roles, action):
                return True

        return False

    def audit_search(self, user: str, query: str, results: list):
        """Log search activity for compliance"""
        audit_entry = {
            "user": user,
            "query": query,
            "results_count": len(results),
            "timestamp": datetime.utcnow().isoformat(),
            "datasets_accessed": [r['dataset_name'] for r in results]
        }
        self._store_audit_log(audit_entry)

Business Impact

Organizations implementing Dataverse have reported significant improvements:

MetricBeforeAfterImprovement
Data Discovery Time4+ hours< 30 minutes87% reduction
Data Steward Requests50/week10/week80% reduction
Data Reuse Rate15%65%4x increase
Time to Insight2 weeks2 days85% faster

Conclusion

Dataverse represents the next evolution in data cataloging - moving from keyword-based discovery to semantic understanding. By combining:

  • Milvus for scalable vector similarity search
  • LLMs for natural language understanding and generation
  • Knowledge Graphs for relationship and lineage tracking
  • SentenceTransformers for high-quality embeddings

Organizations can transform their data catalogs from simple inventories into intelligent discovery platforms that truly understand the meaning and context of their data assets.

The key insight is that data discovery should feel like having a conversation with a knowledgeable colleague - one who understands what you need and can guide you to the right datasets, even when you do not know the exact names or schemas.


Explore the complete implementation at Dataverse-public on GitHub.