家庭内クラスタで動かしている自作の分散AI基盤「Coordinator」の話、第27回です。前回(第26回)で「会話メモリ」の consumer を作り、過去の会話から抽出した記憶を回答に注入できるようにしました。これで機能としては一通り揃った——と、そのときは思っていました。
ところが今回、「あと設定を1行変えるだけ」のはずの仕上げ作業で、記憶の通り道が途中で静かに途切れていたことに気づきます。動いているように見えて、実は何も起きていない。この基盤で何度も踏んできたタイプの落とし穴でした。
前提:global な記憶と thread な記憶
会話メモリには2つのスコープがあります。
- global… どの会話からでも引ける恒久的な事実。「PostgreSQL に接続するときは
-h localhostを付ける」のような、横断して効いてほしい知識。 - thread… その会話スレッド内だけで引ける文脈。長いやり取りの途中で「さっき決めた方針」を後のターンで思い出すための継続性。
consumer(読み出し側)の検索フィルタは「global 全体 または 今いる thread」を引くように作ってありました。pipe(OpenWebUI 側)も、チャットの chat_id を thread_id として Coordinator に渡す配線が済んでいます。つまり読み出し側は thread を扱えていた。
残った仕事は producer(書き込み側)。記憶を抽出して保存するとき、global 固定だったのを「thread_id があれば thread、なければ global」に切り替えるだけ——設定値(MEMORY_AUTO_SCOPE)を thread_if_available にすればいい。コードの分岐はすでに書いてある。env を1行変えて再起動すれば終わり、のはずでした。
通り道を辿ったら、途中で切れていた
切り替える前に一応、書き込み側に本当に thread_id が流れてくるのかを確認しました。これが正解でした。
記憶の給餌(書き込み候補をキューに積む処理)の唯一の経路は、夜間バッチ feed_memory.py --from-pg です。完了済みタスクを走査してキューに積むのですが、そのコードを見ると——
job = {
"user": prompt,
"assistant": response or "",
"thread_id": "", # consumer 経路を作るまで global 運用
"source_task_id": task_id,
}
thread_id が空文字でハードコードされていました。さらに、そもそも完了タスクを保存している tasks テーブルには thread_id の列がなく、ルーター(/route)も保存していない。
結論。auto で覚えた記憶はすべて global に落ちていた。consumer の「thread 分岐」は、そこにマッチするデータが一件も書かれていない死んだ経路だったわけです。
ここで env を thread_if_available に変えても、thread_id が空のままなので分岐は常に global に降格する。何も変わらないのに、エラーも警告も出ない。この基盤で過去に何度も刺されてきた、pg_hba.conf の /32 行漏れや systemd drop-in の [Service] ヘッダ漏れと同じ「動いて見えて実は無音」の一種です。出荷(=機能クローズ宣言)の前に気づけたのは運が良かった。
二つの道:スキーマを足すか、ライブで流すか
thread_id を書き込み側まで届けるには、大きく2案ありました。
案A:夜間バッチの経路を拡張する
tasks テーブルに thread_id 列を足し、/route の INSERT を直して、夜間バッチで読み出して伝播させる。しかしこれは真実の保管庫であるテーブルのスキーマ変更と、全リクエストが通るコアルーターの INSERT 改修を伴います。過去に /route へは OpenWebUI の内部テンプレートが紛れ込む事故もあり、ここは慎重に触りたい場所。
それ以上に、概念的に弱い。夜間バッチが thread 記憶を書いても、「その会話に後日また戻ってきたとき」しか効きません。夜間バッチの本来の価値は、横断して効く global な恒久事実を拾うことのはずです。
案B:会話が終わった瞬間に、ライブで流す(採用)
発想を変えます。「会話の完了を見ていて、かつ原文・回答・chat_id を全部握っている主体」は誰か——それは pipe です。pipe は回答が返ってくるのをポーリングで待っているので、done になった瞬間に「原文の質問・回答・chat_id」を一度に手にしています。
そこで、Coordinator に薄い受け口 POST /ingest_memory を新設し、pipe が done を受けた直後にそこへ投げることにしました。これなら——
- 記憶が生まれたまさにその会話の
thread_id(ライブの正しい値)を運べる - スキーマ変更ゼロ・
/routeの INSERT は不変・Worker は無改修 - 夜間バッチは global のまま温存できる → global(恒久・横断)と thread(会話内継続)が経路レベルで綺麗に分かれる
実装:薄く・汚さず・遅らせず
新エンドポイントは徹底して薄く作りました。やることは「キューに積むだけ」。実際に覚えるかどうかの判断は、これまで通り抽出モデル(moon 上の qwen2.5:7b)に任せます。
@app.post("/ingest_memory")
def ingest_memory(req: IngestMemoryRequest):
prompt = (req.prompt or "").strip()
if not prompt:
return {"queued": False, "reason": "empty prompt"}
# Web検索・RAG の結果は会話メモリの対象外(夜間バッチの除外と揃える)
if req.model and req.model in MEMORY_INGEST_EXCLUDE_MODELS:
return {"queued": False, "reason": f"excluded model {req.model}"}
job = {
"user": prompt,
"assistant": req.response or "",
"thread_id": req.thread_id or "",
"source_task_id": req.source_task_id or "",
}
r.lpush(QUEUE_MEMORY_INGEST, json.dumps(job, ensure_ascii=False))
return {"queued": True}
ここで地味に大事なのが、pipe が送る prompt はユーザーが実際に打った原文だということです。前回の consumer 設計で「記憶を注入した版」と「原文」を分離し、保存系には常に原文を流すことで記憶が自分の注入文を読んで“また記憶”するループを断ち切りました(前回記事参照)。今回のライブ給餌も原文を流すので、その分離は保たれます。
pipe 側は、給餌を回答を画面に出し終えた後に行います。ユーザーの体感速度はゼロ秒で変わらない。さらに失敗しても会話には一切影響しないよう、例外は全部握りつぶす best-effort です。/ingest_memory はキューに積むだけで即座に返り、重い抽出は裏の Worker が非同期にやります。
yield response_text # ← ユーザーへの応答はここで表示済み
# 応答表示後に best-effort で給餌(失敗しても表に出さない)
if self.valves.INGEST_MEMORY and model not in ("web_search", "rag"):
try:
async with aiohttp.ClientSession() as session:
await session.post(f"{base_url}/ingest_memory", json={
"prompt": user_message, # 原文(注入版ではない)
"response": response_text,
"thread_id": chat_id or "",
"source_task_id": task_id,
"model": model,
}, timeout=aiohttp.ClientTimeout(total=self.valves.INGEST_TIMEOUT))
except Exception:
pass
会話のたびに抽出モデルを動かすことになるので、負荷が気になる人向けに INGEST_MEMORY という ON/OFF スイッチ(Valve)も付けました。
最後に Worker 側の env を MEMORY_AUTO_SCOPE=thread_if_available に切り替え。これでライブ給餌(thread_id あり)は thread、夜間バッチ(thread_id 空)は global に自動で振り分けられます。設定1行の意味が、ようやく噛み合いました。
一気通貫を実機で確かめる
OpenWebUI で普通に1往復しただけで、経路全体が動きました。Coordinator のログ:
[ingest_memory] queued src=9b612715-… thread=16fce75f-afbc-… ← pipe(実機)から実 chat_id 到達
そして rag-tools 上の抽出 Worker のログ:
[remember] scope=thread thread=16fce75f-afbc-… kind=decision ← 実 chat_id で thread スコープ保存
pipe → Coordinator → Worker → Qdrant が、実 chat_id を thread として運び切りました。続けて同じ会話で関連質問をすると、consumer 側のログに [memory] injected (thread=… top=0.556)。書いた thread 記憶を、同じ会話の次のターンで読み戻せています。書き込みと読み出しが、やっと両輪で回りました。
余談:引き継ぎ資料が、自分について嘘をついていた
この作業中にもう一つ気づいたことがあります。私はセッションごとに、全ソースを埋め込んだ「引き継ぎ資料」を更新して継続性を保っているのですが、その資料に埋め込まれていた pipe のコードが、実機より古い版(6-15 の chat_id 配線を欠いた状態)でした。実機は正しく新しかった。普段は「資料が最新」を前提にしているので、これは逆向きのズレです。危うく、古いコードをベースに編集して新しい配線を巻き戻すところでした。資料を信じる前に実物と突き合わせる——という当たり前を、改めて記録に残しました。
残った宿題:恒久事実が、一つの会話に閉じてしまう
今回の方式には正直に書いておくべき割り切りがあります。ライブ給餌はその会話のやり取りを全部 thread スコープにするため、本当は横断して効いてほしい恒久事実——たとえば会話の中で決めた「-h localhost を付ける方針」——も、その1スレッドに閉じ込められてしまいます。
理想は、抽出モデル自身に「これは global / これは thread」を判断させる方式(extractor モード)です。ただ以前の計測で、抽出モデルのスコープ判断は不安定だと分かっていて、いったん避けた経緯がある。なので次は、抽出モデルのスコープ判定がどれだけ信用できるかを計測してから、ここを詰めます。「計測してから信じる」は、この基盤を通してずっと守っている方針です。
まとめ
今回の学びを3つに絞ると——
- 「設定を変えるだけ」を疑う。 無音の no-op は、エラーより質が悪い。通り道は端から端まで辿る。
- 最小改変の経路を選ぶ。 スキーマやコアルーターを触らず、すでに必要な情報を握っている主体(pipe)に仕事をさせると、影響範囲が驚くほど小さくなる。
- 役割は経路で分ける。 global と thread を「夜間バッチ」と「ライブ給餌」という別経路に割り当てたことで、設定が自然に意味を持った。
これで会話メモリは、覚える(producer)・思い出す(consumer)・スレッドを区別する、までが揃いました。次は注入しきい値の再計測と、上に書いた「スコープ判定」の宿題に取り組む予定です。
← 前回:第26回 AIに会話の記憶を思い出させたら、その思い出を“また記憶”するループ寸前だった
→ 次回:(未公開)