feat(compression): adaptive auto transcript compactor
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use crate::config::{
|
||||
Config, LEGACY_OLLAMA_CLOUD_API_KEY_ENV, LEGACY_OWLEN_OLLAMA_CLOUD_API_KEY_ENV,
|
||||
McpResourceConfig, McpServerConfig, OLLAMA_API_KEY_ENV, OLLAMA_CLOUD_BASE_URL,
|
||||
ChatSettings, CompressionStrategy, Config, LEGACY_OLLAMA_CLOUD_API_KEY_ENV,
|
||||
LEGACY_OWLEN_OLLAMA_CLOUD_API_KEY_ENV, McpResourceConfig, McpServerConfig, OLLAMA_API_KEY_ENV,
|
||||
OLLAMA_CLOUD_BASE_URL,
|
||||
};
|
||||
use crate::consent::{ConsentManager, ConsentScope};
|
||||
use crate::conversation::ConversationManager;
|
||||
@@ -21,7 +22,7 @@ use crate::providers::OllamaProvider;
|
||||
use crate::storage::{SessionMeta, StorageManager};
|
||||
use crate::tools::{WEB_SEARCH_TOOL_NAME, canonical_tool_name, tool_name_matches};
|
||||
use crate::types::{
|
||||
ChatParameters, ChatRequest, ChatResponse, Conversation, Message, ModelInfo, ToolCall,
|
||||
ChatParameters, ChatRequest, ChatResponse, Conversation, Message, ModelInfo, Role, ToolCall,
|
||||
};
|
||||
use crate::ui::{RoleLabelDisplay, UiController};
|
||||
use crate::usage::{UsageLedger, UsageQuota, UsageSnapshot};
|
||||
@@ -32,10 +33,11 @@ use crate::{
|
||||
ToolRegistry, WebScrapeTool, WebSearchSettings, WebSearchTool,
|
||||
};
|
||||
use crate::{Error, Result};
|
||||
use chrono::Utc;
|
||||
use log::warn;
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::{info, warn};
|
||||
use reqwest::Url;
|
||||
use serde_json::{Value, json};
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
@@ -53,6 +55,107 @@ fn env_var_non_empty(name: &str) -> Option<String> {
|
||||
.filter(|value| !value.is_empty())
|
||||
}
|
||||
|
||||
fn estimate_tokens(messages: &[Message]) -> u32 {
|
||||
messages
|
||||
.iter()
|
||||
.map(estimate_message_tokens)
|
||||
.fold(0u32, |acc, value| acc.saturating_add(value))
|
||||
}
|
||||
|
||||
fn estimate_message_tokens(message: &Message) -> u32 {
|
||||
let content = message.content.trim();
|
||||
if content.is_empty() {
|
||||
return 4;
|
||||
}
|
||||
let approx = max(4, content.chars().count() / 4 + 1);
|
||||
(approx + 4) as u32
|
||||
}
|
||||
|
||||
fn build_transcript(messages: &[Message]) -> String {
|
||||
let mut transcript = String::new();
|
||||
let take = min(messages.len(), MAX_TRANSCRIPT_MESSAGES);
|
||||
for message in messages.iter().take(take) {
|
||||
let role = match message.role {
|
||||
Role::User => "User",
|
||||
Role::Assistant => "Assistant",
|
||||
Role::System => "System",
|
||||
Role::Tool => "Tool",
|
||||
};
|
||||
let snippet = sanitize_snippet(&message.content);
|
||||
if snippet.is_empty() {
|
||||
continue;
|
||||
}
|
||||
transcript.push_str(&format!("{role}: {snippet}\n\n"));
|
||||
}
|
||||
if messages.len() > take {
|
||||
transcript.push_str(&format!(
|
||||
"... ({} additional messages omitted for brevity)\n",
|
||||
messages.len() - take
|
||||
));
|
||||
}
|
||||
transcript
|
||||
}
|
||||
|
||||
fn local_summary(messages: &[Message]) -> String {
|
||||
if messages.is_empty() {
|
||||
return "(no content to summarize)".to_string();
|
||||
}
|
||||
let total = messages.len();
|
||||
let mut summary = String::from("Summary (local heuristic)\n\n");
|
||||
summary.push_str(&format!("- Compressed {total} prior messages.\n"));
|
||||
|
||||
let recent_users = collect_recent_by_role(messages, Role::User, 3);
|
||||
if !recent_users.is_empty() {
|
||||
summary.push_str("- Recent user intents:\n");
|
||||
for intent in recent_users {
|
||||
summary.push_str(&format!(" - {intent}\n"));
|
||||
}
|
||||
}
|
||||
|
||||
let recent_assistant = collect_recent_by_role(messages, Role::Assistant, 3);
|
||||
if !recent_assistant.is_empty() {
|
||||
summary.push_str("- Recent assistant responses:\n");
|
||||
for reply in recent_assistant {
|
||||
summary.push_str(&format!(" - {reply}\n"));
|
||||
}
|
||||
}
|
||||
|
||||
summary.trim_end().to_string()
|
||||
}
|
||||
|
||||
fn collect_recent_by_role(messages: &[Message], role: Role, limit: usize) -> Vec<String> {
|
||||
if limit == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
let mut results = Vec::new();
|
||||
for message in messages.iter().rev() {
|
||||
if message.role == role {
|
||||
let snippet = sanitize_snippet(&message.content);
|
||||
if !snippet.is_empty() {
|
||||
results.push(snippet);
|
||||
if results.len() == limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
results.reverse();
|
||||
results
|
||||
}
|
||||
|
||||
fn sanitize_snippet(content: &str) -> String {
|
||||
let trimmed = content.trim();
|
||||
if trimmed.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
let mut snippet = trimmed.replace('\r', "");
|
||||
if snippet.len() > MAX_TRANSCRIPT_MESSAGE_CHARS {
|
||||
snippet.truncate(MAX_TRANSCRIPT_MESSAGE_CHARS);
|
||||
snippet.push_str("...");
|
||||
}
|
||||
snippet
|
||||
}
|
||||
|
||||
fn compute_web_search_settings(
|
||||
config: &Config,
|
||||
provider_id: &str,
|
||||
@@ -195,6 +298,9 @@ pub enum ControllerEvent {
|
||||
endpoints: Vec<String>,
|
||||
tool_calls: Vec<ToolCall>,
|
||||
},
|
||||
CompressionCompleted {
|
||||
report: CompressionReport,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -206,6 +312,53 @@ struct PendingToolRequest {
|
||||
tool_calls: Vec<ToolCall>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CompressionReport {
|
||||
pub summary_message_id: Uuid,
|
||||
pub compressed_messages: usize,
|
||||
pub estimated_tokens_before: u32,
|
||||
pub estimated_tokens_after: u32,
|
||||
pub strategy: CompressionStrategy,
|
||||
pub model_used: String,
|
||||
pub retained_recent: usize,
|
||||
pub automated: bool,
|
||||
pub timestamp: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct CompressionOptions {
|
||||
trigger_tokens: u32,
|
||||
retain_recent: usize,
|
||||
strategy: CompressionStrategy,
|
||||
model_override: Option<String>,
|
||||
}
|
||||
|
||||
impl CompressionOptions {
|
||||
fn from_settings(settings: &ChatSettings) -> Self {
|
||||
Self {
|
||||
trigger_tokens: settings.trigger_tokens.max(64),
|
||||
retain_recent: settings.retain_recent_messages.max(2),
|
||||
strategy: settings.strategy,
|
||||
model_override: settings.model_override.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn min_chunk_messages(&self) -> usize {
|
||||
self.retain_recent.saturating_add(2).max(4)
|
||||
}
|
||||
|
||||
fn resolve_model<'a>(&'a self, active_model: &'a str) -> String {
|
||||
self.model_override
|
||||
.clone()
|
||||
.filter(|model| !model.trim().is_empty())
|
||||
.unwrap_or_else(|| active_model.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
const MAX_TRANSCRIPT_MESSAGE_CHARS: usize = 1024;
|
||||
const MAX_TRANSCRIPT_MESSAGES: usize = 32;
|
||||
const COMPRESSION_METADATA_KEY: &str = "compression";
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct StreamingMessageState {
|
||||
full_text: String,
|
||||
@@ -381,6 +534,7 @@ pub struct SessionController {
|
||||
pending_tool_requests: HashMap<Uuid, PendingToolRequest>,
|
||||
stream_states: HashMap<Uuid, StreamingMessageState>,
|
||||
usage_ledger: Arc<TokioMutex<UsageLedger>>,
|
||||
last_compression: Option<CompressionReport>,
|
||||
}
|
||||
|
||||
async fn build_tools(
|
||||
@@ -723,6 +877,7 @@ impl SessionController {
|
||||
pending_tool_requests: HashMap::new(),
|
||||
stream_states: HashMap::new(),
|
||||
usage_ledger,
|
||||
last_compression: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -734,6 +889,10 @@ impl SessionController {
|
||||
&mut self.conversation
|
||||
}
|
||||
|
||||
pub fn last_compression(&self) -> Option<CompressionReport> {
|
||||
self.last_compression.clone()
|
||||
}
|
||||
|
||||
pub fn input_buffer(&self) -> &InputBuffer {
|
||||
&self.input_buffer
|
||||
}
|
||||
@@ -956,6 +1115,210 @@ impl SessionController {
|
||||
self.config.clone()
|
||||
}
|
||||
|
||||
pub async fn compress_now(&mut self) -> Result<Option<CompressionReport>> {
|
||||
let settings = {
|
||||
let guard = self.config.lock().await;
|
||||
guard.chat.clone()
|
||||
};
|
||||
let options = CompressionOptions::from_settings(&settings);
|
||||
self.perform_compression(options, false).await
|
||||
}
|
||||
|
||||
pub async fn maybe_auto_compress(&mut self) -> Result<Option<CompressionReport>> {
|
||||
let settings = {
|
||||
let guard = self.config.lock().await;
|
||||
if !guard.chat.auto_compress {
|
||||
return Ok(None);
|
||||
}
|
||||
guard.chat.clone()
|
||||
};
|
||||
let options = CompressionOptions::from_settings(&settings);
|
||||
self.perform_compression(options, true).await
|
||||
}
|
||||
|
||||
async fn perform_compression(
|
||||
&mut self,
|
||||
options: CompressionOptions,
|
||||
automated: bool,
|
||||
) -> Result<Option<CompressionReport>> {
|
||||
let mut final_report = None;
|
||||
let mut iterations = 0usize;
|
||||
|
||||
loop {
|
||||
iterations += 1;
|
||||
if iterations > 4 {
|
||||
break;
|
||||
}
|
||||
|
||||
let snapshot = self.conversation.active().clone();
|
||||
let total_tokens = estimate_tokens(&snapshot.messages);
|
||||
if total_tokens <= options.trigger_tokens {
|
||||
break;
|
||||
}
|
||||
|
||||
if snapshot.messages.len() <= options.retain_recent + 1 {
|
||||
break;
|
||||
}
|
||||
|
||||
let split_index = snapshot
|
||||
.messages
|
||||
.len()
|
||||
.saturating_sub(options.retain_recent);
|
||||
if split_index == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let older_messages = &snapshot.messages[..split_index];
|
||||
if older_messages.len() < options.min_chunk_messages() {
|
||||
break;
|
||||
}
|
||||
|
||||
if older_messages
|
||||
.iter()
|
||||
.all(|msg| msg.metadata.contains_key(COMPRESSION_METADATA_KEY))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
let model_used = options.resolve_model(&snapshot.model);
|
||||
let summary = self
|
||||
.generate_summary(older_messages, &options, &model_used)
|
||||
.await;
|
||||
|
||||
let summary_body = summary.trim();
|
||||
let intro = "### Conversation summary";
|
||||
let footer = if automated {
|
||||
"_This summary was generated automatically to preserve context._"
|
||||
} else {
|
||||
"_Manual compression complete._"
|
||||
};
|
||||
let content = if summary_body.is_empty() {
|
||||
format!(
|
||||
"{intro}\n\n_Compressed {} prior messages._\n\n{footer}",
|
||||
older_messages.len()
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"{intro}\n\n{summary_body}\n\n_Compressed {} prior messages._\n\n{footer}",
|
||||
older_messages.len()
|
||||
)
|
||||
};
|
||||
|
||||
let mut summary_message = Message::system(content);
|
||||
let compressed_ids: Vec<String> = older_messages
|
||||
.iter()
|
||||
.map(|msg| msg.id.to_string())
|
||||
.collect();
|
||||
let summary_tokens = estimate_message_tokens(&summary_message);
|
||||
let retained_tokens = estimate_tokens(&snapshot.messages[split_index..]);
|
||||
let updated_tokens = summary_tokens.saturating_add(retained_tokens);
|
||||
let timestamp = Utc::now();
|
||||
let metadata = json!({
|
||||
"strategy": match options.strategy {
|
||||
CompressionStrategy::Provider => "provider",
|
||||
CompressionStrategy::Local => "local",
|
||||
},
|
||||
"automated": automated,
|
||||
"compressed_message_ids": compressed_ids,
|
||||
"compressed_count": older_messages.len(),
|
||||
"retain_recent": options.retain_recent,
|
||||
"trigger_tokens": options.trigger_tokens,
|
||||
"estimated_tokens_before": total_tokens,
|
||||
"model": model_used,
|
||||
"estimated_tokens_after": updated_tokens,
|
||||
"timestamp": timestamp.to_rfc3339(),
|
||||
});
|
||||
summary_message
|
||||
.metadata
|
||||
.insert(COMPRESSION_METADATA_KEY.to_string(), metadata);
|
||||
|
||||
let mut new_messages =
|
||||
Vec::with_capacity(snapshot.messages.len() - older_messages.len() + 1);
|
||||
new_messages.push(summary_message.clone());
|
||||
new_messages.extend_from_slice(&snapshot.messages[split_index..]);
|
||||
self.conversation.replace_active_messages(new_messages);
|
||||
let report = CompressionReport {
|
||||
summary_message_id: summary_message.id,
|
||||
compressed_messages: older_messages.len(),
|
||||
estimated_tokens_before: total_tokens,
|
||||
estimated_tokens_after: updated_tokens,
|
||||
strategy: options.strategy,
|
||||
model_used: model_used.clone(),
|
||||
retained_recent: options.retain_recent,
|
||||
automated,
|
||||
timestamp,
|
||||
};
|
||||
|
||||
self.last_compression = Some(report.clone());
|
||||
if automated {
|
||||
info!(
|
||||
"auto compression reduced transcript from {} to {} tokens (compressed {} messages)",
|
||||
total_tokens, updated_tokens, report.compressed_messages
|
||||
);
|
||||
}
|
||||
self.emit_compression_event(report.clone());
|
||||
final_report = Some(report.clone());
|
||||
|
||||
if updated_tokens >= total_tokens {
|
||||
break;
|
||||
}
|
||||
if updated_tokens <= options.trigger_tokens {
|
||||
break;
|
||||
}
|
||||
|
||||
// Continue loop to attempt further reduction if needed.
|
||||
}
|
||||
|
||||
Ok(final_report)
|
||||
}
|
||||
|
||||
async fn generate_summary(
|
||||
&self,
|
||||
slice: &[Message],
|
||||
options: &CompressionOptions,
|
||||
model: &str,
|
||||
) -> String {
|
||||
match options.strategy {
|
||||
CompressionStrategy::Provider => {
|
||||
match self.generate_provider_summary(slice, model).await {
|
||||
Ok(content) if !content.trim().is_empty() => content,
|
||||
Ok(_) => local_summary(slice),
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Falling back to local compression: provider summary failed ({})",
|
||||
err
|
||||
);
|
||||
local_summary(slice)
|
||||
}
|
||||
}
|
||||
}
|
||||
CompressionStrategy::Local => local_summary(slice),
|
||||
}
|
||||
}
|
||||
|
||||
async fn generate_provider_summary(&self, slice: &[Message], model: &str) -> Result<String> {
|
||||
let mut prompt_messages = Vec::new();
|
||||
prompt_messages.push(Message::system("You are Owlen's transcript compactor. Summarize the provided conversation excerpt into concise markdown with sections for context, decisions, outstanding tasks, and facts that must be preserved. Avoid referring to removed content explicitly.".to_string()));
|
||||
let transcript = build_transcript(slice);
|
||||
prompt_messages.push(Message::user(transcript));
|
||||
|
||||
let request = ChatRequest {
|
||||
model: model.to_string(),
|
||||
messages: prompt_messages,
|
||||
parameters: ChatParameters::default(),
|
||||
tools: None,
|
||||
};
|
||||
|
||||
let response = self.provider.send_prompt(request).await?;
|
||||
Ok(response.message.content)
|
||||
}
|
||||
|
||||
fn emit_compression_event(&self, report: CompressionReport) {
|
||||
if let Some(tx) = &self.event_tx {
|
||||
let _ = tx.send(ControllerEvent::CompressionCompleted { report });
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reload_mcp_clients(&mut self) -> Result<()> {
|
||||
let (primary, named, missing) = Self::create_mcp_clients(
|
||||
self.config.clone(),
|
||||
@@ -1518,6 +1881,7 @@ impl SessionController {
|
||||
let streaming = { self.config.lock().await.general.enable_streaming || parameters.stream };
|
||||
parameters.stream = streaming;
|
||||
self.conversation.push_user_message(content);
|
||||
let _ = self.maybe_auto_compress().await?;
|
||||
self.send_request_with_current_conversation(parameters)
|
||||
.await
|
||||
}
|
||||
@@ -1588,6 +1952,7 @@ impl SessionController {
|
||||
let _ = self.record_usage_sample(usage).await;
|
||||
}
|
||||
self.conversation.push_message(response.message.clone());
|
||||
let _ = self.maybe_auto_compress().await?;
|
||||
return Ok(SessionOutcome::Complete(response));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user