OpenWebUI連携とWorker実負荷ルーティングを実装した話

前回までで、ドメイン分類・スコアリングによる自動ルーティングが動くようになった。

今回はその続きで、2つのことをやった。

  • OpenWebUIのチャット画面からAIエージェントに処理を投げられるようにした
  • WorkerのCPU/GPU負荷をHeartbeatで収集して、ルーティングに実負荷を反映させた

OpenWebUI連携:Pipe Functionを使う

OpenWebUIには「Pipe Function」という仕組みがある。

Pythonのクラスを書いてOpenWebUIに登録すると、それがチャット画面のモデル選択に現れる。ユーザーがそのモデルを選んでメッセージを送ると、pipe() メソッドが呼ばれる。

これを使えば、チャット入力をそのままCoordinator APIに投げることができる。

処理の流れ

ユーザーがチャットにメッセージを送信
  ↓
coordinator_pipe.py の pipe() が呼ばれる
  ↓
POST /route にプロンプトを投げる
  ↓
Coordinatorがドメイン分類・スコアリング・Worker選択
  ↓
Redisキューにタスクを積む
  ↓
「📡 ルーティング情報」を先にチャットに返す
  ↓
2秒おきに GET /task/{task_id} をポーリング
  ↓
Workerが推論完了 → PostgreSQLに結果を保存
  ↓
ポーリングが "done" を検知
  ↓
レスポンスをチャットに返す

ポイントは OpenWebUIとWorkerが直接通信していないことだ。
Coordinatorが中継しているので、Workerが何台あっても・どのモデルを使っていても、
OpenWebUI側からは 🧭 Coordinator (Auto Route) という1つのモデルに見える。

Pipe Functionの実装

OpenWebUI 0.9.x は全面async対応なので、pipe()async def で書く。

class Pipe:
    class Valves(BaseModel):
        COORDINATOR_URL: str = Field(default="http://192.168.0.40:8000")
        POLL_INTERVAL: float = Field(default=2.0)
        POLL_MAX_RETRIES: int = Field(default=150)
        SHOW_METADATA: bool = Field(default=True)

    def pipes(self) -> list[dict]:
        return [{"id": "coordinator-route", "name": "🧭 Coordinator (Auto Route)"}]

    async def pipe(self, body: dict, __user__: dict | None = None) -> AsyncGenerator[str, None]:
        # 1. /route にタスク投入
        # 2. ポーリングで完了を待つ
        # 3. レスポンスを返す

Valves クラスに書いたフィールドは管理画面のGUIから設定できる。
COORDINATOR_URL だけ環境に合わせて変えれば動く。

動いたときの画面

📡 ルーティング情報
- ドメイン: `general`
- モデル: `qwen2.5:7b`
- Worker: `rtx3070ti`
- スコア: `0.6749`

⏳ 処理中...

(回答が表示される)

---
domain: general / model: qwen2.5:7b / worker: rtx3070ti

ドメイン・モデル・Workerが透けて見えるのが便利だった。
どのモデルが選ばれているかを意識しながら使える。


Worker実負荷ルーティング

前回、スコアリングの式はこうなっていた。

score = 0.4 × speed_score
      + 0.4 × domain_score
      + 0.2 × queue_score   ← ここが固定 1.0 だった

queue_score が固定1.0のままだと、同じモデルを持つWorkerが複数台いるとき、
どちらに振り分けるかが運任せになってしまう。

取得する負荷情報

Heartbeatのペイロードに以下を追加した。

{
  "worker_id": "rtx3070ti",
  "models": ["gemma3:12b", "qwen2.5:7b", ...],
  "load_info": {
    "cpu_util": 0.0,
    "gpu_util": 3,
    "vram_used_mb": 6747,
    "vram_total_mb": 8192,
    "running_tasks": 0
  }
}
  • CPU使用率は /proc/stat から直接読む(psutil不要)
  • GPU情報は nvidia-smi で取得
  • running_tasks はRedisのカウンタで管理(タスク受取時に+1、完了時に-1)

WSL2環境では nvidia-smi がシステムPATHに入っておらず、
/usr/lib/wsl/lib/nvidia-smi というパスを明示的に指定する必要があった。
systemdサービスのPATHは最小限なので、フルパスが必要という落とし穴がある。

queue_scoreの計算

def get_worker_load_score(worker_data: dict) -> float:
    scores = []

    # タスク数: 0件=1.0、増えるほど低下
    if running_tasks >= 0:
        scores.append(1.0 / (1.0 + running_tasks))

    # CPU使用率: 低いほど高スコア
    if cpu_util >= 0:
        scores.append(max(0.0, 1.0 - cpu_util / 100.0))

    # GPU使用率(GPU搭載Workerのみ)
    if gpu_util >= 0:
        scores.append(max(0.0, 1.0 - gpu_util / 100.0))

    # VRAM空き容量: 空きが多いほど高スコア
    if vram_used_mb >= 0 and vram_total_mb > 0:
        scores.append(max(0.0, 1.0 - vram_used_mb / vram_total_mb))

    return sum(scores) / len(scores)

動かしてみると、こういう結果になった。

{
  "worker_id": "moon",
  "cpu_util": 0.0,
  "load_score": 1.0
},
{
  "worker_id": "rtx3070ti",
  "cpu_util": 0.0,
  "gpu_util": 3.0,
  "vram_used_mb": 6747.0,
  "vram_total_mb": 8192.0,
  "load_score": 0.7866
}

rtx3070ti のスコアが 0.7866 にとどまっているのは、
VRAMを既に6.7GB(82%)使っているからだ。
モデルが常駐している分が正直に反映されている。


実績が育つと選ばれるモデルが変わる

最初は全モデルのスコアが 0.3 で横並びだった。
/compare で実績を貯めて stats_updater.py を実行すると、こうなった。

=== domain: general ===
model                  avg_ms
llama3.2:3b             5,239ms  → score: 0.6749
qwen2.5:7b              8,381ms  → score: 0.6749
gemma3:1b              46,626ms  → score: 0.3974
LFM2.5-1.2B-JP         79,984ms  → score: 0.3616
gemma3:12b            167,886ms  → score: 0.2987

general ドメインでLFM2.5が79秒かかっていたことが判明した。
軽量モデルは短いコード生成は速いが、長文の自由回答は苦手だった。
スコアリングがこれをちゃんとペナルティとして扱っている。

OpenWebUIで「面白いトリビアを教えて」と送ると、
以前は LFM2.5-1.2B-JP(score: 0.3、回答がおかしい)が選ばれていたのが、
実績が積まれてからは qwen2.5:7b(score: 0.6749)が選ばれるようになった。


まとめ

今回実装した2つは、どちらも「インフラとして育てる」方向の仕組みだった。

  • OpenWebUI連携:チャット画面が分散AIの司令塔への入口になった
  • 実負荷ルーティング:使えば使うほど賢くなる仕組みが動き始めた

現時点のスコアリング式をまとめると:

score = 0.4 × speed_score   (実績avg_msから計算)
      + 0.4 × domain_score  (sample_countから計算)
      + 0.2 × queue_score   (CPU/GPU/VRAM/タスク数から計算)

次は、タスクの種類に応じてキューを分割(tasks:gpu / tasks:cpu)することと、
チャット履歴をCoordinatorに渡して文脈を保持する仕組みを作っていきたい。


※ 記事内のIPアドレスは実際の環境とは異なる値を使用しています。