636 lines
18 KiB
Python
636 lines
18 KiB
Python
"""
|
||
Owly News Summariser Backend
|
||
|
||
This module provides a FastAPI application that serves as the backend for the Owly News Summariser.
|
||
It handles fetching news from RSS feeds, summarizing articles using Ollama/qwen, and providing
|
||
an API for the frontend to access the summarized news.
|
||
|
||
The application uses SQLite for data storage and APScheduler for scheduling periodic news harvesting.
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import sqlite3
|
||
from contextlib import contextmanager
|
||
from datetime import datetime, timezone, timedelta
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Any, Union, Iterator, Tuple, TypedDict, cast
|
||
|
||
import feedparser
|
||
import httpx
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
from fastapi import FastAPI, Response, status, Depends
|
||
from fastapi.staticfiles import StaticFiles
|
||
from pydantic import BaseModel
|
||
|
||
# Constants
|
||
DB_PATH = Path("owlynews.sqlite")
|
||
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
||
MIN_CRON_HOURS = 0.5
|
||
DEFAULT_CRON_HOURS = float(os.getenv("CRON_HOURS", MIN_CRON_HOURS))
|
||
CRON_HOURS = max(MIN_CRON_HOURS, DEFAULT_CRON_HOURS)
|
||
SYNC_COOLDOWN_MINUTES = 30
|
||
LLM_MODEL = "qwen2:7b-instruct-q4_K_M"
|
||
LLM_TIMEOUT_SECONDS = 180
|
||
OLLAMA_API_TIMEOUT_SECONDS = 10
|
||
|
||
# FastAPI app initialization
|
||
app = FastAPI(
|
||
title="Owly News Summariser",
|
||
description="API for the Owly News Summariser application",
|
||
version="1.0.0"
|
||
)
|
||
|
||
# Database schema definitions
|
||
SCHEMA_SQL = [
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS news (
|
||
id TEXT PRIMARY KEY, -- e.g. URL as unique identifier
|
||
title TEXT NOT NULL,
|
||
summary_de TEXT,
|
||
summary_en TEXT,
|
||
published INTEGER, -- Unix epoch (UTC); use TEXT ISO-8601 if you prefer
|
||
source TEXT,
|
||
country TEXT,
|
||
source_feed TEXT
|
||
)
|
||
""",
|
||
"CREATE INDEX IF NOT EXISTS idx_news_published ON news(published)",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS feeds (
|
||
id INTEGER PRIMARY KEY, -- auto-increment via rowid
|
||
country TEXT,
|
||
url TEXT UNIQUE NOT NULL
|
||
)
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS settings (
|
||
key TEXT PRIMARY KEY,
|
||
val TEXT NOT NULL
|
||
)
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS meta (
|
||
key TEXT PRIMARY KEY,
|
||
val TEXT NOT NULL
|
||
)
|
||
"""
|
||
]
|
||
|
||
|
||
class DatabaseManager:
|
||
"""
|
||
Manages database connections and operations for the application.
|
||
Provides methods for initializing the database, executing queries,
|
||
and managing transactions.
|
||
"""
|
||
|
||
def __init__(self, db_path: Path):
|
||
"""
|
||
Initialize the database manager with the given database path.
|
||
|
||
Args:
|
||
db_path: Path to the SQLite database file
|
||
"""
|
||
self.db_path = db_path
|
||
self._connection = None
|
||
self._initialize_db()
|
||
|
||
def _get_connection(self) -> sqlite3.Connection:
|
||
"""
|
||
Get or create a database connection.
|
||
|
||
Returns:
|
||
An active SQLite connection
|
||
"""
|
||
if self._connection is None:
|
||
self._connection = sqlite3.connect(
|
||
self.db_path,
|
||
check_same_thread=False
|
||
)
|
||
self._connection.row_factory = sqlite3.Row
|
||
return self._connection
|
||
|
||
@contextmanager
|
||
def get_cursor(self) -> Iterator[sqlite3.Cursor]:
|
||
"""
|
||
Context manager that provides a database cursor and handles commits and rollbacks.
|
||
|
||
Yields:
|
||
A database cursor for executing SQL statements
|
||
|
||
Example:
|
||
```python
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute("SELECT * FROM table")
|
||
results = cursor.fetchall()
|
||
```
|
||
"""
|
||
conn = self._get_connection()
|
||
cursor = conn.cursor()
|
||
try:
|
||
yield cursor
|
||
conn.commit()
|
||
except Exception:
|
||
conn.rollback()
|
||
raise
|
||
|
||
def _initialize_db(self) -> None:
|
||
"""
|
||
Initialize the database schema and default settings.
|
||
Creates tables if they don't exist and inserts default values.
|
||
"""
|
||
# Create schema
|
||
with self.get_cursor() as cursor:
|
||
for stmt in SCHEMA_SQL:
|
||
cursor.execute(stmt)
|
||
|
||
# Insert initial settings
|
||
cursor.execute(
|
||
"INSERT INTO settings VALUES (?, ?) ON CONFLICT (key) DO NOTHING",
|
||
("cron_hours", str(CRON_HOURS))
|
||
)
|
||
|
||
# Insert initial metadata
|
||
cursor.execute(
|
||
"INSERT INTO meta VALUES (?, ?) ON CONFLICT (key) DO NOTHING",
|
||
("last_sync", "0")
|
||
)
|
||
|
||
# Seed feeds if none exist
|
||
cursor.execute("SELECT COUNT(*) as count FROM feeds")
|
||
if cursor.fetchone()["count"] == 0:
|
||
self._seed_feeds()
|
||
|
||
def _seed_feeds(self) -> None:
|
||
"""
|
||
Seed the database with initial feeds from the seed_feeds.json file.
|
||
Only runs if the feeds table is empty.
|
||
"""
|
||
try:
|
||
seed_path = Path(__file__).with_name("seed_feeds.json")
|
||
with open(seed_path, "r") as f:
|
||
seed_data = json.load(f)
|
||
|
||
with self.get_cursor() as cursor:
|
||
for country, urls in seed_data.items():
|
||
for url in urls:
|
||
cursor.execute(
|
||
"INSERT INTO feeds (country, url) VALUES (?, ?) "
|
||
"ON CONFLICT (url) DO NOTHING",
|
||
(country, url)
|
||
)
|
||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||
print(f"Error seeding feeds: {e}")
|
||
|
||
|
||
# Initialize database manager
|
||
db_manager = DatabaseManager(DB_PATH)
|
||
|
||
|
||
class ArticleSummary(TypedDict):
|
||
"""Type definition for article summary data returned from the LLM."""
|
||
title: str
|
||
summary_de: str
|
||
summary_en: str
|
||
|
||
|
||
class NewsFetcher:
|
||
"""
|
||
Handles fetching and summarizing news articles from RSS feeds.
|
||
Uses Ollama/qwen to generate summaries of articles.
|
||
"""
|
||
|
||
@staticmethod
|
||
def build_prompt(url: str) -> str:
|
||
"""
|
||
Generate a prompt for the LLM to summarize an article.
|
||
|
||
Args:
|
||
url: Public URL of the article to summarize
|
||
|
||
Returns:
|
||
A formatted prompt string that instructs the LLM to generate
|
||
a JSON response with title and summaries in German and English
|
||
|
||
Note:
|
||
LLMs like qwen2 don't have native web access; the model will
|
||
generate summaries based on its training data and the URL.
|
||
"""
|
||
return (
|
||
"### Aufgabe\n"
|
||
f"Du bekommst eine öffentliche URL: {url}\n"
|
||
"### Regeln\n"
|
||
"1. **Entnimm den Inhalt nicht automatisch.** "
|
||
"Falls dir der Text nicht vorliegt, antworte mit leeren Strings.\n"
|
||
"2. Gib ausschließlich **gültiges minifiziertes JSON** zurück – "
|
||
"kein Markdown, keine Kommentare.\n"
|
||
"3. Struktur:\n"
|
||
"{\"title\":\"…\",\"summary_de\":\"…\",\"summary_en\":\"…\"}\n"
|
||
"4. summary_de ≤ 160 Wörter, summary_en ≤ 160 Wörter. Zähle selbst.\n"
|
||
"5. Kein Text vor oder nach dem JSON.\n"
|
||
"### Ausgabe\n"
|
||
"Jetzt antworte."
|
||
)
|
||
|
||
@staticmethod
|
||
async def summarize_article(
|
||
client: httpx.AsyncClient,
|
||
url: str
|
||
) -> Optional[ArticleSummary]:
|
||
"""
|
||
Generate a summary of an article using the LLM.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
url: URL of the article to summarize
|
||
|
||
Returns:
|
||
A dictionary containing the article title and summaries in German and English,
|
||
or None if summarization failed
|
||
"""
|
||
prompt = NewsFetcher.build_prompt(url)
|
||
payload = {
|
||
"model": LLM_MODEL,
|
||
"prompt": prompt,
|
||
"stream": False,
|
||
"temperature": 0.2,
|
||
"format": "json"
|
||
}
|
||
|
||
try:
|
||
response = await client.post(
|
||
f"{OLLAMA_HOST}/api/generate",
|
||
json=payload,
|
||
timeout=LLM_TIMEOUT_SECONDS
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
return cast(ArticleSummary, result["response"])
|
||
except (KeyError, ValueError, httpx.HTTPError, json.JSONDecodeError) as e:
|
||
print(f"Error summarizing article {url}: {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
async def harvest_feeds() -> None:
|
||
"""
|
||
Fetch articles from all feeds and store summaries in the database.
|
||
This is the main function that runs periodically to update the news database.
|
||
"""
|
||
try:
|
||
# Get all feeds from the database
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute("SELECT country, url FROM feeds")
|
||
feeds = cursor.fetchall()
|
||
|
||
# Process each feed
|
||
async with httpx.AsyncClient() as client:
|
||
for feed_row in feeds:
|
||
await NewsFetcher._process_feed(client, feed_row)
|
||
|
||
# Update last sync timestamp
|
||
current_time = int(datetime.now(timezone.utc).timestamp())
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute(
|
||
"UPDATE meta SET val=? WHERE key='last_sync'",
|
||
(str(current_time),)
|
||
)
|
||
except Exception as e:
|
||
print(f"Error harvesting feeds: {e}")
|
||
|
||
@staticmethod
|
||
async def _process_feed(
|
||
client: httpx.AsyncClient,
|
||
feed_row: sqlite3.Row
|
||
) -> None:
|
||
"""
|
||
Process a single feed, fetching and summarizing all articles.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
feed_row: A database row containing feed information
|
||
"""
|
||
try:
|
||
feed_data = feedparser.parse(feed_row["url"])
|
||
|
||
for entry in feed_data.entries:
|
||
# Skip entries without links or published dates
|
||
if not hasattr(entry, "link") or not hasattr(entry, "published_parsed"):
|
||
continue
|
||
|
||
article_id = entry.link
|
||
|
||
# Parse the published date
|
||
try:
|
||
published = datetime(
|
||
*entry.published_parsed[:6],
|
||
tzinfo=timezone.utc
|
||
)
|
||
except (TypeError, ValueError):
|
||
# Skip entries with invalid dates
|
||
continue
|
||
|
||
# Get article summary
|
||
summary = await NewsFetcher.summarize_article(client, entry.link)
|
||
if not summary:
|
||
continue
|
||
|
||
# Store in database
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO news (
|
||
id, title, summary_de, summary_en, published,
|
||
source, country, source_feed
|
||
)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
ON CONFLICT (id) DO NOTHING
|
||
""",
|
||
(
|
||
article_id,
|
||
summary["title"],
|
||
summary["summary_de"],
|
||
summary["summary_en"],
|
||
published.isoformat(),
|
||
entry.get("source", {}).get("title", feed_row["url"]),
|
||
feed_row["country"],
|
||
feed_row["url"],
|
||
)
|
||
)
|
||
except Exception as e:
|
||
print(f"Error processing feed {feed_row['url']}: {e}")
|
||
|
||
|
||
# Initialize scheduler
|
||
scheduler = AsyncIOScheduler(timezone="UTC")
|
||
scheduler.add_job(
|
||
NewsFetcher.harvest_feeds,
|
||
"interval",
|
||
hours=CRON_HOURS,
|
||
id="harvest"
|
||
)
|
||
scheduler.start()
|
||
|
||
|
||
# Pydantic models for API requests and responses
|
||
class CronSettings(BaseModel):
|
||
"""Settings for the cron job that harvests news."""
|
||
hours: float
|
||
|
||
|
||
class FeedData(BaseModel):
|
||
"""Data for a news feed."""
|
||
country: str
|
||
url: str
|
||
|
||
|
||
class ModelStatus(BaseModel):
|
||
"""Status of the LLM model."""
|
||
name: str
|
||
status: str
|
||
available_models: List[str]
|
||
|
||
|
||
class ErrorResponse(BaseModel):
|
||
"""Standard error response."""
|
||
status: str
|
||
message: str
|
||
|
||
|
||
class SuccessResponse(BaseModel):
|
||
"""Standard success response."""
|
||
status: str
|
||
|
||
|
||
class TimestampResponse(BaseModel):
|
||
"""Response containing a timestamp."""
|
||
ts: int
|
||
|
||
|
||
class HoursResponse(BaseModel):
|
||
"""Response containing hours setting."""
|
||
hours: float
|
||
|
||
|
||
# Dependency for getting a database cursor
|
||
def get_db():
|
||
"""
|
||
Dependency that provides a database cursor.
|
||
|
||
Yields:
|
||
A database cursor for executing SQL statements
|
||
"""
|
||
with db_manager.get_cursor() as cursor:
|
||
yield cursor
|
||
|
||
|
||
# API endpoints
|
||
@app.get("/news", response_model=List[Dict[str, Any]])
|
||
async def get_news(
|
||
country: str = "DE",
|
||
from_: str = "2025-07-01",
|
||
to: str = datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||
db: sqlite3.Cursor = Depends(get_db)
|
||
):
|
||
"""
|
||
Get news articles filtered by country and date range.
|
||
|
||
Args:
|
||
country: Country code to filter by (default: "DE")
|
||
from_: Start date in ISO format (default: "2025-07-01")
|
||
to: End date in ISO format (default: current date)
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
List of news articles matching the criteria
|
||
"""
|
||
db.execute(
|
||
"""
|
||
SELECT * FROM news
|
||
WHERE country=? AND published BETWEEN ? AND ?
|
||
ORDER BY published DESC
|
||
""",
|
||
(country, from_, to)
|
||
)
|
||
return [dict(row) for row in db.fetchall()]
|
||
|
||
|
||
@app.get("/meta/last-sync", response_model=TimestampResponse)
|
||
async def get_last_sync(db: sqlite3.Cursor = Depends(get_db)):
|
||
"""
|
||
Get the timestamp of the last successful feed synchronization.
|
||
|
||
Args:
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Object containing the timestamp as a Unix epoch
|
||
"""
|
||
db.execute("SELECT val FROM meta WHERE key='last_sync'")
|
||
row = db.fetchone()
|
||
return {"ts": int(row["val"])}
|
||
|
||
|
||
@app.put("/settings/cron", response_model=HoursResponse)
|
||
async def set_cron_schedule(
|
||
data: CronSettings,
|
||
db: sqlite3.Cursor = Depends(get_db)
|
||
):
|
||
"""
|
||
Update the cron schedule for harvesting news.
|
||
|
||
Args:
|
||
data: New cron settings with hours interval
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Object containing the updated hours setting
|
||
"""
|
||
# Ensure minimum interval
|
||
hours = max(MIN_CRON_HOURS, data.hours)
|
||
|
||
# Update scheduler
|
||
scheduler.get_job("harvest").modify(trigger="interval", hours=hours)
|
||
|
||
# Update database
|
||
db.execute(
|
||
"UPDATE settings SET val=? WHERE key='cron_hours'",
|
||
(str(hours),)
|
||
)
|
||
|
||
return {"hours": hours}
|
||
|
||
|
||
@app.get("/feeds", response_model=List[Dict[str, Any]])
|
||
async def list_feeds(db: sqlite3.Cursor = Depends(get_db)):
|
||
"""
|
||
List all registered news feeds.
|
||
|
||
Args:
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
List of feed objects with id, country, and url
|
||
"""
|
||
db.execute("SELECT * FROM feeds ORDER BY country")
|
||
return [dict(row) for row in db.fetchall()]
|
||
|
||
|
||
@app.post("/feeds", response_model=SuccessResponse)
|
||
async def add_feed(
|
||
feed: FeedData,
|
||
db: sqlite3.Cursor = Depends(get_db)
|
||
):
|
||
"""
|
||
Add a new news feed.
|
||
|
||
Args:
|
||
feed: Feed data with country and URL
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Success status
|
||
"""
|
||
db.execute(
|
||
"INSERT INTO feeds (country, url) VALUES (?, ?) "
|
||
"ON CONFLICT (url) DO NOTHING",
|
||
(feed.country, feed.url)
|
||
)
|
||
|
||
return {"status": "added"}
|
||
|
||
|
||
@app.delete("/feeds", response_model=SuccessResponse)
|
||
async def delete_feed(
|
||
url: str,
|
||
db: sqlite3.Cursor = Depends(get_db)
|
||
):
|
||
"""
|
||
Delete a news feed by URL.
|
||
|
||
Args:
|
||
url: URL of the feed to delete
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Success status
|
||
"""
|
||
db.execute("DELETE FROM feeds WHERE url=?", (url,))
|
||
return {"status": "deleted"}
|
||
|
||
|
||
@app.get("/model/status", response_model=Union[ModelStatus, ErrorResponse])
|
||
async def get_model_status():
|
||
"""
|
||
Check the status of the LLM model.
|
||
|
||
Returns:
|
||
Object containing model name, status, and available models,
|
||
or an error response if the model service is unavailable
|
||
"""
|
||
try:
|
||
async with httpx.AsyncClient() as client:
|
||
# Get model information from Ollama
|
||
response = await client.get(
|
||
f"{OLLAMA_HOST}/api/tags",
|
||
timeout=OLLAMA_API_TIMEOUT_SECONDS
|
||
)
|
||
response.raise_for_status()
|
||
|
||
models_data = response.json()
|
||
models = models_data.get("models", [])
|
||
|
||
# Check if the current model is available
|
||
model_available = any(
|
||
model.get("name") == LLM_MODEL for model in models
|
||
)
|
||
|
||
return {
|
||
"name": LLM_MODEL,
|
||
"status": "ready" if model_available else "not available",
|
||
"available_models": [model.get("name") for model in models]
|
||
}
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
|
||
@app.post("/sync", response_model=None)
|
||
async def manual_sync(db: sqlite3.Cursor = Depends(get_db)):
|
||
"""
|
||
Manually trigger a feed synchronization.
|
||
|
||
Args:
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Success status or error response if sync was triggered too recently
|
||
"""
|
||
# Check when the last sync was performed
|
||
db.execute("SELECT val FROM meta WHERE key='last_sync'")
|
||
row = db.fetchone()
|
||
last_sync_ts = int(row["val"])
|
||
|
||
# Enforce cooldown period
|
||
now = datetime.now(timezone.utc)
|
||
last_sync_time = datetime.fromtimestamp(last_sync_ts, timezone.utc)
|
||
|
||
if now - last_sync_time < timedelta(minutes=SYNC_COOLDOWN_MINUTES):
|
||
return Response(
|
||
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
||
content=f"Sync too soon – wait {SYNC_COOLDOWN_MINUTES} min."
|
||
)
|
||
|
||
# Trigger sync in background
|
||
asyncio.create_task(NewsFetcher.harvest_feeds())
|
||
return {"status": "triggered"}
|
||
|
||
|
||
# Mount static frontend
|
||
frontend_path = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
||
"frontend",
|
||
"dist"
|
||
)
|
||
app.mount("/", StaticFiles(directory=frontend_path, html=True), name="static")
|