fix(provider/ollama): keep stream whitespace intact
Acceptance Criteria:\n- streaming chunks retain leading whitespace and indentation\n- end-of-stream metadata is still propagated\n- malformed frames emit defensive logging without crashing Test Notes:\n- cargo test -p owlen-providers
This commit is contained in:
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use log::warn;
|
||||||
use owlen_core::provider::{
|
use owlen_core::provider::{
|
||||||
GenerateChunk, GenerateRequest, GenerateStream, ModelInfo, ProviderMetadata, ProviderStatus,
|
GenerateChunk, GenerateRequest, GenerateStream, ModelInfo, ProviderMetadata, ProviderStatus,
|
||||||
};
|
};
|
||||||
@@ -132,21 +133,18 @@ impl OllamaClient {
|
|||||||
buffer.extend_from_slice(&bytes);
|
buffer.extend_from_slice(&bytes);
|
||||||
while let Some(pos) = buffer.iter().position(|byte| *byte == b'\n') {
|
while let Some(pos) = buffer.iter().position(|byte| *byte == b'\n') {
|
||||||
let line_bytes: Vec<u8> = buffer.drain(..=pos).collect();
|
let line_bytes: Vec<u8> = buffer.drain(..=pos).collect();
|
||||||
let line = String::from_utf8_lossy(&line_bytes).trim().to_string();
|
if let Some(line) = prepare_stream_line(&line_bytes) {
|
||||||
if line.is_empty() {
|
match parse_stream_line(&line) {
|
||||||
continue;
|
Ok(item) => {
|
||||||
}
|
if tx.send(Ok(item)).await.is_err() {
|
||||||
|
return;
|
||||||
match parse_stream_line(&line) {
|
}
|
||||||
Ok(item) => {
|
}
|
||||||
if tx.send(Ok(item)).await.is_err() {
|
Err(err) => {
|
||||||
|
let _ = tx.send(Err(err)).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
|
||||||
let _ = tx.send(Err(err)).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -158,8 +156,8 @@ impl OllamaClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !buffer.is_empty() {
|
if !buffer.is_empty() {
|
||||||
let line = String::from_utf8_lossy(&buffer).trim().to_string();
|
let line_bytes = std::mem::take(&mut buffer);
|
||||||
if !line.is_empty() {
|
if let Some(line) = prepare_stream_line(&line_bytes) {
|
||||||
match parse_stream_line(&line) {
|
match parse_stream_line(&line) {
|
||||||
Ok(item) => {
|
Ok(item) => {
|
||||||
let _ = tx.send(Ok(item)).await;
|
let _ = tx.send(Ok(item)).await;
|
||||||
@@ -313,8 +311,51 @@ fn to_metadata_map(value: &Value) -> HashMap<String, Value> {
|
|||||||
metadata
|
metadata
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn prepare_stream_line(bytes: &[u8]) -> Option<String> {
|
||||||
|
if bytes.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut line = String::from_utf8_lossy(bytes).into_owned();
|
||||||
|
|
||||||
|
while line.ends_with('\n') || line.ends_with('\r') {
|
||||||
|
line.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if line.trim().is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(line)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn log_stream_decode_error(line: &str, err: &serde_json::Error) {
|
||||||
|
const MAX_PREVIEW_CHARS: usize = 256;
|
||||||
|
|
||||||
|
let total_chars = line.chars().count();
|
||||||
|
let truncated = total_chars > MAX_PREVIEW_CHARS;
|
||||||
|
let mut preview: String = line.chars().take(MAX_PREVIEW_CHARS).collect();
|
||||||
|
|
||||||
|
if truncated {
|
||||||
|
preview.push_str("...");
|
||||||
|
}
|
||||||
|
|
||||||
|
let preview = preview
|
||||||
|
.replace('\n', "\\n")
|
||||||
|
.replace('\r', "\\r")
|
||||||
|
.replace('\t', "\\t");
|
||||||
|
|
||||||
|
warn!(
|
||||||
|
"Failed to parse Ollama stream chunk ({} chars): {}. Preview: \"{}\"",
|
||||||
|
total_chars, err, preview
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
fn parse_stream_line(line: &str) -> CoreResult<GenerateChunk> {
|
fn parse_stream_line(line: &str) -> CoreResult<GenerateChunk> {
|
||||||
let value: Value = serde_json::from_str(line).map_err(CoreError::Serialization)?;
|
let value: Value = serde_json::from_str(line).map_err(|err| {
|
||||||
|
log_stream_decode_error(line, &err);
|
||||||
|
CoreError::Serialization(err)
|
||||||
|
})?;
|
||||||
|
|
||||||
if let Some(error) = value.get("error").and_then(Value::as_str) {
|
if let Some(error) = value.get("error").and_then(Value::as_str) {
|
||||||
return Err(CoreError::Provider(anyhow::anyhow!(
|
return Err(CoreError::Provider(anyhow::anyhow!(
|
||||||
@@ -332,10 +373,18 @@ fn parse_stream_line(line: &str) -> CoreResult<GenerateChunk> {
|
|||||||
metadata: to_metadata_map(&value),
|
metadata: to_metadata_map(&value),
|
||||||
};
|
};
|
||||||
|
|
||||||
if chunk.is_final && chunk.text.is_none() && chunk.metadata.is_empty() {
|
if chunk.is_final {
|
||||||
chunk
|
if let Some(Value::Object(done_obj)) = value.get("done") {
|
||||||
.metadata
|
for (key, item) in done_obj {
|
||||||
.insert("status".into(), Value::String("done".into()));
|
chunk.metadata.insert(key.clone(), item.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if chunk.text.is_none() && chunk.metadata.is_empty() {
|
||||||
|
chunk
|
||||||
|
.metadata
|
||||||
|
.insert("status".into(), Value::String("done".into()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(chunk)
|
Ok(chunk)
|
||||||
@@ -387,3 +436,64 @@ fn map_reqwest_error(err: reqwest::Error) -> CoreError {
|
|||||||
CoreError::Provider(err.into())
|
CoreError::Provider(err.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn prepare_stream_line_preserves_leading_whitespace() {
|
||||||
|
let mut bytes = br#"{"response":" fn main() {}\n","done":false}"#.to_vec();
|
||||||
|
bytes.extend_from_slice(b"\r\n");
|
||||||
|
|
||||||
|
let line = prepare_stream_line(&bytes).expect("line should be parsed");
|
||||||
|
assert!(line.starts_with(r#"{"response""#));
|
||||||
|
assert!(line.ends_with(r#""done":false}"#));
|
||||||
|
|
||||||
|
let chunk = parse_stream_line(&line).expect("chunk should parse");
|
||||||
|
assert_eq!(
|
||||||
|
chunk.text.as_deref(),
|
||||||
|
Some(" fn main() {}\n"),
|
||||||
|
"leading indentation must be preserved"
|
||||||
|
);
|
||||||
|
assert!(!chunk.is_final);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_stream_line_handles_samples_fixture() {
|
||||||
|
let data = include_str!("../../../../samples.json");
|
||||||
|
let values: Vec<Value> =
|
||||||
|
serde_json::from_str(data).expect("samples fixture should be valid json");
|
||||||
|
|
||||||
|
let mut chunks = Vec::new();
|
||||||
|
for value in values {
|
||||||
|
let line = serde_json::to_string(&value).expect("serialize chunk");
|
||||||
|
let chunk = parse_stream_line(&line).expect("parse chunk");
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!chunks.is_empty(),
|
||||||
|
"fixture must produce at least one chunk"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
chunks[0].text.as_deref(),
|
||||||
|
Some("first"),
|
||||||
|
"first chunk should match fixture payload"
|
||||||
|
);
|
||||||
|
|
||||||
|
let final_chunk = chunks.last().expect("final chunk must exist");
|
||||||
|
assert!(
|
||||||
|
final_chunk.is_final,
|
||||||
|
"last chunk should be marked final per fixture"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
final_chunk.text.as_deref().unwrap_or_default().is_empty(),
|
||||||
|
"final chunk should not include stray text"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
final_chunk.metadata.contains_key("final_data"),
|
||||||
|
"final chunk should surface metadata from fixture"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
1
samples.json
Normal file
1
samples.json
Normal file
@@ -0,0 +1 @@
|
|||||||
|
[{"response": "first", "done": false}, {"response": "", "done": true, "final_data": {"prompt_eval_count": 2048, "eval_count": 512}}]
|
||||||
Reference in New Issue
Block a user