황현동 블로그 개발, 인생, 유우머

241221 RAG + LangChain 완전 가이드 2025

Tags:

RAG + LangChain 완전 가이드 2025

RAG(Retrieval-Augmented Generation)와 LangChain의 결합은 2024년을 거쳐 2025년으로 이어지는 AI 개발의 핵심 패러다임이 되었습니다. 본 가이드는 실무에서 바로 적용 가능한 RAG 시스템 구축 방법을 LangChain을 중심으로 상세히 다룹니다.

RAG 시스템 아키텍처 개요

기본 RAG 아키텍처

┌─────────────── RAG 시스템 전체 구조 ───────────────┐
│                                                  │
│  ┌─────────────┐    ┌─────────────────────────┐  │
│  │   Query     │───►│   Retrieval System      │  │
│  │ Processing  │    │                         │  │
│  └─────────────┘    │ ┌─────────────────────┐ │  │
│                     │ │  Vector Database    │ │  │
│         ┌───────────┤ │                     │ │  │
│         │           │ │ • Embeddings        │ │  │
│         │           │ │ • Similarity Search │ │  │
│         │           │ │ • Metadata Filter   │ │  │
│         │           │ └─────────────────────┘ │  │
│         │           └─────────────────────────┘  │
│         │                         │              │
│         ▼                         ▼              │
│  ┌─────────────┐           ┌─────────────┐       │
│  │   Context   │───────────│  Retrieved  │       │
│  │ Aggregation │           │  Documents  │       │
│  └─────────────┘           └─────────────┘       │
│         │                                        │
│         ▼                                        │
│  ┌─────────────────────────────────────────────┐ │
│  │           Generation System                  │ │
│  │                                             │ │
│  │ ┌─────────────┐  ┌─────────────────────┐   │ │
│  │ │   Prompt    │  │      LLM            │   │ │
│  │ │ Engineering │─►│   (GPT-4, Claude,   │   │ │
│  │ │             │  │    Local Models)    │   │ │
│  │ └─────────────┘  └─────────────────────┘   │ │
│  └─────────────────────────────────────────────┘ │
│                         │                        │
│                         ▼                        │
│              ┌─────────────────────┐              │
│              │  Generated Answer   │              │
│              │   with Citations    │              │
│              └─────────────────────┘              │
└──────────────────────────────────────────────────┘

LangChain 기반 구현

from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader, PyPDFLoader
from langchain.prompts import PromptTemplate

class RAGSystem:
    def __init__(self, openai_api_key: str, persist_directory: str = "./chroma_db"):
        """
        RAG 시스템 초기화
        
        Args:
            openai_api_key: OpenAI API 키
            persist_directory: 벡터 데이터베이스 저장 경로
        """
        self.embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
        self.llm = OpenAI(openai_api_key=openai_api_key, temperature=0.7)
        self.persist_directory = persist_directory
        self.vectorstore = None
        self.retrieval_qa = None
        
    def load_documents(self, file_paths: list, chunk_size: int = 1000, chunk_overlap: int = 200):
        """
        문서들을 로드하고 청크로 분할
        
        Args:
            file_paths: 로드할 파일 경로들
            chunk_size: 청크 크기
            chunk_overlap: 청크 간 겹침
        """
        documents = []
        
        for file_path in file_paths:
            if file_path.endswith('.pdf'):
                loader = PyPDFLoader(file_path)
            else:
                loader = TextLoader(file_path)
            
            documents.extend(loader.load())
        
        # 텍스트 분할
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
        )
        
        chunks = text_splitter.split_documents(documents)
        print(f"총 {len(chunks)}개의 청크로 분할됨")
        
        return chunks
    
    def create_vectorstore(self, documents):
        """
        벡터 데이터베이스 생성 및 문서 임베딩
        
        Args:
            documents: 분할된 문서 청크들
        """
        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings,
            persist_directory=self.persist_directory
        )
        
        # 데이터베이스 저장
        self.vectorstore.persist()
        print(f"벡터 데이터베이스가 {self.persist_directory}에 저장됨")
    
    def load_vectorstore(self):
        """
        기존 벡터 데이터베이스 로드
        """
        self.vectorstore = Chroma(
            persist_directory=self.persist_directory,
            embedding_function=self.embeddings
        )
        print("기존 벡터 데이터베이스 로드됨")
    
    def setup_retrieval_qa(self, k: int = 4, search_type: str = "similarity"):
        """
        검색-생성 체인 설정
        
        Args:
            k: 검색할 문서 개수
            search_type: 검색 방식 ('similarity', 'mmr' 등)
        """
        if self.vectorstore is None:
            raise ValueError("벡터스토어가 초기화되지 않았습니다.")
        
        # 커스텀 프롬프트 템플릿
        prompt_template = """
다음 맥락을 바탕으로 질문에 답변해주세요. 
답변을 할 수 없다면 "주어진 정보로는 답변할 수 없습니다"라고 말해주세요.

맥락:
{context}

질문: {question}

답변:"""
        
        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )
        
        # 검색기 설정
        retriever = self.vectorstore.as_retriever(
            search_type=search_type,
            search_kwargs={"k": k}
        )
        
        # RetrievalQA 체인 생성
        self.retrieval_qa = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=retriever,
            chain_type_kwargs={"prompt": PROMPT},
            return_source_documents=True
        )
    
    def query(self, question: str):
        """
        질문에 대한 답변 생성
        
        Args:
            question: 사용자 질문
        
        Returns:
            dict: 답변과 소스 문서들
        """
        if self.retrieval_qa is None:
            raise ValueError("RetrievalQA가 설정되지 않았습니다.")
        
        result = self.retrieval_qa({"query": question})
        
        return {
            "answer": result["result"],
            "source_documents": result["source_documents"]
        }
    
    def add_documents(self, new_documents):
        """
        기존 벡터스토어에 새 문서 추가
        
        Args:
            new_documents: 새로 추가할 문서들
        """
        if self.vectorstore is None:
            raise ValueError("벡터스토어가 초기화되지 않았습니다.")
        
        self.vectorstore.add_documents(new_documents)
        self.vectorstore.persist()
        print(f"{len(new_documents)}개의 문서가 추가됨")

# 사용 예시
if __name__ == "__main__":
    # RAG 시스템 초기화
    rag = RAGSystem(openai_api_key="your-openai-api-key")
    
    # 문서 로드 및 벡터화
    file_paths = ["document1.pdf", "document2.txt", "document3.md"]
    documents = rag.load_documents(file_paths)
    rag.create_vectorstore(documents)
    
    # 검색-생성 체인 설정
    rag.setup_retrieval_qa(k=3)
    
    # 질문 답변
    result = rag.query("딥러닝의 주요 특징은 무엇인가요?")
    print("답변:", result["answer"])
    print("참조 문서 수:", len(result["source_documents"]))

고급 RAG 기법 구현

1. Hierarchical RAG (계층적 검색)

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.schema import Document

class HierarchicalRAG:
    def __init__(self, embeddings, llm):
        self.embeddings = embeddings
        self.llm = llm
        self.summary_vectorstore = None
        self.detail_vectorstore = None
        
    def create_hierarchical_index(self, documents):
        """
        계층적 인덱스 생성: 요약 레벨과 상세 레벨
        """
        # 1단계: 문서를 큰 청크로 분할 (요약용)
        summary_splitter = RecursiveCharacterTextSplitter(
            chunk_size=4000,
            chunk_overlap=400
        )
        summary_chunks = summary_splitter.split_documents(documents)
        
        # 2단계: 문서를 작은 청크로 분할 (상세 검색용)
        detail_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        detail_chunks = detail_splitter.split_documents(documents)
        
        # 3단계: 각 큰 청크의 요약 생성
        summary_documents = []
        for i, chunk in enumerate(summary_chunks):
            summary_prompt = f"""
다음 텍스트를 2-3문장으로 요약해주세요:

{chunk.page_content}

요약:"""
            
            summary = self.llm(summary_prompt)
            summary_doc = Document(
                page_content=summary,
                metadata={
                    **chunk.metadata,
                    "chunk_id": i,
                    "type": "summary"
                }
            )
            summary_documents.append(summary_doc)
        
        # 4단계: 상세 청크에 부모 청크 ID 추가
        for detail_chunk in detail_chunks:
            # 어느 요약 청크에 속하는지 찾기
            for i, summary_chunk in enumerate(summary_chunks):
                if self._chunks_overlap(detail_chunk, summary_chunk):
                    detail_chunk.metadata["parent_chunk_id"] = i
                    break
        
        # 5단계: 벡터스토어 생성
        self.summary_vectorstore = FAISS.from_documents(
            summary_documents, self.embeddings
        )
        self.detail_vectorstore = FAISS.from_documents(
            detail_chunks, self.embeddings
        )
    
    def _chunks_overlap(self, detail_chunk, summary_chunk):
        """
        두 청크가 겹치는지 확인 (간단한 휴리스틱)
        """
        detail_words = set(detail_chunk.page_content.split()[:10])
        summary_words = set(summary_chunk.page_content.split())
        overlap = len(detail_words & summary_words)
        return overlap > 3
    
    def hierarchical_search(self, query, top_k_summary=2, top_k_detail=5):
        """
        계층적 검색: 먼저 요약에서 검색 후 관련 상세 청크 검색
        """
        # 1단계: 요약 레벨에서 검색
        summary_results = self.summary_vectorstore.similarity_search(
            query, k=top_k_summary
        )
        
        relevant_chunk_ids = [
            doc.metadata["chunk_id"] for doc in summary_results
        ]
        
        # 2단계: 관련 상세 청크들 검색
        detail_results = []
        for chunk_id in relevant_chunk_ids:
            # 해당 부모 청크에 속하는 상세 청크들 검색
            detail_docs = self.detail_vectorstore.similarity_search(
                query,
                k=top_k_detail,
                filter={"parent_chunk_id": chunk_id}
            )
            detail_results.extend(detail_docs)
        
        # 3단계: 상세 결과에서 가장 관련성 높은 것들 선별
        if len(detail_results) > top_k_detail:
            detail_scores = []
            for doc in detail_results:
                # 재검색하여 스코어 계산
                score = self._calculate_similarity_score(query, doc.page_content)
                detail_scores.append((score, doc))
            
            detail_scores.sort(reverse=True)
            detail_results = [doc for _, doc in detail_scores[:top_k_detail]]
        
        return {
            "summary_results": summary_results,
            "detail_results": detail_results
        }
    
    def _calculate_similarity_score(self, query, text):
        """
        간단한 유사도 스코어 계산
        """
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.metrics.pairwise import cosine_similarity
        
        vectorizer = TfidfVectorizer()
        tfidf_matrix = vectorizer.fit_transform([query, text])
        similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])
        return similarity[0][0]

2. Multi-Modal RAG (멀티모달 검색)

from langchain.schema import Document
import base64
from PIL import Image
import io

class MultiModalRAG:
    def __init__(self, text_embeddings, image_embeddings, llm):
        self.text_embeddings = text_embeddings
        self.image_embeddings = image_embeddings
        self.llm = llm
        self.text_vectorstore = None
        self.image_vectorstore = None
        
    def process_mixed_documents(self, documents):
        """
        텍스트와 이미지가 혼재된 문서 처리
        """
        text_documents = []
        image_documents = []
        
        for doc in documents:
            if doc.metadata.get("type") == "image":
                # 이미지 문서 처리
                image_description = self._describe_image(doc.page_content)
                
                image_doc = Document(
                    page_content=image_description,
                    metadata={
                        **doc.metadata,
                        "image_path": doc.page_content,
                        "description": image_description
                    }
                )
                image_documents.append(image_doc)
                
            else:
                # 텍스트 문서 처리
                text_documents.append(doc)
        
        # 벡터스토어 생성
        if text_documents:
            self.text_vectorstore = FAISS.from_documents(
                text_documents, self.text_embeddings
            )
        
        if image_documents:
            self.image_vectorstore = FAISS.from_documents(
                image_documents, self.text_embeddings  # 이미지 설명을 텍스트로 임베딩
            )
    
    def _describe_image(self, image_path):
        """
        이미지 설명 생성 (실제로는 CLIP 등의 모델 사용)
        """
        # 여기서는 간단한 예시
        # 실제로는 CLIP, BLIP 등의 이미지-텍스트 모델 사용
        return f"이미지 파일: {image_path}"
    
    def multimodal_search(self, query, search_text=True, search_images=True, k=5):
        """
        멀티모달 검색
        """
        results = []
        
        if search_text and self.text_vectorstore:
            text_results = self.text_vectorstore.similarity_search(query, k=k)
            for doc in text_results:
                doc.metadata["modality"] = "text"
            results.extend(text_results)
        
        if search_images and self.image_vectorstore:
            image_results = self.image_vectorstore.similarity_search(query, k=k)
            for doc in image_results:
                doc.metadata["modality"] = "image"
            results.extend(image_results)
        
        # 관련성 기준으로 재정렬
        return self._rerank_multimodal_results(query, results, k)
    
    def _rerank_multimodal_results(self, query, results, top_k):
        """
        멀티모달 결과 재순위화
        """
        # 간단한 구현 - 실제로는 더 정교한 재순위화 필요
        scored_results = []
        
        for doc in results:
            score = self._calculate_multimodal_score(query, doc)
            scored_results.append((score, doc))
        
        scored_results.sort(reverse=True)
        return [doc for _, doc in scored_results[:top_k]]
    
    def _calculate_multimodal_score(self, query, doc):
        """
        멀티모달 스코어 계산
        """
        # 텍스트와 이미지에 대한 가중치 적용
        base_score = self._calculate_similarity_score(query, doc.page_content)
        
        if doc.metadata.get("modality") == "image":
            # 이미지의 경우 약간의 보정 적용
            base_score *= 0.9
        
        return base_score

3. Self-Querying RAG

from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

class SelfQueryingRAG:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        self.setup_metadata_field_info()
        
    def setup_metadata_field_info(self):
        """
        메타데이터 필드 정보 설정
        """
        self.metadata_field_info = [
            AttributeInfo(
                name="source",
                description="문서의 출처",
                type="string",
            ),
            AttributeInfo(
                name="page",
                description="문서의 페이지 번호",
                type="integer",
            ),
            AttributeInfo(
                name="author",
                description="문서의 작성자",
                type="string",
            ),
            AttributeInfo(
                name="publication_date",
                description="문서의 발행일",
                type="string",
            ),
            AttributeInfo(
                name="category",
                description="문서의 카테고리",
                type="string",
            ),
        ]
        
        self.document_content_description = "다양한 주제에 관한 문서들"
    
    def create_self_query_retriever(self):
        """
        Self-Query 검색기 생성
        """
        self.retriever = SelfQueryRetriever.from_llm(
            self.llm,
            self.vectorstore,
            self.document_content_description,
            self.metadata_field_info,
            verbose=True
        )
    
    def query_with_filters(self, query_text):
        """
        자연어 쿼리에서 필터 자동 추출하여 검색
        """
        # Self-Query 검색기가 자동으로 필터 조건 추출
        results = self.retriever.get_relevant_documents(query_text)
        
        return results
    
    def query_with_explicit_filters(self, query_text, filters):
        """
        명시적 필터와 함께 검색
        """
        # 메타데이터 필터 적용
        filtered_results = self.vectorstore.similarity_search(
            query_text,
            filter=filters,
            k=5
        )
        
        return filtered_results

# 사용 예시
self_query_rag = SelfQueryingRAG(vectorstore, llm)
self_query_rag.create_self_query_retriever()

# 자연어로 필터 조건 포함한 검색
results = self_query_rag.query_with_filters(
    "2023년에 발행된 머신러닝 관련 논문을 찾아주세요"
)

# 명시적 필터 검색
filtered_results = self_query_rag.query_with_explicit_filters(
    "딥러닝 알고리즘",
    {"category": "AI", "publication_date": "2023"}
)

성능 최적화 전략

1. 임베딩 최적화

import numpy as np
from typing import List, Dict, Any
import faiss

class OptimizedEmbeddingStore:
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.index = None
        self.documents = []
        self.metadata = []
        
    def build_hnsw_index(self, embeddings: np.ndarray, M: int = 16, ef_construction: int = 200):
        """
        HNSW 인덱스 구성으로 빠른 근사 검색
        """
        self.index = faiss.IndexHNSWFlat(self.dimension, M)
        self.index.hnsw.efConstruction = ef_construction
        
        # 임베딩 정규화
        faiss.normalize_L2(embeddings)
        
        # 인덱스에 벡터 추가
        self.index.add(embeddings.astype('float32'))
        
    def build_ivf_index(self, embeddings: np.ndarray, nlist: int = 100):
        """
        IVF 인덱스 구성으로 메모리 효율적 검색
        """
        quantizer = faiss.IndexFlatL2(self.dimension)
        self.index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
        
        # 훈련 데이터로 클러스터링
        self.index.train(embeddings.astype('float32'))
        self.index.add(embeddings.astype('float32'))
        
        # 검색 매개변수 설정
        self.index.nprobe = 10  # 검색할 클러스터 수
    
    def search(self, query_embedding: np.ndarray, k: int = 5) -> List[Dict[str, Any]]:
        """
        최적화된 검색
        """
        query_embedding = query_embedding.reshape(1, -1).astype('float32')
        faiss.normalize_L2(query_embedding)
        
        scores, indices = self.index.search(query_embedding, k)
        
        results = []
        for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
            if idx != -1:  # 유효한 인덱스인 경우
                results.append({
                    "document": self.documents[idx],
                    "metadata": self.metadata[idx],
                    "score": float(score),
                    "rank": i + 1
                })
        
        return results
    
    def add_documents(self, documents: List[str], embeddings: np.ndarray, metadata: List[Dict]):
        """
        문서와 임베딩 추가
        """
        self.documents.extend(documents)
        self.metadata.extend(metadata)
        
        if self.index is None:
            self.build_hnsw_index(embeddings)
        else:
            # 기존 인덱스에 추가
            faiss.normalize_L2(embeddings)
            self.index.add(embeddings.astype('float32'))

class EmbeddingOptimizer:
    def __init__(self):
        pass
    
    def batch_embed_documents(self, texts: List[str], embeddings_model, batch_size: int = 32):
        """
        배치 처리로 임베딩 생성 최적화
        """
        embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            batch_embeddings = embeddings_model.embed_documents(batch)
            embeddings.extend(batch_embeddings)
        
        return np.array(embeddings)
    
    def reduce_embedding_dimension(self, embeddings: np.ndarray, target_dim: int = 768):
        """
        PCA를 사용한 임베딩 차원 축소
        """
        from sklearn.decomposition import PCA
        
        pca = PCA(n_components=target_dim)
        reduced_embeddings = pca.fit_transform(embeddings)
        
        return reduced_embeddings, pca
    
    def quantize_embeddings(self, embeddings: np.ndarray, bits: int = 8):
        """
        임베딩 양자화로 메모리 사용량 감소
        """
        # 8비트 양자화
        min_val = np.min(embeddings)
        max_val = np.max(embeddings)
        
        scale = (max_val - min_val) / (2**bits - 1)
        quantized = np.round((embeddings - min_val) / scale).astype(np.uint8)
        
        return quantized, min_val, scale
    
    def dequantize_embeddings(self, quantized: np.ndarray, min_val: float, scale: float):
        """
        양자화된 임베딩 복원
        """
        return quantized.astype(np.float32) * scale + min_val

2. 검색 결과 재순위화

from sentence_transformers import CrossEncoder
import torch

class AdvancedReranker:
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        self.cross_encoder = CrossEncoder(model_name)
        
    def rerank_with_cross_encoder(self, query: str, documents: List[str], top_k: int = 5):
        """
        Cross-Encoder를 사용한 재순위화
        """
        # 쿼리-문서 쌍 생성
        query_doc_pairs = [(query, doc) for doc in documents]
        
        # Cross-Encoder로 스코어 계산
        scores = self.cross_encoder.predict(query_doc_pairs)
        
        # 스코어 기준으로 정렬
        doc_score_pairs = list(zip(documents, scores))
        doc_score_pairs.sort(key=lambda x: x[1], reverse=True)
        
        return doc_score_pairs[:top_k]
    
    def diversity_rerank(self, query: str, documents: List[str], alpha: float = 0.7):
        """
        다양성을 고려한 재순위화 (MMR - Maximal Marginal Relevance)
        """
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.metrics.pairwise import cosine_similarity
        
        # TF-IDF 벡터화
        vectorizer = TfidfVectorizer()
        all_texts = [query] + documents
        tfidf_matrix = vectorizer.fit_transform(all_texts)
        
        query_vector = tfidf_matrix[0]
        doc_vectors = tfidf_matrix[1:]
        
        # 관련성 스코어 계산
        relevance_scores = cosine_similarity(query_vector, doc_vectors).flatten()
        
        # MMR 알고리즘
        selected_indices = []
        remaining_indices = list(range(len(documents)))
        
        while remaining_indices and len(selected_indices) < len(documents):
            mmr_scores = []
            
            for idx in remaining_indices:
                relevance = relevance_scores[idx]
                
                if not selected_indices:
                    diversity = 0
                else:
                    # 이미 선택된 문서들과의 최대 유사도
                    selected_vectors = doc_vectors[selected_indices]
                    similarities = cosine_similarity(
                        doc_vectors[idx:idx+1], selected_vectors
                    ).flatten()
                    diversity = np.max(similarities)
                
                mmr_score = alpha * relevance - (1 - alpha) * diversity
                mmr_scores.append((mmr_score, idx))
            
            # 가장 높은 MMR 스코어 선택
            best_score, best_idx = max(mmr_scores)
            selected_indices.append(best_idx)
            remaining_indices.remove(best_idx)
        
        return [(documents[idx], relevance_scores[idx]) for idx in selected_indices]

class HybridRetriever:
    def __init__(self, dense_retriever, sparse_retriever, reranker):
        self.dense_retriever = dense_retriever
        self.sparse_retriever = sparse_retriever
        self.reranker = reranker
    
    def hybrid_search(self, query: str, dense_weight: float = 0.7, top_k: int = 5):
        """
        Dense + Sparse 하이브리드 검색
        """
        # Dense 검색 (벡터 유사도)
        dense_results = self.dense_retriever.similarity_search(query, k=top_k*2)
        
        # Sparse 검색 (BM25)
        sparse_results = self.sparse_retriever.search(query, k=top_k*2)
        
        # 결과 병합 및 가중치 적용
        combined_scores = {}
        
        for i, doc in enumerate(dense_results):
            doc_id = id(doc)
            dense_score = 1.0 / (i + 1)  # 순위 기반 스코어
            combined_scores[doc_id] = {
                'document': doc,
                'dense_score': dense_score,
                'sparse_score': 0.0
            }
        
        for i, doc in enumerate(sparse_results):
            doc_id = id(doc)
            sparse_score = 1.0 / (i + 1)
            
            if doc_id in combined_scores:
                combined_scores[doc_id]['sparse_score'] = sparse_score
            else:
                combined_scores[doc_id] = {
                    'document': doc,
                    'dense_score': 0.0,
                    'sparse_score': sparse_score
                }
        
        # 하이브리드 스코어 계산
        hybrid_results = []
        for doc_info in combined_scores.values():
            hybrid_score = (
                dense_weight * doc_info['dense_score'] +
                (1 - dense_weight) * doc_info['sparse_score']
            )
            hybrid_results.append((doc_info['document'], hybrid_score))
        
        # 스코어 기준 정렬
        hybrid_results.sort(key=lambda x: x[1], reverse=True)
        
        # 최종 재순위화
        top_docs = [doc for doc, _ in hybrid_results[:top_k*2]]
        doc_texts = [doc.page_content for doc in top_docs]
        
        reranked = self.reranker.rerank_with_cross_encoder(query, doc_texts, top_k)
        
        return reranked

실시간 업데이트 시스템

1. 증분 인덱싱

import threading
import time
from queue import Queue
from typing import List, Dict, Any

class IncrementalIndexer:
    def __init__(self, vectorstore, embeddings_model, batch_size: int = 10):
        self.vectorstore = vectorstore
        self.embeddings_model = embeddings_model
        self.batch_size = batch_size
        self.update_queue = Queue()
        self.is_running = False
        self.worker_thread = None
        
    def start_worker(self):
        """
        백그라운드 인덱싱 워커 시작
        """
        self.is_running = True
        self.worker_thread = threading.Thread(target=self._process_updates)
        self.worker_thread.daemon = True
        self.worker_thread.start()
        
    def stop_worker(self):
        """
        백그라운드 워커 중지
        """
        self.is_running = False
        if self.worker_thread:
            self.worker_thread.join()
    
    def add_document(self, document: str, metadata: Dict[str, Any]):
        """
        문서 추가 요청을 큐에 넣기
        """
        self.update_queue.put({
            'action': 'add',
            'document': document,
            'metadata': metadata
        })
    
    def update_document(self, doc_id: str, document: str, metadata: Dict[str, Any]):
        """
        문서 업데이트 요청
        """
        self.update_queue.put({
            'action': 'update',
            'doc_id': doc_id,
            'document': document,
            'metadata': metadata
        })
    
    def delete_document(self, doc_id: str):
        """
        문서 삭제 요청
        """
        self.update_queue.put({
            'action': 'delete',
            'doc_id': doc_id
        })
    
    def _process_updates(self):
        """
        배치 단위로 업데이트 처리
        """
        batch_operations = []
        
        while self.is_running:
            try:
                # 큐에서 작업 가져오기 (타임아웃 설정)
                operation = self.update_queue.get(timeout=1.0)
                batch_operations.append(operation)
                
                # 배치 크기에 도달하면 처리
                if len(batch_operations) >= self.batch_size:
                    self._execute_batch(batch_operations)
                    batch_operations = []
                    
            except:
                # 타임아웃 발생 시 현재 배치 처리
                if batch_operations:
                    self._execute_batch(batch_operations)
                    batch_operations = []
                
                time.sleep(0.1)  # CPU 과부하 방지
        
        # 남은 작업 처리
        if batch_operations:
            self._execute_batch(batch_operations)
    
    def _execute_batch(self, operations: List[Dict[str, Any]]):
        """
        배치 작업 실행
        """
        add_docs = []
        add_metadatas = []
        
        for op in operations:
            if op['action'] == 'add':
                add_docs.append(op['document'])
                add_metadatas.append(op['metadata'])
                
            elif op['action'] == 'update':
                # 기존 문서 삭제 후 새로 추가
                self._delete_by_id(op['doc_id'])
                add_docs.append(op['document'])
                add_metadatas.append({**op['metadata'], 'doc_id': op['doc_id']})
                
            elif op['action'] == 'delete':
                self._delete_by_id(op['doc_id'])
        
        # 새 문서들 일괄 추가
        if add_docs:
            embeddings = self.embeddings_model.embed_documents(add_docs)
            self.vectorstore.add_texts(add_docs, add_metadatas, embeddings)
            
        print(f"배치 처리 완료: {len(operations)}개 작업")
    
    def _delete_by_id(self, doc_id: str):
        """
        문서 ID로 삭제
        """
        try:
            # 벡터스토어에 따라 구현 방식이 다름
            if hasattr(self.vectorstore, 'delete'):
                self.vectorstore.delete([doc_id])
        except Exception as e:
            print(f"문서 삭제 실패 {doc_id}: {e}")

class RealTimeRAG:
    def __init__(self, vectorstore, embeddings_model, llm):
        self.vectorstore = vectorstore
        self.embeddings_model = embeddings_model
        self.llm = llm
        self.indexer = IncrementalIndexer(vectorstore, embeddings_model)
        
        # 캐시 시스템
        self.query_cache = {}
        self.cache_ttl = 300  # 5분
        
    def start_real_time_updates(self):
        """
        실시간 업데이트 시작
        """
        self.indexer.start_worker()
        
    def stop_real_time_updates(self):
        """
        실시간 업데이트 중지
        """
        self.indexer.stop_worker()
    
    def add_document_async(self, document: str, metadata: Dict[str, Any]):
        """
        비동기 문서 추가
        """
        # 캐시 무효화
        self._invalidate_cache()
        
        # 인덱싱 큐에 추가
        self.indexer.add_document(document, metadata)
    
    def query_with_cache(self, query: str, force_refresh: bool = False):
        """
        캐시를 활용한 쿼리
        """
        cache_key = hash(query)
        current_time = time.time()
        
        # 캐시 확인
        if not force_refresh and cache_key in self.query_cache:
            cached_result, timestamp = self.query_cache[cache_key]
            if current_time - timestamp < self.cache_ttl:
                return cached_result
        
        # 새로운 검색 수행
        results = self.vectorstore.similarity_search(query, k=5)
        
        # 캐시 저장
        self.query_cache[cache_key] = (results, current_time)
        
        return results
    
    def _invalidate_cache(self):
        """
        캐시 무효화
        """
        self.query_cache.clear()

# 사용 예시
realtime_rag = RealTimeRAG(vectorstore, embeddings_model, llm)
realtime_rag.start_real_time_updates()

# 실시간으로 문서 추가
realtime_rag.add_document_async(
    "새로운 AI 연구 결과가 발표되었습니다.",
    {"source": "news", "timestamp": time.time()}
)

# 캐시된 검색
results = realtime_rag.query_with_cache("AI 연구")

모니터링 및 평가 시스템

1. RAG 성능 메트릭

import json
import time
from typing import List, Dict, Tuple
from dataclasses import dataclass

@dataclass
class RAGMetrics:
    query: str
    retrieved_docs: List[str]
    generated_answer: str
    ground_truth: str
    retrieval_time: float
    generation_time: float
    relevance_scores: List[float]
    context_precision: float
    context_recall: float
    answer_correctness: float
    answer_similarity: float

class RAGEvaluator:
    def __init__(self, evaluation_llm):
        self.evaluation_llm = evaluation_llm
        
    def evaluate_retrieval_relevance(self, query: str, documents: List[str]) -> List[float]:
        """
        검색된 문서의 관련성 평가
        """
        relevance_scores = []
        
        for doc in documents:
            prompt = f"""
질문과 문서의 관련성을 0-1 사이의 점수로 평가해주세요.
0: 전혀 관련 없음, 1: 매우 관련 있음

질문: {query}
문서: {doc[:500]}...

관련성 점수 (0-1):"""
            
            try:
                response = self.evaluation_llm(prompt)
                score = float(response.strip())
                relevance_scores.append(min(max(score, 0), 1))
            except:
                relevance_scores.append(0.0)
        
        return relevance_scores
    
    def calculate_context_precision(self, relevance_scores: List[float], k: int = None) -> float:
        """
        Context Precision 계산: 상위 k개 중 관련 있는 문서의 비율
        """
        if k is None:
            k = len(relevance_scores)
        
        top_k_scores = relevance_scores[:k]
        relevant_count = sum(1 for score in top_k_scores if score > 0.5)
        
        return relevant_count / k if k > 0 else 0
    
    def calculate_context_recall(self, retrieved_docs: List[str], ground_truth_docs: List[str]) -> float:
        """
        Context Recall 계산: 관련 문서 중 검색된 문서의 비율
        """
        if not ground_truth_docs:
            return 1.0
        
        retrieved_set = set(retrieved_docs)
        ground_truth_set = set(ground_truth_docs)
        
        overlap = len(retrieved_set.intersection(ground_truth_set))
        return overlap / len(ground_truth_set)
    
    def evaluate_answer_correctness(self, generated_answer: str, ground_truth: str) -> float:
        """
        답변 정확성 평가
        """
        prompt = f"""
다음 두 답변의 사실적 정확성을 비교하여 0-1 사이의 점수로 평가해주세요.
0: 완전히 틀림, 1: 완전히 정확함

정답: {ground_truth}
생성된 답변: {generated_answer}

정확성 점수 (0-1):"""
        
        try:
            response = self.evaluation_llm(prompt)
            score = float(response.strip())
            return min(max(score, 0), 1)
        except:
            return 0.0
    
    def evaluate_answer_similarity(self, generated_answer: str, ground_truth: str) -> float:
        """
        답변 유사성 평가 (의미적 유사성)
        """
        from sentence_transformers import SentenceTransformer
        from sklearn.metrics.pairwise import cosine_similarity
        
        model = SentenceTransformer('all-MiniLM-L6-v2')
        
        embeddings = model.encode([generated_answer, ground_truth])
        similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
        
        return float(similarity)
    
    def comprehensive_evaluation(self, 
                                query: str, 
                                retrieved_docs: List[str],
                                generated_answer: str,
                                ground_truth: str,
                                ground_truth_docs: List[str] = None,
                                retrieval_time: float = 0,
                                generation_time: float = 0) -> RAGMetrics:
        """
        종합적인 RAG 평가
        """
        # 검색 관련성 평가
        relevance_scores = self.evaluate_retrieval_relevance(query, retrieved_docs)
        
        # Context Precision & Recall
        context_precision = self.calculate_context_precision(relevance_scores)
        context_recall = self.calculate_context_recall(retrieved_docs, ground_truth_docs or [])
        
        # 답변 평가
        answer_correctness = self.evaluate_answer_correctness(generated_answer, ground_truth)
        answer_similarity = self.evaluate_answer_similarity(generated_answer, ground_truth)
        
        return RAGMetrics(
            query=query,
            retrieved_docs=retrieved_docs,
            generated_answer=generated_answer,
            ground_truth=ground_truth,
            retrieval_time=retrieval_time,
            generation_time=generation_time,
            relevance_scores=relevance_scores,
            context_precision=context_precision,
            context_recall=context_recall,
            answer_correctness=answer_correctness,
            answer_similarity=answer_similarity
        )

class RAGMonitor:
    def __init__(self, evaluator: RAGEvaluator):
        self.evaluator = evaluator
        self.metrics_history = []
        
    def log_query(self, metrics: RAGMetrics):
        """
        쿼리 메트릭 로깅
        """
        self.metrics_history.append(metrics)
        
        # 실시간 성능 모니터링
        if len(self.metrics_history) % 10 == 0:
            self._print_performance_summary()
    
    def _print_performance_summary(self):
        """
        성능 요약 출력
        """
        recent_metrics = self.metrics_history[-10:]
        
        avg_retrieval_time = sum(m.retrieval_time for m in recent_metrics) / len(recent_metrics)
        avg_generation_time = sum(m.generation_time for m in recent_metrics) / len(recent_metrics)
        avg_precision = sum(m.context_precision for m in recent_metrics) / len(recent_metrics)
        avg_recall = sum(m.context_recall for m in recent_metrics) / len(recent_metrics)
        avg_correctness = sum(m.answer_correctness for m in recent_metrics) / len(recent_metrics)
        
        print(f"""
=== RAG 성능 모니터링 (최근 10개 쿼리) ===
평균 검색 시간: {avg_retrieval_time:.3f}초
평균 생성 시간: {avg_generation_time:.3f}초
평균 Context Precision: {avg_precision:.3f}
평균 Context Recall: {avg_recall:.3f}
평균 답변 정확성: {avg_correctness:.3f}
=======================================
        """)
    
    def export_metrics(self, filename: str):
        """
        메트릭을 JSON 파일로 출력
        """
        metrics_data = []
        for m in self.metrics_history:
            metrics_data.append({
                "query": m.query,
                "retrieved_docs_count": len(m.retrieved_docs),
                "retrieval_time": m.retrieval_time,
                "generation_time": m.generation_time,
                "context_precision": m.context_precision,
                "context_recall": m.context_recall,
                "answer_correctness": m.answer_correctness,
                "answer_similarity": m.answer_similarity
            })
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(metrics_data, f, ensure_ascii=False, indent=2)
        
        print(f"메트릭이 {filename}에 저장되었습니다.")

# 사용 예시
evaluator = RAGEvaluator(evaluation_llm)
monitor = RAGMonitor(evaluator)

# RAG 쿼리 실행 및 평가
def monitored_rag_query(rag_system, query, ground_truth):
    start_time = time.time()
    
    # 검색
    retrieval_start = time.time()
    retrieved_docs = rag_system.vectorstore.similarity_search(query, k=5)
    retrieval_time = time.time() - retrieval_start
    
    # 생성
    generation_start = time.time()
    result = rag_system.query(query)
    generation_time = time.time() - generation_start
    
    # 평가
    metrics = evaluator.comprehensive_evaluation(
        query=query,
        retrieved_docs=[doc.page_content for doc in retrieved_docs],
        generated_answer=result["answer"],
        ground_truth=ground_truth,
        retrieval_time=retrieval_time,
        generation_time=generation_time
    )
    
    # 모니터링
    monitor.log_query(metrics)
    
    return result

결론

RAG + LangChain 시스템의 성공적인 구축을 위해서는 다음 핵심 요소들이 중요합니다:

1. 아키텍처 설계

  • 모듈화된 구조로 각 컴포넌트를 독립적으로 최적화
  • 확장성을 고려한 벡터 데이터베이스 선택
  • 실시간 업데이트 지원 시스템

2. 성능 최적화

  • 하이브리드 검색 (Dense + Sparse)
  • 효율적인 임베딩 및 인덱싱
  • 계층적 검색과 재순위화

3. 품질 관리

  • 종합적인 평가 메트릭
  • 실시간 모니터링 시스템
  • A/B 테스팅 기반 지속적 개선

4. 사용자 경험

  • 빠른 응답 시간
  • 관련성 높은 결과
  • 신뢰할 수 있는 출처 정보

2025년에는 RAG 기술이 더욱 정교해지고, 멀티모달 지원과 실시간 학습 기능이 강화될 것으로 예상됩니다. 이러한 트렌드에 맞춰 지속적으로 시스템을 개선하고 최신 기술을 적용하는 것이 중요합니다.