Adds consent management for tool execution, input validation, sandboxed process execution, and MCP server integration. Updates session management to support tool use, conversation persistence, and streaming responses. Major additions: - Database migrations for conversations and secure storage - Encryption and credential management infrastructure - Extensible tool system with code execution and web search - Consent management and validation systems - Sandboxed process execution - MCP server integration Infrastructure changes: - Module registration and workspace dependencies - ToolCall type and tool-related Message methods - Privacy, security, and tool configuration structures - Database-backed conversation persistence - Tool call tracking in conversations Provider and UI updates: - Ollama provider updates for tool support and new Role types - TUI chat and code app updates for async initialization - CLI updates for new SessionController API - Configuration documentation updates - CHANGELOG updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
347 lines
11 KiB
Rust
347 lines
11 KiB
Rust
use crate::storage::StorageManager;
|
|
use crate::types::{Conversation, Message};
|
|
use crate::Result;
|
|
use serde_json::{Number, Value};
|
|
use std::collections::{HashMap, VecDeque};
|
|
use std::time::{Duration, Instant};
|
|
use uuid::Uuid;
|
|
|
|
const STREAMING_FLAG: &str = "streaming";
|
|
const LAST_CHUNK_TS: &str = "last_chunk_ts";
|
|
const PLACEHOLDER_FLAG: &str = "placeholder";
|
|
|
|
/// Manage active and historical conversations, including streaming updates.
|
|
pub struct ConversationManager {
|
|
active: Conversation,
|
|
history: VecDeque<Conversation>,
|
|
message_index: HashMap<Uuid, usize>,
|
|
streaming: HashMap<Uuid, StreamingMetadata>,
|
|
max_history: usize,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct StreamingMetadata {
|
|
started: Instant,
|
|
last_update: Instant,
|
|
}
|
|
|
|
impl ConversationManager {
|
|
/// Create a new conversation manager with a default model
|
|
pub fn new(model: impl Into<String>) -> Self {
|
|
Self::with_history_capacity(model, 32)
|
|
}
|
|
|
|
/// Create with explicit history capacity
|
|
pub fn with_history_capacity(model: impl Into<String>, max_history: usize) -> Self {
|
|
let conversation = Conversation::new(model.into());
|
|
Self {
|
|
active: conversation,
|
|
history: VecDeque::new(),
|
|
message_index: HashMap::new(),
|
|
streaming: HashMap::new(),
|
|
max_history: max_history.max(1),
|
|
}
|
|
}
|
|
|
|
/// Access the active conversation
|
|
pub fn active(&self) -> &Conversation {
|
|
&self.active
|
|
}
|
|
|
|
/// Public mutable access to the active conversation
|
|
pub fn active_mut(&mut self) -> &mut Conversation {
|
|
&mut self.active
|
|
}
|
|
|
|
/// Replace the active conversation with a provided one, archiving the existing conversation if it contains data
|
|
pub fn load(&mut self, conversation: Conversation) {
|
|
if !self.active.messages.is_empty() {
|
|
self.archive_active();
|
|
}
|
|
|
|
self.message_index.clear();
|
|
for (idx, message) in conversation.messages.iter().enumerate() {
|
|
self.message_index.insert(message.id, idx);
|
|
}
|
|
|
|
self.stream_reset();
|
|
self.active = conversation;
|
|
}
|
|
|
|
/// Start a brand new conversation, archiving the previous one
|
|
pub fn start_new(&mut self, model: Option<String>, name: Option<String>) {
|
|
self.archive_active();
|
|
let model = model.unwrap_or_else(|| self.active.model.clone());
|
|
self.active = Conversation::new(model);
|
|
self.active.name = name;
|
|
self.message_index.clear();
|
|
self.stream_reset();
|
|
}
|
|
|
|
/// Archive the active conversation into history
|
|
pub fn archive_active(&mut self) {
|
|
if self.active.messages.is_empty() {
|
|
return;
|
|
}
|
|
|
|
let mut archived = self.active.clone();
|
|
archived.updated_at = std::time::SystemTime::now();
|
|
self.history.push_front(archived);
|
|
|
|
while self.history.len() > self.max_history {
|
|
self.history.pop_back();
|
|
}
|
|
}
|
|
|
|
/// Get immutable history
|
|
pub fn history(&self) -> impl Iterator<Item = &Conversation> {
|
|
self.history.iter()
|
|
}
|
|
|
|
/// Add a user message and return its identifier
|
|
pub fn push_user_message(&mut self, content: impl Into<String>) -> Uuid {
|
|
let message = Message::user(content.into());
|
|
self.register_message(message)
|
|
}
|
|
|
|
/// Add a system message and return its identifier
|
|
pub fn push_system_message(&mut self, content: impl Into<String>) -> Uuid {
|
|
let message = Message::system(content.into());
|
|
self.register_message(message)
|
|
}
|
|
|
|
/// Add an assistant message (non-streaming) and return its identifier
|
|
pub fn push_assistant_message(&mut self, content: impl Into<String>) -> Uuid {
|
|
let message = Message::assistant(content.into());
|
|
self.register_message(message)
|
|
}
|
|
|
|
/// Push an arbitrary message into the active conversation
|
|
pub fn push_message(&mut self, message: Message) -> Uuid {
|
|
self.register_message(message)
|
|
}
|
|
|
|
/// Start tracking a streaming assistant response, returning the message id to update
|
|
pub fn start_streaming_response(&mut self) -> Uuid {
|
|
let mut message = Message::assistant(String::new());
|
|
message
|
|
.metadata
|
|
.insert(STREAMING_FLAG.to_string(), Value::Bool(true));
|
|
let id = message.id;
|
|
self.register_message(message);
|
|
self.streaming.insert(
|
|
id,
|
|
StreamingMetadata {
|
|
started: Instant::now(),
|
|
last_update: Instant::now(),
|
|
},
|
|
);
|
|
id
|
|
}
|
|
|
|
/// Append streaming content to an assistant message
|
|
pub fn append_stream_chunk(
|
|
&mut self,
|
|
message_id: Uuid,
|
|
chunk: &str,
|
|
is_final: bool,
|
|
) -> Result<()> {
|
|
let index = self
|
|
.message_index
|
|
.get(&message_id)
|
|
.copied()
|
|
.ok_or_else(|| crate::Error::Unknown(format!("Unknown message id: {message_id}")))?;
|
|
|
|
let conversation = self.active_mut();
|
|
if let Some(message) = conversation.messages.get_mut(index) {
|
|
let was_placeholder = message
|
|
.metadata
|
|
.remove(PLACEHOLDER_FLAG)
|
|
.and_then(|v| v.as_bool())
|
|
.unwrap_or(false);
|
|
|
|
if was_placeholder {
|
|
message.content.clear();
|
|
}
|
|
|
|
if !chunk.is_empty() {
|
|
message.content.push_str(chunk);
|
|
}
|
|
message.timestamp = std::time::SystemTime::now();
|
|
let millis = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis() as u64;
|
|
message.metadata.insert(
|
|
LAST_CHUNK_TS.to_string(),
|
|
Value::Number(Number::from(millis)),
|
|
);
|
|
|
|
if is_final {
|
|
message
|
|
.metadata
|
|
.insert(STREAMING_FLAG.to_string(), Value::Bool(false));
|
|
self.streaming.remove(&message_id);
|
|
} else if let Some(info) = self.streaming.get_mut(&message_id) {
|
|
info.last_update = Instant::now();
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Set placeholder text for a streaming message
|
|
pub fn set_stream_placeholder(
|
|
&mut self,
|
|
message_id: Uuid,
|
|
text: impl Into<String>,
|
|
) -> Result<()> {
|
|
let index = self
|
|
.message_index
|
|
.get(&message_id)
|
|
.copied()
|
|
.ok_or_else(|| crate::Error::Unknown(format!("Unknown message id: {message_id}")))?;
|
|
|
|
if let Some(message) = self.active_mut().messages.get_mut(index) {
|
|
message.content = text.into();
|
|
message.timestamp = std::time::SystemTime::now();
|
|
message
|
|
.metadata
|
|
.insert(PLACEHOLDER_FLAG.to_string(), Value::Bool(true));
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Set tool calls on a streaming message
|
|
pub fn set_tool_calls_on_message(
|
|
&mut self,
|
|
message_id: Uuid,
|
|
tool_calls: Vec<crate::types::ToolCall>,
|
|
) -> Result<()> {
|
|
let index = self
|
|
.message_index
|
|
.get(&message_id)
|
|
.copied()
|
|
.ok_or_else(|| crate::Error::Unknown(format!("Unknown message id: {message_id}")))?;
|
|
|
|
if let Some(message) = self.active_mut().messages.get_mut(index) {
|
|
message.tool_calls = Some(tool_calls);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Update the active model (used when user changes model mid session)
|
|
pub fn set_model(&mut self, model: impl Into<String>) {
|
|
self.active.model = model.into();
|
|
self.active.updated_at = std::time::SystemTime::now();
|
|
}
|
|
|
|
/// Provide read access to the cached streaming metadata
|
|
pub fn streaming_metadata(&self, message_id: &Uuid) -> Option<StreamingMetadata> {
|
|
self.streaming.get(message_id).cloned()
|
|
}
|
|
|
|
/// Remove inactive streaming messages that have stalled beyond the provided timeout
|
|
pub fn expire_stalled_streams(&mut self, idle_timeout: Duration) -> Vec<Uuid> {
|
|
let cutoff = Instant::now() - idle_timeout;
|
|
let mut expired = Vec::new();
|
|
|
|
self.streaming.retain(|id, meta| {
|
|
if meta.last_update < cutoff {
|
|
expired.push(*id);
|
|
false
|
|
} else {
|
|
true
|
|
}
|
|
});
|
|
|
|
expired
|
|
}
|
|
|
|
/// Clear all state
|
|
pub fn clear(&mut self) {
|
|
self.active.clear();
|
|
self.history.clear();
|
|
self.message_index.clear();
|
|
self.streaming.clear();
|
|
}
|
|
|
|
fn register_message(&mut self, message: Message) -> Uuid {
|
|
let id = message.id;
|
|
let idx;
|
|
{
|
|
let conversation = self.active_mut();
|
|
idx = conversation.messages.len();
|
|
conversation.messages.push(message);
|
|
conversation.updated_at = std::time::SystemTime::now();
|
|
}
|
|
self.message_index.insert(id, idx);
|
|
id
|
|
}
|
|
|
|
fn stream_reset(&mut self) {
|
|
self.streaming.clear();
|
|
}
|
|
|
|
/// Save the active conversation to disk
|
|
pub async fn save_active(
|
|
&self,
|
|
storage: &StorageManager,
|
|
name: Option<String>,
|
|
) -> Result<Uuid> {
|
|
storage.save_conversation(&self.active, name).await?;
|
|
Ok(self.active.id)
|
|
}
|
|
|
|
/// Save the active conversation to disk with a description
|
|
pub async fn save_active_with_description(
|
|
&self,
|
|
storage: &StorageManager,
|
|
name: Option<String>,
|
|
description: Option<String>,
|
|
) -> Result<Uuid> {
|
|
storage
|
|
.save_conversation_with_description(&self.active, name, description)
|
|
.await?;
|
|
Ok(self.active.id)
|
|
}
|
|
|
|
/// Load a conversation from storage and make it active
|
|
pub async fn load_saved(&mut self, storage: &StorageManager, id: Uuid) -> Result<()> {
|
|
let conversation = storage.load_conversation(id).await?;
|
|
self.load(conversation);
|
|
Ok(())
|
|
}
|
|
|
|
/// List all saved sessions
|
|
pub async fn list_saved_sessions(
|
|
storage: &StorageManager,
|
|
) -> Result<Vec<crate::storage::SessionMeta>> {
|
|
storage.list_sessions().await
|
|
}
|
|
}
|
|
|
|
impl StreamingMetadata {
|
|
/// Duration since the stream started
|
|
pub fn elapsed(&self) -> Duration {
|
|
self.started.elapsed()
|
|
}
|
|
|
|
/// Duration since the last chunk was received
|
|
pub fn idle_duration(&self) -> Duration {
|
|
self.last_update.elapsed()
|
|
}
|
|
|
|
/// Timestamp when streaming started
|
|
pub fn started_at(&self) -> Instant {
|
|
self.started
|
|
}
|
|
|
|
/// Timestamp of most recent update
|
|
pub fn last_update_at(&self) -> Instant {
|
|
self.last_update
|
|
}
|
|
}
|