feat(usage): track cloud quotas and expose :limits
Acceptance Criteria:\n- header shows hourly/weekly usage with colored thresholds\n- :limits command prints persisted usage data and quotas\n- token usage survives restarts and emits 80%/95% toasts Test Notes:\n- cargo test -p owlen-core usage
This commit is contained in:
@@ -19,6 +19,7 @@ use crate::types::{
|
||||
ChatParameters, ChatRequest, ChatResponse, Conversation, Message, ModelInfo, ToolCall,
|
||||
};
|
||||
use crate::ui::{RoleLabelDisplay, UiController};
|
||||
use crate::usage::{UsageLedger, UsageQuota, UsageSnapshot};
|
||||
use crate::validation::{SchemaValidator, get_builtin_schemas};
|
||||
use crate::{ChatStream, Provider};
|
||||
use crate::{
|
||||
@@ -29,10 +30,12 @@ use crate::{Error, Result};
|
||||
use chrono::Utc;
|
||||
use log::warn;
|
||||
use serde_json::{Value, json};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::SystemTime;
|
||||
use tokio::fs;
|
||||
use tokio::sync::Mutex as TokioMutex;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use uuid::Uuid;
|
||||
@@ -240,6 +243,7 @@ pub struct SessionController {
|
||||
event_tx: Option<UnboundedSender<ControllerEvent>>,
|
||||
pending_tool_requests: HashMap<Uuid, PendingToolRequest>,
|
||||
stream_states: HashMap<Uuid, StreamingMessageState>,
|
||||
usage_ledger: Arc<TokioMutex<UsageLedger>>,
|
||||
}
|
||||
|
||||
async fn build_tools(
|
||||
@@ -545,6 +549,25 @@ impl SessionController {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let usage_ledger_path = storage
|
||||
.database_path()
|
||||
.parent()
|
||||
.map(|dir| dir.join("usage-ledger.json"))
|
||||
.unwrap_or_else(|| PathBuf::from("usage-ledger.json"));
|
||||
|
||||
let usage_ledger_instance =
|
||||
match UsageLedger::load_or_default(usage_ledger_path.clone()).await {
|
||||
Ok(ledger) => ledger,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Failed to load usage ledger at {}: {err}. Starting with an empty ledger.",
|
||||
usage_ledger_path.display()
|
||||
);
|
||||
UsageLedger::empty(usage_ledger_path)
|
||||
}
|
||||
};
|
||||
let usage_ledger = Arc::new(TokioMutex::new(usage_ledger_instance));
|
||||
|
||||
Ok(Self {
|
||||
provider,
|
||||
conversation,
|
||||
@@ -568,6 +591,7 @@ impl SessionController {
|
||||
event_tx,
|
||||
pending_tool_requests: HashMap::new(),
|
||||
stream_states: HashMap::new(),
|
||||
usage_ledger,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -656,6 +680,134 @@ impl SessionController {
|
||||
Some((server.to_string(), uri.to_string()))
|
||||
}
|
||||
|
||||
async fn persist_usage_serialized(path: PathBuf, serialized: String) {
|
||||
if let Some(parent) = path.parent() {
|
||||
if let Err(err) = fs::create_dir_all(parent).await {
|
||||
warn!(
|
||||
"Failed to create usage ledger directory {}: {}",
|
||||
parent.display(),
|
||||
err
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = fs::write(&path, serialized).await {
|
||||
warn!("Failed to write usage ledger {}: {}", path.display(), err);
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_quota_value(value: &Value) -> Option<u64> {
|
||||
match value {
|
||||
Value::Number(num) => num.as_u64(),
|
||||
Value::String(text) => text.trim().parse::<u64>().ok(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn quota_from_config(config: &Config, provider: &str) -> UsageQuota {
|
||||
let mut quota = UsageQuota::default();
|
||||
|
||||
if let Some(entry) = config.providers.get(provider) {
|
||||
if let Some(value) = entry.extra.get("hourly_quota_tokens") {
|
||||
quota.hourly_quota_tokens = Self::parse_quota_value(value);
|
||||
}
|
||||
if let Some(value) = entry.extra.get("weekly_quota_tokens") {
|
||||
quota.weekly_quota_tokens = Self::parse_quota_value(value);
|
||||
}
|
||||
}
|
||||
|
||||
quota
|
||||
}
|
||||
|
||||
pub async fn record_usage_sample(
|
||||
&self,
|
||||
usage: &crate::types::TokenUsage,
|
||||
) -> Option<UsageSnapshot> {
|
||||
if usage.total_tokens == 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let provider_name = self.provider.name().to_string();
|
||||
if provider_name.trim().is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let quotas = {
|
||||
let guard = self.config.lock().await;
|
||||
Self::quota_from_config(&guard, &provider_name)
|
||||
};
|
||||
|
||||
let timestamp = SystemTime::now();
|
||||
let mut serialized_payload: Option<(PathBuf, String)> = None;
|
||||
|
||||
let snapshot = {
|
||||
let mut ledger = self.usage_ledger.lock().await;
|
||||
ledger.record(&provider_name, usage, timestamp);
|
||||
let snapshot = ledger.snapshot(&provider_name, quotas, timestamp);
|
||||
match ledger.serialize() {
|
||||
Ok(payload) => {
|
||||
serialized_payload = Some((ledger.path().to_path_buf(), payload));
|
||||
}
|
||||
Err(err) => warn!("Failed to serialize usage ledger: {}", err),
|
||||
}
|
||||
snapshot
|
||||
};
|
||||
|
||||
if let Some((path, payload)) = serialized_payload {
|
||||
Self::persist_usage_serialized(path, payload).await;
|
||||
}
|
||||
|
||||
Some(snapshot)
|
||||
}
|
||||
|
||||
pub async fn current_usage_snapshot(&self) -> Option<UsageSnapshot> {
|
||||
let provider_name = self.provider.name().to_string();
|
||||
if provider_name.trim().is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let quotas = {
|
||||
let guard = self.config.lock().await;
|
||||
Self::quota_from_config(&guard, &provider_name)
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
let ledger = self.usage_ledger.lock().await;
|
||||
Some(ledger.snapshot(&provider_name, quotas, now))
|
||||
}
|
||||
|
||||
pub async fn usage_overview(&self) -> Vec<UsageSnapshot> {
|
||||
let quota_map = {
|
||||
let guard = self.config.lock().await;
|
||||
guard
|
||||
.providers
|
||||
.iter()
|
||||
.map(|(name, _)| (name.clone(), Self::quota_from_config(&guard, name)))
|
||||
.collect::<HashMap<_, _>>()
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
let mut provider_names: HashSet<String> = quota_map.keys().cloned().collect();
|
||||
|
||||
let snapshots = {
|
||||
let ledger = self.usage_ledger.lock().await;
|
||||
for key in ledger.provider_keys() {
|
||||
provider_names.insert(key.clone());
|
||||
}
|
||||
|
||||
provider_names
|
||||
.into_iter()
|
||||
.map(|provider| {
|
||||
let quota = quota_map.get(&provider).cloned().unwrap_or_default();
|
||||
ledger.snapshot(&provider, quota, now)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
snapshots
|
||||
}
|
||||
|
||||
// Asynchronous access to the configuration (used internally).
|
||||
pub async fn config_async(&self) -> tokio::sync::MutexGuard<'_, Config> {
|
||||
self.config.lock().await
|
||||
@@ -1305,6 +1457,9 @@ impl SessionController {
|
||||
request.messages = self.conversation.active().messages.clone();
|
||||
continue;
|
||||
} else {
|
||||
if let Some(usage) = response.usage.as_ref() {
|
||||
let _ = self.record_usage_sample(usage).await;
|
||||
}
|
||||
self.conversation.push_message(response.message.clone());
|
||||
return Ok(SessionOutcome::Complete(response));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user