前回までで、ドメイン分類・スコアリングによる自動ルーティングが動くようになった。
今回はその続きで、2つのことをやった。
- OpenWebUIのチャット画面からAIエージェントに処理を投げられるようにした
- WorkerのCPU/GPU負荷をHeartbeatで収集して、ルーティングに実負荷を反映させた
OpenWebUI連携:Pipe Functionを使う
OpenWebUIには「Pipe Function」という仕組みがある。
Pythonのクラスを書いてOpenWebUIに登録すると、それがチャット画面のモデル選択に現れる。ユーザーがそのモデルを選んでメッセージを送ると、pipe() メソッドが呼ばれる。
これを使えば、チャット入力をそのままCoordinator APIに投げることができる。
処理の流れ
ユーザーがチャットにメッセージを送信
↓
coordinator_pipe.py の pipe() が呼ばれる
↓
POST /route にプロンプトを投げる
↓
Coordinatorがドメイン分類・スコアリング・Worker選択
↓
Redisキューにタスクを積む
↓
「📡 ルーティング情報」を先にチャットに返す
↓
2秒おきに GET /task/{task_id} をポーリング
↓
Workerが推論完了 → PostgreSQLに結果を保存
↓
ポーリングが "done" を検知
↓
レスポンスをチャットに返す
ポイントは OpenWebUIとWorkerが直接通信していないことだ。
Coordinatorが中継しているので、Workerが何台あっても・どのモデルを使っていても、
OpenWebUI側からは 🧭 Coordinator (Auto Route) という1つのモデルに見える。
Pipe Functionの実装
OpenWebUI 0.9.x は全面async対応なので、pipe() は async def で書く。
class Pipe:
class Valves(BaseModel):
COORDINATOR_URL: str = Field(default="http://192.168.0.40:8000")
POLL_INTERVAL: float = Field(default=2.0)
POLL_MAX_RETRIES: int = Field(default=150)
SHOW_METADATA: bool = Field(default=True)
def pipes(self) -> list[dict]:
return [{"id": "coordinator-route", "name": "🧭 Coordinator (Auto Route)"}]
async def pipe(self, body: dict, __user__: dict | None = None) -> AsyncGenerator[str, None]:
# 1. /route にタスク投入
# 2. ポーリングで完了を待つ
# 3. レスポンスを返す
Valves クラスに書いたフィールドは管理画面のGUIから設定できる。
COORDINATOR_URL だけ環境に合わせて変えれば動く。
動いたときの画面
📡 ルーティング情報
- ドメイン: `general`
- モデル: `qwen2.5:7b`
- Worker: `rtx3070ti`
- スコア: `0.6749`
⏳ 処理中...
(回答が表示される)
---
domain: general / model: qwen2.5:7b / worker: rtx3070ti
ドメイン・モデル・Workerが透けて見えるのが便利だった。
どのモデルが選ばれているかを意識しながら使える。
Worker実負荷ルーティング
前回、スコアリングの式はこうなっていた。
score = 0.4 × speed_score
+ 0.4 × domain_score
+ 0.2 × queue_score ← ここが固定 1.0 だった
queue_score が固定1.0のままだと、同じモデルを持つWorkerが複数台いるとき、
どちらに振り分けるかが運任せになってしまう。
取得する負荷情報
Heartbeatのペイロードに以下を追加した。
{
"worker_id": "rtx3070ti",
"models": ["gemma3:12b", "qwen2.5:7b", ...],
"load_info": {
"cpu_util": 0.0,
"gpu_util": 3,
"vram_used_mb": 6747,
"vram_total_mb": 8192,
"running_tasks": 0
}
}
- CPU使用率は
/proc/statから直接読む(psutil不要) - GPU情報は
nvidia-smiで取得 running_tasksはRedisのカウンタで管理(タスク受取時に+1、完了時に-1)
WSL2環境では nvidia-smi がシステムPATHに入っておらず、
/usr/lib/wsl/lib/nvidia-smi というパスを明示的に指定する必要があった。
systemdサービスのPATHは最小限なので、フルパスが必要という落とし穴がある。
queue_scoreの計算
def get_worker_load_score(worker_data: dict) -> float:
scores = []
# タスク数: 0件=1.0、増えるほど低下
if running_tasks >= 0:
scores.append(1.0 / (1.0 + running_tasks))
# CPU使用率: 低いほど高スコア
if cpu_util >= 0:
scores.append(max(0.0, 1.0 - cpu_util / 100.0))
# GPU使用率(GPU搭載Workerのみ)
if gpu_util >= 0:
scores.append(max(0.0, 1.0 - gpu_util / 100.0))
# VRAM空き容量: 空きが多いほど高スコア
if vram_used_mb >= 0 and vram_total_mb > 0:
scores.append(max(0.0, 1.0 - vram_used_mb / vram_total_mb))
return sum(scores) / len(scores)
動かしてみると、こういう結果になった。
{
"worker_id": "moon",
"cpu_util": 0.0,
"load_score": 1.0
},
{
"worker_id": "rtx3070ti",
"cpu_util": 0.0,
"gpu_util": 3.0,
"vram_used_mb": 6747.0,
"vram_total_mb": 8192.0,
"load_score": 0.7866
}
rtx3070ti のスコアが 0.7866 にとどまっているのは、
VRAMを既に6.7GB(82%)使っているからだ。
モデルが常駐している分が正直に反映されている。
実績が育つと選ばれるモデルが変わる
最初は全モデルのスコアが 0.3 で横並びだった。
/compare で実績を貯めて stats_updater.py を実行すると、こうなった。
=== domain: general ===
model avg_ms
llama3.2:3b 5,239ms → score: 0.6749
qwen2.5:7b 8,381ms → score: 0.6749
gemma3:1b 46,626ms → score: 0.3974
LFM2.5-1.2B-JP 79,984ms → score: 0.3616
gemma3:12b 167,886ms → score: 0.2987
general ドメインでLFM2.5が79秒かかっていたことが判明した。
軽量モデルは短いコード生成は速いが、長文の自由回答は苦手だった。
スコアリングがこれをちゃんとペナルティとして扱っている。
OpenWebUIで「面白いトリビアを教えて」と送ると、
以前は LFM2.5-1.2B-JP(score: 0.3、回答がおかしい)が選ばれていたのが、
実績が積まれてからは qwen2.5:7b(score: 0.6749)が選ばれるようになった。
まとめ
今回実装した2つは、どちらも「インフラとして育てる」方向の仕組みだった。
- OpenWebUI連携:チャット画面が分散AIの司令塔への入口になった
- 実負荷ルーティング:使えば使うほど賢くなる仕組みが動き始めた
現時点のスコアリング式をまとめると:
score = 0.4 × speed_score (実績avg_msから計算)
+ 0.4 × domain_score (sample_countから計算)
+ 0.2 × queue_score (CPU/GPU/VRAM/タスク数から計算)
次は、タスクの種類に応じてキューを分割(tasks:gpu / tasks:cpu)することと、
チャット履歴をCoordinatorに渡して文脈を保持する仕組みを作っていきたい。
※ 記事内のIPアドレスは実際の環境とは異なる値を使用しています。