LangGraph 완벽 가이드 - AI 에이전트와 워크플로우 구축

LangGraph란?

LangGraph는 LangChain을 기반으로 상태를 가진(stateful) 멀티 액터 애플리케이션을 그래프로 구축할 수 있는 라이브러리입니다. LangChain이 단순한 체인 구조라면, LangGraph는 복잡한 순환 구조와 조건부 분기를 지원하는 강력한 워크플로우 엔진입니다.

LangGraph의 탄생 배경

LangChain의 체인 방식은 선형적인 워크플로우에는 적합하지만, 다음과 같은 한계가 있었습니다:

  • 순환(cycle) 구조를 구현하기 어려움
  • 복잡한 조건부 분기 처리 한계
  • 상태 관리의 어려움
  • Human-in-the-loop 구현의 복잡성

LangGraph는 이러한 한계를 극복하기 위해 만들어졌습니다.

LangGraph의 핵심 개념

       ┌─────────┐
│ START │
└────┬────┘

┌────▼────┐
│ Node A │
└────┬────┘

┌───────┴───────┐
│ │
┌───▼───┐ ┌───▼───┐
│Node B │ │Node C │
└───┬───┘ └───┬───┘
│ │
└───────┬───────┘

┌────▼────┐
│ END │
└─────────┘
  • Nodes (노드): 작업을 수행하는 단위 (함수)
  • Edges (엣지): 노드 간의 연결 (제어 흐름)
  • State (상태): 그래프 전체에서 공유되는 데이터
  • Conditional Edges (조건부 엣지): 상태에 따라 다음 노드를 결정

LangGraph vs LangChain

특징 LangChain LangGraph
구조 선형 체인 그래프 (DAG + 순환)
상태 관리 제한적 강력한 상태 관리
조건부 분기 기본 수준 고급 조건부 라우팅
순환 구조 지원 안 함 지원
Human-in-the-loop 복잡 내장 지원
사용 사례 간단한 체인 복잡한 에이전트/워크플로우

1. 설치 및 환경 설정

패키지 설치

# LangGraph 설치
pip install langgraph

# LangChain (필수)
pip install langchain langchain-openai

# 시각화 도구 (선택)
pip install pygraphviz

# 유틸리티
pip install python-dotenv

환경 변수 설정

# .env
OPENAI_API_KEY=your-openai-api-key
LANGCHAIN_API_KEY=your-langsmith-api-key
LANGCHAIN_TRACING_V2=true
# config.py
import os
from dotenv import load_dotenv

load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

기본 설정

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4", temperature=0)

2. LangGraph 기본 구조

2.1 상태(State) 정의

상태는 그래프 전체에서 공유되는 데이터 구조입니다.

from typing import TypedDict, Annotated
import operator

class State(TypedDict):
messages: Annotated[list, operator.add] # 리스트에 추가
count: int
status: str

Annotated 타입 설명:

  • operator.add: 새 값을 기존 리스트에 추가
  • 기본값: 새 값으로 완전히 대체

2.2 노드(Node) 정의

노드는 상태를 입력받아 처리하고 업데이트된 상태를 반환하는 함수입니다.

def my_node(state: State) -> State:
print(f"Processing node with count: {state['count']}")

# 상태 업데이트
state["count"] += 1
state["messages"].append("Processed by my_node")

return state

2.3 그래프 구성

from langgraph.graph import StateGraph, END

# 그래프 생성
workflow = StateGraph(State)

# 노드 추가
workflow.add_node("node1", my_node)
workflow.add_node("node2", another_node)

# 엣지 추가
workflow.add_edge("node1", "node2")

# 시작점 설정
workflow.set_entry_point("node1")

# 종료점 설정
workflow.add_edge("node2", END)

# 컴파일
app = workflow.compile()

# 실행
result = app.invoke({
"messages": [],
"count": 0,
"status": "starting"
})

print(result)

3. 조건부 라우팅

3.1 기본 조건부 엣지

from langgraph.graph import StateGraph, END

class RouterState(TypedDict):
value: int
path: str
result: str

def check_value(state: RouterState) -> RouterState:
state["path"] = "high" if state["value"] > 50 else "low"
return state

def high_path(state: RouterState) -> RouterState:
state["result"] = "Value is high!"
return state

def low_path(state: RouterState) -> RouterState:
state["result"] = "Value is low!"
return state

# 라우팅 결정 함수
def route_decision(state: RouterState) -> str:
if state["path"] == "high":
return "high"
return "low"

# 그래프 구성
workflow = StateGraph(RouterState)

workflow.add_node("check", check_value)
workflow.add_node("high", high_path)
workflow.add_node("low", low_path)

workflow.set_entry_point("check")

# 조건부 엣지
workflow.add_conditional_edges(
"check",
route_decision,
{
"high": "high",
"low": "low"
}
)

workflow.add_edge("high", END)
workflow.add_edge("low", END)

app = workflow.compile()

# 테스트
print(app.invoke({"value": 75, "path": "", "result": ""}))
# {'value': 75, 'path': 'high', 'result': 'Value is high!'}

print(app.invoke({"value": 25, "path": "", "result": ""}))
# {'value': 25, 'path': 'low', 'result': 'Value is low!'}

3.2 복잡한 조건부 라우팅

from typing import Literal

def complex_router(state: dict) -> Literal["path_a", "path_b", "path_c", "end"]:
"""여러 조건에 따라 경로 결정"""
if state.get("error"):
return "end"
elif state["count"] > 10:
return "path_a"
elif state["status"] == "processing":
return "path_b"
else:
return "path_c"

workflow.add_conditional_edges(
"router_node",
complex_router,
{
"path_a": "node_a",
"path_b": "node_b",
"path_c": "node_c",
"end": END
}
)

4. AI 에이전트 구축

4.1 기본 ReAct 에이전트

from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

# 도구 정의
@tool
def search(query: str) -> str:
"""Search for information."""
# 실제로는 검색 API 호출
return f"Search results for: {query}"

@tool
def calculator(expression: str) -> str:
"""Calculate a mathematical expression."""
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"

# 상태 정의
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
next_action: str

# LLM 초기화
llm = ChatOpenAI(model="gpt-4", temperature=0)
tools = [search, calculator]
llm_with_tools = llm.bind_tools(tools)

# 에이전트 노드
def agent_node(state: AgentState):
"""LLM이 다음 행동을 결정"""
messages = state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}

# 도구 실행 노드
def tool_node(state: AgentState):
"""선택된 도구를 실행"""
messages = state["messages"]
last_message = messages[-1]

tool_calls = last_message.tool_calls
if not tool_calls:
return {"messages": []}

# 도구 실행
tool_messages = []
for tool_call in tool_calls:
tool = next((t for t in tools if t.name == tool_call["name"]), None)
if tool:
result = tool.invoke(tool_call["args"])
tool_messages.append({
"role": "tool",
"content": result,
"tool_call_id": tool_call["id"]
})

return {"messages": tool_messages}

# 계속 진행 여부 결정
def should_continue(state: AgentState) -> str:
messages = state["messages"]
last_message = messages[-1]

if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"

# 그래프 구성
workflow = StateGraph(AgentState)

workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
"agent",
should_continue,
{
"tools": "tools",
"end": END
}
)

workflow.add_edge("tools", "agent")

agent_app = workflow.compile()

# 실행
result = agent_app.invoke({
"messages": [{"role": "user", "content": "What is 25 * 4? Then search for Python programming."}]
})

for msg in result["messages"]:
print(msg)

4.2 Self-Reflection 에이전트

에이전트가 자신의 출력을 검토하고 개선합니다.

class ReflectionState(TypedDict):
messages: Annotated[list, operator.add]
draft: str
critique: str
iteration: int
max_iterations: int

def generate_draft(state: ReflectionState):
"""초안 생성"""
messages = state["messages"]
response = llm.invoke(messages)
return {
"draft": response.content,
"iteration": state.get("iteration", 0) + 1
}

def critique_draft(state: ReflectionState):
"""초안 비평"""
prompt = f"""
Review this draft and provide constructive criticism:

{state['draft']}

Is this good enough? If not, what should be improved?
"""

response = llm.invoke(prompt)
return {"critique": response.content}

def should_revise(state: ReflectionState) -> str:
"""개선이 필요한지 판단"""
if state["iteration"] >= state["max_iterations"]:
return "end"

# 비평에 "good enough" 또는 "acceptable" 포함 시 종료
if any(word in state["critique"].lower() for word in ["good enough", "acceptable", "approved"]):
return "end"

return "revise"

def revise_draft(state: ReflectionState):
"""초안 개선"""
prompt = f"""
Original draft:
{state['draft']}

Critique:
{state['critique']}

Please revise the draft based on the critique.
"""

response = llm.invoke(prompt)
return {"draft": response.content}

# 그래프 구성
reflection_workflow = StateGraph(ReflectionState)

reflection_workflow.add_node("generate", generate_draft)
reflection_workflow.add_node("critique", critique_draft)
reflection_workflow.add_node("revise", revise_draft)

reflection_workflow.set_entry_point("generate")
reflection_workflow.add_edge("generate", "critique")

reflection_workflow.add_conditional_edges(
"critique",
should_revise,
{
"revise": "revise",
"end": END
}
)

reflection_workflow.add_edge("revise", "critique")

reflection_app = reflection_workflow.compile()

# 실행
result = reflection_app.invoke({
"messages": [{"role": "user", "content": "Write a blog post about AI safety"}],
"draft": "",
"critique": "",
"iteration": 0,
"max_iterations": 3
})

print("Final Draft:", result["draft"])
print(f"Iterations: {result['iteration']}")

5. 멀티 에이전트 시스템

5.1 계층적 에이전트 시스템

class MultiAgentState(TypedDict):
task: str
research_output: str
writing_output: str
review_output: str
final_output: str
current_agent: str

# 연구 에이전트
def research_agent(state: MultiAgentState):
"""정보를 수집하고 분석"""
prompt = f"""
As a research specialist, gather and analyze information about:
{state['task']}

Provide key findings and data points.
"""
response = llm.invoke(prompt)
return {
"research_output": response.content,
"current_agent": "research"
}

# 작성 에이전트
def writing_agent(state: MultiAgentState):
"""연구 결과를 바탕으로 문서 작성"""
prompt = f"""
As a professional writer, create a well-structured article based on:

Research Findings:
{state['research_output']}

Original Task: {state['task']}
"""
response = llm.invoke(prompt)
return {
"writing_output": response.content,
"current_agent": "writing"
}

# 검토 에이전트
def review_agent(state: MultiAgentState):
"""작성된 문서를 검토"""
prompt = f"""
As an editor, review this article:

{state['writing_output']}

Provide constructive feedback and suggestions.
"""
response = llm.invoke(prompt)
return {
"review_output": response.content,
"current_agent": "review"
}

# 최종 에이전트
def finalize_agent(state: MultiAgentState):
"""피드백을 반영한 최종 버전"""
prompt = f"""
Finalize this article based on the review:

Original Article:
{state['writing_output']}

Review Feedback:
{state['review_output']}
"""
response = llm.invoke(prompt)
return {
"final_output": response.content,
"current_agent": "finalize"
}

# 그래프 구성
multi_agent_workflow = StateGraph(MultiAgentState)

multi_agent_workflow.add_node("research", research_agent)
multi_agent_workflow.add_node("writing", writing_agent)
multi_agent_workflow.add_node("review", review_agent)
multi_agent_workflow.add_node("finalize", finalize_agent)

multi_agent_workflow.set_entry_point("research")
multi_agent_workflow.add_edge("research", "writing")
multi_agent_workflow.add_edge("writing", "review")
multi_agent_workflow.add_edge("review", "finalize")
multi_agent_workflow.add_edge("finalize", END)

multi_agent_app = multi_agent_workflow.compile()

# 실행
result = multi_agent_app.invoke({
"task": "The impact of artificial intelligence on software development",
"research_output": "",
"writing_output": "",
"review_output": "",
"final_output": "",
"current_agent": ""
})

print("=== Research Output ===")
print(result["research_output"])
print("\n=== Writing Output ===")
print(result["writing_output"])
print("\n=== Review Output ===")
print(result["review_output"])
print("\n=== Final Output ===")
print(result["final_output"])

5.2 협업 에이전트 시스템

여러 전문가 에이전트가 협업하여 문제를 해결합니다.

from typing import Literal

class CollaborationState(TypedDict):
messages: Annotated[list, operator.add]
next_speaker: str
conversation_count: int
max_conversations: int

# 전문가 에이전트들
def python_expert(state: CollaborationState):
"""Python 전문가"""
prompt = f"""
You are a Python expert. Based on the conversation, provide your insights.

Conversation so far:
{state['messages']}
"""
response = llm.invoke(prompt)
return {
"messages": [{"role": "python_expert", "content": response.content}],
"conversation_count": state["conversation_count"] + 1
}

def architecture_expert(state: CollaborationState):
"""아키텍처 전문가"""
prompt = f"""
You are a software architecture expert. Provide architectural insights.

Conversation:
{state['messages']}
"""
response = llm.invoke(prompt)
return {
"messages": [{"role": "architect", "content": response.content}],
"conversation_count": state["conversation_count"] + 1
}

def security_expert(state: CollaborationState):
"""보안 전문가"""
prompt = f"""
You are a security expert. Identify security concerns.

Conversation:
{state['messages']}
"""
response = llm.invoke(prompt)
return {
"messages": [{"role": "security", "content": response.content}],
"conversation_count": state["conversation_count"] + 1
}

# 다음 발언자 결정
def select_next_speaker(state: CollaborationState) -> Literal["python", "architect", "security", "end"]:
"""다음 발언자 선택"""
if state["conversation_count"] >= state["max_conversations"]:
return "end"

# 순환 또는 스마트 라우팅
count = state["conversation_count"]
if count % 3 == 0:
return "python"
elif count % 3 == 1:
return "architect"
else:
return "security"

# 그래프 구성
collab_workflow = StateGraph(CollaborationState)

collab_workflow.add_node("python", python_expert)
collab_workflow.add_node("architect", architecture_expert)
collab_workflow.add_node("security", security_expert)

collab_workflow.set_entry_point("python")

# 각 에이전트 후 다음 발언자 선택
for node in ["python", "architect", "security"]:
collab_workflow.add_conditional_edges(
node,
select_next_speaker,
{
"python": "python",
"architect": "architect",
"security": "security",
"end": END
}
)

collab_app = collab_workflow.compile()

# 실행
result = collab_app.invoke({
"messages": [{"role": "user", "content": "How should we design a scalable authentication system?"}],
"next_speaker": "",
"conversation_count": 0,
"max_conversations": 6
})

for msg in result["messages"]:
print(f"\n[{msg.get('role', 'unknown')}]")
print(msg.get('content', ''))

6. Human-in-the-Loop

6.1 체크포인트를 활용한 승인 워크플로우

from langgraph.checkpoint.sqlite import SqliteSaver

class ApprovalState(TypedDict):
task: str
plan: str
approved: bool
result: str
needs_revision: bool

def create_plan(state: ApprovalState):
"""계획 생성"""
prompt = f"Create a detailed plan for: {state['task']}"
response = llm.invoke(prompt)
return {"plan": response.content}

def wait_for_approval(state: ApprovalState):
"""승인 대기 (여기서 중단됨)"""
print("\n" + "="*50)
print("PLAN FOR APPROVAL")
print("="*50)
print(state["plan"])
print("="*50)
return state

def execute_plan(state: ApprovalState):
"""계획 실행"""
prompt = f"Execute this plan:\n{state['plan']}"
response = llm.invoke(prompt)
return {"result": response.content}

def revision_needed(state: ApprovalState) -> str:
"""수정이 필요한지 확인"""
if state.get("needs_revision", False):
return "revise"
elif state.get("approved", False):
return "execute"
else:
return "wait"

def revise_plan(state: ApprovalState):
"""계획 수정"""
prompt = f"Revise this plan to make it better:\n{state['plan']}"
response = llm.invoke(prompt)
return {
"plan": response.content,
"needs_revision": False
}

# 체크포인트 설정
memory = SqliteSaver.from_conn_string(":memory:")

workflow = StateGraph(ApprovalState)

workflow.add_node("create_plan", create_plan)
workflow.add_node("wait", wait_for_approval)
workflow.add_node("revise", revise_plan)
workflow.add_node("execute", execute_plan)

workflow.set_entry_point("create_plan")
workflow.add_edge("create_plan", "wait")

workflow.add_conditional_edges(
"wait",
revision_needed,
{
"revise": "revise",
"execute": "execute",
"wait": END # 승인 대기
}
)

workflow.add_edge("revise", "wait")
workflow.add_edge("execute", END)

# 체크포인트와 함께 컴파일
approval_app = workflow.compile(checkpointer=memory)

# 사용 예시
config = {"configurable": {"thread_id": "1"}}

# 1단계: 계획 생성
result = approval_app.invoke({
"task": "Implement a user authentication system",
"plan": "",
"approved": False,
"result": "",
"needs_revision": False
}, config)

print("\nPlan created. Review the plan above.")

# 2단계: 수정 요청
result = approval_app.invoke({
**result,
"needs_revision": True
}, config)

print("\nRevised plan created.")

# 3단계: 승인 후 실행
result = approval_app.invoke({
**result,
"approved": True
}, config)

print("\n=== Execution Result ===")
print(result["result"])

6.2 인터럽트를 활용한 Human-in-the-Loop

from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver

def interrupt_before_execution(state):
"""실행 전 중단점"""
# 이 노드 전에 실행이 중단됨
return state

# 그래프 구성
workflow = StateGraph(ApprovalState)
workflow.add_node("plan", create_plan)
workflow.add_node("execute", execute_plan)

workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", END)

# interrupt_before 설정
app = workflow.compile(
checkpointer=MemorySaver(),
interrupt_before=["execute"] # execute 전에 중단
)

config = {"configurable": {"thread_id": "2"}}

# 실행 - 'execute' 전에 중단됨
result = app.invoke({
"task": "Deploy application",
"plan": "",
"approved": False,
"result": ""
}, config)

print("Execution paused. Review and approve.")

# 승인 후 재개
result = app.invoke(None, config) # None을 전달하면 이전 상태에서 재개
print("Execution completed:", result)

7. 서브그래프(Subgraphs)

복잡한 워크플로우를 모듈화할 수 있습니다.

# 서브그래프 1: 데이터 처리
def process_data_workflow():
class ProcessState(TypedDict):
data: str
cleaned: str
validated: bool

def clean(state):
return {"cleaned": state["data"].strip().lower()}

def validate(state):
return {"validated": len(state["cleaned"]) > 0}

workflow = StateGraph(ProcessState)
workflow.add_node("clean", clean)
workflow.add_node("validate", validate)
workflow.set_entry_point("clean")
workflow.add_edge("clean", "validate")
workflow.add_edge("validate", END)

return workflow.compile()

# 서브그래프 2: 분석
def analysis_workflow():
class AnalysisState(TypedDict):
text: str
summary: str
sentiment: str

def summarize(state):
prompt = f"Summarize: {state['text']}"
response = llm.invoke(prompt)
return {"summary": response.content}

def analyze_sentiment(state):
prompt = f"Analyze sentiment: {state['text']}"
response = llm.invoke(prompt)
return {"sentiment": response.content}

workflow = StateGraph(AnalysisState)
workflow.add_node("summarize", summarize)
workflow.add_node("sentiment", analyze_sentiment)
workflow.set_entry_point("summarize")
workflow.add_edge("summarize", "sentiment")
workflow.add_edge("sentiment", END)

return workflow.compile()

# 메인 워크플로우에 통합
class MainState(TypedDict):
input: str
processed_data: dict
analysis: dict

def main_workflow():
process_app = process_data_workflow()
analysis_app = analysis_workflow()

def process_step(state):
result = process_app.invoke({"data": state["input"]})
return {"processed_data": result}

def analyze_step(state):
result = analysis_app.invoke({"text": state["processed_data"]["cleaned"]})
return {"analysis": result}

workflow = StateGraph(MainState)
workflow.add_node("process", process_step)
workflow.add_node("analyze", analyze_step)
workflow.set_entry_point("process")
workflow.add_edge("process", "analyze")
workflow.add_edge("analyze", END)

return workflow.compile()

# 실행
main_app = main_workflow()
result = main_app.invoke({"input": " Hello World "})

8. 스트리밍과 비동기

8.1 스트리밍

# 이벤트 스트리밍
for event in app.stream({"messages": [{"role": "user", "content": "Hello"}]}):
print("Event:", event)

# 값 스트리밍
for value in app.stream(input_data, stream_mode="values"):
print("Value:", value)

# 업데이트 스트리밍
for update in app.stream(input_data, stream_mode="updates"):
print("Update:", update)

8.2 비동기 처리

import asyncio

async def async_workflow():
result = await app.ainvoke({
"messages": [{"role": "user", "content": "Hello"}]
})
return result

# 비동기 스트리밍
async def async_stream():
async for event in app.astream(input_data):
print("Async event:", event)

# 실행
result = asyncio.run(async_workflow())

9. 시각화

9.1 그래프 구조 시각화

# Mermaid 다이어그램 생성
mermaid_code = app.get_graph().draw_mermaid()
print(mermaid_code)

# PNG 이미지로 저장 (Jupyter)
from IPython.display import Image, display

display(Image(app.get_graph().draw_mermaid_png()))

# 또는 파일로 저장
with open("workflow.png", "wb") as f:
f.write(app.get_graph().draw_mermaid_png())

9.2 실행 추적

# LangSmith로 추적
import os

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "langgraph-project"

# 이제 모든 실행이 LangSmith에 기록됨
result = app.invoke(input_data)

10. 고급 패턴

10.1 Plan-and-Execute

class PlanExecuteState(TypedDict):
input: str
plan: list[str]
current_step: int
results: Annotated[list, operator.add]
final_answer: str

def planner(state: PlanExecuteState):
"""전체 계획 수립"""
prompt = f"""
Create a step-by-step plan to answer: {state['input']}

Return as a numbered list.
"""
response = llm.invoke(prompt)
# 계획을 리스트로 파싱
plan = [line.strip() for line in response.content.split('\n') if line.strip()]
return {"plan": plan, "current_step": 0}

def executor(state: PlanExecuteState):
"""현재 단계 실행"""
current_step = state["current_step"]
step = state["plan"][current_step]

prompt = f"""
Execute this step: {step}

Previous results: {state['results']}
"""
response = llm.invoke(prompt)

return {
"results": [response.content],
"current_step": current_step + 1
}

def should_continue_execution(state: PlanExecuteState) -> str:
"""계속 실행할지 결정"""
if state["current_step"] >= len(state["plan"]):
return "finish"
return "execute"

def finalizer(state: PlanExecuteState):
"""최종 답변 생성"""
prompt = f"""
Based on these results:
{state['results']}

Provide a final answer to: {state['input']}
"""
response = llm.invoke(prompt)
return {"final_answer": response.content}

# 그래프 구성
plan_execute_workflow = StateGraph(PlanExecuteState)

plan_execute_workflow.add_node("planner", planner)
plan_execute_workflow.add_node("executor", executor)
plan_execute_workflow.add_node("finalizer", finalizer)

plan_execute_workflow.set_entry_point("planner")
plan_execute_workflow.add_edge("planner", "executor")

plan_execute_workflow.add_conditional_edges(
"executor",
should_continue_execution,
{
"execute": "executor",
"finish": "finalizer"
}
)

plan_execute_workflow.add_edge("finalizer", END)

plan_execute_app = plan_execute_workflow.compile()

# 실행
result = plan_execute_app.invoke({
"input": "What are the key differences between Python and JavaScript?",
"plan": [],
"current_step": 0,
"results": [],
"final_answer": ""
})

print("=== Plan ===")
for i, step in enumerate(result["plan"], 1):
print(f"{i}. {step}")

print("\n=== Results ===")
for i, res in enumerate(result["results"], 1):
print(f"Step {i}: {res}")

print("\n=== Final Answer ===")
print(result["final_answer"])

10.2 Self-RAG (Self-Reflective RAG)

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class SelfRAGState(TypedDict):
question: str
documents: list
answer: str
relevance_score: float
iteration: int
max_iterations: int

# 벡터 스토어 (가정)
vectorstore = Chroma(embedding_function=OpenAIEmbeddings())

def retrieve(state: SelfRAGState):
"""문서 검색"""
k = 3 + (state.get("iteration", 0) * 2) # 반복마다 더 많은 문서
docs = vectorstore.similarity_search(state["question"], k=k)
return {"documents": docs}

def grade_documents(state: SelfRAGState):
"""문서 관련성 평가"""
prompt = f"""
Rate the relevance of these documents to the question on a scale of 0-1:

Question: {state['question']}

Documents: {[doc.page_content for doc in state['documents']]}

Return only a number between 0 and 1.
"""
response = llm.invoke(prompt)
try:
score = float(response.content.strip())
except:
score = 0.5

return {"relevance_score": score}

def generate_answer(state: SelfRAGState):
"""답변 생성"""
context = "\n\n".join([doc.page_content for doc in state["documents"]])
prompt = f"""
Answer the question based on this context:

Context: {context}

Question: {state['question']}
"""
response = llm.invoke(prompt)
return {
"answer": response.content,
"iteration": state.get("iteration", 0) + 1
}

def should_retry(state: SelfRAGState) -> str:
"""재시도 여부 결정"""
if state["relevance_score"] < 0.7 and state["iteration"] < state["max_iterations"]:
return "retrieve" # 더 많은 문서 검색
return "end"

# 그래프 구성
self_rag_workflow = StateGraph(SelfRAGState)

self_rag_workflow.add_node("retrieve", retrieve)
self_rag_workflow.add_node("grade", grade_documents)
self_rag_workflow.add_node("generate", generate_answer)

self_rag_workflow.set_entry_point("retrieve")
self_rag_workflow.add_edge("retrieve", "grade")
self_rag_workflow.add_edge("grade", "generate")

self_rag_workflow.add_conditional_edges(
"generate",
should_retry,
{
"retrieve": "retrieve",
"end": END
}
)

self_rag_app = self_rag_workflow.compile()

11. 프로덕션 배포

11.1 FastAPI 통합

from fastapi import FastAPI
from pydantic import BaseModel

app_server = FastAPI()

class QueryRequest(BaseModel):
question: str
config: dict = {}

@app_server.post("/invoke")
async def invoke_workflow(request: QueryRequest):
result = app.invoke(
{"messages": [{"role": "user", "content": request.question}]},
config=request.config
)
return result

@app_server.post("/stream")
async def stream_workflow(request: QueryRequest):
async def event_stream():
async for event in app.astream(
{"messages": [{"role": "user", "content": request.question}]},
config=request.config
):
yield f"data: {event}\n\n"

from fastapi.responses import StreamingResponse
return StreamingResponse(event_stream(), media_type="text/event-stream")

# 실행: uvicorn main:app_server --reload

11.2 Docker 배포

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app_server", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
langgraph-app:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./data:/app/data
Share