AIは会話を覚えていた。でも“どの会話か”は記録していなかった ― thread_id が producer まで届かず、スレッド記憶の通り道が静かに死んでいた話と、ライブ給餌での解決

家庭内クラスタで動かしている自作の分散AI基盤「Coordinator」の話、第27回です。前回(第26回)で「会話メモリ」の consumer を作り、過去の会話から抽出した記憶を回答に注入できるようにしました。これで機能としては一通り揃った——と、そのときは思っていました。

ところが今回、「あと設定を1行変えるだけ」のはずの仕上げ作業で、記憶の通り道が途中で静かに途切れていたことに気づきます。動いているように見えて、実は何も起きていない。この基盤で何度も踏んできたタイプの落とし穴でした。

前提:global な記憶と thread な記憶

会話メモリには2つのスコープがあります。

  • global… どの会話からでも引ける恒久的な事実。「PostgreSQL に接続するときは -h localhost を付ける」のような、横断して効いてほしい知識。
  • thread… その会話スレッド内だけで引ける文脈。長いやり取りの途中で「さっき決めた方針」を後のターンで思い出すための継続性。

consumer(読み出し側)の検索フィルタは「global 全体 または 今いる thread」を引くように作ってありました。pipe(OpenWebUI 側)も、チャットの chat_idthread_id として Coordinator に渡す配線が済んでいます。つまり読み出し側は thread を扱えていた

残った仕事は producer(書き込み側)。記憶を抽出して保存するとき、global 固定だったのを「thread_id があれば thread、なければ global」に切り替えるだけ——設定値(MEMORY_AUTO_SCOPE)を thread_if_available にすればいい。コードの分岐はすでに書いてある。env を1行変えて再起動すれば終わり、のはずでした。

通り道を辿ったら、途中で切れていた

切り替える前に一応、書き込み側に本当に thread_id が流れてくるのかを確認しました。これが正解でした。

記憶の給餌(書き込み候補をキューに積む処理)の唯一の経路は、夜間バッチ feed_memory.py --from-pg です。完了済みタスクを走査してキューに積むのですが、そのコードを見ると——

job = {
    "user":           prompt,
    "assistant":      response or "",
    "thread_id":      "",            # consumer 経路を作るまで global 運用
    "source_task_id": task_id,
}

thread_id空文字でハードコードされていました。さらに、そもそも完了タスクを保存している tasks テーブルには thread_id の列がなく、ルーター(/route)も保存していない。

結論。auto で覚えた記憶はすべて global に落ちていた。consumer の「thread 分岐」は、そこにマッチするデータが一件も書かれていない死んだ経路だったわけです。

ここで env を thread_if_available に変えても、thread_id が空のままなので分岐は常に global に降格する。何も変わらないのに、エラーも警告も出ない。この基盤で過去に何度も刺されてきた、pg_hba.conf/32 行漏れや systemd drop-in の [Service] ヘッダ漏れと同じ「動いて見えて実は無音」の一種です。出荷(=機能クローズ宣言)の前に気づけたのは運が良かった。

二つの道:スキーマを足すか、ライブで流すか

thread_id を書き込み側まで届けるには、大きく2案ありました。

案A:夜間バッチの経路を拡張する

tasks テーブルに thread_id 列を足し、/route の INSERT を直して、夜間バッチで読み出して伝播させる。しかしこれは真実の保管庫であるテーブルのスキーマ変更と、全リクエストが通るコアルーターの INSERT 改修を伴います。過去に /route へは OpenWebUI の内部テンプレートが紛れ込む事故もあり、ここは慎重に触りたい場所。

それ以上に、概念的に弱い。夜間バッチが thread 記憶を書いても、「その会話に後日また戻ってきたとき」しか効きません。夜間バッチの本来の価値は、横断して効く global な恒久事実を拾うことのはずです。

案B:会話が終わった瞬間に、ライブで流す(採用)

発想を変えます。「会話の完了を見ていて、かつ原文・回答・chat_id を全部握っている主体」は誰か——それは pipe です。pipe は回答が返ってくるのをポーリングで待っているので、done になった瞬間に「原文の質問・回答・chat_id」を一度に手にしています。

そこで、Coordinator に薄い受け口 POST /ingest_memory を新設し、pipe が done を受けた直後にそこへ投げることにしました。これなら——

  • 記憶が生まれたまさにその会話の thread_id(ライブの正しい値)を運べる
  • スキーマ変更ゼロ・/route の INSERT は不変・Worker は無改修
  • 夜間バッチは global のまま温存できる → global(恒久・横断)と thread(会話内継続)が経路レベルで綺麗に分かれる

実装:薄く・汚さず・遅らせず

新エンドポイントは徹底して薄く作りました。やることは「キューに積むだけ」。実際に覚えるかどうかの判断は、これまで通り抽出モデル(moon 上の qwen2.5:7b)に任せます。

@app.post("/ingest_memory")
def ingest_memory(req: IngestMemoryRequest):
    prompt = (req.prompt or "").strip()
    if not prompt:
        return {"queued": False, "reason": "empty prompt"}
    # Web検索・RAG の結果は会話メモリの対象外(夜間バッチの除外と揃える)
    if req.model and req.model in MEMORY_INGEST_EXCLUDE_MODELS:
        return {"queued": False, "reason": f"excluded model {req.model}"}
    job = {
        "user":           prompt,
        "assistant":      req.response or "",
        "thread_id":      req.thread_id or "",
        "source_task_id": req.source_task_id or "",
    }
    r.lpush(QUEUE_MEMORY_INGEST, json.dumps(job, ensure_ascii=False))
    return {"queued": True}

ここで地味に大事なのが、pipe が送る promptユーザーが実際に打った原文だということです。前回の consumer 設計で「記憶を注入した版」と「原文」を分離し、保存系には常に原文を流すことで記憶が自分の注入文を読んで“また記憶”するループを断ち切りました(前回記事参照)。今回のライブ給餌も原文を流すので、その分離は保たれます。

pipe 側は、給餌を回答を画面に出し終えた後に行います。ユーザーの体感速度はゼロ秒で変わらない。さらに失敗しても会話には一切影響しないよう、例外は全部握りつぶす best-effort です。/ingest_memory はキューに積むだけで即座に返り、重い抽出は裏の Worker が非同期にやります。

yield response_text   # ← ユーザーへの応答はここで表示済み

# 応答表示後に best-effort で給餌(失敗しても表に出さない)
if self.valves.INGEST_MEMORY and model not in ("web_search", "rag"):
    try:
        async with aiohttp.ClientSession() as session:
            await session.post(f"{base_url}/ingest_memory", json={
                "prompt":         user_message,   # 原文(注入版ではない)
                "response":       response_text,
                "thread_id":      chat_id or "",
                "source_task_id": task_id,
                "model":          model,
            }, timeout=aiohttp.ClientTimeout(total=self.valves.INGEST_TIMEOUT))
    except Exception:
        pass

会話のたびに抽出モデルを動かすことになるので、負荷が気になる人向けに INGEST_MEMORY という ON/OFF スイッチ(Valve)も付けました。

最後に Worker 側の env を MEMORY_AUTO_SCOPE=thread_if_available に切り替え。これでライブ給餌(thread_id あり)は thread、夜間バッチ(thread_id 空)は global に自動で振り分けられます。設定1行の意味が、ようやく噛み合いました。

一気通貫を実機で確かめる

OpenWebUI で普通に1往復しただけで、経路全体が動きました。Coordinator のログ:

[ingest_memory] queued src=9b612715-… thread=16fce75f-afbc-…  ← pipe(実機)から実 chat_id 到達

そして rag-tools 上の抽出 Worker のログ:

[remember] scope=thread thread=16fce75f-afbc-… kind=decision  ← 実 chat_id で thread スコープ保存

pipe → Coordinator → Worker → Qdrant が、実 chat_id を thread として運び切りました。続けて同じ会話で関連質問をすると、consumer 側のログに [memory] injected (thread=… top=0.556)。書いた thread 記憶を、同じ会話の次のターンで読み戻せています。書き込みと読み出しが、やっと両輪で回りました。

余談:引き継ぎ資料が、自分について嘘をついていた

この作業中にもう一つ気づいたことがあります。私はセッションごとに、全ソースを埋め込んだ「引き継ぎ資料」を更新して継続性を保っているのですが、その資料に埋め込まれていた pipe のコードが、実機より古い版(6-15 の chat_id 配線を欠いた状態)でした。実機は正しく新しかった。普段は「資料が最新」を前提にしているので、これは逆向きのズレです。危うく、古いコードをベースに編集して新しい配線を巻き戻すところでした。資料を信じる前に実物と突き合わせる——という当たり前を、改めて記録に残しました。

残った宿題:恒久事実が、一つの会話に閉じてしまう

今回の方式には正直に書いておくべき割り切りがあります。ライブ給餌はその会話のやり取りを全部 thread スコープにするため、本当は横断して効いてほしい恒久事実——たとえば会話の中で決めた「-h localhost を付ける方針」——も、その1スレッドに閉じ込められてしまいます。

理想は、抽出モデル自身に「これは global / これは thread」を判断させる方式(extractor モード)です。ただ以前の計測で、抽出モデルのスコープ判断は不安定だと分かっていて、いったん避けた経緯がある。なので次は、抽出モデルのスコープ判定がどれだけ信用できるかを計測してから、ここを詰めます。「計測してから信じる」は、この基盤を通してずっと守っている方針です。

まとめ

今回の学びを3つに絞ると——

  • 「設定を変えるだけ」を疑う。 無音の no-op は、エラーより質が悪い。通り道は端から端まで辿る。
  • 最小改変の経路を選ぶ。 スキーマやコアルーターを触らず、すでに必要な情報を握っている主体(pipe)に仕事をさせると、影響範囲が驚くほど小さくなる。
  • 役割は経路で分ける。 global と thread を「夜間バッチ」と「ライブ給餌」という別経路に割り当てたことで、設定が自然に意味を持った。

これで会話メモリは、覚える(producer)・思い出す(consumer)・スレッドを区別する、までが揃いました。次は注入しきい値の再計測と、上に書いた「スコープ判定」の宿題に取り組む予定です。

← 前回:第26回 AIに会話の記憶を思い出させたら、その思い出を“また記憶”するループ寸前だった
→ 次回:(未公開)