From e0b14a42f2afd4803f9d2a3704e95c5447732674 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Thu, 23 Oct 2025 19:40:53 +0200 Subject: [PATCH] fix(provider/ollama): keep stream whitespace intact Acceptance Criteria:\n- streaming chunks retain leading whitespace and indentation\n- end-of-stream metadata is still propagated\n- malformed frames emit defensive logging without crashing Test Notes:\n- cargo test -p owlen-providers --- crates/owlen-providers/src/ollama/shared.rs | 148 +++++++++++++++++--- samples.json | 1 + 2 files changed, 130 insertions(+), 19 deletions(-) create mode 100644 samples.json diff --git a/crates/owlen-providers/src/ollama/shared.rs b/crates/owlen-providers/src/ollama/shared.rs index 9aaceec..37c4cb2 100644 --- a/crates/owlen-providers/src/ollama/shared.rs +++ b/crates/owlen-providers/src/ollama/shared.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::time::Duration; use futures::StreamExt; +use log::warn; use owlen_core::provider::{ GenerateChunk, GenerateRequest, GenerateStream, ModelInfo, ProviderMetadata, ProviderStatus, }; @@ -132,21 +133,18 @@ impl OllamaClient { buffer.extend_from_slice(&bytes); while let Some(pos) = buffer.iter().position(|byte| *byte == b'\n') { let line_bytes: Vec = buffer.drain(..=pos).collect(); - let line = String::from_utf8_lossy(&line_bytes).trim().to_string(); - if line.is_empty() { - continue; - } - - match parse_stream_line(&line) { - Ok(item) => { - if tx.send(Ok(item)).await.is_err() { + if let Some(line) = prepare_stream_line(&line_bytes) { + match parse_stream_line(&line) { + Ok(item) => { + if tx.send(Ok(item)).await.is_err() { + return; + } + } + Err(err) => { + let _ = tx.send(Err(err)).await; return; } } - Err(err) => { - let _ = tx.send(Err(err)).await; - return; - } } } } @@ -158,8 +156,8 @@ impl OllamaClient { } if !buffer.is_empty() { - let line = String::from_utf8_lossy(&buffer).trim().to_string(); - if !line.is_empty() { + let line_bytes = std::mem::take(&mut buffer); + if let Some(line) = prepare_stream_line(&line_bytes) { match parse_stream_line(&line) { Ok(item) => { let _ = tx.send(Ok(item)).await; @@ -313,8 +311,51 @@ fn to_metadata_map(value: &Value) -> HashMap { metadata } +fn prepare_stream_line(bytes: &[u8]) -> Option { + if bytes.is_empty() { + return None; + } + + let mut line = String::from_utf8_lossy(bytes).into_owned(); + + while line.ends_with('\n') || line.ends_with('\r') { + line.pop(); + } + + if line.trim().is_empty() { + return None; + } + + Some(line) +} + +fn log_stream_decode_error(line: &str, err: &serde_json::Error) { + const MAX_PREVIEW_CHARS: usize = 256; + + let total_chars = line.chars().count(); + let truncated = total_chars > MAX_PREVIEW_CHARS; + let mut preview: String = line.chars().take(MAX_PREVIEW_CHARS).collect(); + + if truncated { + preview.push_str("..."); + } + + let preview = preview + .replace('\n', "\\n") + .replace('\r', "\\r") + .replace('\t', "\\t"); + + warn!( + "Failed to parse Ollama stream chunk ({} chars): {}. Preview: \"{}\"", + total_chars, err, preview + ); +} + fn parse_stream_line(line: &str) -> CoreResult { - let value: Value = serde_json::from_str(line).map_err(CoreError::Serialization)?; + let value: Value = serde_json::from_str(line).map_err(|err| { + log_stream_decode_error(line, &err); + CoreError::Serialization(err) + })?; if let Some(error) = value.get("error").and_then(Value::as_str) { return Err(CoreError::Provider(anyhow::anyhow!( @@ -332,10 +373,18 @@ fn parse_stream_line(line: &str) -> CoreResult { metadata: to_metadata_map(&value), }; - if chunk.is_final && chunk.text.is_none() && chunk.metadata.is_empty() { - chunk - .metadata - .insert("status".into(), Value::String("done".into())); + if chunk.is_final { + if let Some(Value::Object(done_obj)) = value.get("done") { + for (key, item) in done_obj { + chunk.metadata.insert(key.clone(), item.clone()); + } + } + + if chunk.text.is_none() && chunk.metadata.is_empty() { + chunk + .metadata + .insert("status".into(), Value::String("done".into())); + } } Ok(chunk) @@ -387,3 +436,64 @@ fn map_reqwest_error(err: reqwest::Error) -> CoreError { CoreError::Provider(err.into()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn prepare_stream_line_preserves_leading_whitespace() { + let mut bytes = br#"{"response":" fn main() {}\n","done":false}"#.to_vec(); + bytes.extend_from_slice(b"\r\n"); + + let line = prepare_stream_line(&bytes).expect("line should be parsed"); + assert!(line.starts_with(r#"{"response""#)); + assert!(line.ends_with(r#""done":false}"#)); + + let chunk = parse_stream_line(&line).expect("chunk should parse"); + assert_eq!( + chunk.text.as_deref(), + Some(" fn main() {}\n"), + "leading indentation must be preserved" + ); + assert!(!chunk.is_final); + } + + #[test] + fn parse_stream_line_handles_samples_fixture() { + let data = include_str!("../../../../samples.json"); + let values: Vec = + serde_json::from_str(data).expect("samples fixture should be valid json"); + + let mut chunks = Vec::new(); + for value in values { + let line = serde_json::to_string(&value).expect("serialize chunk"); + let chunk = parse_stream_line(&line).expect("parse chunk"); + chunks.push(chunk); + } + + assert!( + !chunks.is_empty(), + "fixture must produce at least one chunk" + ); + assert_eq!( + chunks[0].text.as_deref(), + Some("first"), + "first chunk should match fixture payload" + ); + + let final_chunk = chunks.last().expect("final chunk must exist"); + assert!( + final_chunk.is_final, + "last chunk should be marked final per fixture" + ); + assert!( + final_chunk.text.as_deref().unwrap_or_default().is_empty(), + "final chunk should not include stray text" + ); + assert!( + final_chunk.metadata.contains_key("final_data"), + "final chunk should surface metadata from fixture" + ); + } +} diff --git a/samples.json b/samples.json new file mode 100644 index 0000000..6cb700e --- /dev/null +++ b/samples.json @@ -0,0 +1 @@ +[{"response": "first", "done": false}, {"response": "", "done": true, "final_data": {"prompt_eval_count": 2048, "eval_count": 512}}]