Back to Blog

Integrations: Building RAG with Supacrawler, LangChain, and Supabase pgvector for Enterprise

LangChain has become the go-to framework for building sophisticated AI applications, providing powerful abstractions for document processing, embeddings, and retrieval systems. This comprehensive guide shows you how to combine LangChain's robust ecosystem with Supacrawler's intelligent web crawling and Supabase's vector storage to build a production-ready RAG system.

By leveraging LangChain's document loaders, text splitters, and retrieval chains, you'll create a RAG system that's both powerful and maintainable, perfect for complex document processing and advanced retrieval scenarios.

If you'd like to try it yourself you can check the Langchain Vectors notebook.

Table of Contents

LangChain RAG Architecture

Our RAG system leverages LangChain's modular architecture for maximum flexibility and maintainability:

ComponentTechnologyPurposeLangChain Module
Web CrawlingSupacrawlerExtract clean content from websitesCustom Document Loader
Document ProcessingLangChainParse and structure crawled contentDocument Loaders & Transformers
Text ChunkingLangChainSplit documents into searchable segmentsText Splitters
EmbeddingsOpenAIConvert text to high-dimensional vectorsOpenAI Embeddings
Vector StorageSupabase pgvectorStore and search vectors efficientlyPGVector
RetrievalLangChainFind relevant documents for queriesRetrievers
GenerationOpenAIGenerate responses with retrieved contextChat Models

Key Advantages of LangChain Integration

  • Modular Design: Easy to swap components and experiment with different approaches
  • Rich Ecosystem: Access to 100+ document loaders, text splitters, and integrations
  • Chain Abstractions: Build complex workflows with simple, reusable components
  • Memory Management: Handle conversation context and multi-turn interactions
  • Evaluation Tools: Built-in metrics and evaluation frameworks

Setting Up the Development Environment

First, install LangChain and all required dependencies:

# Core LangChain packages
pip install langchain langchain-community langchain-openai
# Vector storage and database
pip install langchain-postgres psycopg2-binary sqlalchemy
# Text processing and utilities
pip install langchain-text-splitters beautifulsoup4 markdownify
# Supacrawler and environment management
pip install supacrawler python-dotenv
# Optional: Advanced features
pip install langchain-experimental # For experimental features

Create your environment configuration:

# .env
SUPACRAWLER_API_KEY=your_supacrawler_api_key
OPENAI_API_KEY=your_openai_api_key
SUPABASE_URL=your_supabase_project_url
SUPABASE_KEY=your_supabase_anon_key
DATABASE_URL=postgresql://postgres:[password]@db.[project].supabase.co:5432/postgres
# LangChain settings
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langchain_api_key # Optional: for LangSmith tracing

Supabase pgvector Configuration

Enable pgvector in your Supabase project:

import os
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
load_dotenv()
def setup_supabase_pgvector():
"""
Setup Supabase with pgvector extension
"""
engine = create_engine(os.getenv('DATABASE_URL'))
with engine.connect() as connection:
# Enable pgvector extension
connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
connection.commit()
print("✅ pgvector extension enabled in Supabase")
return engine
# Setup database
engine = setup_supabase_pgvector()

Intelligent Web Crawling with Supacrawler

Create a custom LangChain document loader that uses Supacrawler for intelligent web crawling:

import os
from typing import List, Dict, Any, Optional
from langchain_core.documents import Document
from langchain_core.document_loaders import BaseLoader
from supacrawler import SupacrawlerClient
from dotenv import load_dotenv
load_dotenv()
class SupacrawlerDocumentLoader(BaseLoader):
"""
LangChain document loader that uses Supacrawler for web crawling
"""
def __init__(
self,
url: str,
api_key: Optional[str] = None,
include_patterns: Optional[List[str]] = None,
exclude_patterns: Optional[List[str]] = None,
depth: int = 3,
link_limit: int = 200,
**crawl_kwargs
):
"""
Initialize Supacrawler document loader
Args:
url: Starting URL to crawl
api_key: Supacrawler API key (defaults to environment variable)
include_patterns: URL patterns to include (e.g., ['/docs/*'])
exclude_patterns: URL patterns to exclude (e.g., ['/blog/*'])
depth: Maximum crawl depth
link_limit: Maximum number of pages to crawl
**crawl_kwargs: Additional crawl parameters
"""
self.url = url
self.client = SupacrawlerClient(
api_key=api_key or os.getenv('SUPACRAWLER_API_KEY')
)
# Default crawl configuration optimized for documentation
self.crawl_config = {
'url': url,
'format': 'markdown',
'depth': depth,
'link_limit': link_limit,
'render_js': True,
'include_patterns': include_patterns or ['/docs/*', '/api/*', '/guides/*'],
'exclude_patterns': exclude_patterns or ['/blog/*', '/changelog/*', '/privacy/*'],
'timeout': 30000,
'wait_for': '.main-content, .content, main, article',
'block_ads': True,
'block_cookies': True,
**crawl_kwargs
}
def load(self) -> List[Document]:
"""
Crawl website and return LangChain Documents
"""
print(f"🚀 Starting crawl of {self.url}")
# Create and execute crawl job
job = self.client.create_crawl_job(**self.crawl_config)
result = self.client.wait_for_crawl(job.job_id)
if result.status != 'completed':
raise Exception(f"Crawl failed with status: {result.status}")
crawl_data = result.data.get('crawl_data', {})
print(f"✅ Crawl completed! Found {len(crawl_data)} pages")
# Convert to LangChain Documents
documents = []
for url, page_data in crawl_data.items():
content = page_data.get('markdown', '')
metadata = page_data.get('metadata', {})
if not content or len(content.strip()) < 100:
continue
# Create LangChain Document with rich metadata
doc = Document(
page_content=content,
metadata={
'url': url,
'title': metadata.get('title', ''),
'description': metadata.get('description', ''),
'keywords': metadata.get('keywords', ''),
'author': metadata.get('author', ''),
'source': 'supacrawler',
'crawl_timestamp': result.data.get('timestamp'),
'content_length': len(content),
'word_count': len(content.split())
}
)
documents.append(doc)
print(f"📄 Created {len(documents)} LangChain documents")
return documents
def lazy_load(self) -> List[Document]:
"""
Lazy loading implementation (same as load for web crawling)
"""
return self.load()
# Example usage
loader = SupacrawlerDocumentLoader(
url="https://python.langchain.com/docs",
include_patterns=['/docs/*'],
exclude_patterns=['/docs/changelog/*'],
depth=3,
link_limit=150
)
# Load documents
documents = loader.load()
print(f"\n📊 Loaded {len(documents)} documents")
print(f"📈 Total content: {sum(len(doc.page_content) for doc in documents):,} characters")
# Display sample document
if documents:
sample_doc = documents[0]
print(f"\n📋 Sample Document:")
print(f"Title: {sample_doc.metadata['title']}")
print(f"URL: {sample_doc.metadata['url']}")
print(f"Content preview: {sample_doc.page_content[:300]}...")

LangChain Document Processing

Process the crawled documents using LangChain's powerful document transformers:

from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter
from typing import List
import re
class AdvancedDocumentProcessor:
"""
Advanced document processing using LangChain's text splitters
"""
def __init__(self):
# Initialize different text splitters for different content types
self.markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
("####", "Header 4"),
]
)
self.recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
def clean_document_content(self, doc: Document) -> Document:
"""
Clean and normalize document content
"""
content = doc.page_content
# Remove excessive whitespace
content = re.sub(r'\n\s*\n\s*\n', '\n\n', content)
content = re.sub(r'[ \t]+', ' ', content)
# Clean markdown artifacts
content = re.sub(r'!\[([^\]]*)\]\([^)]+\)', r'[Image: \1]', content) # Images
content = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', content) # Links to text
# Remove code blocks for general content (keep for API docs)
if 'api' not in doc.metadata.get('url', '').lower():
content = re.sub(r'```[\s\S]*?```', '[Code Block]', content)
content = re.sub(r'`([^`]+)`', r'\1', content)
# Create cleaned document
cleaned_doc = Document(
page_content=content.strip(),
metadata={**doc.metadata, 'processed': True}
)
return cleaned_doc
def split_documents_intelligently(self, documents: List[Document]) -> List[Document]:
"""
Split documents using intelligent strategies based on content type
"""
all_chunks = []
for doc in documents:
# Clean document first
cleaned_doc = self.clean_document_content(doc)
# Choose splitting strategy based on content
url = doc.metadata.get('url', '')
content = cleaned_doc.page_content
if self._is_structured_markdown(content):
# Use markdown-aware splitting for structured content
chunks = self._split_markdown_document(cleaned_doc)
else:
# Use recursive splitting for general content
chunks = self.recursive_splitter.split_documents([cleaned_doc])
# Add chunk metadata
for i, chunk in enumerate(chunks):
chunk.metadata.update({
'chunk_index': i,
'total_chunks': len(chunks),
'chunk_id': f"{doc.metadata.get('url', 'unknown')}#{i}",
'parent_document_id': doc.metadata.get('url', 'unknown')
})
all_chunks.append(chunk)
print(f"📊 Split {len(documents)} documents into {len(all_chunks)} chunks")
return all_chunks
def _is_structured_markdown(self, content: str) -> bool:
"""
Detect if content has clear markdown structure
"""
header_count = len(re.findall(r'^#{1,6}\s+', content, re.MULTILINE))
lines = content.count('\n')
# If more than 10% of lines are headers, consider it structured
return lines > 0 and (header_count / lines) > 0.1
def _split_markdown_document(self, doc: Document) -> List[Document]:
"""
Split markdown document preserving header hierarchy
"""
# First split by headers
header_chunks = self.markdown_splitter.split_text(doc.page_content)
# Then further split large chunks
final_chunks = []
for chunk in header_chunks:
if len(chunk.page_content) > 1200:
# Further split large sections
sub_chunks = self.recursive_splitter.split_documents([chunk])
final_chunks.extend(sub_chunks)
else:
final_chunks.append(chunk)
# Preserve original metadata
for chunk in final_chunks:
chunk.metadata.update(doc.metadata)
return final_chunks
# Process documents
processor = AdvancedDocumentProcessor()
document_chunks = processor.split_documents_intelligently(documents)
print(f"\n📈 Processing Statistics:")
print(f"Original documents: {len(documents)}")
print(f"Generated chunks: {len(document_chunks)}")
print(f"Average chunk size: {sum(len(chunk.page_content) for chunk in document_chunks) // len(document_chunks)} characters")
# Show sample chunk
if document_chunks:
sample_chunk = document_chunks[0]
print(f"\n📋 Sample Chunk:")
print(f"Chunk ID: {sample_chunk.metadata['chunk_id']}")
print(f"Title: {sample_chunk.metadata['title']}")
print(f"Content: {sample_chunk.page_content[:200]}...")

Advanced Text Chunking Strategies

LangChain provides sophisticated text splitting options for different use cases:

from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
TokenTextSplitter,
MarkdownHeaderTextSplitter,
HTMLHeaderTextSplitter
)
from langchain_community.document_transformers import Html2TextTransformer
class AdaptiveChunkingStrategy:
"""
Adaptive chunking that selects the best strategy based on content type
"""
def __init__(self):
self.strategies = {
'recursive': RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " ", ""]
),
'token_based': TokenTextSplitter(
chunk_size=800,
chunk_overlap=100
),
'markdown': MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
),
'semantic': RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=300,
separators=["\n\n", "\n", ". ", " "]
)
}
def choose_chunking_strategy(self, doc: Document) -> str:
"""
Choose optimal chunking strategy based on content analysis
"""
content = doc.page_content
metadata = doc.metadata
# Analyze content characteristics
has_headers = bool(re.search(r'^#{1,6}\s+', content, re.MULTILINE))
has_code = bool(re.search(r'```|`[^`]+`', content))
is_api_doc = 'api' in metadata.get('url', '').lower()
is_long_form = len(content) > 2000
# Decision logic
if has_headers and is_long_form:
return 'markdown'
elif is_api_doc or has_code:
return 'semantic' # Preserve more context for technical content
elif is_long_form:
return 'token_based' # More precise for long content
else:
return 'recursive' # Default for general content
def chunk_document(self, doc: Document) -> List[Document]:
"""
Chunk document using adaptive strategy
"""
strategy_name = self.choose_chunking_strategy(doc)
strategy = self.strategies[strategy_name]
print(f"📄 Using '{strategy_name}' strategy for: {doc.metadata.get('title', 'Unknown')}")
if strategy_name == 'markdown':
# Special handling for markdown
chunks = strategy.split_text(doc.page_content)
# Convert back to Documents
result_chunks = []
for chunk in chunks:
new_doc = Document(
page_content=chunk.page_content,
metadata={**doc.metadata, **chunk.metadata, 'chunking_strategy': strategy_name}
)
result_chunks.append(new_doc)
return result_chunks
else:
chunks = strategy.split_documents([doc])
for chunk in chunks:
chunk.metadata['chunking_strategy'] = strategy_name
return chunks
# Apply adaptive chunking
adaptive_chunker = AdaptiveChunkingStrategy()
# Process all documents with adaptive chunking
adaptive_chunks = []
for doc in documents:
doc_chunks = adaptive_chunker.chunk_document(doc)
adaptive_chunks.extend(doc_chunks)
print(f"\n🔄 Adaptive Chunking Results:")
print(f"Total chunks: {len(adaptive_chunks)}")
# Analyze strategy distribution
strategy_counts = {}
for chunk in adaptive_chunks:
strategy = chunk.metadata.get('chunking_strategy', 'unknown')
strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1
print(f"📊 Strategy distribution:")
for strategy, count in strategy_counts.items():
print(f" {strategy}: {count} chunks")

OpenAI Embeddings Integration

Integrate OpenAI embeddings with LangChain's embedding abstractions:

from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from typing import List
import os
class OptimizedOpenAIEmbeddings:
"""
Optimized OpenAI embeddings with batching and error handling
"""
def __init__(self, api_key: str = None, model: str = "text-embedding-3-small"):
self.embeddings = OpenAIEmbeddings(
openai_api_key=api_key or os.getenv('OPENAI_API_KEY'),
model=model,
show_progress_bar=True
)
self.model = model
self.dimension = 1536 if model == "text-embedding-3-small" else 3072
def embed_documents_with_metadata(self, documents: List[Document]) -> List[Document]:
"""
Embed documents and add embedding vectors to metadata
"""
print(f"🧠 Generating embeddings for {len(documents)} documents...")
# Extract texts for embedding
texts = [doc.page_content for doc in documents]
try:
# Generate embeddings in batch
embeddings = self.embeddings.embed_documents(texts)
# Add embeddings to documents
embedded_docs = []
for doc, embedding in zip(documents, embeddings):
# Create new document with embedding in metadata
embedded_doc = Document(
page_content=doc.page_content,
metadata={
**doc.metadata,
'embedding': embedding,
'embedding_model': self.model,
'embedding_dimension': len(embedding)
}
)
embedded_docs.append(embedded_doc)
print(f"✅ Successfully embedded {len(embedded_docs)} documents")
return embedded_docs
except Exception as e:
print(f"❌ Error generating embeddings: {e}")
return []
def embed_query(self, query: str) -> List[float]:
"""
Embed a query string
"""
return self.embeddings.embed_query(query)
# Generate embeddings for our chunks
embedder = OptimizedOpenAIEmbeddings()
embedded_chunks = embedder.embed_documents_with_metadata(document_chunks)
print(f"\n📊 Embedding Statistics:")
print(f"Embedded chunks: {len(embedded_chunks)}")
if embedded_chunks:
print(f"Embedding dimension: {embedded_chunks[0].metadata['embedding_dimension']}")
print(f"Model used: {embedded_chunks[0].metadata['embedding_model']}")

LangChain PGVector Storage

Use LangChain's PGVector integration for seamless vector storage:

from langchain_postgres import PGVector
from langchain_openai import OpenAIEmbeddings
from sqlalchemy import create_engine
import os
class LangChainVectorStore:
"""
LangChain-integrated vector store using Supabase pgvector
"""
def __init__(self, connection_string: str, collection_name: str = "langchain_documents"):
self.connection_string = connection_string
self.collection_name = collection_name
self.engine = create_engine(connection_string)
# Initialize embeddings
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=os.getenv('OPENAI_API_KEY')
)
# Initialize vector store
self.vector_store = PGVector(
connection=self.engine,
collection_name=collection_name,
embeddings=self.embeddings,
use_jsonb=True # Use JSONB for metadata storage
)
print(f"📦 Initialized LangChain PGVector store: {collection_name}")
def add_documents(self, documents: List[Document]) -> List[str]:
"""
Add documents to vector store
"""
print(f"💾 Adding {len(documents)} documents to vector store...")
try:
# Add documents (LangChain handles embedding automatically)
doc_ids = self.vector_store.add_documents(documents)
print(f"✅ Successfully added {len(doc_ids)} documents")
return doc_ids
except Exception as e:
print(f"❌ Error adding documents: {e}")
return []
def create_retriever(self, search_type: str = "similarity", search_kwargs: dict = None):
"""
Create a LangChain retriever
"""
search_kwargs = search_kwargs or {"k": 5}
retriever = self.vector_store.as_retriever(
search_type=search_type,
search_kwargs=search_kwargs
)
print(f"🔍 Created retriever with search_type='{search_type}', k={search_kwargs.get('k', 5)}")
return retriever
def similarity_search_with_score(self, query: str, k: int = 5):
"""
Search with similarity scores
"""
return self.vector_store.similarity_search_with_score(query, k=k)
# Initialize vector store
vector_store = LangChainVectorStore(
connection_string=os.getenv('DATABASE_URL'),
collection_name="supacrawler_langchain_rag"
)
# Add our embedded documents
document_ids = vector_store.add_documents(document_chunks)
print(f"\n🎯 Vector store ready with {len(document_ids)} documents!")

Building Retrieval Chains

Create sophisticated retrieval chains using LangChain's composable components:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
class AdvancedRAGChain:
"""
Advanced RAG chain with multiple retrieval strategies
"""
def __init__(self, vector_store: LangChainVectorStore):
self.vector_store = vector_store
self.llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.1,
openai_api_key=os.getenv('OPENAI_API_KEY')
)
# Create different retrievers
self.retrievers = {
'similarity': vector_store.create_retriever(
search_type="similarity",
search_kwargs={"k": 5}
),
'mmr': vector_store.create_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20}
),
'similarity_score': vector_store.create_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.7, "k": 5}
)
}
self.chains = {}
self._build_chains()
def _build_chains(self):
"""
Build different retrieval chains
"""
# Standard RAG prompt
rag_prompt = ChatPromptTemplate.from_template("""
You are a helpful assistant that answers questions based on the provided context.
Use only the information from the context to answer the question.
If the context doesn't contain enough information, say so clearly.
Context: {context}
Question: {input}
Answer:""")
# Advanced RAG prompt with source citation
citation_prompt = ChatPromptTemplate.from_template("""
You are a helpful assistant that answers questions based on the provided context.
Use only the information from the context to answer the question.
Always cite your sources by mentioning the relevant document titles or URLs.
If the context doesn't contain enough information, say so clearly.
Context: {context}
Question: {input}
Answer (with citations):""")
# Build chains for each retriever
for name, retriever in self.retrievers.items():
# Standard chain
question_answer_chain = create_stuff_documents_chain(self.llm, rag_prompt)
self.chains[f"{name}_basic"] = create_retrieval_chain(retriever, question_answer_chain)
# Citation chain
citation_chain = create_stuff_documents_chain(self.llm, citation_prompt)
self.chains[f"{name}_citation"] = create_retrieval_chain(retriever, citation_chain)
def ask(self, question: str, chain_type: str = "similarity_citation") -> dict:
"""
Ask a question using specified chain
"""
if chain_type not in self.chains:
available_chains = list(self.chains.keys())
raise ValueError(f"Chain type '{chain_type}' not available. Choose from: {available_chains}")
chain = self.chains[chain_type]
print(f"🤔 Processing question with '{chain_type}' chain...")
result = chain.invoke({"input": question})
# Enhanced result with metadata
enhanced_result = {
'question': question,
'answer': result['answer'],
'chain_type': chain_type,
'source_documents': result.get('context', []),
'num_sources': len(result.get('context', [])),
'sources': self._extract_sources(result.get('context', []))
}
return enhanced_result
def _extract_sources(self, documents) -> List[dict]:
"""
Extract source information from retrieved documents
"""
sources = []
seen_urls = set()
for doc in documents:
url = doc.metadata.get('url', 'Unknown')
if url not in seen_urls:
sources.append({
'url': url,
'title': doc.metadata.get('title', 'Untitled'),
'description': doc.metadata.get('description', ''),
'relevance_score': doc.metadata.get('score', 'N/A')
})
seen_urls.add(url)
return sources
def compare_retrieval_strategies(self, question: str):
"""
Compare different retrieval strategies for a question
"""
print(f"\n🔍 Comparing retrieval strategies for: '{question}'")
print("=" * 60)
results = {}
for chain_name in ['similarity_basic', 'mmr_basic', 'similarity_score_basic']:
try:
result = self.ask(question, chain_name)
results[chain_name] = result
print(f"\n{chain_name.upper()}:")
print(f"Answer: {result['answer'][:200]}...")
print(f"Sources: {result['num_sources']} documents")
except Exception as e:
print(f"❌ Error with {chain_name}: {e}")
return results
# Create advanced RAG chain
rag_chain = AdvancedRAGChain(vector_store)
# Test the system
test_questions = [
"How do I install LangChain?",
"What are the different types of text splitters in LangChain?",
"How do I use OpenAI embeddings with LangChain?",
"What is the difference between similarity search and MMR?"
]
print("\n🧪 Testing Advanced RAG System:")
print("=" * 50)
for question in test_questions:
result = rag_chain.ask(question)
print(f"\n❓ {result['question']}")
print(f"💡 {result['answer']}")
print(f"📚 Sources ({result['num_sources']}):")
for source in result['sources'][:3]: # Show top 3 sources
print(f" • {source['title']} - {source['url']}")
print("-" * 50)

Advanced Query Processing

Implement sophisticated query processing with conversation memory and query enhancement:

from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_core.prompts import PromptTemplate
class ConversationalRAG:
"""
Conversational RAG system with memory and context awareness
"""
def __init__(self, vector_store: LangChainVectorStore):
self.vector_store = vector_store
self.llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.1
)
# Initialize memory
self.memory = ConversationBufferWindowMemory(
memory_key="chat_history",
output_key="answer",
return_messages=True,
k=5 # Remember last 5 exchanges
)
# Create retriever
self.retriever = vector_store.create_retriever(
search_type="mmr",
search_kwargs={"k": 6, "fetch_k": 20}
)
# Custom prompt for conversational RAG
self.qa_prompt = PromptTemplate(
template="""
You are a helpful AI assistant with expertise in the provided documentation.
Use the following context and conversation history to answer the question.
Previous conversation:
{chat_history}
Context from documentation:
{context}
Current question: {question}
Instructions:
1. Use only information from the provided context
2. Consider the conversation history for context
3. If you cannot answer based on the context, say so clearly
4. Provide specific examples when available
5. If referring to previous questions, be explicit about the connection
Answer:""",
input_variables=["context", "question", "chat_history"]
)
# Create conversational chain
self.qa_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.retriever,
memory=self.memory,
combine_docs_chain_kwargs={"prompt": self.qa_prompt},
return_source_documents=True,
verbose=True
)
def chat(self, question: str) -> dict:
"""
Have a conversation with the RAG system
"""
print(f"💬 User: {question}")
try:
result = self.qa_chain.invoke({"question": question})
answer = result["answer"]
source_docs = result.get("source_documents", [])
print(f"🤖 Assistant: {answer}")
return {
"question": question,
"answer": answer,
"sources": [doc.metadata.get('url', 'Unknown') for doc in source_docs],
"source_documents": source_docs
}
except Exception as e:
error_msg = f"Sorry, I encountered an error: {e}"
print(f"❌ {error_msg}")
return {
"question": question,
"answer": error_msg,
"sources": [],
"source_documents": []
}
def get_conversation_history(self):
"""
Get the current conversation history
"""
return self.memory.chat_memory.messages
def clear_memory(self):
"""
Clear conversation memory
"""
self.memory.clear()
print("🧹 Conversation memory cleared")
# Create conversational RAG
conversational_rag = ConversationalRAG(vector_store)
# Example conversation
print("\n💬 Starting Conversational RAG Demo:")
print("=" * 50)
conversation_flow = [
"What is LangChain?",
"How do I install it?",
"What are the main components I should know about?",
"Can you explain more about text splitters?",
"Which text splitter should I use for long documents?"
]
for question in conversation_flow:
result = conversational_rag.chat(question)
print(f"📚 Sources: {len(result['sources'])} documents")
print("-" * 30)

Production Optimization

Optimize the system for production deployment:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from langchain.callbacks import get_openai_callback
import time
class ProductionRAGSystem:
"""
Production-optimized RAG system with performance monitoring
"""
def __init__(self, vector_store: LangChainVectorStore):
self.vector_store = vector_store
self.llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.1,
max_retries=3,
request_timeout=30
)
# Performance metrics
self.metrics = {
'queries_processed': 0,
'total_tokens_used': 0,
'total_cost': 0.0,
'avg_response_time': 0.0,
'error_count': 0
}
# Create optimized retriever
self.retriever = vector_store.create_retriever(
search_type="mmr",
search_kwargs={"k": 4, "fetch_k": 12} # Reduced for speed
)
# Build production chain
self._build_production_chain()
def _build_production_chain(self):
"""
Build optimized production chain
"""
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
# Optimized prompt
prompt = ChatPromptTemplate.from_template("""
Based on the provided context, answer the question concisely and accurately.
Context: {context}
Question: {input}
Answer:""")
question_answer_chain = create_stuff_documents_chain(self.llm, prompt)
self.chain = create_retrieval_chain(self.retriever, question_answer_chain)
def query_with_monitoring(self, question: str) -> dict:
"""
Process query with performance monitoring
"""
start_time = time.time()
try:
with get_openai_callback() as cb:
result = self.chain.invoke({"input": question})
# Update metrics
response_time = time.time() - start_time
self.metrics['queries_processed'] += 1
self.metrics['total_tokens_used'] += cb.total_tokens
self.metrics['total_cost'] += cb.total_cost
# Update average response time
current_avg = self.metrics['avg_response_time']
query_count = self.metrics['queries_processed']
self.metrics['avg_response_time'] = (
(current_avg * (query_count - 1) + response_time) / query_count
)
return {
'answer': result['answer'],
'sources': [doc.metadata.get('url') for doc in result.get('context', [])],
'response_time': response_time,
'tokens_used': cb.total_tokens,
'cost': cb.total_cost,
'success': True
}
except Exception as e:
self.metrics['error_count'] += 1
return {
'error': str(e),
'response_time': time.time() - start_time,
'success': False
}
def batch_process_queries(self, questions: List[str], max_workers: int = 3) -> List[dict]:
"""
Process multiple queries in parallel
"""
print(f"⚡ Processing {len(questions)} queries with {max_workers} workers...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(self.query_with_monitoring, questions))
print(f"✅ Batch processing complete!")
return results
def get_performance_report(self) -> dict:
"""
Get comprehensive performance report
"""
return {
'queries_processed': self.metrics['queries_processed'],
'total_tokens_used': self.metrics['total_tokens_used'],
'total_cost': round(self.metrics['total_cost'], 4),
'avg_response_time': round(self.metrics['avg_response_time'], 2),
'error_count': self.metrics['error_count'],
'error_rate': round(self.metrics['error_count'] / max(1, self.metrics['queries_processed']) * 100, 2),
'cost_per_query': round(self.metrics['total_cost'] / max(1, self.metrics['queries_processed']), 4)
}
# Create production system
production_rag = ProductionRAGSystem(vector_store)
# Performance testing
test_queries = [
"What is LangChain?",
"How do I use OpenAI embeddings?",
"What are the different text splitters?",
"How do I build a retrieval chain?",
"What is the difference between similarity and MMR search?"
]
print("\n⚡ Production Performance Test:")
results = production_rag.batch_process_queries(test_queries)
# Display results
for i, (query, result) in enumerate(zip(test_queries, results)):
if result['success']:
print(f"\n{i+1}. {query}")
print(f" Answer: {result['answer'][:100]}...")
print(f" Time: {result['response_time']:.2f}s, Tokens: {result['tokens_used']}, Cost: ${result['cost']:.4f}")
else:
print(f"\n{i+1}. {query} - ERROR: {result['error']}")
# Performance report
print(f"\n📊 Performance Report:")
report = production_rag.get_performance_report()
for key, value in report.items():
print(f" {key.replace('_', ' ').title()}: {value}")

Scale Beyond Local Development with Supacrawler

While this tutorial demonstrates building with LangChain locally, production RAG systems require sophisticated data ingestion, content management, and performance optimization:

  • Large-Scale Knowledge Bases: Processing thousands of documents with consistent quality
  • Content Freshness: Keeping embeddings current with website changes
  • Advanced Processing: Handling complex document structures, multimedia content, and dynamic pages
  • Performance Optimization: Balancing retrieval quality with response time

Supacrawler's Crawl API integrates seamlessly with LangChain for production-scale RAG systems:

from langchain_core.documents import Document
from supacrawler import SupacrawlerClient
class ProductionSupacrawlerLoader:
"""Production-grade Supacrawler integration with LangChain"""
def __init__(self, api_key: str):
self.client = SupacrawlerClient(api_key=api_key)
def load_knowledge_base(self, urls: List[str]) -> List[Document]:
"""Load multiple websites into a comprehensive knowledge base"""
all_documents = []
for url in urls:
job = self.client.create_crawl_job(
url=url,
format='markdown',
depth=4, # Deep crawling for comprehensive coverage
link_limit=5000, # Large-scale processing
render_js=True,
# Production optimizations
include_patterns=['/docs/*', '/api/*', '/guides/*', '/tutorials/*'],
exclude_patterns=['/blog/*', '/news/*', '/privacy/*'],
remove_selectors=['.sidebar', '.nav', '.footer', '.ads'],
wait_for='.main-content, .content, main',
block_ads=True,
block_cookies=True,
# Quality controls
timeout=45000,
concurrent_limit=8,
respect_robots_txt=True
)
result = self.client.wait_for_crawl(job.job_id)
if result.status == 'completed':
crawl_data = result.data.get('crawl_data', {})
for url, page_data in crawl_data.items():
content = page_data.get('markdown', '')
if len(content.strip()) > 200: # Quality filter
doc = Document(
page_content=content,
metadata={
**page_data.get('metadata', {}),
'source_domain': url.split('/')[2],
'crawl_timestamp': result.data.get('timestamp'),
'content_quality_score': len(content) / 1000 # Simple quality metric
}
)
all_documents.append(doc)
return all_documents

Production Integration Benefits:

  • Scalable Data Ingestion: Process 10,000+ pages without infrastructure management
  • LangChain Compatibility: Direct integration with Document loaders and processors
  • Content Quality: Clean, structured content optimized for embeddings
  • Automated Updates: Easy re-crawling for fresh knowledge bases
  • Error Resilience: Built-in retry logic and failure handling
  • Performance Optimization: Concurrent processing and intelligent rate limiting

Getting Started:

Conclusion

This comprehensive guide demonstrated how to build a sophisticated RAG system using LangChain's powerful abstractions with Supacrawler's intelligent web crawling and Supabase's vector storage. The combination provides:

  • Modular Architecture: Easy to customize and extend with LangChain's ecosystem
  • Advanced Processing: Sophisticated document chunking and retrieval strategies
  • Production Ready: Performance monitoring, error handling, and scalability features
  • Conversation Support: Memory and context-aware interactions

The LangChain integration offers unmatched flexibility for complex RAG scenarios, making it ideal for applications requiring advanced document processing, custom retrieval logic, or integration with existing LangChain workflows.

Whether building customer support systems, documentation search, or intelligent assistants, this LangChain-based RAG architecture provides the foundation for sophisticated AI applications that can scale from prototype to production.

By Supacrawler Team
Published on September 9, 2025