diff --git a/resources/functions/openwebui_monitor.py b/resources/functions/openwebui_monitor.py index 158f403..d8d6484 100644 --- a/resources/functions/openwebui_monitor.py +++ b/resources/functions/openwebui_monitor.py @@ -8,8 +8,8 @@ """ import logging +import time from typing import Dict, Optional - from httpx import AsyncClient from pydantic import BaseModel, Field import json @@ -18,6 +18,27 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) +TRANSLATIONS = { + "en": { + "request_failed": "Request failed: {error_msg}", + "insufficient_balance": "Insufficient balance: Current balance `{balance:.4f}`", + "cost": "Cost: ${cost:.4f}", + "balance": "Balance: ${balance:.4f}", + "tokens": "Tokens: {input}+{output}", + "time_spent": "Time: {time:.2f}s", + "tokens_per_sec": "{tokens_per_sec:.2f} T/s", + }, + "zh": { + "request_failed": "请求失败: {error_msg}", + "insufficient_balance": "余额不足: 当前余额 `{balance:.4f}`", + "cost": "费用: ¥{cost:.4f}", + "balance": "余额: ¥{balance:.4f}", + "tokens": "Token: {input}+{output}", + "time_spent": "耗时: {time:.2f}s", + "tokens_per_sec": "{tokens_per_sec:.2f} T/s", + }, +} + class CustomException(Exception): pass @@ -28,12 +49,25 @@ class Valves(BaseModel): api_endpoint: str = Field(default="", description="openwebui-monitor's base url") api_key: str = Field(default="", description="openwebui-monitor's api key") priority: int = Field(default=5, description="filter priority") + language: str = Field(default="zh", description="language (en/zh)") + show_time_spent: bool = Field(default=True, description="show time spent") + show_tokens_per_sec: bool = Field(default=True, description="show tokens per second") + show_cost: bool = Field(default=True, description="show cost") + show_balance: bool = Field(default=True, description="show balance") + show_tokens: bool = Field(default=True, description="show tokens") def __init__(self): self.type = "filter" + self.name = "OpenWebUI Monitor" self.valves = self.Valves() self.outage_map: Dict[str, bool] = {} - + self.start_time: Optional[float] = None + + def get_text(self, key: str, **kwargs) -> str: + lang = self.valves.language if self.valves.language in TRANSLATIONS else "en" + text = TRANSLATIONS[lang].get(key, TRANSLATIONS["en"][key]) + return text.format(**kwargs) if kwargs else text + async def request(self, client: AsyncClient, url: str, headers: dict, json_data: dict): json_data = json.loads(json.dumps(json_data, default=lambda o: o.dict() if hasattr(o, "dict") else str(o))) @@ -41,14 +75,15 @@ async def request(self, client: AsyncClient, url: str, headers: dict, json_data: response.raise_for_status() response_data = response.json() if not response_data.get("success"): - logger.error("[usage_monitor] req monitor failed: %s", response_data) - raise CustomException("calculate usage failed, please contact administrator") + logger.error(self.get_text("request_failed", error_msg=response_data)) + raise CustomException(self.get_text("request_failed", error_msg=response_data)) return response_data async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None) -> dict: __user__ = __user__ or {} __metadata__ = __metadata__ or {} - user_id = __user__["id"] + self.start_time = time.time() + user_id = __user__.get("id", "default") client = AsyncClient() @@ -61,13 +96,12 @@ async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__: ) self.outage_map[user_id] = response_data.get("balance", 0) <= 0 if self.outage_map[user_id]: - logger.info("[usage_monitor] no balance: %s", user_id) - raise CustomException("no balance, please contact administrator") - + logger.info(self.get_text("insufficient_balance", balance=response_data.get("balance", 0))) + raise CustomException(self.get_text("insufficient_balance", balance=response_data.get("balance", 0))) return body except Exception as err: - logger.exception("[usage_monitor] error calculating usage: %s", err) + logger.exception(self.get_text("request_failed", error_msg=err)) if isinstance(err, CustomException): raise err raise Exception(f"error calculating usage, {err}") from err @@ -80,13 +114,13 @@ async def outlet( body: dict, __metadata__: Optional[dict] = None, __user__: Optional[dict] = None, - __event_emitter__: callable = None, + __event_emitter__: Optional[callable] = None, ) -> dict: __user__ = __user__ or {} __metadata__ = __metadata__ or {} - user_id = __user__["id"] + user_id = __user__.get("id", "default") - if self.outage_map[user_id]: + if self.outage_map.get(user_id, False): return body client = AsyncClient() @@ -99,23 +133,29 @@ async def outlet( json_data={"user": __user__, "body": body}, ) - # pylint: disable=C0209 - stats = " | ".join( - [ - f"Tokens: {response_data['inputTokens']} + {response_data['outputTokens']}", - "Cost: %.4f" % response_data["totalCost"], - "Balance: %.4f" % response_data["newBalance"], - ] - ) - - await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}}) + stats_list = [] + if self.valves.show_tokens: + stats_list.append(self.get_text("tokens", input=response_data["inputTokens"], output=response_data["outputTokens"])) + if self.valves.show_cost: + stats_list.append(self.get_text("cost", cost=response_data["totalCost"])) + if self.valves.show_balance: + stats_list.append(self.get_text("balance", balance=response_data["newBalance"])) + if self.start_time and self.valves.show_time_spent: + elapsed = time.time() - self.start_time + stats_list.append(self.get_text("time_spent", time=elapsed)) + if self.valves.show_tokens_per_sec: + tokens_per_sec = (response_data["outputTokens"] / elapsed if elapsed > 0 else 0) + stats_list.append(self.get_text("tokens_per_sec", tokens_per_sec=tokens_per_sec)) + + stats = " | ".join(stats_list) + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"description": stats, "done": True}}) logger.info("usage_monitor: %s %s", user_id, stats) return body except Exception as err: - logger.exception("[usage_monitor] error calculating usage: %s", err) - raise Exception(f"error calculating usage, {err}") from err - + logger.exception(self.get_text("request_failed", error_msg=err)) + raise Exception(self.get_text("request_failed", error_msg=err)) finally: await client.aclose()