Phase G: Kei API 통신 정상화 — streaming 전환 + Sonnet fallback 제거

G-1: httpx non-streaming → streaming 전환 (3개 파일)
  - client.post() → client.stream("POST") + response.aiter_lines()
  - SSE 토큰을 실시간 수신 (30분+ 무응답 해소)

G-2: Sonnet fallback 완전 제거
  - kei_client.py: classify_content()에서 _call_anthropic_direct() 호출 제거
  - content_editor.py: fill_content()에서 Sonnet fallback 분기 제거
  - Kei API만 사용. 실패 시 manual_classify() 또는 _apply_defaults() 안전망

G-3: _parse_json() 마크다운 제거 3파일 동기화
  - content_editor.py, design_director.py에 kei_client.py와 동일한 전처리 추가

G-4: FAISS를 CPU로 전환 (GPU 메모리 경쟁 해소)
  - block_search.py + build_block_index.py: device="cpu"

G-5: streaming 파서에 event:error 처리
  - persona_agent 에러 시 무한 대기 방지. 즉시 중단.

G-6: content_editor.py None 가드
  - Kei API 실패 시 _parse_json(None) TypeError 방지

G-7: "mode" → "mode_hint" 필드명 수정 (3개 파일)
  - persona_agent의 실제 필드명에 맞춤

persona_agent 수정: 0건

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-26 01:26:03 +09:00
parent 7038418c8b
commit a01f7a7f8a
8 changed files with 519 additions and 152 deletions

View File

@@ -66,8 +66,8 @@ def _ensure_loaded() -> bool:
with open(META_PATH, encoding="utf-8") as f:
_metadata = json.load(f)
logger.info(f"임베딩 모델 로딩: {EMBEDDING_MODEL}")
_model = SentenceTransformer(EMBEDDING_MODEL)
logger.info(f"임베딩 모델 로딩: {EMBEDDING_MODEL} (CPU)")
_model = SentenceTransformer(EMBEDDING_MODEL, device="cpu")
logger.info(
f"블록 검색 준비 완료: {_index.ntotal}개 벡터, "

View File

@@ -111,20 +111,14 @@ async def fill_content(
)
try:
# 1차: Kei API (도메인 전문가 + RAG)
# Kei API만 사용. Sonnet fallback 없음.
result_text = await _call_kei_editor(user_prompt)
# fallback: Anthropic 직접
# G-6: Kei API 실패 시 None 가드
if result_text is None:
logger.warning("Kei API 편집 실패. Anthropic 직접 호출로 fallback.")
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system=EDITOR_PROMPT,
messages=[{"role": "user", "content": user_prompt}],
)
result_text = response.content[0].text
logger.warning("Kei API 편집 실패. 기본값 적용.")
_apply_defaults(blocks)
continue
filled = _parse_json(result_text)
@@ -165,7 +159,7 @@ async def fill_content(
async def _call_kei_editor(prompt: str) -> str | None:
"""Kei API를 통해 텍스트 편집을 요청한다.
"""Kei API를 통해 텍스트 편집을 요청한다. SSE 스트리밍으로 실시간 수신.
Kei persona의 도메인 지식 + RAG를 활용하여
건설/DX 분야 전문 용어를 정확하게 유지하면서 편집.
@@ -176,22 +170,22 @@ async def _call_kei_editor(prompt: str) -> str | None:
try:
async with httpx.AsyncClient(timeout=None) as client:
response = await client.post(
async with client.stream(
"POST",
f"{kei_url}/api/message",
json={
"message": full_prompt,
"session_id": "design-agent-editor",
"mode": "chat",
"mode_hint": "chat",
},
timeout=None,
)
) as response:
if response.status_code != 200:
logger.warning(f"Kei API (editor) HTTP {response.status_code}")
return None
if response.status_code != 200:
logger.warning(f"Kei API (editor) HTTP {response.status_code}")
return None
full_text = await _stream_sse_tokens(response)
# SSE 응답에서 텍스트 수집
full_text = _extract_sse_text(response.text)
if full_text:
return full_text
@@ -203,6 +197,36 @@ async def _call_kei_editor(prompt: str) -> str | None:
return None
async def _stream_sse_tokens(response: httpx.Response) -> str:
"""SSE 스트리밍 응답에서 토큰을 실시간 수집한다."""
tokens: list[str] = []
event_type = ""
async for line in response.aiter_lines():
line = line.strip()
if not line:
event_type = ""
continue
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data = line[5:].strip()
if event_type == "token" and data:
try:
token = json.loads(data)
if isinstance(token, str):
tokens.append(token)
except json.JSONDecodeError:
tokens.append(data)
elif event_type == "done":
break
elif event_type == "error":
logger.warning(f"Kei API SSE 에러: {data}")
break
return "".join(tokens)
def _extract_sse_text(raw: str) -> str:
"""SSE 응답에서 토큰 텍스트를 수집한다."""
import re as _re
@@ -287,17 +311,35 @@ def _apply_defaults(blocks: list[dict[str, Any]]) -> None:
def _parse_json(text: str) -> dict[str, Any] | None:
"""텍스트에서 JSON을 추출한다."""
patterns = [
r"```json\s*(.*?)```",
r"```\s*(.*?)```",
r"(\{.*\})",
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
continue
"""텍스트에서 JSON을 추출한다.
Kei API가 마크다운 리스트 접두사(- )를 붙여 응답하는 경우에도 처리.
"""
# 전처리: 각 줄 앞의 마크다운 리스트 접두사(- ) 제거
lines = text.split("\n")
cleaned_lines = []
for line in lines:
stripped = line.lstrip()
if stripped.startswith("- "):
cleaned_lines.append(stripped[2:])
elif stripped.startswith("* "):
cleaned_lines.append(stripped[2:])
else:
cleaned_lines.append(stripped)
cleaned = "\n".join(cleaned_lines)
# 원본 먼저 시도 → 클린 버전 시도
for target in [text, cleaned]:
patterns = [
r"```json\s*(.*?)```",
r"```\s*(.*?)```",
r"(\{.*\})",
]
for pattern in patterns:
match = re.search(pattern, target, re.DOTALL)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
continue
return None

View File

@@ -12,6 +12,7 @@ from pathlib import Path
from typing import Any
import anthropic
import httpx
import yaml
from src.config import settings
@@ -174,6 +175,22 @@ def select_preset(analysis: dict[str, Any]) -> str:
# ──────────────────────────────────────
# Step B: 프리셋 내 블록 매핑 (Sonnet)
# ──────────────────────────────────────
def _get_registered_block_ids() -> set[str]:
"""catalog.yaml에 등록된 블록 ID 집합을 반환한다."""
catalog_path = Path(__file__).parent.parent / "templates" / "catalog.yaml"
if not catalog_path.exists():
return set(BLOCK_SLOTS.keys())
try:
with open(catalog_path, encoding="utf-8") as f:
data = yaml.safe_load(f)
return {
b["id"] for b in data.get("blocks", [])
if b.get("id") and not b.get("id", "").replace("-", "").isdigit()
}
except Exception:
return set(BLOCK_SLOTS.keys())
def _load_catalog() -> str:
"""catalog.yaml 로드."""
catalog_path = Path(__file__).parent.parent / "templates" / "catalog.yaml"
@@ -182,14 +199,13 @@ def _load_catalog() -> str:
return """사용 가능한 블록:
- quote-question: 질문형 강조. 문제 제기, 전환점.
- compare-box: 2개 키워드 시각 대비.
- comparison-table: 항목 비교 테이블.
- card-image: 이미지+텍스트 카드.
- compare-pill-pair: 2개 키워드 시각 대비.
- comparison-2col: 2항목 비교.
- card-icon-desc: 아이콘+설명 카드.
- card-dark-overlay: 다크 배경 키워드 카드.
- relationship: 벤 다이어그램. 포함/상위-하위 관계.
- process: 단계 흐름. 절차.
- topic-header: 꼭지 제목+설명.
- conclusion-bar: 하단 결론 바.
- venn-diagram: 벤 다이어그램. 포함/상위-하위 관계.
- process-horizontal: 단계 흐름. 절차.
- topic-left-right: 꼭지 제목+설명.
- banner-gradient: 섹션 강조 배너."""
@@ -222,8 +238,7 @@ header/footer는 고정이므로 건드리지 않는다.
### 2단계: 꼭지 → zone 배정
- flow 꼭지 → body / left / hero zone
- reference 꼭지 → sidebar zone
- detail_target 꼭지 → details-block으로 배치 (해당 zone에 접기/펼치기)
- conclusion 꼭지 → footer zone
- conclusion 꼭지 → footer zone (banner-gradient 권장)
### 3단계: zone별 블록 선택 + 높이 예산 계산
각 zone에 대해:
@@ -236,15 +251,18 @@ header/footer는 고정이므로 건드리지 않는다.
### 4단계: 최종 검증
모든 zone의 블록 총 높이가 예산 이내인지 재확인한 후 출력한다.
## 블록 선택 규칙
## 블록 선택 규칙 (절대 규칙)
- **아래 허용 목록에 있는 블록만 선택하라. 목록에 없는 블록은 절대 사용 금지.**
- **텍스트 블록 우선** — 텍스트로 충분히 전달 가능하면 시각화(SVG) 블록 쓰지 마라
- **시각화 블록(relationship, process 등)은 높이 비용이 매우 크다** — 한 zone에 시각화 블록은 최대 1개, 다른 블록과 함께 쌓지 마라
- **시각화 블록은 높이 비용이 크다** — 한 zone에 시각화 블록은 최대 1개
- 너비 35% 이하 zone(sidebar)에는 카드 1열, 시각화 블록 금지
- catalog의 when/not_for와 height_cost를 반드시 읽고 선택
- 같은 블록 타입 반복 금지 — 다양한 블록 활용
- 같은 내용이 두 블록에 중복되면 안 된다
## 사용 가능한 블록 (catalog)
## 허용된 블록 id 목록 (이 목록에 없는 블록은 절대 선택하지 마라)
{allowed_ids}
## 블록 상세 설명 (위 목록의 when/not_for 참고)
{catalog}
## 출력 형식 (반드시 JSON만. 설명 없이.)
@@ -303,8 +321,7 @@ async def _opus_block_recommendation(
f"## Zone 배정 규칙 (절대 규칙)\n"
f"- flow 꼭지 → body / left / hero zone\n"
f"- reference 꼭지 → sidebar zone\n"
f"- conclusion 꼭지 → **반드시 footer zone** + block_type은 **conclusion-accent-bar**\n"
f"- detail_target 꼭지 → details-block\n"
f"- conclusion 꼭지 → **반드시 footer zone** (banner-gradient 권장)\n"
f"- sidebar(35%)에는 시각화 블록 금지\n\n"
f"## 꼭지 목록\n{topics_text}\n\n"
f"## 블록 후보 (FAISS 검색 결과)\n{block_candidates}\n\n"
@@ -321,47 +338,22 @@ async def _opus_block_recommendation(
try:
async with httpx.AsyncClient(timeout=None) as client:
response = await client.post(
async with client.stream(
"POST",
f"{kei_url}/api/message",
json={
"message": prompt,
"session_id": "design-agent-opus",
"mode": "chat",
"mode_hint": "chat",
},
timeout=None,
)
) as response:
if response.status_code != 200:
logger.warning(f"[Step A-2] Kei API HTTP {response.status_code}")
return None
if response.status_code != 200:
logger.warning(f"[Step A-2] Kei API HTTP {response.status_code}")
return None
full_text = await _stream_sse_tokens(response)
# SSE 응답 파싱 (kei_client.py와 동일 패턴)
import re
tokens = []
events = re.split(r'\r?\n\r?\n', response.text)
for event in events:
if not event.strip():
continue
event_type = ""
event_data = ""
for line in event.split('\n'):
line = line.strip('\r')
if line.startswith('event:'):
event_type = line[6:].strip()
elif line.startswith('data:'):
event_data = line[5:].strip()
if event_type == 'token' and event_data:
try:
import json as _json
token = _json.loads(event_data)
if isinstance(token, str):
tokens.append(token)
except Exception:
tokens.append(event_data)
elif event_type == 'done':
break
full_text = "".join(tokens)
if not full_text:
logger.warning("[Step A-2] Kei API 응답 텍스트 없음")
return None
@@ -382,6 +374,36 @@ async def _opus_block_recommendation(
return None
async def _stream_sse_tokens(response: httpx.Response) -> str:
"""SSE 스트리밍 응답에서 토큰을 실시간 수집한다."""
tokens: list[str] = []
event_type = ""
async for line in response.aiter_lines():
line = line.strip()
if not line:
event_type = ""
continue
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data = line[5:].strip()
if event_type == "token" and data:
try:
token = json.loads(data)
if isinstance(token, str):
tokens.append(token)
except json.JSONDecodeError:
tokens.append(data)
elif event_type == "done":
break
elif event_type == "error":
logger.warning(f"Kei API SSE 에러: {data}")
break
return "".join(tokens)
async def create_layout_concept(
content: str,
analysis: dict[str, Any],
@@ -443,9 +465,13 @@ async def create_layout_concept(
f"강조:{t.get('emphasis', False)}]"
)
if t.get("detail_target"):
line += " → ★detail_target (details-block으로 배치: 요약+상세 접기/펼치기)"
line += " → ★detail_target (callout-solution으로 요약 배치 권장)"
topics_summary.append(line)
# 허용 블록 ID 목록 생성 (catalog.yaml에 등록된 블록만)
allowed_ids_list = _get_registered_block_ids()
allowed_ids_str = ", ".join(sorted(allowed_ids_list))
system = STEP_B_PROMPT.format(
preset_name=preset_name,
preset_description=preset["description"],
@@ -453,6 +479,7 @@ async def create_layout_concept(
grid_columns=preset["grid_columns"],
grid_rows=preset["grid_rows"],
zone_descriptions=zone_desc,
allowed_ids=allowed_ids_str,
catalog=catalog_text,
)
@@ -520,6 +547,17 @@ async def create_layout_concept(
blocks = all_blocks
if blocks is not None:
# 블록 ID 검증: catalog에 없는 블록은 거부하고 안전한 대체 블록 사용
registered_ids = _get_registered_block_ids()
for block in blocks:
block_type = block.get("type", "")
if block_type and block_type not in registered_ids:
logger.warning(
f"[Step B 검증] 미등록 블록 '{block_type}' 거부 → "
f"'callout-solution'으로 교체"
)
block["type"] = "callout-solution"
# area명 검증: 프리셋 zone에 없으면 기본 zone으로 매핑
valid_zones = {z for z in preset["zones"] if z != "header"}
default_zone = "body" if "body" in valid_zones else next(iter(valid_zones))
@@ -530,7 +568,7 @@ async def create_layout_concept(
)
block["area"] = default_zone
# 6번: conclusion 꼭지 → footer zone + conclusion-accent-bar 강제
# 6번: conclusion 꼭지 → footer zone 강제
for block in blocks:
topic = next(
(t for t in analysis.get("topics", [])
@@ -543,11 +581,6 @@ async def create_layout_concept(
f"conclusion 꼭지 {block.get('topic_id')} → footer 강제 이동"
)
block["area"] = "footer"
if block.get("type") != "conclusion-accent-bar":
logger.warning(
f"conclusion 블록 {block.get('type')} → conclusion-accent-bar 강제"
)
block["type"] = "conclusion-accent-bar"
# 5번: zone별 height_cost 합산 검증 — 초과 시 큰 블록 교체
_validate_height_budget(blocks, preset)
@@ -585,22 +618,6 @@ def _fallback_layout(
for topic in analysis.get("topics", []):
role = topic.get("role", "flow")
if topic.get("detail_target"):
# detail_target → details-block으로 배치
if role == "reference" and preset_name == "sidebar-right":
area = "sidebar"
else:
area = "body" if preset_name != "two-column" else "left"
blocks.append({
"area": area,
"type": "details-block",
"topic_id": topic.get("id", len(blocks) + 1),
"reason": f"detail_target: {topic.get('title', '')}",
"size": "medium",
"char_guide": {"summary_text": 60, "detail_content": 300},
})
continue
if role == "reference" and preset_name == "sidebar-right":
area = "sidebar"
elif topic.get("layer") == "conclusion":
@@ -608,9 +625,12 @@ def _fallback_layout(
else:
area = "body" if preset_name != "two-column" else "left"
# conclusion → banner-gradient, 그 외 → topic-left-right
block_type = "banner-gradient" if topic.get("layer") == "conclusion" else "topic-left-right"
blocks.append({
"area": area,
"type": "topic-header",
"type": block_type,
"topic_id": topic.get("id", 0),
"reason": topic.get("title", ""),
"size": "medium",
@@ -637,13 +657,12 @@ HEIGHT_COST_PX = {
# xlarge/large → medium/compact 교체 후보
DOWNGRADE_MAP = {
"venn-diagram": "card-text-grid",
"pyramid-hierarchy": "card-numbered",
"venn-diagram": "card-icon-desc",
"card-step-vertical": "card-numbered",
"image-grid-2x2": "image-row-2col",
"compare-3col-badge": "comparison-2col",
"card-image-3col": "card-text-grid",
"card-tag-image": "card-text-grid",
"card-image-3col": "card-icon-desc",
"card-tag-image": "card-icon-desc",
"card-compare-3col": "comparison-2col",
"card-image-round": "card-icon-desc",
}
@@ -726,17 +745,35 @@ def _validate_height_budget(blocks: list[dict], preset: dict) -> None:
def _parse_json(text: str) -> dict[str, Any] | None:
"""텍스트에서 JSON을 추출한다."""
patterns = [
r"```json\s*(.*?)```",
r"```\s*(.*?)```",
r"(\{.*\})",
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
continue
"""텍스트에서 JSON을 추출한다.
Kei API가 마크다운 리스트 접두사(- )를 붙여 응답하는 경우에도 처리.
"""
# 전처리: 각 줄 앞의 마크다운 리스트 접두사(- ) 제거
lines = text.split("\n")
cleaned_lines = []
for line in lines:
stripped = line.lstrip()
if stripped.startswith("- "):
cleaned_lines.append(stripped[2:])
elif stripped.startswith("* "):
cleaned_lines.append(stripped[2:])
else:
cleaned_lines.append(stripped)
cleaned = "\n".join(cleaned_lines)
# 원본 먼저 시도 → 클린 버전 시도
for target in [text, cleaned]:
patterns = [
r"```json\s*(.*?)```",
r"```\s*(.*?)```",
r"(\{.*\})",
]
for pattern in patterns:
match = re.search(pattern, target, re.DOTALL)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
continue
return None

View File

@@ -63,10 +63,9 @@ KEI_PROMPT = (
async def classify_content(content: str) -> dict[str, Any] | None:
"""1단계: Kei API를 통해 꼭지를 추출하고 분석한다.
1차: Kei API (persona + RAG + 사고)
fallback: Anthropic API 직접 호출
Kei API만 사용. Sonnet fallback 없음.
Kei API 실패 시 None 반환 → pipeline.py에서 manual_classify() 안전망.
"""
# 1차: Kei API
result = await _call_kei_api(content)
if result:
logger.info(
@@ -75,47 +74,36 @@ async def classify_content(content: str) -> dict[str, Any] | None:
)
return result
# fallback: Anthropic 직접
logger.warning("Kei API 실패. Anthropic 직접 호출로 fallback.")
result = await _call_anthropic_direct(content)
if result:
logger.info(
f"[Anthropic] 꼭지 추출 완료: {result.get('title', '')}, "
f"{len(result.get('topics', []))}개 꼭지"
)
return result
logger.warning("[Kei API] 꼭지 추출 실패. manual_classify로 안전망 적용.")
return None
async def _call_kei_api(content: str) -> dict[str, Any] | None:
"""Kei API를 통해 꼭지 추출. SSE 스트리밍 응답을 파싱."""
"""Kei API를 통해 꼭지 추출. SSE 스트리밍으로 실시간 수신."""
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
try:
async with httpx.AsyncClient(timeout=None) as client:
response = await client.post(
async with client.stream(
"POST",
f"{kei_url}/api/message",
json={
"message": KEI_PROMPT + content,
"session_id": "design-agent",
"mode": "chat",
"mode_hint": "chat",
},
timeout=None,
)
) as response:
if response.status_code != 200:
logger.warning(f"Kei API HTTP {response.status_code}")
return None
if response.status_code != 200:
logger.warning(f"Kei API HTTP {response.status_code}")
return None
# SSE 응답에서 토큰 수집
full_text = _extract_sse_text(response.text)
full_text = await _stream_sse_tokens(response)
if not full_text:
logger.warning("Kei API 응답에서 텍스트 추출 실패")
return None
# JSON 추출
result = _parse_json(full_text)
if result and "topics" in result:
return result
@@ -128,6 +116,43 @@ async def _call_kei_api(content: str) -> dict[str, Any] | None:
return None
async def _stream_sse_tokens(response: httpx.Response) -> str:
"""SSE 스트리밍 응답에서 토큰을 실시간 수집한다.
persona_agent의 SSE 이벤트:
- token: 텍스트 토큰 수집
- done: 완료, 중단
- error: 에러, 즉시 중단
- planning/planning_done/research_progress/warning: 스킵
"""
tokens: list[str] = []
event_type = ""
async for line in response.aiter_lines():
line = line.strip()
if not line:
event_type = ""
continue
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data = line[5:].strip()
if event_type == "token" and data:
try:
token = json.loads(data)
if isinstance(token, str):
tokens.append(token)
except json.JSONDecodeError:
tokens.append(data)
elif event_type == "done":
break
elif event_type == "error":
logger.warning(f"Kei API SSE 에러: {data}")
break
return "".join(tokens)
def _extract_sse_text(raw: str) -> str:
"""SSE 응답에서 토큰 텍스트를 수집한다. CRLF/LF 모두 처리."""
tokens = []

View File

@@ -75,7 +75,7 @@ def _resolve_template_path(env: Environment, block_type: str) -> str | None:
검색 순서:
0. catalog.yaml 매핑 (id → template 경로, 최우선)
1. 정확한 경로 (blocks/cards/card-text-grid.html 등 — 팀장이 카테고리 포함 지정)
1. 정확한 경로 (blocks/cards/card-icon-desc.html 등 — 팀장이 카테고리 포함 지정)
2. 카테고리 폴더 검색 (blocks/{category}/{block_type}.html)
3. _legacy fallback (blocks/_legacy/{block_type}.html)
4. 루트 fallback (blocks/{block_type}.html)
@@ -91,7 +91,7 @@ def _resolve_template_path(env: Environment, block_type: str) -> str | None:
if not catalog_path.endswith(".html"):
candidates.append(f"{catalog_path}.html")
# 1. 이미 카테고리 경로가 포함된 경우 (예: "cards/card-text-grid")
# 1. 이미 카테고리 경로가 포함된 경우 (예: "cards/card-icon-desc")
if "/" in block_type:
candidates.append(f"blocks/{block_type}.html")
candidates.append(f"blocks/{block_type}") # .html 이미 포함된 경우
@@ -116,6 +116,7 @@ def _resolve_template_path(env: Environment, block_type: str) -> str | None:
return None
def _preprocess_svg_data(block_type: str, block_data: dict[str, Any]) -> dict[str, Any]:
"""P2-B: SVG 시각화 블록의 좌표를 사전 계산한다.