241221 RAG + LangChain 완전 가이드 2025
21 Dec 2025
RAG + LangChain 완전 가이드 2025
RAG(Retrieval-Augmented Generation)와 LangChain의 결합은 2024년을 거쳐 2025년으로 이어지는 AI 개발의 핵심 패러다임이 되었습니다. 본 가이드는 실무에서 바로 적용 가능한 RAG 시스템 구축 방법을 LangChain을 중심으로 상세히 다룹니다.
RAG 시스템 아키텍처 개요
기본 RAG 아키텍처
┌─────────────── RAG 시스템 전체 구조 ───────────────┐
│ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Query │───►│ Retrieval System │ │
│ │ Processing │ │ │ │
│ └─────────────┘ │ ┌─────────────────────┐ │ │
│ │ │ Vector Database │ │ │
│ ┌───────────┤ │ │ │ │
│ │ │ │ • Embeddings │ │ │
│ │ │ │ • Similarity Search │ │ │
│ │ │ │ • Metadata Filter │ │ │
│ │ │ └─────────────────────┘ │ │
│ │ └─────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Context │───────────│ Retrieved │ │
│ │ Aggregation │ │ Documents │ │
│ └─────────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Generation System │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Prompt │ │ LLM │ │ │
│ │ │ Engineering │─►│ (GPT-4, Claude, │ │ │
│ │ │ │ │ Local Models) │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Generated Answer │ │
│ │ with Citations │ │
│ └─────────────────────┘ │
└──────────────────────────────────────────────────┘
LangChain 기반 구현
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader, PyPDFLoader
from langchain.prompts import PromptTemplate
class RAGSystem:
def __init__(self, openai_api_key: str, persist_directory: str = "./chroma_db"):
"""
RAG 시스템 초기화
Args:
openai_api_key: OpenAI API 키
persist_directory: 벡터 데이터베이스 저장 경로
"""
self.embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
self.llm = OpenAI(openai_api_key=openai_api_key, temperature=0.7)
self.persist_directory = persist_directory
self.vectorstore = None
self.retrieval_qa = None
def load_documents(self, file_paths: list, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
문서들을 로드하고 청크로 분할
Args:
file_paths: 로드할 파일 경로들
chunk_size: 청크 크기
chunk_overlap: 청크 간 겹침
"""
documents = []
for file_path in file_paths:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
else:
loader = TextLoader(file_path)
documents.extend(loader.load())
# 텍스트 분할
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
chunks = text_splitter.split_documents(documents)
print(f"총 {len(chunks)}개의 청크로 분할됨")
return chunks
def create_vectorstore(self, documents):
"""
벡터 데이터베이스 생성 및 문서 임베딩
Args:
documents: 분할된 문서 청크들
"""
self.vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
# 데이터베이스 저장
self.vectorstore.persist()
print(f"벡터 데이터베이스가 {self.persist_directory}에 저장됨")
def load_vectorstore(self):
"""
기존 벡터 데이터베이스 로드
"""
self.vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings
)
print("기존 벡터 데이터베이스 로드됨")
def setup_retrieval_qa(self, k: int = 4, search_type: str = "similarity"):
"""
검색-생성 체인 설정
Args:
k: 검색할 문서 개수
search_type: 검색 방식 ('similarity', 'mmr' 등)
"""
if self.vectorstore is None:
raise ValueError("벡터스토어가 초기화되지 않았습니다.")
# 커스텀 프롬프트 템플릿
prompt_template = """
다음 맥락을 바탕으로 질문에 답변해주세요.
답변을 할 수 없다면 "주어진 정보로는 답변할 수 없습니다"라고 말해주세요.
맥락:
{context}
질문: {question}
답변:"""
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
# 검색기 설정
retriever = self.vectorstore.as_retriever(
search_type=search_type,
search_kwargs={"k": k}
)
# RetrievalQA 체인 생성
self.retrieval_qa = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=retriever,
chain_type_kwargs={"prompt": PROMPT},
return_source_documents=True
)
def query(self, question: str):
"""
질문에 대한 답변 생성
Args:
question: 사용자 질문
Returns:
dict: 답변과 소스 문서들
"""
if self.retrieval_qa is None:
raise ValueError("RetrievalQA가 설정되지 않았습니다.")
result = self.retrieval_qa({"query": question})
return {
"answer": result["result"],
"source_documents": result["source_documents"]
}
def add_documents(self, new_documents):
"""
기존 벡터스토어에 새 문서 추가
Args:
new_documents: 새로 추가할 문서들
"""
if self.vectorstore is None:
raise ValueError("벡터스토어가 초기화되지 않았습니다.")
self.vectorstore.add_documents(new_documents)
self.vectorstore.persist()
print(f"{len(new_documents)}개의 문서가 추가됨")
# 사용 예시
if __name__ == "__main__":
# RAG 시스템 초기화
rag = RAGSystem(openai_api_key="your-openai-api-key")
# 문서 로드 및 벡터화
file_paths = ["document1.pdf", "document2.txt", "document3.md"]
documents = rag.load_documents(file_paths)
rag.create_vectorstore(documents)
# 검색-생성 체인 설정
rag.setup_retrieval_qa(k=3)
# 질문 답변
result = rag.query("딥러닝의 주요 특징은 무엇인가요?")
print("답변:", result["answer"])
print("참조 문서 수:", len(result["source_documents"]))
고급 RAG 기법 구현
1. Hierarchical RAG (계층적 검색)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.schema import Document
class HierarchicalRAG:
def __init__(self, embeddings, llm):
self.embeddings = embeddings
self.llm = llm
self.summary_vectorstore = None
self.detail_vectorstore = None
def create_hierarchical_index(self, documents):
"""
계층적 인덱스 생성: 요약 레벨과 상세 레벨
"""
# 1단계: 문서를 큰 청크로 분할 (요약용)
summary_splitter = RecursiveCharacterTextSplitter(
chunk_size=4000,
chunk_overlap=400
)
summary_chunks = summary_splitter.split_documents(documents)
# 2단계: 문서를 작은 청크로 분할 (상세 검색용)
detail_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
detail_chunks = detail_splitter.split_documents(documents)
# 3단계: 각 큰 청크의 요약 생성
summary_documents = []
for i, chunk in enumerate(summary_chunks):
summary_prompt = f"""
다음 텍스트를 2-3문장으로 요약해주세요:
{chunk.page_content}
요약:"""
summary = self.llm(summary_prompt)
summary_doc = Document(
page_content=summary,
metadata={
**chunk.metadata,
"chunk_id": i,
"type": "summary"
}
)
summary_documents.append(summary_doc)
# 4단계: 상세 청크에 부모 청크 ID 추가
for detail_chunk in detail_chunks:
# 어느 요약 청크에 속하는지 찾기
for i, summary_chunk in enumerate(summary_chunks):
if self._chunks_overlap(detail_chunk, summary_chunk):
detail_chunk.metadata["parent_chunk_id"] = i
break
# 5단계: 벡터스토어 생성
self.summary_vectorstore = FAISS.from_documents(
summary_documents, self.embeddings
)
self.detail_vectorstore = FAISS.from_documents(
detail_chunks, self.embeddings
)
def _chunks_overlap(self, detail_chunk, summary_chunk):
"""
두 청크가 겹치는지 확인 (간단한 휴리스틱)
"""
detail_words = set(detail_chunk.page_content.split()[:10])
summary_words = set(summary_chunk.page_content.split())
overlap = len(detail_words & summary_words)
return overlap > 3
def hierarchical_search(self, query, top_k_summary=2, top_k_detail=5):
"""
계층적 검색: 먼저 요약에서 검색 후 관련 상세 청크 검색
"""
# 1단계: 요약 레벨에서 검색
summary_results = self.summary_vectorstore.similarity_search(
query, k=top_k_summary
)
relevant_chunk_ids = [
doc.metadata["chunk_id"] for doc in summary_results
]
# 2단계: 관련 상세 청크들 검색
detail_results = []
for chunk_id in relevant_chunk_ids:
# 해당 부모 청크에 속하는 상세 청크들 검색
detail_docs = self.detail_vectorstore.similarity_search(
query,
k=top_k_detail,
filter={"parent_chunk_id": chunk_id}
)
detail_results.extend(detail_docs)
# 3단계: 상세 결과에서 가장 관련성 높은 것들 선별
if len(detail_results) > top_k_detail:
detail_scores = []
for doc in detail_results:
# 재검색하여 스코어 계산
score = self._calculate_similarity_score(query, doc.page_content)
detail_scores.append((score, doc))
detail_scores.sort(reverse=True)
detail_results = [doc for _, doc in detail_scores[:top_k_detail]]
return {
"summary_results": summary_results,
"detail_results": detail_results
}
def _calculate_similarity_score(self, query, text):
"""
간단한 유사도 스코어 계산
"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform([query, text])
similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])
return similarity[0][0]
2. Multi-Modal RAG (멀티모달 검색)
from langchain.schema import Document
import base64
from PIL import Image
import io
class MultiModalRAG:
def __init__(self, text_embeddings, image_embeddings, llm):
self.text_embeddings = text_embeddings
self.image_embeddings = image_embeddings
self.llm = llm
self.text_vectorstore = None
self.image_vectorstore = None
def process_mixed_documents(self, documents):
"""
텍스트와 이미지가 혼재된 문서 처리
"""
text_documents = []
image_documents = []
for doc in documents:
if doc.metadata.get("type") == "image":
# 이미지 문서 처리
image_description = self._describe_image(doc.page_content)
image_doc = Document(
page_content=image_description,
metadata={
**doc.metadata,
"image_path": doc.page_content,
"description": image_description
}
)
image_documents.append(image_doc)
else:
# 텍스트 문서 처리
text_documents.append(doc)
# 벡터스토어 생성
if text_documents:
self.text_vectorstore = FAISS.from_documents(
text_documents, self.text_embeddings
)
if image_documents:
self.image_vectorstore = FAISS.from_documents(
image_documents, self.text_embeddings # 이미지 설명을 텍스트로 임베딩
)
def _describe_image(self, image_path):
"""
이미지 설명 생성 (실제로는 CLIP 등의 모델 사용)
"""
# 여기서는 간단한 예시
# 실제로는 CLIP, BLIP 등의 이미지-텍스트 모델 사용
return f"이미지 파일: {image_path}"
def multimodal_search(self, query, search_text=True, search_images=True, k=5):
"""
멀티모달 검색
"""
results = []
if search_text and self.text_vectorstore:
text_results = self.text_vectorstore.similarity_search(query, k=k)
for doc in text_results:
doc.metadata["modality"] = "text"
results.extend(text_results)
if search_images and self.image_vectorstore:
image_results = self.image_vectorstore.similarity_search(query, k=k)
for doc in image_results:
doc.metadata["modality"] = "image"
results.extend(image_results)
# 관련성 기준으로 재정렬
return self._rerank_multimodal_results(query, results, k)
def _rerank_multimodal_results(self, query, results, top_k):
"""
멀티모달 결과 재순위화
"""
# 간단한 구현 - 실제로는 더 정교한 재순위화 필요
scored_results = []
for doc in results:
score = self._calculate_multimodal_score(query, doc)
scored_results.append((score, doc))
scored_results.sort(reverse=True)
return [doc for _, doc in scored_results[:top_k]]
def _calculate_multimodal_score(self, query, doc):
"""
멀티모달 스코어 계산
"""
# 텍스트와 이미지에 대한 가중치 적용
base_score = self._calculate_similarity_score(query, doc.page_content)
if doc.metadata.get("modality") == "image":
# 이미지의 경우 약간의 보정 적용
base_score *= 0.9
return base_score
3. Self-Querying RAG
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
class SelfQueryingRAG:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
self.setup_metadata_field_info()
def setup_metadata_field_info(self):
"""
메타데이터 필드 정보 설정
"""
self.metadata_field_info = [
AttributeInfo(
name="source",
description="문서의 출처",
type="string",
),
AttributeInfo(
name="page",
description="문서의 페이지 번호",
type="integer",
),
AttributeInfo(
name="author",
description="문서의 작성자",
type="string",
),
AttributeInfo(
name="publication_date",
description="문서의 발행일",
type="string",
),
AttributeInfo(
name="category",
description="문서의 카테고리",
type="string",
),
]
self.document_content_description = "다양한 주제에 관한 문서들"
def create_self_query_retriever(self):
"""
Self-Query 검색기 생성
"""
self.retriever = SelfQueryRetriever.from_llm(
self.llm,
self.vectorstore,
self.document_content_description,
self.metadata_field_info,
verbose=True
)
def query_with_filters(self, query_text):
"""
자연어 쿼리에서 필터 자동 추출하여 검색
"""
# Self-Query 검색기가 자동으로 필터 조건 추출
results = self.retriever.get_relevant_documents(query_text)
return results
def query_with_explicit_filters(self, query_text, filters):
"""
명시적 필터와 함께 검색
"""
# 메타데이터 필터 적용
filtered_results = self.vectorstore.similarity_search(
query_text,
filter=filters,
k=5
)
return filtered_results
# 사용 예시
self_query_rag = SelfQueryingRAG(vectorstore, llm)
self_query_rag.create_self_query_retriever()
# 자연어로 필터 조건 포함한 검색
results = self_query_rag.query_with_filters(
"2023년에 발행된 머신러닝 관련 논문을 찾아주세요"
)
# 명시적 필터 검색
filtered_results = self_query_rag.query_with_explicit_filters(
"딥러닝 알고리즘",
{"category": "AI", "publication_date": "2023"}
)
성능 최적화 전략
1. 임베딩 최적화
import numpy as np
from typing import List, Dict, Any
import faiss
class OptimizedEmbeddingStore:
def __init__(self, dimension: int = 1536):
self.dimension = dimension
self.index = None
self.documents = []
self.metadata = []
def build_hnsw_index(self, embeddings: np.ndarray, M: int = 16, ef_construction: int = 200):
"""
HNSW 인덱스 구성으로 빠른 근사 검색
"""
self.index = faiss.IndexHNSWFlat(self.dimension, M)
self.index.hnsw.efConstruction = ef_construction
# 임베딩 정규화
faiss.normalize_L2(embeddings)
# 인덱스에 벡터 추가
self.index.add(embeddings.astype('float32'))
def build_ivf_index(self, embeddings: np.ndarray, nlist: int = 100):
"""
IVF 인덱스 구성으로 메모리 효율적 검색
"""
quantizer = faiss.IndexFlatL2(self.dimension)
self.index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
# 훈련 데이터로 클러스터링
self.index.train(embeddings.astype('float32'))
self.index.add(embeddings.astype('float32'))
# 검색 매개변수 설정
self.index.nprobe = 10 # 검색할 클러스터 수
def search(self, query_embedding: np.ndarray, k: int = 5) -> List[Dict[str, Any]]:
"""
최적화된 검색
"""
query_embedding = query_embedding.reshape(1, -1).astype('float32')
faiss.normalize_L2(query_embedding)
scores, indices = self.index.search(query_embedding, k)
results = []
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
if idx != -1: # 유효한 인덱스인 경우
results.append({
"document": self.documents[idx],
"metadata": self.metadata[idx],
"score": float(score),
"rank": i + 1
})
return results
def add_documents(self, documents: List[str], embeddings: np.ndarray, metadata: List[Dict]):
"""
문서와 임베딩 추가
"""
self.documents.extend(documents)
self.metadata.extend(metadata)
if self.index is None:
self.build_hnsw_index(embeddings)
else:
# 기존 인덱스에 추가
faiss.normalize_L2(embeddings)
self.index.add(embeddings.astype('float32'))
class EmbeddingOptimizer:
def __init__(self):
pass
def batch_embed_documents(self, texts: List[str], embeddings_model, batch_size: int = 32):
"""
배치 처리로 임베딩 생성 최적화
"""
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_embeddings = embeddings_model.embed_documents(batch)
embeddings.extend(batch_embeddings)
return np.array(embeddings)
def reduce_embedding_dimension(self, embeddings: np.ndarray, target_dim: int = 768):
"""
PCA를 사용한 임베딩 차원 축소
"""
from sklearn.decomposition import PCA
pca = PCA(n_components=target_dim)
reduced_embeddings = pca.fit_transform(embeddings)
return reduced_embeddings, pca
def quantize_embeddings(self, embeddings: np.ndarray, bits: int = 8):
"""
임베딩 양자화로 메모리 사용량 감소
"""
# 8비트 양자화
min_val = np.min(embeddings)
max_val = np.max(embeddings)
scale = (max_val - min_val) / (2**bits - 1)
quantized = np.round((embeddings - min_val) / scale).astype(np.uint8)
return quantized, min_val, scale
def dequantize_embeddings(self, quantized: np.ndarray, min_val: float, scale: float):
"""
양자화된 임베딩 복원
"""
return quantized.astype(np.float32) * scale + min_val
2. 검색 결과 재순위화
from sentence_transformers import CrossEncoder
import torch
class AdvancedReranker:
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.cross_encoder = CrossEncoder(model_name)
def rerank_with_cross_encoder(self, query: str, documents: List[str], top_k: int = 5):
"""
Cross-Encoder를 사용한 재순위화
"""
# 쿼리-문서 쌍 생성
query_doc_pairs = [(query, doc) for doc in documents]
# Cross-Encoder로 스코어 계산
scores = self.cross_encoder.predict(query_doc_pairs)
# 스코어 기준으로 정렬
doc_score_pairs = list(zip(documents, scores))
doc_score_pairs.sort(key=lambda x: x[1], reverse=True)
return doc_score_pairs[:top_k]
def diversity_rerank(self, query: str, documents: List[str], alpha: float = 0.7):
"""
다양성을 고려한 재순위화 (MMR - Maximal Marginal Relevance)
"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# TF-IDF 벡터화
vectorizer = TfidfVectorizer()
all_texts = [query] + documents
tfidf_matrix = vectorizer.fit_transform(all_texts)
query_vector = tfidf_matrix[0]
doc_vectors = tfidf_matrix[1:]
# 관련성 스코어 계산
relevance_scores = cosine_similarity(query_vector, doc_vectors).flatten()
# MMR 알고리즘
selected_indices = []
remaining_indices = list(range(len(documents)))
while remaining_indices and len(selected_indices) < len(documents):
mmr_scores = []
for idx in remaining_indices:
relevance = relevance_scores[idx]
if not selected_indices:
diversity = 0
else:
# 이미 선택된 문서들과의 최대 유사도
selected_vectors = doc_vectors[selected_indices]
similarities = cosine_similarity(
doc_vectors[idx:idx+1], selected_vectors
).flatten()
diversity = np.max(similarities)
mmr_score = alpha * relevance - (1 - alpha) * diversity
mmr_scores.append((mmr_score, idx))
# 가장 높은 MMR 스코어 선택
best_score, best_idx = max(mmr_scores)
selected_indices.append(best_idx)
remaining_indices.remove(best_idx)
return [(documents[idx], relevance_scores[idx]) for idx in selected_indices]
class HybridRetriever:
def __init__(self, dense_retriever, sparse_retriever, reranker):
self.dense_retriever = dense_retriever
self.sparse_retriever = sparse_retriever
self.reranker = reranker
def hybrid_search(self, query: str, dense_weight: float = 0.7, top_k: int = 5):
"""
Dense + Sparse 하이브리드 검색
"""
# Dense 검색 (벡터 유사도)
dense_results = self.dense_retriever.similarity_search(query, k=top_k*2)
# Sparse 검색 (BM25)
sparse_results = self.sparse_retriever.search(query, k=top_k*2)
# 결과 병합 및 가중치 적용
combined_scores = {}
for i, doc in enumerate(dense_results):
doc_id = id(doc)
dense_score = 1.0 / (i + 1) # 순위 기반 스코어
combined_scores[doc_id] = {
'document': doc,
'dense_score': dense_score,
'sparse_score': 0.0
}
for i, doc in enumerate(sparse_results):
doc_id = id(doc)
sparse_score = 1.0 / (i + 1)
if doc_id in combined_scores:
combined_scores[doc_id]['sparse_score'] = sparse_score
else:
combined_scores[doc_id] = {
'document': doc,
'dense_score': 0.0,
'sparse_score': sparse_score
}
# 하이브리드 스코어 계산
hybrid_results = []
for doc_info in combined_scores.values():
hybrid_score = (
dense_weight * doc_info['dense_score'] +
(1 - dense_weight) * doc_info['sparse_score']
)
hybrid_results.append((doc_info['document'], hybrid_score))
# 스코어 기준 정렬
hybrid_results.sort(key=lambda x: x[1], reverse=True)
# 최종 재순위화
top_docs = [doc for doc, _ in hybrid_results[:top_k*2]]
doc_texts = [doc.page_content for doc in top_docs]
reranked = self.reranker.rerank_with_cross_encoder(query, doc_texts, top_k)
return reranked
실시간 업데이트 시스템
1. 증분 인덱싱
import threading
import time
from queue import Queue
from typing import List, Dict, Any
class IncrementalIndexer:
def __init__(self, vectorstore, embeddings_model, batch_size: int = 10):
self.vectorstore = vectorstore
self.embeddings_model = embeddings_model
self.batch_size = batch_size
self.update_queue = Queue()
self.is_running = False
self.worker_thread = None
def start_worker(self):
"""
백그라운드 인덱싱 워커 시작
"""
self.is_running = True
self.worker_thread = threading.Thread(target=self._process_updates)
self.worker_thread.daemon = True
self.worker_thread.start()
def stop_worker(self):
"""
백그라운드 워커 중지
"""
self.is_running = False
if self.worker_thread:
self.worker_thread.join()
def add_document(self, document: str, metadata: Dict[str, Any]):
"""
문서 추가 요청을 큐에 넣기
"""
self.update_queue.put({
'action': 'add',
'document': document,
'metadata': metadata
})
def update_document(self, doc_id: str, document: str, metadata: Dict[str, Any]):
"""
문서 업데이트 요청
"""
self.update_queue.put({
'action': 'update',
'doc_id': doc_id,
'document': document,
'metadata': metadata
})
def delete_document(self, doc_id: str):
"""
문서 삭제 요청
"""
self.update_queue.put({
'action': 'delete',
'doc_id': doc_id
})
def _process_updates(self):
"""
배치 단위로 업데이트 처리
"""
batch_operations = []
while self.is_running:
try:
# 큐에서 작업 가져오기 (타임아웃 설정)
operation = self.update_queue.get(timeout=1.0)
batch_operations.append(operation)
# 배치 크기에 도달하면 처리
if len(batch_operations) >= self.batch_size:
self._execute_batch(batch_operations)
batch_operations = []
except:
# 타임아웃 발생 시 현재 배치 처리
if batch_operations:
self._execute_batch(batch_operations)
batch_operations = []
time.sleep(0.1) # CPU 과부하 방지
# 남은 작업 처리
if batch_operations:
self._execute_batch(batch_operations)
def _execute_batch(self, operations: List[Dict[str, Any]]):
"""
배치 작업 실행
"""
add_docs = []
add_metadatas = []
for op in operations:
if op['action'] == 'add':
add_docs.append(op['document'])
add_metadatas.append(op['metadata'])
elif op['action'] == 'update':
# 기존 문서 삭제 후 새로 추가
self._delete_by_id(op['doc_id'])
add_docs.append(op['document'])
add_metadatas.append({**op['metadata'], 'doc_id': op['doc_id']})
elif op['action'] == 'delete':
self._delete_by_id(op['doc_id'])
# 새 문서들 일괄 추가
if add_docs:
embeddings = self.embeddings_model.embed_documents(add_docs)
self.vectorstore.add_texts(add_docs, add_metadatas, embeddings)
print(f"배치 처리 완료: {len(operations)}개 작업")
def _delete_by_id(self, doc_id: str):
"""
문서 ID로 삭제
"""
try:
# 벡터스토어에 따라 구현 방식이 다름
if hasattr(self.vectorstore, 'delete'):
self.vectorstore.delete([doc_id])
except Exception as e:
print(f"문서 삭제 실패 {doc_id}: {e}")
class RealTimeRAG:
def __init__(self, vectorstore, embeddings_model, llm):
self.vectorstore = vectorstore
self.embeddings_model = embeddings_model
self.llm = llm
self.indexer = IncrementalIndexer(vectorstore, embeddings_model)
# 캐시 시스템
self.query_cache = {}
self.cache_ttl = 300 # 5분
def start_real_time_updates(self):
"""
실시간 업데이트 시작
"""
self.indexer.start_worker()
def stop_real_time_updates(self):
"""
실시간 업데이트 중지
"""
self.indexer.stop_worker()
def add_document_async(self, document: str, metadata: Dict[str, Any]):
"""
비동기 문서 추가
"""
# 캐시 무효화
self._invalidate_cache()
# 인덱싱 큐에 추가
self.indexer.add_document(document, metadata)
def query_with_cache(self, query: str, force_refresh: bool = False):
"""
캐시를 활용한 쿼리
"""
cache_key = hash(query)
current_time = time.time()
# 캐시 확인
if not force_refresh and cache_key in self.query_cache:
cached_result, timestamp = self.query_cache[cache_key]
if current_time - timestamp < self.cache_ttl:
return cached_result
# 새로운 검색 수행
results = self.vectorstore.similarity_search(query, k=5)
# 캐시 저장
self.query_cache[cache_key] = (results, current_time)
return results
def _invalidate_cache(self):
"""
캐시 무효화
"""
self.query_cache.clear()
# 사용 예시
realtime_rag = RealTimeRAG(vectorstore, embeddings_model, llm)
realtime_rag.start_real_time_updates()
# 실시간으로 문서 추가
realtime_rag.add_document_async(
"새로운 AI 연구 결과가 발표되었습니다.",
{"source": "news", "timestamp": time.time()}
)
# 캐시된 검색
results = realtime_rag.query_with_cache("AI 연구")
모니터링 및 평가 시스템
1. RAG 성능 메트릭
import json
import time
from typing import List, Dict, Tuple
from dataclasses import dataclass
@dataclass
class RAGMetrics:
query: str
retrieved_docs: List[str]
generated_answer: str
ground_truth: str
retrieval_time: float
generation_time: float
relevance_scores: List[float]
context_precision: float
context_recall: float
answer_correctness: float
answer_similarity: float
class RAGEvaluator:
def __init__(self, evaluation_llm):
self.evaluation_llm = evaluation_llm
def evaluate_retrieval_relevance(self, query: str, documents: List[str]) -> List[float]:
"""
검색된 문서의 관련성 평가
"""
relevance_scores = []
for doc in documents:
prompt = f"""
질문과 문서의 관련성을 0-1 사이의 점수로 평가해주세요.
0: 전혀 관련 없음, 1: 매우 관련 있음
질문: {query}
문서: {doc[:500]}...
관련성 점수 (0-1):"""
try:
response = self.evaluation_llm(prompt)
score = float(response.strip())
relevance_scores.append(min(max(score, 0), 1))
except:
relevance_scores.append(0.0)
return relevance_scores
def calculate_context_precision(self, relevance_scores: List[float], k: int = None) -> float:
"""
Context Precision 계산: 상위 k개 중 관련 있는 문서의 비율
"""
if k is None:
k = len(relevance_scores)
top_k_scores = relevance_scores[:k]
relevant_count = sum(1 for score in top_k_scores if score > 0.5)
return relevant_count / k if k > 0 else 0
def calculate_context_recall(self, retrieved_docs: List[str], ground_truth_docs: List[str]) -> float:
"""
Context Recall 계산: 관련 문서 중 검색된 문서의 비율
"""
if not ground_truth_docs:
return 1.0
retrieved_set = set(retrieved_docs)
ground_truth_set = set(ground_truth_docs)
overlap = len(retrieved_set.intersection(ground_truth_set))
return overlap / len(ground_truth_set)
def evaluate_answer_correctness(self, generated_answer: str, ground_truth: str) -> float:
"""
답변 정확성 평가
"""
prompt = f"""
다음 두 답변의 사실적 정확성을 비교하여 0-1 사이의 점수로 평가해주세요.
0: 완전히 틀림, 1: 완전히 정확함
정답: {ground_truth}
생성된 답변: {generated_answer}
정확성 점수 (0-1):"""
try:
response = self.evaluation_llm(prompt)
score = float(response.strip())
return min(max(score, 0), 1)
except:
return 0.0
def evaluate_answer_similarity(self, generated_answer: str, ground_truth: str) -> float:
"""
답변 유사성 평가 (의미적 유사성)
"""
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode([generated_answer, ground_truth])
similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
return float(similarity)
def comprehensive_evaluation(self,
query: str,
retrieved_docs: List[str],
generated_answer: str,
ground_truth: str,
ground_truth_docs: List[str] = None,
retrieval_time: float = 0,
generation_time: float = 0) -> RAGMetrics:
"""
종합적인 RAG 평가
"""
# 검색 관련성 평가
relevance_scores = self.evaluate_retrieval_relevance(query, retrieved_docs)
# Context Precision & Recall
context_precision = self.calculate_context_precision(relevance_scores)
context_recall = self.calculate_context_recall(retrieved_docs, ground_truth_docs or [])
# 답변 평가
answer_correctness = self.evaluate_answer_correctness(generated_answer, ground_truth)
answer_similarity = self.evaluate_answer_similarity(generated_answer, ground_truth)
return RAGMetrics(
query=query,
retrieved_docs=retrieved_docs,
generated_answer=generated_answer,
ground_truth=ground_truth,
retrieval_time=retrieval_time,
generation_time=generation_time,
relevance_scores=relevance_scores,
context_precision=context_precision,
context_recall=context_recall,
answer_correctness=answer_correctness,
answer_similarity=answer_similarity
)
class RAGMonitor:
def __init__(self, evaluator: RAGEvaluator):
self.evaluator = evaluator
self.metrics_history = []
def log_query(self, metrics: RAGMetrics):
"""
쿼리 메트릭 로깅
"""
self.metrics_history.append(metrics)
# 실시간 성능 모니터링
if len(self.metrics_history) % 10 == 0:
self._print_performance_summary()
def _print_performance_summary(self):
"""
성능 요약 출력
"""
recent_metrics = self.metrics_history[-10:]
avg_retrieval_time = sum(m.retrieval_time for m in recent_metrics) / len(recent_metrics)
avg_generation_time = sum(m.generation_time for m in recent_metrics) / len(recent_metrics)
avg_precision = sum(m.context_precision for m in recent_metrics) / len(recent_metrics)
avg_recall = sum(m.context_recall for m in recent_metrics) / len(recent_metrics)
avg_correctness = sum(m.answer_correctness for m in recent_metrics) / len(recent_metrics)
print(f"""
=== RAG 성능 모니터링 (최근 10개 쿼리) ===
평균 검색 시간: {avg_retrieval_time:.3f}초
평균 생성 시간: {avg_generation_time:.3f}초
평균 Context Precision: {avg_precision:.3f}
평균 Context Recall: {avg_recall:.3f}
평균 답변 정확성: {avg_correctness:.3f}
=======================================
""")
def export_metrics(self, filename: str):
"""
메트릭을 JSON 파일로 출력
"""
metrics_data = []
for m in self.metrics_history:
metrics_data.append({
"query": m.query,
"retrieved_docs_count": len(m.retrieved_docs),
"retrieval_time": m.retrieval_time,
"generation_time": m.generation_time,
"context_precision": m.context_precision,
"context_recall": m.context_recall,
"answer_correctness": m.answer_correctness,
"answer_similarity": m.answer_similarity
})
with open(filename, 'w', encoding='utf-8') as f:
json.dump(metrics_data, f, ensure_ascii=False, indent=2)
print(f"메트릭이 {filename}에 저장되었습니다.")
# 사용 예시
evaluator = RAGEvaluator(evaluation_llm)
monitor = RAGMonitor(evaluator)
# RAG 쿼리 실행 및 평가
def monitored_rag_query(rag_system, query, ground_truth):
start_time = time.time()
# 검색
retrieval_start = time.time()
retrieved_docs = rag_system.vectorstore.similarity_search(query, k=5)
retrieval_time = time.time() - retrieval_start
# 생성
generation_start = time.time()
result = rag_system.query(query)
generation_time = time.time() - generation_start
# 평가
metrics = evaluator.comprehensive_evaluation(
query=query,
retrieved_docs=[doc.page_content for doc in retrieved_docs],
generated_answer=result["answer"],
ground_truth=ground_truth,
retrieval_time=retrieval_time,
generation_time=generation_time
)
# 모니터링
monitor.log_query(metrics)
return result
결론
RAG + LangChain 시스템의 성공적인 구축을 위해서는 다음 핵심 요소들이 중요합니다:
1. 아키텍처 설계
- 모듈화된 구조로 각 컴포넌트를 독립적으로 최적화
- 확장성을 고려한 벡터 데이터베이스 선택
- 실시간 업데이트 지원 시스템
2. 성능 최적화
- 하이브리드 검색 (Dense + Sparse)
- 효율적인 임베딩 및 인덱싱
- 계층적 검색과 재순위화
3. 품질 관리
- 종합적인 평가 메트릭
- 실시간 모니터링 시스템
- A/B 테스팅 기반 지속적 개선
4. 사용자 경험
- 빠른 응답 시간
- 관련성 높은 결과
- 신뢰할 수 있는 출처 정보
2025년에는 RAG 기술이 더욱 정교해지고, 멀티모달 지원과 실시간 학습 기능이 강화될 것으로 예상됩니다. 이러한 트렌드에 맞춰 지속적으로 시스템을 개선하고 최신 기술을 적용하는 것이 중요합니다.