use crate::types::{Conversation, Message}; use crate::Result; use serde_json::{Number, Value}; use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; use uuid::Uuid; const STREAMING_FLAG: &str = "streaming"; const LAST_CHUNK_TS: &str = "last_chunk_ts"; const PLACEHOLDER_FLAG: &str = "placeholder"; /// Manage active and historical conversations, including streaming updates. pub struct ConversationManager { active: Conversation, history: VecDeque, message_index: HashMap, streaming: HashMap, max_history: usize, } #[derive(Debug, Clone)] pub struct StreamingMetadata { started: Instant, last_update: Instant, } impl ConversationManager { /// Create a new conversation manager with a default model pub fn new(model: impl Into) -> Self { Self::with_history_capacity(model, 32) } /// Create with explicit history capacity pub fn with_history_capacity(model: impl Into, max_history: usize) -> Self { let conversation = Conversation::new(model.into()); Self { active: conversation, history: VecDeque::new(), message_index: HashMap::new(), streaming: HashMap::new(), max_history: max_history.max(1), } } /// Access the active conversation pub fn active(&self) -> &Conversation { &self.active } /// Mutable access to the active conversation (auto refreshing indexes afterwards) fn active_mut(&mut self) -> &mut Conversation { &mut self.active } /// Replace the active conversation with a provided one, archiving the existing conversation if it contains data pub fn load(&mut self, conversation: Conversation) { if !self.active.messages.is_empty() { self.archive_active(); } self.message_index.clear(); for (idx, message) in conversation.messages.iter().enumerate() { self.message_index.insert(message.id, idx); } self.stream_reset(); self.active = conversation; } /// Start a brand new conversation, archiving the previous one pub fn start_new(&mut self, model: Option, name: Option) { self.archive_active(); let model = model.unwrap_or_else(|| self.active.model.clone()); self.active = Conversation::new(model); self.active.name = name; self.message_index.clear(); self.stream_reset(); } /// Archive the active conversation into history pub fn archive_active(&mut self) { if self.active.messages.is_empty() { return; } let mut archived = self.active.clone(); archived.updated_at = std::time::SystemTime::now(); self.history.push_front(archived); while self.history.len() > self.max_history { self.history.pop_back(); } } /// Get immutable history pub fn history(&self) -> impl Iterator { self.history.iter() } /// Add a user message and return its identifier pub fn push_user_message(&mut self, content: impl Into) -> Uuid { let message = Message::user(content.into()); self.register_message(message) } /// Add a system message and return its identifier pub fn push_system_message(&mut self, content: impl Into) -> Uuid { let message = Message::system(content.into()); self.register_message(message) } /// Add an assistant message (non-streaming) and return its identifier pub fn push_assistant_message(&mut self, content: impl Into) -> Uuid { let message = Message::assistant(content.into()); self.register_message(message) } /// Push an arbitrary message into the active conversation pub fn push_message(&mut self, message: Message) -> Uuid { self.register_message(message) } /// Start tracking a streaming assistant response, returning the message id to update pub fn start_streaming_response(&mut self) -> Uuid { let mut message = Message::assistant(String::new()); message .metadata .insert(STREAMING_FLAG.to_string(), Value::Bool(true)); let id = message.id; self.register_message(message); self.streaming.insert( id, StreamingMetadata { started: Instant::now(), last_update: Instant::now(), }, ); id } /// Append streaming content to an assistant message pub fn append_stream_chunk( &mut self, message_id: Uuid, chunk: &str, is_final: bool, ) -> Result<()> { let index = self .message_index .get(&message_id) .copied() .ok_or_else(|| crate::Error::Unknown(format!("Unknown message id: {message_id}")))?; let conversation = self.active_mut(); if let Some(message) = conversation.messages.get_mut(index) { let was_placeholder = message .metadata .remove(PLACEHOLDER_FLAG) .and_then(|v| v.as_bool()) .unwrap_or(false); if was_placeholder { message.content.clear(); } if !chunk.is_empty() { message.content.push_str(chunk); } message.timestamp = std::time::SystemTime::now(); let millis = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; message.metadata.insert( LAST_CHUNK_TS.to_string(), Value::Number(Number::from(millis)), ); if is_final { message .metadata .insert(STREAMING_FLAG.to_string(), Value::Bool(false)); self.streaming.remove(&message_id); } else if let Some(info) = self.streaming.get_mut(&message_id) { info.last_update = Instant::now(); } } Ok(()) } /// Set placeholder text for a streaming message pub fn set_stream_placeholder( &mut self, message_id: Uuid, text: impl Into, ) -> Result<()> { let index = self .message_index .get(&message_id) .copied() .ok_or_else(|| crate::Error::Unknown(format!("Unknown message id: {message_id}")))?; if let Some(message) = self.active_mut().messages.get_mut(index) { message.content = text.into(); message.timestamp = std::time::SystemTime::now(); message .metadata .insert(PLACEHOLDER_FLAG.to_string(), Value::Bool(true)); } Ok(()) } /// Update the active model (used when user changes model mid session) pub fn set_model(&mut self, model: impl Into) { self.active.model = model.into(); self.active.updated_at = std::time::SystemTime::now(); } /// Provide read access to the cached streaming metadata pub fn streaming_metadata(&self, message_id: &Uuid) -> Option { self.streaming.get(message_id).cloned() } /// Remove inactive streaming messages that have stalled beyond the provided timeout pub fn expire_stalled_streams(&mut self, idle_timeout: Duration) -> Vec { let cutoff = Instant::now() - idle_timeout; let mut expired = Vec::new(); self.streaming.retain(|id, meta| { if meta.last_update < cutoff { expired.push(*id); false } else { true } }); expired } /// Clear all state pub fn clear(&mut self) { self.active.clear(); self.history.clear(); self.message_index.clear(); self.streaming.clear(); } fn register_message(&mut self, message: Message) -> Uuid { let id = message.id; let idx; { let conversation = self.active_mut(); idx = conversation.messages.len(); conversation.messages.push(message); conversation.updated_at = std::time::SystemTime::now(); } self.message_index.insert(id, idx); id } fn stream_reset(&mut self) { self.streaming.clear(); } } impl StreamingMetadata { /// Duration since the stream started pub fn elapsed(&self) -> Duration { self.started.elapsed() } /// Duration since the last chunk was received pub fn idle_duration(&self) -> Duration { self.last_update.elapsed() } /// Timestamp when streaming started pub fn started_at(&self) -> Instant { self.started } /// Timestamp of most recent update pub fn last_update_at(&self) -> Instant { self.last_update } }