Back to Blog

Integrations: Build a RAG System with Supacrawler, LlamaIndex, and Supabase pgvector

LlamaIndex represents the cutting edge of RAG framework development, offering sophisticated indexing strategies, advanced query engines, and enterprise-grade features that go far beyond traditional vector search. This comprehensive guide demonstrates how to build production-ready RAG systems using LlamaIndex's powerful abstractions with Supacrawler's intelligent crawling and Supabase's scalable vector storage.

By combining LlamaIndex's advanced features like knowledge graphs, multi-modal processing, and intelligent routing with Supacrawler's robust data extraction, you'll create RAG systems capable of handling complex enterprise scenarios with unprecedented sophistication.

If you'd like to try it yourself you can check the LlamaIndex Vectors notebook.

Table of Contents

Key Enterprise Advantages

  • Intelligent Query Routing: Automatically route queries to optimal retrieval strategies
  • Sub-Question Decomposition: Break complex questions into manageable components
  • Multi-Index Querying: Query across multiple knowledge bases simultaneously
  • Knowledge Graph Integration: Structured relationship understanding
  • Advanced Synthesis: Sophisticated response generation with citations
  • Evaluation Framework: Built-in metrics and quality assessment

Environment Setup and Dependencies

Install LlamaIndex with all enterprise features:

# Core LlamaIndex
pip install llama-index llama-index-core
# Vector stores and databases
pip install llama-index-vector-stores-postgres
pip install llama-index-embeddings-openai
pip install llama-index-llms-openai
# Advanced features
pip install llama-index-indices-managed-llama-cloud
pip install llama-index-postprocessor-flag-embedding-reranker
pip install llama-index-graph-stores-neo4j # Optional: for knowledge graphs
# Supporting libraries
pip install supacrawler python-dotenv sqlalchemy
pip install nest-asyncio beautifulsoup4 markdownify
# Optional: Performance and monitoring
pip install llama-index-callbacks-langfuse # For advanced monitoring

Configure your environment with all necessary credentials:

# .env
SUPACRAWLER_API_KEY=your_supacrawler_api_key
OPENAI_API_KEY=your_openai_api_key
SUPABASE_URL=your_supabase_project_url
SUPABASE_KEY=your_supabase_anon_key
DATABASE_URL=postgresql://postgres:[password]@db.[project].supabase.co:5432/postgres
# LlamaIndex configuration
LLAMAINDEX_DEBUG=true
LLAMAINDEX_CACHE_DIR=./cache
# Optional: Advanced monitoring
LANGFUSE_SECRET_KEY=your_langfuse_secret
LANGFUSE_PUBLIC_KEY=your_langfuse_public

Supabase Vector Store Configuration

Set up Supabase with optimized pgvector configuration for LlamaIndex:

import os
from sqlalchemy import create_engine, text
from llama_index.vector_stores.postgres import PGVectorStore
from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from dotenv import load_dotenv
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
class SupabaseLlamaIndexSetup:
"""
Enterprise-grade Supabase setup for LlamaIndex
"""
def __init__(self):
self.database_url = os.getenv('DATABASE_URL')
self.engine = create_engine(self.database_url)
# Configure LlamaIndex global settings
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
api_key=os.getenv('OPENAI_API_KEY')
)
Settings.llm = OpenAI(
model="gpt-3.5-turbo",
api_key=os.getenv('OPENAI_API_KEY'),
temperature=0.1
)
logger.info("✅ LlamaIndex global settings configured")
def setup_pgvector_optimized(self):
"""
Setup pgvector with enterprise optimizations
"""
with self.engine.connect() as connection:
# Enable extensions
connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
connection.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;")) # For text search
connection.execute(text("CREATE EXTENSION IF NOT EXISTS btree_gin;")) # For GIN indexes
# Configure pgvector for optimal performance
connection.execute(text("SET maintenance_work_mem = '1GB';"))
connection.execute(text("SET max_parallel_maintenance_workers = 4;"))
connection.commit()
logger.info("✅ Supabase pgvector optimized for enterprise use")
def create_vector_store(self,
table_name: str = "llamaindex_enterprise",
embed_dim: int = 1536) -> PGVectorStore:
"""
Create optimized PGVectorStore for LlamaIndex
"""
vector_store = PGVectorStore.from_params(
database_url=self.database_url,
table_name=table_name,
embed_dim=embed_dim,
# Enterprise optimizations
hnsw_kwargs={
"hnsw_m": 16, # Higher M for better recall
"hnsw_ef_construction": 200, # Higher EF for better index quality
"hnsw_ef_search": 40 # Balanced search performance
}
)
logger.info(f"✅ PGVectorStore created: {table_name}")
return vector_store
# Initialize setup
supabase_setup = SupabaseLlamaIndexSetup()
supabase_setup.setup_pgvector_optimized()
# Create vector store
vector_store = supabase_setup.create_vector_store("supacrawler_llamaindex_rag")

Advanced Web Crawling with Supacrawler

Create an enterprise-grade document loader that integrates Supacrawler with LlamaIndex:

from typing import List, Dict, Any, Optional
from llama_index.core import Document
from llama_index.core.readers.base import BaseReader
from supacrawler import SupacrawlerClient
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
class EnterpriseSupacrawlerReader(BaseReader):
"""
Enterprise-grade Supacrawler integration for LlamaIndex
"""
def __init__(self,
api_key: Optional[str] = None,
max_workers: int = 3,
quality_threshold: int = 200):
"""
Initialize enterprise crawler
Args:
api_key: Supacrawler API key
max_workers: Maximum parallel crawl jobs
quality_threshold: Minimum content length for inclusion
"""
self.client = SupacrawlerClient(
api_key=api_key or os.getenv('SUPACRAWLER_API_KEY')
)
self.max_workers = max_workers
self.quality_threshold = quality_threshold
logger.info(f"🚀 Enterprise Supacrawler Reader initialized")
def load_data(self,
urls: List[str],
crawl_config: Optional[Dict] = None) -> List[Document]:
"""
Load data from multiple URLs with enterprise features
"""
default_config = {
'format': 'markdown',
'depth': 3,
'link_limit': 500,
'render_js': True,
'include_patterns': ['/docs/*', '/api/*', '/guides/*', '/tutorials/*'],
'exclude_patterns': ['/blog/*', '/news/*', '/privacy/*', '/terms/*'],
'timeout': 45000,
'concurrent_limit': 8,
'block_ads': True,
'block_cookies': True,
'remove_selectors': ['.sidebar', '.nav', '.footer', '.ads', '.banner'],
'wait_for': '.main-content, .content, main, article',
'respect_robots_txt': True
}
# Merge with user config
if crawl_config:
default_config.update(crawl_config)
# Parallel crawling for multiple URLs
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
crawl_jobs = []
for url in urls:
config = {**default_config, 'url': url}
job = executor.submit(self._crawl_single_url, config)
crawl_jobs.append((url, job))
# Collect results
all_documents = []
for url, job in crawl_jobs:
try:
documents = job.result(timeout=300) # 5 minute timeout per URL
all_documents.extend(documents)
logger.info(f"✅ Crawled {url}: {len(documents)} documents")
except Exception as e:
logger.error(f"❌ Failed to crawl {url}: {e}")
# Quality filtering and enhancement
enhanced_documents = self._enhance_documents(all_documents)
logger.info(f"📊 Total documents loaded: {len(enhanced_documents)}")
return enhanced_documents
def _crawl_single_url(self, config: Dict) -> List[Document]:
"""
Crawl a single URL and return LlamaIndex Documents
"""
try:
# Create and execute crawl job
job = self.client.create_crawl_job(**config)
result = self.client.wait_for_crawl(job.job_id)
if result.status != 'completed':
logger.warning(f"Crawl incomplete: {result.status}")
return []
crawl_data = result.data.get('crawl_data', {})
documents = []
for url, page_data in crawl_data.items():
content = page_data.get('markdown', '')
metadata = page_data.get('metadata', {})
# Quality filtering
if len(content.strip()) < self.quality_threshold:
continue
# Create LlamaIndex Document with rich metadata
doc = Document(
text=content,
metadata={
'url': url,
'title': metadata.get('title', ''),
'description': metadata.get('description', ''),
'keywords': metadata.get('keywords', ''),
'author': metadata.get('author', ''),
'language': metadata.get('language', 'en'),
'source': 'supacrawler',
'crawl_timestamp': result.data.get('timestamp'),
'content_length': len(content),
'word_count': len(content.split()),
'domain': url.split('/')[2] if '/' in url else 'unknown',
# Quality metrics
'content_quality_score': self._calculate_quality_score(content, metadata),
'content_type': self._classify_content_type(content, url),
'technical_depth': self._assess_technical_depth(content)
}
)
documents.append(doc)
return documents
except Exception as e:
logger.error(f"Error in crawl job: {e}")
return []
def _enhance_documents(self, documents: List[Document]) -> List[Document]:
"""
Enhance documents with additional processing
"""
enhanced = []
for doc in documents:
# Content enhancement
enhanced_text = self._clean_and_enhance_content(doc.text)
# Metadata enhancement
enhanced_metadata = {
**doc.metadata,
'enhanced': True,
'processing_timestamp': time.time(),
'content_hash': hash(enhanced_text),
# Add searchable keywords
'searchable_content': self._extract_searchable_keywords(enhanced_text)
}
enhanced_doc = Document(
text=enhanced_text,
metadata=enhanced_metadata
)
enhanced.append(enhanced_doc)
return enhanced
def _calculate_quality_score(self, content: str, metadata: Dict) -> float:
"""
Calculate content quality score (0-1)
"""
score = 0.0
# Length score (normalize to 1000 chars)
length_score = min(len(content) / 1000, 1.0) * 0.3
# Structure score (headers, lists, etc.)
structure_indicators = ['##', '###', '-', '*', '1.', '2.']
structure_count = sum(content.count(indicator) for indicator in structure_indicators)
structure_score = min(structure_count / 10, 1.0) * 0.3
# Metadata completeness
metadata_score = 0.0
if metadata.get('title'): metadata_score += 0.1
if metadata.get('description'): metadata_score += 0.1
if metadata.get('keywords'): metadata_score += 0.1
if metadata.get('author'): metadata_score += 0.1
# Technical content indicators
technical_indicators = ['API', 'function', 'class', 'method', 'parameter', 'example']
technical_count = sum(content.lower().count(indicator.lower()) for indicator in technical_indicators)
technical_score = min(technical_count / 20, 1.0) * 0.1
return length_score + structure_score + metadata_score + technical_score
def _classify_content_type(self, content: str, url: str) -> str:
"""
Classify content type for optimized processing
"""
content_lower = content.lower()
url_lower = url.lower()
if '/api/' in url_lower or 'endpoint' in content_lower:
return 'api_documentation'
elif 'tutorial' in url_lower or 'how to' in content_lower:
return 'tutorial'
elif 'guide' in url_lower or 'getting started' in content_lower:
return 'guide'
elif 'reference' in url_lower or 'documentation' in content_lower:
return 'reference'
elif 'example' in content_lower or 'sample' in content_lower:
return 'example'
else:
return 'general'
def _assess_technical_depth(self, content: str) -> str:
"""
Assess technical depth for appropriate processing
"""
technical_terms = ['function', 'class', 'method', 'parameter', 'variable',
'implementation', 'algorithm', 'architecture', 'pattern']
technical_count = sum(content.lower().count(term) for term in technical_terms)
if technical_count > 20:
return 'advanced'
elif technical_count > 10:
return 'intermediate'
elif technical_count > 3:
return 'basic'
else:
return 'conceptual'
def _clean_and_enhance_content(self, content: str) -> str:
"""
Clean and enhance content for better indexing
"""
import re
# Clean excessive whitespace
content = re.sub(r'\n\s*\n\s*\n', '\n\n', content)
content = re.sub(r'[ \t]+', ' ', content)
# Enhance structure markers
content = re.sub(r'^(#{1,6})\s*(.+)$', r'\1 \2', content, flags=re.MULTILINE)
# Clean but preserve code blocks
code_blocks = re.findall(r'```[\s\S]*?```', content)
for i, block in enumerate(code_blocks):
content = content.replace(block, f'__CODE_BLOCK_{i}__')
# Clean inline artifacts
content = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', content) # Links to text
# Restore code blocks
for i, block in enumerate(code_blocks):
content = content.replace(f'__CODE_BLOCK_{i}__', block)
return content.strip()
def _extract_searchable_keywords(self, content: str) -> str:
"""
Extract searchable keywords for metadata
"""
import re
# Extract important terms
keywords = []
# Headers
headers = re.findall(r'^#{1,6}\s*(.+)$', content, re.MULTILINE)
keywords.extend([h.strip() for h in headers])
# Code function names
functions = re.findall(r'def\s+(\w+)|function\s+(\w+)|class\s+(\w+)', content)
keywords.extend([f for group in functions for f in group if f])
# API endpoints
endpoints = re.findall(r'/[\w/]+', content)
keywords.extend(endpoints[:10]) # Limit to avoid noise
return ' '.join(keywords[:50]) # Limit total keywords
# Example usage
enterprise_reader = EnterpriseSupacrawlerReader(max_workers=2)
# Load enterprise knowledge base
knowledge_urls = [
"https://docs.llamaindex.ai",
"https://python.langchain.com/docs",
]
# Load documents with enterprise features
documents = enterprise_reader.load_data(
urls=knowledge_urls,
crawl_config={
'depth': 4,
'link_limit': 1000,
'include_patterns': ['/docs/*', '/api/*', '/guides/*', '/examples/*']
}
)
print(f"\n📊 Enterprise Knowledge Base:")
print(f"Total documents: {len(documents)}")
print(f"Quality distribution:")
quality_distribution = {}
for doc in documents:
quality = doc.metadata.get('content_quality_score', 0)
if quality >= 0.8: quality_distribution['high'] = quality_distribution.get('high', 0) + 1
elif quality >= 0.6: quality_distribution['medium'] = quality_distribution.get('medium', 0) + 1
else: quality_distribution['low'] = quality_distribution.get('low', 0) + 1
for quality, count in quality_distribution.items():
print(f" {quality.capitalize()}: {count} documents")

LlamaIndex Document Processing

Leverage LlamaIndex's advanced document processing capabilities:

from llama_index.core.node_parser import SentenceSplitter, SemanticSplitterNodeParser
from llama_index.core.extractors import TitleExtractor, QuestionsAnsweredExtractor, SummaryExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.schema import MetadataMode
from llama_index.core import Settings
class EnterpriseDocumentProcessor:
"""
Advanced document processing with LlamaIndex
"""
def __init__(self):
self.pipelines = {}
self._setup_processing_pipelines()
def _setup_processing_pipelines(self):
"""
Setup specialized processing pipelines for different content types
"""
# General documentation pipeline
self.pipelines['general'] = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=1024, chunk_overlap=200),
TitleExtractor(nodes=5),
QuestionsAnsweredExtractor(questions=3),
SummaryExtractor(summaries=["prev", "self"])
]
)
# API documentation pipeline (preserve technical structure)
self.pipelines['api_documentation'] = IngestionPipeline(
transformations=[
SentenceSplitter(
chunk_size=1536, # Larger chunks for technical content
chunk_overlap=300,
separator=" "
),
TitleExtractor(nodes=3),
QuestionsAnsweredExtractor(questions=5) # More questions for API docs
]
)
# Tutorial pipeline (semantic splitting)
self.pipelines['tutorial'] = IngestionPipeline(
transformations=[
SemanticSplitterNodeParser(
buffer_size=1,
breakpoint_percentile_threshold=95
),
TitleExtractor(nodes=7),
QuestionsAnsweredExtractor(questions=4),
SummaryExtractor(summaries=["prev", "self", "next"])
]
)
# Reference pipeline (structured splitting)
self.pipelines['reference'] = IngestionPipeline(
transformations=[
SentenceSplitter(
chunk_size=800,
chunk_overlap=100,
separator="\n\n"
),
TitleExtractor(nodes=3),
SummaryExtractor(summaries=["self"])
]
)
def process_documents(self, documents: List[Document]) -> List[Document]:
"""
Process documents using appropriate pipelines based on content type
"""
processed_nodes = []
# Group documents by content type
doc_groups = self._group_by_content_type(documents)
for content_type, docs in doc_groups.items():
pipeline = self.pipelines.get(content_type, self.pipelines['general'])
print(f"🔄 Processing {len(docs)} {content_type} documents...")
try:
# Process documents through pipeline
nodes = pipeline.run(documents=docs, show_progress=True)
processed_nodes.extend(nodes)
print(f"✅ Processed {len(docs)} docs → {len(nodes)} nodes")
except Exception as e:
print(f"❌ Error processing {content_type}: {e}")
# Fallback to general pipeline
nodes = self.pipelines['general'].run(documents=docs)
processed_nodes.extend(nodes)
print(f"\n📊 Processing Complete:")
print(f"Input documents: {len(documents)}")
print(f"Output nodes: {len(processed_nodes)}")
return processed_nodes
def _group_by_content_type(self, documents: List[Document]) -> Dict[str, List[Document]]:
"""
Group documents by content type for specialized processing
"""
groups = {}
for doc in documents:
content_type = doc.metadata.get('content_type', 'general')
if content_type not in groups:
groups[content_type] = []
groups[content_type].append(doc)
return groups
def analyze_processing_results(self, nodes: List[Document]) -> Dict:
"""
Analyze processing results for optimization
"""
analysis = {
'total_nodes': len(nodes),
'avg_node_length': 0,
'metadata_completeness': {},
'content_type_distribution': {},
'quality_metrics': {}
}
if not nodes:
return analysis
# Calculate averages
total_length = sum(len(node.text) for node in nodes)
analysis['avg_node_length'] = total_length // len(nodes)
# Analyze metadata completeness
metadata_fields = ['title', 'questions_this_excerpt_can_answer', 'section_summary']
for field in metadata_fields:
complete_count = sum(1 for node in nodes if node.metadata.get(field))
analysis['metadata_completeness'][field] = (complete_count / len(nodes)) * 100
# Content type distribution
for node in nodes:
content_type = node.metadata.get('content_type', 'unknown')
analysis['content_type_distribution'][content_type] = (
analysis['content_type_distribution'].get(content_type, 0) + 1
)
# Quality metrics
quality_scores = [
node.metadata.get('content_quality_score', 0) for node in nodes
]
if quality_scores:
analysis['quality_metrics'] = {
'avg_quality': sum(quality_scores) / len(quality_scores),
'min_quality': min(quality_scores),
'max_quality': max(quality_scores)
}
return analysis
# Process documents with enterprise pipeline
processor = EnterpriseDocumentProcessor()
processed_nodes = processor.process_documents(documents)
# Analyze results
analysis = processor.analyze_processing_results(processed_nodes)
print(f"\n📈 Processing Analysis:")
for key, value in analysis.items():
if isinstance(value, dict):
print(f"{key.replace('_', ' ').title()}:")
for subkey, subvalue in value.items():
print(f" {subkey.replace('_', ' ').title()}: {subvalue}")
else:
print(f"{key.replace('_', ' ').title()}: {value}")

Intelligent Indexing Strategies

Implement multiple indexing strategies for different query patterns:

from llama_index.core import VectorStoreIndex, TreeIndex, KnowledgeGraphIndex
from llama_index.core import StorageContext
from llama_index.core.indices.composability import ComposableGraph
from llama_index.core.indices.query.query_transform import HyDEQueryTransform
from llama_index.core.query_engine import TransformQueryEngine
class EnterpriseIndexManager:
"""
Manage multiple indexing strategies for enterprise RAG
"""
def __init__(self, vector_store):
self.vector_store = vector_store
self.storage_context = StorageContext.from_defaults(vector_store=vector_store)
self.indices = {}
self.composable_graph = None
print("🏗️ Enterprise Index Manager initialized")
def build_multi_strategy_indices(self, nodes: List[Document]) -> Dict:
"""
Build multiple indices with different strategies
"""
indexing_strategies = {
'vector_similarity': self._build_vector_index,
'hierarchical_tree': self._build_tree_index,
'knowledge_graph': self._build_knowledge_graph,
'hybrid_composite': self._build_hybrid_index
}
results = {}
for strategy_name, build_func in indexing_strategies.items():
try:
print(f"🔨 Building {strategy_name} index...")
start_time = time.time()
index = build_func(nodes)
build_time = time.time() - start_time
self.indices[strategy_name] = index
results[strategy_name] = {
'success': True,
'build_time': build_time,
'node_count': len(nodes),
'index_type': type(index).__name__
}
print(f"✅ {strategy_name} index built in {build_time:.2f}s")
except Exception as e:
print(f"❌ Failed to build {strategy_name}: {e}")
results[strategy_name] = {
'success': False,
'error': str(e)
}
# Build composable graph if we have multiple indices
if len([r for r in results.values() if r.get('success')]) > 1:
self._build_composable_graph()
return results
def _build_vector_index(self, nodes: List[Document]) -> VectorStoreIndex:
"""
Build high-performance vector similarity index
"""
return VectorStoreIndex(
nodes,
storage_context=self.storage_context,
show_progress=True
)
def _build_tree_index(self, nodes: List[Document]) -> TreeIndex:
"""
Build hierarchical tree index for structured queries
"""
return TreeIndex(
nodes,
show_progress=True,
num_children=10, # Balanced tree structure
build_tree=True
)
def _build_knowledge_graph(self, nodes: List[Document]) -> KnowledgeGraphIndex:
"""
Build knowledge graph for relationship queries
"""
return KnowledgeGraphIndex(
nodes,
show_progress=True,
max_triplets_per_chunk=10,
include_embeddings=True
)
def _build_hybrid_index(self, nodes: List[Document]) -> VectorStoreIndex:
"""
Build hybrid index with advanced features
"""
# Group nodes by content type for specialized indexing
grouped_nodes = {}
for node in nodes:
content_type = node.metadata.get('content_type', 'general')
if content_type not in grouped_nodes:
grouped_nodes[content_type] = []
grouped_nodes[content_type].append(node)
# Build specialized sub-indices
sub_indices = {}
for content_type, type_nodes in grouped_nodes.items():
if len(type_nodes) >= 5: # Only create sub-index if enough nodes
sub_indices[content_type] = VectorStoreIndex(
type_nodes,
storage_context=self.storage_context
)
# Create main index with all nodes
main_index = VectorStoreIndex(
nodes,
storage_context=self.storage_context
)
return main_index
def _build_composable_graph(self):
"""
Build composable graph for intelligent query routing
"""
try:
# Define index summaries for routing
index_summaries = {
'vector_similarity': "Best for semantic similarity search and general questions",
'hierarchical_tree': "Best for structured queries and hierarchical information",
'knowledge_graph': "Best for relationship queries and entity connections",
'hybrid_composite': "Best for complex multi-faceted queries"
}
# Create composable graph
graph_indices = []
for name, index in self.indices.items():
if index is not None:
# Create query engine for each index
query_engine = index.as_query_engine()
graph_indices.append((query_engine, index_summaries.get(name, "")))
if len(graph_indices) > 1:
self.composable_graph = ComposableGraph.from_indices(
[idx[0] for idx in graph_indices],
index_summaries=[idx[1] for idx in graph_indices]
)
print("✅ Composable graph created for intelligent routing")
except Exception as e:
print(f"⚠️ Could not create composable graph: {e}")
def get_optimal_query_engine(self,
query_type: str = "auto",
**kwargs):
"""
Get optimal query engine based on query characteristics
"""
query_engines = {
'similarity': self._create_similarity_engine,
'hierarchical': self._create_tree_engine,
'relationship': self._create_graph_engine,
'hybrid': self._create_hybrid_engine,
'auto': self._create_auto_engine
}
if query_type not in query_engines:
query_type = 'auto'
return query_engines[query_type](**kwargs)
def _create_similarity_engine(self, **kwargs):
"""Create optimized similarity search engine"""
if 'vector_similarity' in self.indices:
base_engine = self.indices['vector_similarity'].as_query_engine(
similarity_top_k=kwargs.get('top_k', 5),
response_mode=kwargs.get('response_mode', 'compact')
)
# Add HyDE transformation for better semantic matching
hyde_transform = HyDEQueryTransform(include_original=True)
return TransformQueryEngine(base_engine, hyde_transform)
return None
def _create_tree_engine(self, **kwargs):
"""Create hierarchical tree query engine"""
if 'hierarchical_tree' in self.indices:
return self.indices['hierarchical_tree'].as_query_engine(
child_branch_factor=kwargs.get('branch_factor', 2),
response_mode=kwargs.get('response_mode', 'tree_summarize')
)
return None
def _create_graph_engine(self, **kwargs):
"""Create knowledge graph query engine"""
if 'knowledge_graph' in self.indices:
return self.indices['knowledge_graph'].as_query_engine(
include_text=kwargs.get('include_text', True),
response_mode=kwargs.get('response_mode', 'compact'),
embedding_mode=kwargs.get('embedding_mode', 'hybrid')
)
return None
def _create_hybrid_engine(self, **kwargs):
"""Create hybrid query engine"""
if 'hybrid_composite' in self.indices:
return self.indices['hybrid_composite'].as_query_engine(
similarity_top_k=kwargs.get('top_k', 7),
response_mode=kwargs.get('response_mode', 'compact')
)
return None
def _create_auto_engine(self, **kwargs):
"""Create auto-routing engine using composable graph"""
if self.composable_graph:
return self.composable_graph.as_query_engine()
elif 'vector_similarity' in self.indices:
return self._create_similarity_engine(**kwargs)
else:
# Fallback to any available index
for index in self.indices.values():
if index is not None:
return index.as_query_engine()
return None
# Build enterprise indices
index_manager = EnterpriseIndexManager(vector_store)
indexing_results = index_manager.build_multi_strategy_indices(processed_nodes)
print(f"\n🏗️ Indexing Results:")
for strategy, result in indexing_results.items():
if result.get('success'):
print(f"✅ {strategy}: {result['build_time']:.2f}s ({result['node_count']} nodes)")
else:
print(f"❌ {strategy}: {result.get('error', 'Unknown error')}")

Advanced Query Engines

Create sophisticated query engines with multiple strategies:

from llama_index.core.query_engine import SubQuestionQueryEngine
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector
from llama_index.core.response_synthesizers import ResponseMode
class EnterpriseQueryEngine:
"""
Advanced query engine with multiple strategies and intelligent routing
"""
def __init__(self, index_manager: EnterpriseIndexManager):
self.index_manager = index_manager
self.query_engines = {}
self.router_engine = None
self.sub_question_engine = None
self._build_specialized_engines()
self._build_router_engine()
self._build_sub_question_engine()
def _build_specialized_engines(self):
"""
Build specialized query engines for different use cases
"""
engine_configs = {
'quick_facts': {
'type': 'similarity',
'params': {
'top_k': 3,
'response_mode': 'compact'
},
'description': "Quick factual answers and definitions"
},
'detailed_analysis': {
'type': 'similarity',
'params': {
'top_k': 8,
'response_mode': 'tree_summarize'
},
'description': "Comprehensive analysis and detailed explanations"
},
'step_by_step': {
'type': 'hierarchical',
'params': {
'response_mode': 'tree_summarize',
'branch_factor': 3
},
'description': "Step-by-step instructions and tutorials"
},
'relationship_analysis': {
'type': 'relationship',
'params': {
'include_text': True,
'embedding_mode': 'hybrid'
},
'description': "Relationship analysis and entity connections"
},
'code_examples': {
'type': 'hybrid',
'params': {
'top_k': 5,
'response_mode': 'compact'
},
'description': "Code examples and implementation details"
}
}
for name, config in engine_configs.items():
engine = self.index_manager.get_optimal_query_engine(
query_type=config['type'],
**config['params']
)
if engine:
self.query_engines[name] = {
'engine': engine,
'description': config['description']
}
print(f"✅ Built {name} query engine")
def _build_router_engine(self):
"""
Build intelligent router that selects optimal engine based on query
"""
if not self.query_engines:
print("⚠️ No query engines available for router")
return
# Create query engine tools
tools = []
for name, config in self.query_engines.items():
tool = QueryEngineTool(
query_engine=config['engine'],
metadata=ToolMetadata(
name=name,
description=config['description']
)
)
tools.append(tool)
# Create router with LLM-based selection
self.router_engine = RouterQueryEngine(
selector=LLMSingleSelector.from_defaults(),
query_engine_tools=tools,
verbose=True
)
print("✅ Intelligent router engine created")
def _build_sub_question_engine(self):
"""
Build sub-question engine for complex multi-part queries
"""
if not self.query_engines:
print("⚠️ No query engines available for sub-question engine")
return
# Create tools for sub-question decomposition
tools = []
for name, config in self.query_engines.items():
tool = QueryEngineTool(
query_engine=config['engine'],
metadata=ToolMetadata(
name=name,
description=config['description']
)
)
tools.append(tool)
self.sub_question_engine = SubQuestionQueryEngine.from_defaults(
query_engine_tools=tools,
verbose=True
)
print("✅ Sub-question decomposition engine created")
def query(self,
question: str,
engine_type: str = "router",
**kwargs) -> Dict:
"""
Query the knowledge base using specified engine
"""
engines = {
'router': self.router_engine,
'sub_question': self.sub_question_engine,
**{name: config['engine'] for name, config in self.query_engines.items()}
}
if engine_type not in engines or engines[engine_type] is None:
raise ValueError(f"Engine '{engine_type}' not available. Choose from: {list(engines.keys())}")
engine = engines[engine_type]
print(f"🤔 Querying with {engine_type} engine: '{question}'")
start_time = time.time()
try:
response = engine.query(question)
query_time = time.time() - start_time
result = {
'question': question,
'answer': str(response),
'engine_used': engine_type,
'query_time': query_time,
'source_nodes': getattr(response, 'source_nodes', []),
'metadata': getattr(response, 'metadata', {}),
'success': True
}
# Extract source information
if hasattr(response, 'source_nodes') and response.source_nodes:
result['sources'] = [
{
'url': node.metadata.get('url', 'Unknown'),
'title': node.metadata.get('title', 'Untitled'),
'score': getattr(node, 'score', None),
'content_type': node.metadata.get('content_type', 'unknown')
}
for node in response.source_nodes[:5] # Top 5 sources
]
else:
result['sources'] = []
print(f"✅ Query completed in {query_time:.2f}s")
return result
except Exception as e:
error_result = {
'question': question,
'error': str(e),
'engine_used': engine_type,
'query_time': time.time() - start_time,
'success': False
}
print(f"❌ Query failed: {e}")
return error_result
def compare_engines(self, question: str, engines: List[str] = None) -> Dict:
"""
Compare different engines on the same question
"""
if engines is None:
engines = ['quick_facts', 'detailed_analysis', 'router']
available_engines = list(self.query_engines.keys()) + ['router', 'sub_question']
engines = [e for e in engines if e in available_engines]
if not engines:
print("⚠️ No valid engines provided for comparison")
return {}
print(f"🔍 Comparing engines for: '{question}'")
results = {}
for engine in engines:
print(f"\n--- Testing {engine} ---")
result = self.query(question, engine_type=engine)
results[engine] = result
return results
def analyze_query_patterns(self, queries: List[str]) -> Dict:
"""
Analyze query patterns and recommend optimal engines
"""
patterns = {
'quick_facts': [],
'detailed_analysis': [],
'step_by_step': [],
'relationship_analysis': [],
'code_examples': []
}
pattern_indicators = {
'quick_facts': ['what is', 'define', 'meaning', 'definition'],
'detailed_analysis': ['explain', 'analyze', 'comprehensive', 'detailed'],
'step_by_step': ['how to', 'tutorial', 'steps', 'guide', 'process'],
'relationship_analysis': ['relationship', 'connection', 'related', 'compare'],
'code_examples': ['example', 'code', 'implementation', 'sample']
}
for query in queries:
query_lower = query.lower()
for pattern, indicators in pattern_indicators.items():
if any(indicator in query_lower for indicator in indicators):
patterns[pattern].append(query)
break
else:
patterns['quick_facts'].append(query) # Default
return patterns
# Create enterprise query engine
enterprise_query = EnterpriseQueryEngine(index_manager)
# Test different query types
test_queries = [
"What is LlamaIndex?",
"How do I build a RAG system with LlamaIndex step by step?",
"Explain the relationship between embeddings and vector stores",
"Show me code examples for creating a vector index",
"What are the differences between vector stores and knowledge graphs in LlamaIndex?"
]
print("\n🧪 Testing Enterprise Query Engine:")
print("=" * 60)
for query in test_queries:
# Use router engine (intelligent selection)
result = enterprise_query.query(query, engine_type="router")
if result['success']:
print(f"\n❓ {result['question']}")
print(f"🎯 Engine: {result['engine_used']}")
print(f"💡 {result['answer'][:300]}...")
print(f"⏱️ Time: {result['query_time']:.2f}s")
print(f"📚 Sources: {len(result['sources'])} documents")
else:
print(f"\n❓ {result['question']}")
print(f"❌ Error: {result['error']}")
print("-" * 60)

Scale Beyond Local Development with Supacrawler

While this tutorial demonstrates LlamaIndex's enterprise features locally, production RAG systems require sophisticated data ingestion, knowledge management, and performance optimization at scale:

  • Enterprise Knowledge Bases: Managing thousands of documents across multiple domains
  • Real-Time Updates: Keeping knowledge current with automated re-crawling
  • Advanced Processing: Handling complex document structures, multimedia content, and technical documentation
  • Performance at Scale: Sub-second response times across millions of vectors

Supacrawler's enterprise crawling integrates seamlessly with LlamaIndex for production-scale systems:

import { SupacrawlerClient } from '@supacrawler/js'
const client = new SupacrawlerClient({ apiKey: process.env.SUPACRAWLER_API_KEY })
// Enterprise-scale knowledge base construction
async function buildEnterpriseKnowledgeBase() {
const knowledgeDomains = [
'https://docs.company.com',
'https://api.company.com/docs',
'https://support.company.com',
'https://engineering.company.com'
]
const crawlJobs = await Promise.all(
knowledgeDomains.map(url => client.createCrawlJob({
url,
format: 'markdown',
depth: 6, // Deep enterprise crawling
link_limit: 20000, // Large-scale processing
render_js: true,
// Enterprise quality controls
include_patterns: ['/docs/*', '/api/*', '/guides/*', '/tutorials/*', '/reference/*'],
exclude_patterns: ['/blog/*', '/news/*', '/privacy/*', '/legal/*'],
remove_selectors: ['.sidebar', '.nav', '.footer', '.ads', '.cookie-banner'],
wait_for: '.main-content, .content, main, article',
// Performance optimizations
concurrent_limit: 12,
timeout: 60000,
block_ads: true,
block_cookies: true,
// Content quality
min_content_length: 500,
respect_robots_txt: true
}))
)
// Process results with quality assessment
const knowledgeBase = []
for (const job of crawlJobs) {
const result = await client.waitForCrawl(job.job_id)
if (result.status === 'completed') {
const crawlData = result.data.crawl_data
for (const [url, pageData] of Object.entries(crawlData)) {
const content = pageData.markdown
const metadata = pageData.metadata
// Enterprise content quality scoring
const qualityScore = assessContentQuality(content, metadata, url)
if (qualityScore >= 0.7) { // High-quality threshold
knowledgeBase.push({
content,
metadata: {
...metadata,
url,
domain: new URL(url).hostname,
crawl_timestamp: Date.now(),
quality_score: qualityScore,
content_classification: classifyContent(content, url),
technical_depth: assessTechnicalDepth(content)
}
})
}
}
}
}
return knowledgeBase
}
function assessContentQuality(content: string, metadata: any, url: string): number {
let score = 0.0
// Content length (normalized to 2000 chars)
score += Math.min(content.length / 2000, 1.0) * 0.3
// Structural quality (headers, lists, code blocks)
const structureScore = (
(content.match(/^#{1,6}\s/gm) || []).length * 0.1 +
(content.match(/^[-*+]\s/gm) || []).length * 0.05 +
(content.match(/```[\s\S]*?```/g) || []).length * 0.1
) / content.length * 1000
score += Math.min(structureScore, 1.0) * 0.3
// Metadata completeness
let metadataScore = 0
if (metadata.title) metadataScore += 0.1
if (metadata.description) metadataScore += 0.1
if (metadata.keywords) metadataScore += 0.05
score += metadataScore
// Technical content indicators
const technicalTerms = ['API', 'function', 'class', 'method', 'endpoint', 'parameter']
const technicalCount = technicalTerms.reduce(
(count, term) => count + (content.toLowerCase().match(new RegExp(term.toLowerCase(), 'g')) || []).length,
0
)
score += Math.min(technicalCount / 50, 1.0) * 0.25
return Math.min(score, 1.0)
}

Enterprise Production Benefits:

  • Massive Scale Processing: Handle 100,000+ pages across multiple domains
  • LlamaIndex Integration: Direct compatibility with enterprise indexing strategies
  • Content Quality Assessment: AI-powered content scoring and filtering
  • Automated Knowledge Management: Scheduled re-crawling for knowledge freshness
  • Enterprise Security: Respect for robots.txt, rate limiting, and access controls
  • Performance Optimization: Concurrent processing and intelligent caching

Getting Started:

Conclusion

This comprehensive guide demonstrated how to build enterprise-grade RAG systems using LlamaIndex's advanced features with Supacrawler's intelligent crawling and Supabase's vector storage. The combination delivers:

  • Enterprise Architecture: Multi-index strategies, intelligent routing, and advanced query processing
  • Production Performance: Sub-second queries across large knowledge bases with sophisticated caching
  • Advanced Features: Knowledge graphs, sub-question decomposition, and multi-modal processing
  • Quality Assurance: Content scoring, metadata enhancement, and comprehensive evaluation

LlamaIndex's enterprise features make it ideal for complex RAG scenarios requiring advanced document understanding, sophisticated query processing, and integration with existing enterprise systems.

Whether building internal knowledge management, customer support automation, or research assistance platforms, this LlamaIndex-based architecture provides the sophistication and scalability needed for enterprise AI applications that can handle the most demanding use cases.

By Supacrawler Team
Published on September 8, 2025