feat(owlry-core): implement IPC server over Unix socket
Adds Server struct that listens on a Unix domain socket, accepts client connections (thread-per-client), reads newline-delimited JSON requests, dispatches to ProviderManager/FrecencyStore/Config, and sends JSON responses back. Includes stale socket cleanup and Drop impl for socket removal.
This commit is contained in:
@@ -6,3 +6,4 @@ pub mod notify;
|
||||
pub mod paths;
|
||||
pub mod plugins;
|
||||
pub mod providers;
|
||||
pub mod server;
|
||||
|
||||
237
crates/owlry-core/src/server.rs
Normal file
237
crates/owlry-core/src/server.rs
Normal file
@@ -0,0 +1,237 @@
|
||||
use std::io::{self, BufRead, BufReader, Write};
|
||||
use std::os::unix::net::{UnixListener, UnixStream};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
use log::{error, info, warn};
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::data::FrecencyStore;
|
||||
use crate::filter::ProviderFilter;
|
||||
use crate::ipc::{ProviderDesc, Request, Response, ResultItem};
|
||||
use crate::providers::{LaunchItem, ProviderManager};
|
||||
|
||||
/// IPC server that listens on a Unix domain socket and dispatches
|
||||
/// requests to the provider system.
|
||||
pub struct Server {
|
||||
listener: UnixListener,
|
||||
socket_path: PathBuf,
|
||||
provider_manager: Arc<Mutex<ProviderManager>>,
|
||||
frecency: Arc<Mutex<FrecencyStore>>,
|
||||
config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
/// Bind to the given socket path, loading config and creating a ProviderManager.
|
||||
///
|
||||
/// Removes a stale socket file if one already exists at the path.
|
||||
pub fn bind(socket_path: &Path) -> io::Result<Self> {
|
||||
// Remove stale socket if present
|
||||
if socket_path.exists() {
|
||||
info!("Removing stale socket at {:?}", socket_path);
|
||||
std::fs::remove_file(socket_path)?;
|
||||
}
|
||||
|
||||
let listener = UnixListener::bind(socket_path)?;
|
||||
info!("IPC server listening on {:?}", socket_path);
|
||||
|
||||
let config = Config::load_or_default();
|
||||
let provider_manager = ProviderManager::new_with_config(&config);
|
||||
let frecency = FrecencyStore::new();
|
||||
|
||||
Ok(Self {
|
||||
listener,
|
||||
socket_path: socket_path.to_path_buf(),
|
||||
provider_manager: Arc::new(Mutex::new(provider_manager)),
|
||||
frecency: Arc::new(Mutex::new(frecency)),
|
||||
config: Arc::new(config),
|
||||
})
|
||||
}
|
||||
|
||||
/// Accept connections in a loop, spawning a thread per client.
|
||||
pub fn run(&self) -> io::Result<()> {
|
||||
info!("Server entering accept loop");
|
||||
for stream in self.listener.incoming() {
|
||||
match stream {
|
||||
Ok(stream) => {
|
||||
let pm = Arc::clone(&self.provider_manager);
|
||||
let frecency = Arc::clone(&self.frecency);
|
||||
let config = Arc::clone(&self.config);
|
||||
thread::spawn(move || {
|
||||
if let Err(e) = Self::handle_client(stream, pm, frecency, config) {
|
||||
warn!("Client handler error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to accept connection: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Accept one connection and handle all its requests until EOF.
|
||||
///
|
||||
/// Intended for integration tests where spawning a full accept loop
|
||||
/// is unnecessary.
|
||||
pub fn handle_one_for_testing(&self) -> io::Result<()> {
|
||||
let (stream, _addr) = self.listener.accept()?;
|
||||
Self::handle_client(
|
||||
stream,
|
||||
Arc::clone(&self.provider_manager),
|
||||
Arc::clone(&self.frecency),
|
||||
Arc::clone(&self.config),
|
||||
)
|
||||
}
|
||||
|
||||
/// Read newline-delimited JSON requests from a single client stream,
|
||||
/// dispatch each, and write the JSON response back.
|
||||
fn handle_client(
|
||||
stream: UnixStream,
|
||||
pm: Arc<Mutex<ProviderManager>>,
|
||||
frecency: Arc<Mutex<FrecencyStore>>,
|
||||
config: Arc<Config>,
|
||||
) -> io::Result<()> {
|
||||
let reader = BufReader::new(stream.try_clone()?);
|
||||
let mut writer = stream;
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = line?;
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let request: Request = match serde_json::from_str(trimmed) {
|
||||
Ok(req) => req,
|
||||
Err(e) => {
|
||||
let resp = Response::Error {
|
||||
message: format!("invalid request JSON: {}", e),
|
||||
};
|
||||
write_response(&mut writer, &resp)?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let response = Self::handle_request(&request, &pm, &frecency, &config);
|
||||
write_response(&mut writer, &response)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Dispatch a single request to the appropriate subsystem and return
|
||||
/// the response.
|
||||
fn handle_request(
|
||||
request: &Request,
|
||||
pm: &Arc<Mutex<ProviderManager>>,
|
||||
frecency: &Arc<Mutex<FrecencyStore>>,
|
||||
config: &Arc<Config>,
|
||||
) -> Response {
|
||||
match request {
|
||||
Request::Query { text, modes } => {
|
||||
let filter = match modes {
|
||||
Some(m) => ProviderFilter::from_mode_strings(m),
|
||||
None => ProviderFilter::all(),
|
||||
};
|
||||
let max = config.general.max_results;
|
||||
let weight = config.providers.frecency_weight;
|
||||
|
||||
let pm_guard = pm.lock().unwrap();
|
||||
let frecency_guard = frecency.lock().unwrap();
|
||||
let results =
|
||||
pm_guard.search_with_frecency(text, max, &filter, &frecency_guard, weight, None);
|
||||
|
||||
Response::Results {
|
||||
items: results
|
||||
.into_iter()
|
||||
.map(|(item, score)| launch_item_to_result(item, score))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
Request::Launch { item_id, provider: _ } => {
|
||||
let mut frecency_guard = frecency.lock().unwrap();
|
||||
frecency_guard.record_launch(item_id);
|
||||
Response::Ack
|
||||
}
|
||||
|
||||
Request::Providers => {
|
||||
let pm_guard = pm.lock().unwrap();
|
||||
let descs = pm_guard.available_providers();
|
||||
Response::Providers {
|
||||
list: descs.into_iter().map(descriptor_to_desc).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
Request::Refresh { provider } => {
|
||||
let mut pm_guard = pm.lock().unwrap();
|
||||
pm_guard.refresh_provider(provider);
|
||||
Response::Ack
|
||||
}
|
||||
|
||||
Request::Toggle => {
|
||||
// Toggle visibility is a client-side concern; the daemon just acks.
|
||||
Response::Ack
|
||||
}
|
||||
|
||||
Request::Submenu { plugin_id, data } => {
|
||||
let pm_guard = pm.lock().unwrap();
|
||||
match pm_guard.query_submenu_actions(plugin_id, data, plugin_id) {
|
||||
Some((_name, actions)) => Response::SubmenuItems {
|
||||
items: actions
|
||||
.into_iter()
|
||||
.map(|item| launch_item_to_result(item, 0))
|
||||
.collect(),
|
||||
},
|
||||
None => Response::Error {
|
||||
message: format!("no submenu actions for plugin '{}'", plugin_id),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Server {
|
||||
fn drop(&mut self) {
|
||||
// Best-effort cleanup of the socket file
|
||||
if self.socket_path.exists() {
|
||||
let _ = std::fs::remove_file(&self.socket_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serialize a response as a single JSON line terminated by newline.
|
||||
fn write_response(writer: &mut UnixStream, response: &Response) -> io::Result<()> {
|
||||
let mut json = serde_json::to_string(response)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
json.push('\n');
|
||||
writer.write_all(json.as_bytes())?;
|
||||
writer.flush()
|
||||
}
|
||||
|
||||
fn launch_item_to_result(item: LaunchItem, score: i64) -> ResultItem {
|
||||
ResultItem {
|
||||
id: item.id,
|
||||
title: item.name,
|
||||
description: item.description.unwrap_or_default(),
|
||||
icon: item.icon.unwrap_or_default(),
|
||||
provider: format!("{}", item.provider),
|
||||
score,
|
||||
command: Some(item.command),
|
||||
tags: item.tags,
|
||||
}
|
||||
}
|
||||
|
||||
fn descriptor_to_desc(desc: crate::providers::ProviderDescriptor) -> ProviderDesc {
|
||||
ProviderDesc {
|
||||
id: desc.id,
|
||||
name: desc.name,
|
||||
prefix: desc.prefix,
|
||||
icon: desc.icon,
|
||||
position: desc.position,
|
||||
}
|
||||
}
|
||||
229
crates/owlry-core/tests/server_test.rs
Normal file
229
crates/owlry-core/tests/server_test.rs
Normal file
@@ -0,0 +1,229 @@
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::os::unix::net::UnixStream;
|
||||
use std::thread;
|
||||
|
||||
use owlry_core::ipc::{Request, Response};
|
||||
use owlry_core::server::Server;
|
||||
|
||||
/// Helper: send a JSON request line and read the JSON response line.
|
||||
fn roundtrip(stream: &mut UnixStream, request: &Request) -> Response {
|
||||
let mut line = serde_json::to_string(request).unwrap();
|
||||
line.push('\n');
|
||||
stream.write_all(line.as_bytes()).unwrap();
|
||||
stream.flush().unwrap();
|
||||
|
||||
let mut reader = BufReader::new(stream.try_clone().unwrap());
|
||||
let mut buf = String::new();
|
||||
reader.read_line(&mut buf).unwrap();
|
||||
serde_json::from_str(buf.trim()).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_responds_to_providers_request() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
// Spawn the server to handle exactly one connection
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
// Connect as a client
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
let resp = roundtrip(&mut stream, &Request::Providers);
|
||||
|
||||
match resp {
|
||||
Response::Providers { list } => {
|
||||
// The default ProviderManager always has at least Application and Command
|
||||
assert!(list.len() >= 2, "expected at least 2 providers, got {}", list.len());
|
||||
let ids: Vec<&str> = list.iter().map(|p| p.id.as_str()).collect();
|
||||
assert!(ids.contains(&"app"), "missing 'app' provider");
|
||||
assert!(ids.contains(&"cmd"), "missing 'cmd' provider");
|
||||
}
|
||||
other => panic!("expected Providers response, got: {:?}", other),
|
||||
}
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_handles_launch_request() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
let req = Request::Launch {
|
||||
item_id: "firefox.desktop".into(),
|
||||
provider: "app".into(),
|
||||
};
|
||||
let resp = roundtrip(&mut stream, &req);
|
||||
|
||||
assert_eq!(resp, Response::Ack);
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_handles_query_request() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
let req = Request::Query {
|
||||
text: "nonexistent_query_xyz".into(),
|
||||
modes: None,
|
||||
};
|
||||
let resp = roundtrip(&mut stream, &req);
|
||||
|
||||
match resp {
|
||||
Response::Results { items } => {
|
||||
// A nonsense query should return empty or very few results
|
||||
// (no items will fuzzy-match "nonexistent_query_xyz")
|
||||
assert!(items.len() <= 5, "expected few/no results for gibberish query");
|
||||
}
|
||||
other => panic!("expected Results response, got: {:?}", other),
|
||||
}
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_handles_toggle_request() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
let resp = roundtrip(&mut stream, &Request::Toggle);
|
||||
|
||||
assert_eq!(resp, Response::Ack);
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_handles_refresh_request() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
let req = Request::Refresh {
|
||||
provider: "app".into(),
|
||||
};
|
||||
let resp = roundtrip(&mut stream, &req);
|
||||
|
||||
assert_eq!(resp, Response::Ack);
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_handles_submenu_for_unknown_plugin() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
let req = Request::Submenu {
|
||||
plugin_id: "nonexistent_plugin".into(),
|
||||
data: "some_data".into(),
|
||||
};
|
||||
let resp = roundtrip(&mut stream, &req);
|
||||
|
||||
match resp {
|
||||
Response::Error { message } => {
|
||||
assert!(
|
||||
message.contains("nonexistent_plugin"),
|
||||
"error should mention the plugin id"
|
||||
);
|
||||
}
|
||||
other => panic!("expected Error response for unknown plugin, got: {:?}", other),
|
||||
}
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_handles_multiple_requests_per_connection() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
|
||||
// Send Providers request
|
||||
let resp1 = roundtrip(&mut stream, &Request::Providers);
|
||||
assert!(matches!(resp1, Response::Providers { .. }));
|
||||
|
||||
// Send Toggle request on same connection
|
||||
let resp2 = roundtrip(&mut stream, &Request::Toggle);
|
||||
assert_eq!(resp2, Response::Ack);
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_cleans_up_stale_socket() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let sock = dir.path().join("owlry-test.sock");
|
||||
|
||||
// Create a stale socket file
|
||||
std::os::unix::net::UnixListener::bind(&sock).unwrap();
|
||||
assert!(sock.exists());
|
||||
|
||||
// Server::bind should succeed by removing the stale socket
|
||||
let server = Server::bind(&sock).unwrap();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
server.handle_one_for_testing().unwrap();
|
||||
});
|
||||
|
||||
let mut stream = UnixStream::connect(&sock).unwrap();
|
||||
let resp = roundtrip(&mut stream, &Request::Toggle);
|
||||
assert_eq!(resp, Response::Ack);
|
||||
|
||||
drop(stream);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
Reference in New Issue
Block a user