Add App core struct with event-handling and initialization logic for TUI.
This commit is contained in:
289
crates/owlen-core/src/conversation.rs
Normal file
289
crates/owlen-core/src/conversation.rs
Normal file
@@ -0,0 +1,289 @@
|
||||
use crate::types::{Conversation, Message};
|
||||
use crate::Result;
|
||||
use serde_json::{Number, Value};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::time::{Duration, Instant};
|
||||
use uuid::Uuid;
|
||||
|
||||
const STREAMING_FLAG: &str = "streaming";
|
||||
const LAST_CHUNK_TS: &str = "last_chunk_ts";
|
||||
const PLACEHOLDER_FLAG: &str = "placeholder";
|
||||
|
||||
/// Manage active and historical conversations, including streaming updates.
|
||||
pub struct ConversationManager {
|
||||
active: Conversation,
|
||||
history: VecDeque<Conversation>,
|
||||
message_index: HashMap<Uuid, usize>,
|
||||
streaming: HashMap<Uuid, StreamingMetadata>,
|
||||
max_history: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StreamingMetadata {
|
||||
started: Instant,
|
||||
last_update: Instant,
|
||||
}
|
||||
|
||||
impl ConversationManager {
|
||||
/// Create a new conversation manager with a default model
|
||||
pub fn new(model: impl Into<String>) -> Self {
|
||||
Self::with_history_capacity(model, 32)
|
||||
}
|
||||
|
||||
/// Create with explicit history capacity
|
||||
pub fn with_history_capacity(model: impl Into<String>, max_history: usize) -> Self {
|
||||
let conversation = Conversation::new(model.into());
|
||||
Self {
|
||||
active: conversation,
|
||||
history: VecDeque::new(),
|
||||
message_index: HashMap::new(),
|
||||
streaming: HashMap::new(),
|
||||
max_history: max_history.max(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Access the active conversation
|
||||
pub fn active(&self) -> &Conversation {
|
||||
&self.active
|
||||
}
|
||||
|
||||
/// Mutable access to the active conversation (auto refreshing indexes afterwards)
|
||||
fn active_mut(&mut self) -> &mut Conversation {
|
||||
&mut self.active
|
||||
}
|
||||
|
||||
/// Replace the active conversation with a provided one, archiving the existing conversation if it contains data
|
||||
pub fn load(&mut self, conversation: Conversation) {
|
||||
if !self.active.messages.is_empty() {
|
||||
self.archive_active();
|
||||
}
|
||||
|
||||
self.message_index.clear();
|
||||
for (idx, message) in conversation.messages.iter().enumerate() {
|
||||
self.message_index.insert(message.id, idx);
|
||||
}
|
||||
|
||||
self.stream_reset();
|
||||
self.active = conversation;
|
||||
}
|
||||
|
||||
/// Start a brand new conversation, archiving the previous one
|
||||
pub fn start_new(&mut self, model: Option<String>, name: Option<String>) {
|
||||
self.archive_active();
|
||||
let model = model.unwrap_or_else(|| self.active.model.clone());
|
||||
self.active = Conversation::new(model);
|
||||
self.active.name = name;
|
||||
self.message_index.clear();
|
||||
self.stream_reset();
|
||||
}
|
||||
|
||||
/// Archive the active conversation into history
|
||||
pub fn archive_active(&mut self) {
|
||||
if self.active.messages.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut archived = self.active.clone();
|
||||
archived.updated_at = std::time::SystemTime::now();
|
||||
self.history.push_front(archived);
|
||||
|
||||
while self.history.len() > self.max_history {
|
||||
self.history.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
/// Get immutable history
|
||||
pub fn history(&self) -> impl Iterator<Item = &Conversation> {
|
||||
self.history.iter()
|
||||
}
|
||||
|
||||
/// Add a user message and return its identifier
|
||||
pub fn push_user_message(&mut self, content: impl Into<String>) -> Uuid {
|
||||
let message = Message::user(content.into());
|
||||
self.register_message(message)
|
||||
}
|
||||
|
||||
/// Add a system message and return its identifier
|
||||
pub fn push_system_message(&mut self, content: impl Into<String>) -> Uuid {
|
||||
let message = Message::system(content.into());
|
||||
self.register_message(message)
|
||||
}
|
||||
|
||||
/// Add an assistant message (non-streaming) and return its identifier
|
||||
pub fn push_assistant_message(&mut self, content: impl Into<String>) -> Uuid {
|
||||
let message = Message::assistant(content.into());
|
||||
self.register_message(message)
|
||||
}
|
||||
|
||||
/// Push an arbitrary message into the active conversation
|
||||
pub fn push_message(&mut self, message: Message) -> Uuid {
|
||||
self.register_message(message)
|
||||
}
|
||||
|
||||
/// Start tracking a streaming assistant response, returning the message id to update
|
||||
pub fn start_streaming_response(&mut self) -> Uuid {
|
||||
let mut message = Message::assistant(String::new());
|
||||
message
|
||||
.metadata
|
||||
.insert(STREAMING_FLAG.to_string(), Value::Bool(true));
|
||||
let id = message.id;
|
||||
self.register_message(message);
|
||||
self.streaming.insert(
|
||||
id,
|
||||
StreamingMetadata {
|
||||
started: Instant::now(),
|
||||
last_update: Instant::now(),
|
||||
},
|
||||
);
|
||||
id
|
||||
}
|
||||
|
||||
/// Append streaming content to an assistant message
|
||||
pub fn append_stream_chunk(
|
||||
&mut self,
|
||||
message_id: Uuid,
|
||||
chunk: &str,
|
||||
is_final: bool,
|
||||
) -> Result<()> {
|
||||
let index = self
|
||||
.message_index
|
||||
.get(&message_id)
|
||||
.copied()
|
||||
.ok_or_else(|| crate::Error::Unknown(format!("Unknown message id: {message_id}")))?;
|
||||
|
||||
let conversation = self.active_mut();
|
||||
if let Some(message) = conversation.messages.get_mut(index) {
|
||||
let was_placeholder = message
|
||||
.metadata
|
||||
.remove(PLACEHOLDER_FLAG)
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false);
|
||||
|
||||
if was_placeholder {
|
||||
message.content.clear();
|
||||
}
|
||||
|
||||
if !chunk.is_empty() {
|
||||
message.content.push_str(chunk);
|
||||
}
|
||||
message.timestamp = std::time::SystemTime::now();
|
||||
let millis = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64;
|
||||
message.metadata.insert(
|
||||
LAST_CHUNK_TS.to_string(),
|
||||
Value::Number(Number::from(millis)),
|
||||
);
|
||||
|
||||
if is_final {
|
||||
message
|
||||
.metadata
|
||||
.insert(STREAMING_FLAG.to_string(), Value::Bool(false));
|
||||
self.streaming.remove(&message_id);
|
||||
} else if let Some(info) = self.streaming.get_mut(&message_id) {
|
||||
info.last_update = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set placeholder text for a streaming message
|
||||
pub fn set_stream_placeholder(
|
||||
&mut self,
|
||||
message_id: Uuid,
|
||||
text: impl Into<String>,
|
||||
) -> Result<()> {
|
||||
let index = self
|
||||
.message_index
|
||||
.get(&message_id)
|
||||
.copied()
|
||||
.ok_or_else(|| crate::Error::Unknown(format!("Unknown message id: {message_id}")))?;
|
||||
|
||||
if let Some(message) = self.active_mut().messages.get_mut(index) {
|
||||
message.content = text.into();
|
||||
message.timestamp = std::time::SystemTime::now();
|
||||
message
|
||||
.metadata
|
||||
.insert(PLACEHOLDER_FLAG.to_string(), Value::Bool(true));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the active model (used when user changes model mid session)
|
||||
pub fn set_model(&mut self, model: impl Into<String>) {
|
||||
self.active.model = model.into();
|
||||
self.active.updated_at = std::time::SystemTime::now();
|
||||
}
|
||||
|
||||
/// Provide read access to the cached streaming metadata
|
||||
pub fn streaming_metadata(&self, message_id: &Uuid) -> Option<StreamingMetadata> {
|
||||
self.streaming.get(message_id).cloned()
|
||||
}
|
||||
|
||||
/// Remove inactive streaming messages that have stalled beyond the provided timeout
|
||||
pub fn expire_stalled_streams(&mut self, idle_timeout: Duration) -> Vec<Uuid> {
|
||||
let cutoff = Instant::now() - idle_timeout;
|
||||
let mut expired = Vec::new();
|
||||
|
||||
self.streaming.retain(|id, meta| {
|
||||
if meta.last_update < cutoff {
|
||||
expired.push(*id);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
expired
|
||||
}
|
||||
|
||||
/// Clear all state
|
||||
pub fn clear(&mut self) {
|
||||
self.active.clear();
|
||||
self.history.clear();
|
||||
self.message_index.clear();
|
||||
self.streaming.clear();
|
||||
}
|
||||
|
||||
fn register_message(&mut self, message: Message) -> Uuid {
|
||||
let id = message.id;
|
||||
let idx;
|
||||
{
|
||||
let conversation = self.active_mut();
|
||||
idx = conversation.messages.len();
|
||||
conversation.messages.push(message);
|
||||
conversation.updated_at = std::time::SystemTime::now();
|
||||
}
|
||||
self.message_index.insert(id, idx);
|
||||
id
|
||||
}
|
||||
|
||||
fn stream_reset(&mut self) {
|
||||
self.streaming.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamingMetadata {
|
||||
/// Duration since the stream started
|
||||
pub fn elapsed(&self) -> Duration {
|
||||
self.started.elapsed()
|
||||
}
|
||||
|
||||
/// Duration since the last chunk was received
|
||||
pub fn idle_duration(&self) -> Duration {
|
||||
self.last_update.elapsed()
|
||||
}
|
||||
|
||||
/// Timestamp when streaming started
|
||||
pub fn started_at(&self) -> Instant {
|
||||
self.started
|
||||
}
|
||||
|
||||
/// Timestamp of most recent update
|
||||
pub fn last_update_at(&self) -> Instant {
|
||||
self.last_update
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user