Building a Local RAG System with Ollama and Gemma: A Complete Guide - Part 2
Building a Local RAG System with Chat Memory Using Redis
This is a continuation of our comprehensive guide on building a local RAG system with Ollama and Gemma. If you haven’t read Part 1, we recommend starting there to understand the foundational concepts and basic implementation.
In our previous article, we successfully built a functional RAG (Retrieval-Augmented Generation) system that could process documents and answer questions based on their content. However, our system had one significant limitation: it couldn’t remember previous conversations or maintain context across multiple interactions.
Today, we’ll enhance our RAG system by adding chat memory functionality using Redis, enabling it to maintain conversation history and provide more contextual responses. This upgrade transforms our stateless question-answering system into an intelligent conversational AI that can reference previous exchanges.
Why Add Chat Memory to Your RAG System?
Before diving into the implementation, let’s understand the benefits of adding memory:
Enhanced User Experience: Users can ask follow-up questions without repeating context
Contextual Understanding: The system can reference previous parts of the conversation
Natural Conversations: Creates a more human-like interaction pattern
Improved Accuracy: Better responses by considering conversation history
User Session Management: Support multiple users with isolated conversation histories
Two Approaches to Implementing Chat Memory
When building a RAG system with memory, you have two primary implementation strategies:
Option 1: Manual History Management
Manually append chat history to your prompt template. This approach gives you complete control but requires more implementation work.
Option 2: LangChain’s Memory Components
Leverage LangChain’s built-in memory management with ConversationBufferMemory
and RedisChatMessageHistory
. This is the approach we’ll use as it provides robust, production-ready functionality.
User Isolation Strategies
For multi-user systems, you need to isolate chat histories:
-
Session IDs or User IDs: Assign unique identifiers to manage memory per user
-
Dynamic Memory Management: Use LangChain’s
RedisChatMessageHistory
with dynamic session handling
We’ll implement the second method using ConversationBufferMemory
for optimal performance and scalability.
Choosing Your Storage Backend
Different storage options serve different use cases:
For our implementation, we’ll use Redis for its excellent performance in handling frequent read/write operations typical in chat applications.
Setting Up Redis
Installing Redis
For Windows: Download and install the Unofficial Native Redis for Windows
For Linux/Mac:
# Ubuntu/Debian
sudo apt-get install redis-server
# macOS with Homebrew
brew install redis
Installing Python Redis Client
pip install redis
Enhanced Document Processing
This section introduces improved document preprocessing to remove unwanted headers, footers, and formatting artifacts before chunking. We’ve also optimized chunk size and overlap parameters for better retrieval performance.
Let’s start by improving our document processing pipeline with better text cleaning:
from langchain_text_splitters import RecursiveCharacterTextSplitter
def clean_document_text(text: str) -> str:
"""Clean and preprocess document text."""
lines = text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
# Skip empty lines and unwanted headers/footers
if not line or line.startswith("Page |") or line.lower() in ["home", "service", "about me", "contact"]:
continue
# Skip all-caps headers with 4 words or less
if line.isupper() and len(line.split()) <= 4:
continue
cleaned_lines.append(line)
return "\n".join(cleaned_lines)
def preprocess_documents(documents):
"""Preprocess documents before chunking."""
for doc in documents:
doc.page_content = clean_document_text(doc.page_content)
return documents
def split_documents(documents):
"""Split documents into optimized chunks."""
cleaned_docs = preprocess_documents(documents)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800, # Reduced for better precision
chunk_overlap=300, # Increased overlap for better context
length_function=len,
is_separator_regex=False,
)
all_splits = text_splitter.split_documents(cleaned_docs)
print(f"Split into {len(all_splits)} chunks")
return all_splits
Note: The clean_document_text
function is document-specific. Modify the cleaning logic based on your document format and content structure.
Implementing Redis Memory Management
These functions establish the core Redis integration, providing session-based memory storage and retrieval capabilities. Each user gets isolated chat history managed by unique session IDs.
Now let’s implement the Redis-based memory system:
from langchain.memory.chat_message_histories import RedisChatMessageHistory
import json
def get_memory(session_id: str):
"""Initialize Redis chat memory for a session."""
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379"
)
def get_chat_history_as_json(session_id: str, redis_url="redis://localhost:6379"):
"""Retrieve chat history and convert to JSON format."""
history = RedisChatMessageHistory(session_id=session_id, url=redis_url)
messages = history.messages
# Convert to list of dictionaries
chat_log = [
{"role": msg.type, "message": msg.content}
for msg in messages
]
return json.dumps(chat_log, indent=2)
def clear_user_chat(session_id: str, redis_url="redis://localhost:6379"):
"""Delete the chat history for a specific user/session."""
history = RedisChatMessageHistory(session_id=session_id, url=redis_url)
history.clear()
print(f"Chat history for session '{session_id}' cleared.")
Creating the Memory-Enhanced RAG Chain
The key modification here is updating the prompt template to include chat history and restructuring the chain to handle three inputs: context, question, and conversation history.
Let’s update our RAG chain to incorporate chat memory:
from langchain.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_models import ChatOllama
def create_rag_chain(vector_store, llm_model_name="gemma:2b", context_window=8192):
"""Create the base RAG chain with memory support."""
llm = ChatOllama(
model=llm_model_name,
temperature=0.8, # Slightly higher for more conversational responses
num_ctx=context_window
)
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 3}
)
# Enhanced prompt template with memory support
prompt = ChatPromptTemplate.from_template("""
You are a helpful and informative bot that answers questions using text from the reference Context included below.
Be sure to respond in a complete sentence, being comprehensive, including all relevant background information.
However, you are talking to a non-technical audience, so be sure to break down complicated concepts and strike a friendly and conversational tone.
If the Context is irrelevant to the answer, tell them to contact the company to know more.
Also consider the chat history section to connect with previous conversation with user.
Context:
{context}
Chat History:
{history}
Question: {question}
Answer:
""")
rag_chain = (
{
"context": RunnableLambda(lambda x: retriever.invoke(x["question"])),
"question": RunnablePassthrough(),
"history": RunnablePassthrough()
}
| prompt
| llm
| StrOutputParser()
)
return rag_chain
Integrating Memory with the RAG Chain
This wrapper uses LangChain’s RunnableWithMessageHistory
to automatically manage message storage and retrieval. The rag_call
function handles the conversation flow and returns the complete chat history.
Now let’s create a wrapper that integrates memory management:
from langchain_core.runnables.history import RunnableWithMessageHistory
def create_rag_chain_with_memory(
vector_store,
get_memory,
llm_model_name="gemma:2b",
context_window=8192
):
"""Create a RAG chain with Redis memory integration."""
chain_with_memory = RunnableWithMessageHistory(
create_rag_chain(vector_store, llm_model_name, context_window),
get_memory,
input_messages_key="question",
history_messages_key="history"
)
return chain_with_memory
def rag_call(chain, question, user_id="user_1"):
"""Execute RAG query with memory and return conversation history."""
print("\nQuerying RAG chain...")
print(f"Question: {question}")
response = chain.invoke(
{"question": question},
config={"configurable": {"session_id": user_id}}
)
print("\nResponse generated successfully")
# Return the complete conversation history in JSON format
json_history = get_chat_history_as_json(user_id)
return json_history
Building the Enhanced FastAPI Application
The API now includes three main endpoints: /rag_chat
for conversational queries, /clear_chat
for resetting user conversations, and enhanced error handling throughout. Each endpoint supports user-specific operations.
Let’s create a comprehensive API with memory management endpoints:
from fastapi import FastAPI, Request
from langchain_community.chat_models import ChatOllama
from langchain.vectorstores import Chroma
from langchain.embeddings.ollama import OllamaEmbeddings
app = FastAPI(title="RAG System with Memory", version="2.0")
# Initialize components
print("Loading documents...")
docs = load_documents()
print("Splitting documents...")
chunks = split_documents(docs)
print("Initializing embedding function...")
embedding_function = get_embedding_function()
print("Loading vector store...")
vector_store = get_vector_store(embedding_function)
print("Creating RAG chain with memory...")
rag_chain = create_rag_chain_with_memory(
vector_store,
get_memory,
llm_model_name="gemma:2b"
)
print("RAG system with memory initialized successfully!")
@app.post("/rag_chat")
async def rag_chat_endpoint(request: Request):
"""Main chat endpoint with memory support."""
try:
data = await request.json()
user_id = data.get("user_id", "default_user")
question = data["question"]
response = rag_call(rag_chain, question, user_id)
return {
"status": "success",
"conversation_history": response
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
@app.post("/index_db")
async def index_database_endpoint(request: Request):
"""Re-index the document database."""
try:
vector_store = index_documents(chunks, embedding_function)
return {
"status": "success",
"message": "Database indexing completed successfully"
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
@app.post("/clear_chat")
async def clear_chat_endpoint(request: Request):
"""Clear chat history for a specific user."""
try:
data = await request.json()
user_id = data["user_id"]
clear_user_chat(user_id)
return {
"status": "success",
"message": f"Chat history cleared for user: {user_id}"
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
Running Your Enhanced RAG System
Start Redis Server
# On most systems
redis-server
# Or specify configuration
redis-server /path/to/redis.conf
Start the FastAPI Application
uvicorn your_api_file:app --host 0.0.0.0 --port 8080 --reload
Testing the Memory-Enhanced System
1. Chat with Memory
curl -X POST "http://localhost:8080/rag_chat" \
-H "Content-Type: application/json" \
-d '{
"user_id": "user_123",
"question": "What services does the company provide?"
}'
2. Follow-up Question
curl -X POST "http://localhost:8080/rag_chat" \
-H "Content-Type: application/json" \
-d '{
"user_id": "user_123",
"question": "Can you tell me more about the first service you mentioned?"
}'
3. Clear Chat History
curl -X POST "http://localhost:8080/clear_chat" \
-H "Content-Type: application/json" \
-d '{"user_id": "user_123"}'
Conclusion
We’ve successfully enhanced our local RAG system with sophisticated memory management capabilities. The integration of Redis provides fast, reliable conversation history while maintaining the privacy and control benefits of local deployment.
Our system now offers:
-
Contextual conversations that remember previous interactions
-
Multi-user support with isolated chat histories
-
Production-ready API endpoints with proper error handling
-
Scalable Redis-based memory management
The combination of local LLM processing with Redis memory creates a powerful, privacy-focused conversational AI system that can compete with cloud-based solutions while keeping your data completely under your control.
Ready to implement these enhancements? Start with the code above and customize it for your specific use case. Remember to monitor system performance and adjust parameters based on your hardware capabilities and user requirements.
Comments
Post a Comment