Compare commits
4 Commits
eed5f4afbb
...
0fd2c7a8b6
Author | SHA1 | Date | |
---|---|---|---|
0fd2c7a8b6 | |||
e22f3a627a | |||
3a1c817381 | |||
003b8da4b2 |
4
backend/.gitignore
vendored
4
backend/.gitignore
vendored
@@ -54,3 +54,7 @@ logs/
|
||||
.vscode/
|
||||
*.swp
|
||||
*.swo
|
||||
/owlynews.sqlite-shm
|
||||
/owlynews.sqlite-wal
|
||||
/owlynews.sqlite3-shm
|
||||
/owlynews.sqlite3-wal
|
||||
|
110
backend/app/config.py
Normal file
110
backend/app/config.py
Normal file
@@ -0,0 +1,110 @@
|
||||
from pathlib import Path
|
||||
import os
|
||||
import logging
|
||||
|
||||
DB_PATH = Path(os.getenv("DB_NAME", "owlynews.sqlite3"))
|
||||
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
||||
MIN_CRON_HOURS = float(os.getenv("MIN_CRON_HOURS", 0.5))
|
||||
DEFAULT_CRON_HOURS = float(os.getenv("CRON_HOURS", MIN_CRON_HOURS))
|
||||
CRON_HOURS = max(MIN_CRON_HOURS, DEFAULT_CRON_HOURS)
|
||||
SYNC_COOLDOWN_MINUTES = int(os.getenv("SYNC_COOLDOWN_MINUTES", 30))
|
||||
LLM_MODEL = os.getenv("LLM_MODEL", "qwen2:7b-instruct-q4_K_M")
|
||||
LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", 180))
|
||||
OLLAMA_API_TIMEOUT_SECONDS = int(os.getenv("OLLAMA_API_TIMEOUT_SECONDS", 10))
|
||||
ARTICLE_FETCH_TIMEOUT = int(os.getenv("ARTICLE_FETCH_TIMEOUT", 30))
|
||||
MAX_ARTICLE_LENGTH = int(os.getenv("MAX_ARTICLE_LENGTH", 5000))
|
||||
|
||||
frontend_path = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
||||
"frontend",
|
||||
"dist"
|
||||
)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.WARNING,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def update_constants_from_db(settings_dict):
|
||||
"""
|
||||
Update global constants with values from the database settings.
|
||||
Environment variables take precedence over database settings.
|
||||
|
||||
Args:
|
||||
settings_dict: Dictionary of settings from the database
|
||||
"""
|
||||
global OLLAMA_HOST, MIN_CRON_HOURS, CRON_HOURS, SYNC_COOLDOWN_MINUTES
|
||||
global LLM_MODEL, LLM_TIMEOUT_SECONDS, OLLAMA_API_TIMEOUT_SECONDS
|
||||
global ARTICLE_FETCH_TIMEOUT, MAX_ARTICLE_LENGTH
|
||||
|
||||
if 'ollama_host' in settings_dict and os.getenv("OLLAMA_HOST") is None:
|
||||
OLLAMA_HOST = settings_dict['ollama_host']
|
||||
|
||||
if 'min_cron_hours' in settings_dict and os.getenv("MIN_CRON_HOURS") is None:
|
||||
try:
|
||||
MIN_CRON_HOURS = float(settings_dict['min_cron_hours'])
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(
|
||||
f"⚠️ Invalid min_cron_hours value in DB: "
|
||||
f"{settings_dict['min_cron_hours']}"
|
||||
)
|
||||
|
||||
if 'cron_hours' in settings_dict and os.getenv("CRON_HOURS") is None:
|
||||
try:
|
||||
cron_hours_value = float(settings_dict['cron_hours'])
|
||||
CRON_HOURS = max(MIN_CRON_HOURS, cron_hours_value)
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(
|
||||
f"⚠️ Invalid cron_hours value in DB: "
|
||||
f"{settings_dict['cron_hours']}"
|
||||
)
|
||||
|
||||
if 'sync_cooldown_minutes' in settings_dict and os.getenv("SYNC_COOLDOWN_MINUTES") is None:
|
||||
try:
|
||||
SYNC_COOLDOWN_MINUTES = int(settings_dict['sync_cooldown_minutes'])
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(
|
||||
f"⚠️ Invalid sync_cooldown_minutes value in DB: "
|
||||
f"{settings_dict['sync_cooldown_minutes']}"
|
||||
)
|
||||
|
||||
if 'llm_model' in settings_dict and os.getenv("LLM_MODEL") is None:
|
||||
LLM_MODEL = settings_dict['llm_model']
|
||||
|
||||
if 'llm_timeout_seconds' in settings_dict and os.getenv("LLM_TIMEOUT_SECONDS") is None:
|
||||
try:
|
||||
LLM_TIMEOUT_SECONDS = int(settings_dict['llm_timeout_seconds'])
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(
|
||||
f"⚠️ Invalid llm_timeout_seconds value in DB: "
|
||||
f"{settings_dict['llm_timeout_seconds']}"
|
||||
)
|
||||
|
||||
if 'ollama_api_timeout_seconds' in settings_dict and os.getenv("OLLAMA_API_TIMEOUT_SECONDS") is None:
|
||||
try:
|
||||
OLLAMA_API_TIMEOUT_SECONDS = int(settings_dict['ollama_api_timeout_seconds'])
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(
|
||||
f"⚠️ Invalid ollama_api_timeout_seconds value in DB: "
|
||||
f"{settings_dict['ollama_api_timeout_seconds']}"
|
||||
)
|
||||
|
||||
if 'article_fetch_timeout' in settings_dict and os.getenv("ARTICLE_FETCH_TIMEOUT") is None:
|
||||
try:
|
||||
ARTICLE_FETCH_TIMEOUT = int(settings_dict['article_fetch_timeout'])
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(
|
||||
f"⚠️ Invalid article_fetch_timeout value in DB: "
|
||||
f"{settings_dict['article_fetch_timeout']}"
|
||||
)
|
||||
|
||||
if 'max_article_length' in settings_dict and os.getenv("MAX_ARTICLE_LENGTH") is None:
|
||||
try:
|
||||
MAX_ARTICLE_LENGTH = int(settings_dict['max_article_length'])
|
||||
except (ValueError, TypeError):
|
||||
logger.warning(
|
||||
f"⚠️ Invalid max_article_length value in DB: "
|
||||
f"{settings_dict['max_article_length']}"
|
||||
)
|
248
backend/app/database.py
Normal file
248
backend/app/database.py
Normal file
@@ -0,0 +1,248 @@
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
import sqlite3
|
||||
from typing import Iterator
|
||||
|
||||
from backend.app.config import logger, DB_PATH, update_constants_from_db, OLLAMA_HOST, CRON_HOURS, MIN_CRON_HOURS, \
|
||||
SYNC_COOLDOWN_MINUTES, LLM_MODEL, LLM_TIMEOUT_SECONDS, OLLAMA_API_TIMEOUT_SECONDS, ARTICLE_FETCH_TIMEOUT, \
|
||||
MAX_ARTICLE_LENGTH
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
"""
|
||||
Manages database connections and operations for the application.
|
||||
Provides methods for initializing the database, executing queries,
|
||||
and managing transactions.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path):
|
||||
"""
|
||||
Initialize the database manager with the given database path.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database file
|
||||
"""
|
||||
self.db_path = db_path
|
||||
self._initialize_db()
|
||||
|
||||
def _initialize_db(self) -> None:
|
||||
"""
|
||||
Initialize the database by creating tables if they don't exist.
|
||||
Also seeds initial feeds from seed_feeds.json and settings from global constants.
|
||||
After initialization, updates global constants with values from the database.
|
||||
"""
|
||||
try:
|
||||
schema_file = Path(__file__).parent / "schema.sql"
|
||||
if not schema_file.exists():
|
||||
logger.error("❌ schema.sql not found")
|
||||
raise FileNotFoundError("schema.sql not found")
|
||||
|
||||
with open(schema_file, 'r', encoding='utf-8') as f:
|
||||
schema_sql = f.read()
|
||||
|
||||
with self.get_cursor() as cursor:
|
||||
statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()]
|
||||
for statement in statements:
|
||||
cursor.execute(statement)
|
||||
|
||||
cursor.execute("SELECT COUNT(*) FROM feeds")
|
||||
feed_count = cursor.fetchone()[0]
|
||||
|
||||
if feed_count == 0:
|
||||
self._seed_feeds(cursor)
|
||||
|
||||
cursor.execute("SELECT COUNT(*) FROM settings")
|
||||
settings_count = cursor.fetchone()[0]
|
||||
|
||||
if settings_count == 0:
|
||||
self._seed_settings(cursor)
|
||||
|
||||
settings = self.get_all_settings()
|
||||
update_constants_from_db(settings)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to initialize database: {e}")
|
||||
raise
|
||||
|
||||
def get_all_settings(self) -> dict:
|
||||
"""
|
||||
Retrieve all settings from the database.
|
||||
|
||||
Returns:
|
||||
Dictionary of settings with key-value pairs
|
||||
"""
|
||||
settings = {}
|
||||
try:
|
||||
with self.get_cursor(readonly=True) as cursor:
|
||||
cursor.execute("SELECT key, val FROM settings")
|
||||
for row in cursor.fetchall():
|
||||
settings[row['key']] = row['val']
|
||||
return settings
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to retrieve settings from database: {e}")
|
||||
return {}
|
||||
|
||||
def _seed_feeds(self, cursor) -> None:
|
||||
"""
|
||||
Seed initial feeds from seed_feeds.json file.
|
||||
"""
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
seed_file = Path(__file__).parent / "seed_feeds.json"
|
||||
|
||||
if not seed_file.exists():
|
||||
logger.warning("⚠️ seed_feeds.json not found, skipping feed seeding")
|
||||
return
|
||||
|
||||
with open(seed_file, 'r', encoding='utf-8') as f:
|
||||
feeds_data = json.load(f)
|
||||
|
||||
for country, urls in feeds_data.items():
|
||||
for url in urls:
|
||||
cursor.execute(
|
||||
"INSERT OR IGNORE INTO feeds (country, url) VALUES (?, ?)",
|
||||
(country, url)
|
||||
)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to seed feeds: {e}")
|
||||
|
||||
def _seed_settings(self, cursor) -> None:
|
||||
"""
|
||||
Seed initial settings from global constants.
|
||||
"""
|
||||
try:
|
||||
settings_data = {
|
||||
'ollama_host': OLLAMA_HOST,
|
||||
'min_cron_hours': MIN_CRON_HOURS,
|
||||
'cron_hours': CRON_HOURS,
|
||||
'sync_cooldown_minutes': SYNC_COOLDOWN_MINUTES,
|
||||
'llm_model': LLM_MODEL,
|
||||
'llm_timeout_seconds': LLM_TIMEOUT_SECONDS,
|
||||
'ollama_api_timeout_seconds': OLLAMA_API_TIMEOUT_SECONDS,
|
||||
'article_fetch_timeout': ARTICLE_FETCH_TIMEOUT,
|
||||
'max_article_length': MAX_ARTICLE_LENGTH
|
||||
}
|
||||
|
||||
for key, val in settings_data.items():
|
||||
cursor.execute(
|
||||
"INSERT OR IGNORE INTO settings (key, val) VALUES (?, ?)",
|
||||
(key, str(val))
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to seed settings: {e}")
|
||||
|
||||
def _get_connection(self) -> sqlite3.Connection:
|
||||
"""
|
||||
Create a thread-safe database connection.
|
||||
|
||||
Returns:
|
||||
An active SQLite connection
|
||||
"""
|
||||
conn = sqlite3.connect(
|
||||
self.db_path,
|
||||
check_same_thread=False,
|
||||
timeout=30.0
|
||||
)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=30000")
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
conn.execute("PRAGMA temp_store=MEMORY")
|
||||
return conn
|
||||
|
||||
@contextmanager
|
||||
def get_cursor(self, readonly: bool = False) -> Iterator[sqlite3.Cursor]:
|
||||
"""
|
||||
Context manager that provides a database cursor and handles commits and rollbacks.
|
||||
|
||||
Args:
|
||||
readonly: If True, opens connection in readonly mode for better concurrency
|
||||
|
||||
Yields:
|
||||
A database cursor for executing SQL statements
|
||||
"""
|
||||
conn = None
|
||||
try:
|
||||
conn = self._get_connection()
|
||||
|
||||
if readonly:
|
||||
conn.execute("BEGIN DEFERRED")
|
||||
|
||||
cursor = conn.cursor()
|
||||
yield cursor
|
||||
|
||||
if not readonly:
|
||||
conn.commit()
|
||||
except sqlite3.OperationalError as e:
|
||||
if conn:
|
||||
conn.rollback()
|
||||
if "database is locked" in str(e).lower():
|
||||
logger.warning(
|
||||
f"⚠️ Database temporarily locked, operation may need retry: {e}"
|
||||
)
|
||||
raise e
|
||||
except Exception as e:
|
||||
if conn:
|
||||
conn.rollback()
|
||||
raise e
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
@contextmanager
|
||||
def get_cursor_with_retry(self, readonly: bool = False, max_retries: int = 3) -> Iterator[sqlite3.Cursor]:
|
||||
"""
|
||||
Context manager with retry logic for database operations.
|
||||
|
||||
Args:
|
||||
readonly: If True, opens connection in readonly mode
|
||||
max_retries: Maximum number of retry attempts
|
||||
|
||||
Yields:
|
||||
A database cursor for executing SQL statements
|
||||
"""
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
with self.get_cursor(readonly=readonly) as cursor:
|
||||
yield cursor
|
||||
return
|
||||
except sqlite3.OperationalError as e:
|
||||
if "database is locked" in str(e).lower() and attempt < max_retries:
|
||||
wait_time = (attempt + 1) * 0.1
|
||||
logger.warning(
|
||||
f"⚠️ Database locked, retrying in {wait_time}s "
|
||||
f"(attempt {attempt + 1}/{max_retries + 1})"
|
||||
)
|
||||
import time
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
raise e
|
||||
|
||||
|
||||
db_manager = DatabaseManager(DB_PATH)
|
||||
|
||||
|
||||
async def get_db():
|
||||
"""
|
||||
Dependency that provides a database cursor with retry logic.
|
||||
|
||||
Yields:
|
||||
A database cursor for executing SQL statements
|
||||
"""
|
||||
with db_manager.get_cursor_with_retry(readonly=True) as cursor:
|
||||
yield cursor
|
||||
|
||||
|
||||
async def get_db_write():
|
||||
"""
|
||||
Dependency that provides a database cursor for write operations with retry logic.
|
||||
|
||||
Yields:
|
||||
A database cursor for executing SQL statements
|
||||
"""
|
||||
with db_manager.get_cursor_with_retry(readonly=False) as cursor:
|
||||
yield cursor
|
@@ -8,361 +8,34 @@ an API for the frontend to access the summarized news.
|
||||
The application uses SQLite for data storage and APScheduler for scheduling periodic news harvesting.
|
||||
"""
|
||||
|
||||
# Standard library imports
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any, Union, Iterator, Tuple, TypedDict, cast
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http.client import HTTPException
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
import feedparser
|
||||
# Third-party imports
|
||||
import httpx
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from fastapi import FastAPI, Response, status, Depends
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from fastapi import Depends, FastAPI, Response, status
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel
|
||||
|
||||
# Constants
|
||||
DB_PATH = Path("owlynews.sqlite")
|
||||
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
||||
MIN_CRON_HOURS = 0.5
|
||||
DEFAULT_CRON_HOURS = float(os.getenv("CRON_HOURS", MIN_CRON_HOURS))
|
||||
CRON_HOURS = max(MIN_CRON_HOURS, DEFAULT_CRON_HOURS)
|
||||
SYNC_COOLDOWN_MINUTES = 30
|
||||
LLM_MODEL = "qwen2:7b-instruct-q4_K_M"
|
||||
LLM_TIMEOUT_SECONDS = 180
|
||||
OLLAMA_API_TIMEOUT_SECONDS = 10
|
||||
from backend.app.config import logger, OLLAMA_HOST, CRON_HOURS, MIN_CRON_HOURS, \
|
||||
SYNC_COOLDOWN_MINUTES, LLM_MODEL, OLLAMA_API_TIMEOUT_SECONDS, frontend_path
|
||||
from backend.app.database import get_db, get_db_write
|
||||
from backend.app.models import TimestampResponse, SuccessResponse, FeedData, ModelStatus, ErrorResponse, HoursResponse, \
|
||||
CronSettings
|
||||
from backend.app.services import NewsFetcher
|
||||
|
||||
# FastAPI app initialization
|
||||
app = FastAPI(
|
||||
title="Owly News Summariser",
|
||||
description="API for the Owly News Summariser application",
|
||||
version="1.0.0"
|
||||
)
|
||||
|
||||
# Database schema definitions
|
||||
SCHEMA_SQL = [
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS news (
|
||||
id TEXT PRIMARY KEY, -- e.g. URL as unique identifier
|
||||
title TEXT NOT NULL,
|
||||
summary_de TEXT,
|
||||
summary_en TEXT,
|
||||
published INTEGER, -- Unix epoch (UTC); use TEXT ISO-8601 if you prefer
|
||||
source TEXT,
|
||||
country TEXT,
|
||||
source_feed TEXT
|
||||
)
|
||||
""",
|
||||
"CREATE INDEX IF NOT EXISTS idx_news_published ON news(published)",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS feeds (
|
||||
id INTEGER PRIMARY KEY, -- auto-increment via rowid
|
||||
country TEXT,
|
||||
url TEXT UNIQUE NOT NULL
|
||||
)
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS settings (
|
||||
key TEXT PRIMARY KEY,
|
||||
val TEXT NOT NULL
|
||||
)
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
val TEXT NOT NULL
|
||||
)
|
||||
"""
|
||||
]
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
"""
|
||||
Manages database connections and operations for the application.
|
||||
Provides methods for initializing the database, executing queries,
|
||||
and managing transactions.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path):
|
||||
"""
|
||||
Initialize the database manager with the given database path.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database file
|
||||
"""
|
||||
self.db_path = db_path
|
||||
self._connection = None
|
||||
self._initialize_db()
|
||||
|
||||
def _get_connection(self) -> sqlite3.Connection:
|
||||
"""
|
||||
Get or create a database connection.
|
||||
|
||||
Returns:
|
||||
An active SQLite connection
|
||||
"""
|
||||
if self._connection is None:
|
||||
self._connection = sqlite3.connect(
|
||||
self.db_path,
|
||||
check_same_thread=False
|
||||
)
|
||||
self._connection.row_factory = sqlite3.Row
|
||||
return self._connection
|
||||
|
||||
@contextmanager
|
||||
def get_cursor(self) -> Iterator[sqlite3.Cursor]:
|
||||
"""
|
||||
Context manager that provides a database cursor and handles commits and rollbacks.
|
||||
|
||||
Yields:
|
||||
A database cursor for executing SQL statements
|
||||
|
||||
Example:
|
||||
```python
|
||||
with db_manager.get_cursor() as cursor:
|
||||
cursor.execute("SELECT * FROM table")
|
||||
results = cursor.fetchall()
|
||||
```
|
||||
"""
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
yield cursor
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
|
||||
def _initialize_db(self) -> None:
|
||||
"""
|
||||
Initialize the database schema and default settings.
|
||||
Creates tables if they don't exist and inserts default values.
|
||||
"""
|
||||
# Create schema
|
||||
with self.get_cursor() as cursor:
|
||||
for stmt in SCHEMA_SQL:
|
||||
cursor.execute(stmt)
|
||||
|
||||
# Insert initial settings
|
||||
cursor.execute(
|
||||
"INSERT INTO settings VALUES (?, ?) ON CONFLICT (key) DO NOTHING",
|
||||
("cron_hours", str(CRON_HOURS))
|
||||
)
|
||||
|
||||
# Insert initial metadata
|
||||
cursor.execute(
|
||||
"INSERT INTO meta VALUES (?, ?) ON CONFLICT (key) DO NOTHING",
|
||||
("last_sync", "0")
|
||||
)
|
||||
|
||||
# Seed feeds if none exist
|
||||
cursor.execute("SELECT COUNT(*) as count FROM feeds")
|
||||
if cursor.fetchone()["count"] == 0:
|
||||
self._seed_feeds()
|
||||
|
||||
def _seed_feeds(self) -> None:
|
||||
"""
|
||||
Seed the database with initial feeds from the seed_feeds.json file.
|
||||
Only runs if the feeds table is empty.
|
||||
"""
|
||||
try:
|
||||
seed_path = Path(__file__).with_name("seed_feeds.json")
|
||||
with open(seed_path, "r") as f:
|
||||
seed_data = json.load(f)
|
||||
|
||||
with self.get_cursor() as cursor:
|
||||
for country, urls in seed_data.items():
|
||||
for url in urls:
|
||||
cursor.execute(
|
||||
"INSERT INTO feeds (country, url) VALUES (?, ?) "
|
||||
"ON CONFLICT (url) DO NOTHING",
|
||||
(country, url)
|
||||
)
|
||||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||
print(f"Error seeding feeds: {e}")
|
||||
|
||||
|
||||
# Initialize database manager
|
||||
db_manager = DatabaseManager(DB_PATH)
|
||||
|
||||
|
||||
class ArticleSummary(TypedDict):
|
||||
"""Type definition for article summary data returned from the LLM."""
|
||||
title: str
|
||||
summary_de: str
|
||||
summary_en: str
|
||||
|
||||
|
||||
class NewsFetcher:
|
||||
"""
|
||||
Handles fetching and summarizing news articles from RSS feeds.
|
||||
Uses Ollama/qwen to generate summaries of articles.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def build_prompt(url: str) -> str:
|
||||
"""
|
||||
Generate a prompt for the LLM to summarize an article.
|
||||
|
||||
Args:
|
||||
url: Public URL of the article to summarize
|
||||
|
||||
Returns:
|
||||
A formatted prompt string that instructs the LLM to generate
|
||||
a JSON response with title and summaries in German and English
|
||||
|
||||
Note:
|
||||
LLMs like qwen2 don't have native web access; the model will
|
||||
generate summaries based on its training data and the URL.
|
||||
"""
|
||||
return (
|
||||
"### Aufgabe\n"
|
||||
f"Du bekommst eine öffentliche URL: {url}\n"
|
||||
"### Regeln\n"
|
||||
"1. **Entnimm den Inhalt nicht automatisch.** "
|
||||
"Falls dir der Text nicht vorliegt, antworte mit leeren Strings.\n"
|
||||
"2. Gib ausschließlich **gültiges minifiziertes JSON** zurück – "
|
||||
"kein Markdown, keine Kommentare.\n"
|
||||
"3. Struktur:\n"
|
||||
"{\"title\":\"…\",\"summary_de\":\"…\",\"summary_en\":\"…\"}\n"
|
||||
"4. summary_de ≤ 160 Wörter, summary_en ≤ 160 Wörter. Zähle selbst.\n"
|
||||
"5. Kein Text vor oder nach dem JSON.\n"
|
||||
"### Ausgabe\n"
|
||||
"Jetzt antworte."
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def summarize_article(
|
||||
client: httpx.AsyncClient,
|
||||
url: str
|
||||
) -> Optional[ArticleSummary]:
|
||||
"""
|
||||
Generate a summary of an article using the LLM.
|
||||
|
||||
Args:
|
||||
client: An active httpx AsyncClient for making requests
|
||||
url: URL of the article to summarize
|
||||
|
||||
Returns:
|
||||
A dictionary containing the article title and summaries in German and English,
|
||||
or None if summarization failed
|
||||
"""
|
||||
prompt = NewsFetcher.build_prompt(url)
|
||||
payload = {
|
||||
"model": LLM_MODEL,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"temperature": 0.2,
|
||||
"format": "json"
|
||||
}
|
||||
|
||||
try:
|
||||
response = await client.post(
|
||||
f"{OLLAMA_HOST}/api/generate",
|
||||
json=payload,
|
||||
timeout=LLM_TIMEOUT_SECONDS
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
return cast(ArticleSummary, result["response"])
|
||||
except (KeyError, ValueError, httpx.HTTPError, json.JSONDecodeError) as e:
|
||||
print(f"Error summarizing article {url}: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def harvest_feeds() -> None:
|
||||
"""
|
||||
Fetch articles from all feeds and store summaries in the database.
|
||||
This is the main function that runs periodically to update the news database.
|
||||
"""
|
||||
try:
|
||||
# Get all feeds from the database
|
||||
with db_manager.get_cursor() as cursor:
|
||||
cursor.execute("SELECT country, url FROM feeds")
|
||||
feeds = cursor.fetchall()
|
||||
|
||||
# Process each feed
|
||||
async with httpx.AsyncClient() as client:
|
||||
for feed_row in feeds:
|
||||
await NewsFetcher._process_feed(client, feed_row)
|
||||
|
||||
# Update last sync timestamp
|
||||
current_time = int(datetime.now(timezone.utc).timestamp())
|
||||
with db_manager.get_cursor() as cursor:
|
||||
cursor.execute(
|
||||
"UPDATE meta SET val=? WHERE key='last_sync'",
|
||||
(str(current_time),)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error harvesting feeds: {e}")
|
||||
|
||||
@staticmethod
|
||||
async def _process_feed(
|
||||
client: httpx.AsyncClient,
|
||||
feed_row: sqlite3.Row
|
||||
) -> None:
|
||||
"""
|
||||
Process a single feed, fetching and summarizing all articles.
|
||||
|
||||
Args:
|
||||
client: An active httpx AsyncClient for making requests
|
||||
feed_row: A database row containing feed information
|
||||
"""
|
||||
try:
|
||||
feed_data = feedparser.parse(feed_row["url"])
|
||||
|
||||
for entry in feed_data.entries:
|
||||
# Skip entries without links or published dates
|
||||
if not hasattr(entry, "link") or not hasattr(entry, "published_parsed"):
|
||||
continue
|
||||
|
||||
article_id = entry.link
|
||||
|
||||
# Parse the published date
|
||||
try:
|
||||
published = datetime(
|
||||
*entry.published_parsed[:6],
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
except (TypeError, ValueError):
|
||||
# Skip entries with invalid dates
|
||||
continue
|
||||
|
||||
# Get article summary
|
||||
summary = await NewsFetcher.summarize_article(client, entry.link)
|
||||
if not summary:
|
||||
continue
|
||||
|
||||
# Store in database
|
||||
with db_manager.get_cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO news (
|
||||
id, title, summary_de, summary_en, published,
|
||||
source, country, source_feed
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
""",
|
||||
(
|
||||
article_id,
|
||||
summary["title"],
|
||||
summary["summary_de"],
|
||||
summary["summary_en"],
|
||||
published.isoformat(),
|
||||
entry.get("source", {}).get("title", feed_row["url"]),
|
||||
feed_row["country"],
|
||||
feed_row["url"],
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error processing feed {feed_row['url']}: {e}")
|
||||
|
||||
|
||||
# Initialize scheduler
|
||||
scheduler = AsyncIOScheduler(timezone="UTC")
|
||||
scheduler.add_job(
|
||||
NewsFetcher.harvest_feeds,
|
||||
@@ -373,68 +46,17 @@ scheduler.add_job(
|
||||
scheduler.start()
|
||||
|
||||
|
||||
# Pydantic models for API requests and responses
|
||||
class CronSettings(BaseModel):
|
||||
"""Settings for the cron job that harvests news."""
|
||||
hours: float
|
||||
|
||||
|
||||
class FeedData(BaseModel):
|
||||
"""Data for a news feed."""
|
||||
country: str
|
||||
url: str
|
||||
|
||||
|
||||
class ModelStatus(BaseModel):
|
||||
"""Status of the LLM model."""
|
||||
name: str
|
||||
status: str
|
||||
available_models: List[str]
|
||||
|
||||
|
||||
class ErrorResponse(BaseModel):
|
||||
"""Standard error response."""
|
||||
status: str
|
||||
message: str
|
||||
|
||||
|
||||
class SuccessResponse(BaseModel):
|
||||
"""Standard success response."""
|
||||
status: str
|
||||
|
||||
|
||||
class TimestampResponse(BaseModel):
|
||||
"""Response containing a timestamp."""
|
||||
ts: int
|
||||
|
||||
|
||||
class HoursResponse(BaseModel):
|
||||
"""Response containing hours setting."""
|
||||
hours: float
|
||||
|
||||
|
||||
# Dependency for getting a database cursor
|
||||
def get_db():
|
||||
"""
|
||||
Dependency that provides a database cursor.
|
||||
|
||||
Yields:
|
||||
A database cursor for executing SQL statements
|
||||
"""
|
||||
with db_manager.get_cursor() as cursor:
|
||||
yield cursor
|
||||
|
||||
|
||||
# API endpoints
|
||||
@app.get("/news", response_model=List[Dict[str, Any]])
|
||||
async def get_news(
|
||||
country: str = "DE",
|
||||
from_: str = "2025-07-01",
|
||||
to: str = datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||||
db: sqlite3.Cursor = Depends(get_db)
|
||||
country: str = "DE",
|
||||
from_: str = "2025-07-01",
|
||||
to_: str = datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||||
db: sqlite3.Cursor = Depends(get_db)
|
||||
):
|
||||
"""
|
||||
Get news articles filtered by country and date range.
|
||||
Now optimized for concurrent access while scheduler is running.
|
||||
|
||||
Args:
|
||||
country: Country code to filter by (default: "DE")
|
||||
@@ -445,15 +67,54 @@ async def get_news(
|
||||
Returns:
|
||||
List of news articles matching the criteria
|
||||
"""
|
||||
db.execute(
|
||||
"""
|
||||
SELECT * FROM news
|
||||
WHERE country=? AND published BETWEEN ? AND ?
|
||||
ORDER BY published DESC
|
||||
""",
|
||||
(country, from_, to)
|
||||
)
|
||||
return [dict(row) for row in db.fetchall()]
|
||||
try:
|
||||
datetime.fromisoformat(from_)
|
||||
datetime.fromisoformat(to_)
|
||||
|
||||
from_ts = int(datetime.fromisoformat(from_).timestamp())
|
||||
to_ts = int(datetime.fromisoformat(to_).timestamp())
|
||||
|
||||
db.execute(
|
||||
"""
|
||||
SELECT id, title, description, url, published, country, created_at
|
||||
FROM news
|
||||
WHERE country = ?
|
||||
AND published BETWEEN ? AND ?
|
||||
ORDER BY published DESC LIMIT 1000
|
||||
""",
|
||||
(country, from_ts, to_ts)
|
||||
)
|
||||
|
||||
return [dict(row) for row in db.fetchall()]
|
||||
|
||||
except ValueError:
|
||||
raise HTTPException(400, "Invalid date format. Use ISO format (YYYY-MM-DD)")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error fetching news: {e}")
|
||||
raise HTTPException(
|
||||
500, "Internal server error while fetching news"
|
||||
)
|
||||
|
||||
|
||||
@app.get("/feeds", response_model=List[Dict[str, Any]])
|
||||
async def list_feeds(db: sqlite3.Cursor = Depends(get_db)):
|
||||
"""
|
||||
List all registered news feeds.
|
||||
|
||||
Args:
|
||||
db: Database cursor dependency
|
||||
|
||||
Returns:
|
||||
List of feed objects with id, country, and url
|
||||
"""
|
||||
try:
|
||||
db.execute("SELECT * FROM feeds ORDER BY country, url")
|
||||
return [dict(row) for row in db.fetchall()]
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error fetching feeds: {e}")
|
||||
raise HTTPException(
|
||||
500, "Internal server error while fetching feeds"
|
||||
)
|
||||
|
||||
|
||||
@app.get("/meta/last-sync", response_model=TimestampResponse)
|
||||
@@ -469,58 +130,16 @@ async def get_last_sync(db: sqlite3.Cursor = Depends(get_db)):
|
||||
"""
|
||||
db.execute("SELECT val FROM meta WHERE key='last_sync'")
|
||||
row = db.fetchone()
|
||||
if row is None:
|
||||
import time
|
||||
return {"ts": int(time.time())}
|
||||
return {"ts": int(row["val"])}
|
||||
|
||||
|
||||
@app.put("/settings/cron", response_model=HoursResponse)
|
||||
async def set_cron_schedule(
|
||||
data: CronSettings,
|
||||
db: sqlite3.Cursor = Depends(get_db)
|
||||
):
|
||||
"""
|
||||
Update the cron schedule for harvesting news.
|
||||
|
||||
Args:
|
||||
data: New cron settings with hours interval
|
||||
db: Database cursor dependency
|
||||
|
||||
Returns:
|
||||
Object containing the updated hours setting
|
||||
"""
|
||||
# Ensure minimum interval
|
||||
hours = max(MIN_CRON_HOURS, data.hours)
|
||||
|
||||
# Update scheduler
|
||||
scheduler.get_job("harvest").modify(trigger="interval", hours=hours)
|
||||
|
||||
# Update database
|
||||
db.execute(
|
||||
"UPDATE settings SET val=? WHERE key='cron_hours'",
|
||||
(str(hours),)
|
||||
)
|
||||
|
||||
return {"hours": hours}
|
||||
|
||||
|
||||
@app.get("/feeds", response_model=List[Dict[str, Any]])
|
||||
async def list_feeds(db: sqlite3.Cursor = Depends(get_db)):
|
||||
"""
|
||||
List all registered news feeds.
|
||||
|
||||
Args:
|
||||
db: Database cursor dependency
|
||||
|
||||
Returns:
|
||||
List of feed objects with id, country, and url
|
||||
"""
|
||||
db.execute("SELECT * FROM feeds ORDER BY country")
|
||||
return [dict(row) for row in db.fetchall()]
|
||||
|
||||
|
||||
@app.post("/feeds", response_model=SuccessResponse)
|
||||
async def add_feed(
|
||||
feed: FeedData,
|
||||
db: sqlite3.Cursor = Depends(get_db)
|
||||
feed: FeedData,
|
||||
db: sqlite3.Cursor = Depends(get_db_write)
|
||||
):
|
||||
"""
|
||||
Add a new news feed.
|
||||
@@ -532,19 +151,24 @@ async def add_feed(
|
||||
Returns:
|
||||
Success status
|
||||
"""
|
||||
db.execute(
|
||||
"INSERT INTO feeds (country, url) VALUES (?, ?) "
|
||||
"ON CONFLICT (url) DO NOTHING",
|
||||
(feed.country, feed.url)
|
||||
)
|
||||
|
||||
return {"status": "added"}
|
||||
try:
|
||||
db.execute(
|
||||
"INSERT INTO feeds (country, url) VALUES (?, ?) "
|
||||
"ON CONFLICT (url) DO NOTHING",
|
||||
(feed.country, feed.url)
|
||||
)
|
||||
return {"status": "added"}
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error adding feed: {e}")
|
||||
raise HTTPException(
|
||||
500, "Internal server error while adding feed"
|
||||
)
|
||||
|
||||
|
||||
@app.delete("/feeds", response_model=SuccessResponse)
|
||||
async def delete_feed(
|
||||
url: str,
|
||||
db: sqlite3.Cursor = Depends(get_db)
|
||||
url: str,
|
||||
db: sqlite3.Cursor = Depends(get_db_write)
|
||||
):
|
||||
"""
|
||||
Delete a news feed by URL.
|
||||
@@ -556,8 +180,14 @@ async def delete_feed(
|
||||
Returns:
|
||||
Success status
|
||||
"""
|
||||
db.execute("DELETE FROM feeds WHERE url=?", (url,))
|
||||
return {"status": "deleted"}
|
||||
try:
|
||||
db.execute("DELETE FROM feeds WHERE url=?", (url,))
|
||||
return {"status": "deleted"}
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error deleting feed: {e}")
|
||||
raise HTTPException(
|
||||
500, "Internal server error while deleting feed"
|
||||
)
|
||||
|
||||
|
||||
@app.get("/model/status", response_model=Union[ModelStatus, ErrorResponse])
|
||||
@@ -571,7 +201,6 @@ async def get_model_status():
|
||||
"""
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Get model information from Ollama
|
||||
response = await client.get(
|
||||
f"{OLLAMA_HOST}/api/tags",
|
||||
timeout=OLLAMA_API_TIMEOUT_SECONDS
|
||||
@@ -581,7 +210,6 @@ async def get_model_status():
|
||||
models_data = response.json()
|
||||
models = models_data.get("models", [])
|
||||
|
||||
# Check if the current model is available
|
||||
model_available = any(
|
||||
model.get("name") == LLM_MODEL for model in models
|
||||
)
|
||||
@@ -606,30 +234,79 @@ async def manual_sync(db: sqlite3.Cursor = Depends(get_db)):
|
||||
Returns:
|
||||
Success status or error response if sync was triggered too recently
|
||||
"""
|
||||
# Check when the last sync was performed
|
||||
db.execute("SELECT val FROM meta WHERE key='last_sync'")
|
||||
row = db.fetchone()
|
||||
last_sync_ts = int(row["val"])
|
||||
|
||||
# Enforce cooldown period
|
||||
now = datetime.now(timezone.utc)
|
||||
last_sync_time = datetime.fromtimestamp(last_sync_ts, timezone.utc)
|
||||
|
||||
if now - last_sync_time < timedelta(minutes=SYNC_COOLDOWN_MINUTES):
|
||||
return Response(
|
||||
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
||||
content=f"Sync too soon – wait {SYNC_COOLDOWN_MINUTES} min."
|
||||
content="Sync was triggered too recently. Please wait before triggering again."
|
||||
)
|
||||
|
||||
# Trigger sync in background
|
||||
asyncio.create_task(NewsFetcher.harvest_feeds())
|
||||
return {"status": "triggered"}
|
||||
try:
|
||||
task = asyncio.create_task(NewsFetcher.harvest_feeds())
|
||||
return {"status": "triggered", "task_id": id(task)}
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
500, f"Failed to trigger sync: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@app.get("/settings/cron", response_model=HoursResponse)
|
||||
async def get_cron_schedule(db: sqlite3.Cursor = Depends(get_db)):
|
||||
"""
|
||||
Get the current cron schedule for harvesting news.
|
||||
|
||||
Args:
|
||||
db: Database cursor dependency
|
||||
|
||||
Returns:
|
||||
Object containing the current hours setting
|
||||
"""
|
||||
db.execute("SELECT val FROM settings WHERE key='cron_hours'")
|
||||
row = db.fetchone()
|
||||
|
||||
if row is None:
|
||||
return {"hours": CRON_HOURS}
|
||||
|
||||
try:
|
||||
hours = float(row["val"])
|
||||
return {"hours": hours}
|
||||
except (ValueError, TypeError):
|
||||
return {"hours": CRON_HOURS}
|
||||
|
||||
|
||||
@app.post("/settings/cron", response_model=HoursResponse)
|
||||
async def update_cron_schedule(data: CronSettings, db: sqlite3.Cursor = Depends(get_db_write)):
|
||||
"""
|
||||
Update the cron schedule for harvesting news.
|
||||
|
||||
Args:
|
||||
data: New cron settings with hours interval
|
||||
db: Database cursor dependency
|
||||
|
||||
Returns:
|
||||
Object containing the updated hours setting
|
||||
"""
|
||||
hours = max(MIN_CRON_HOURS, data.hours)
|
||||
|
||||
scheduler.get_job("harvest").modify(trigger=IntervalTrigger(hours=hours))
|
||||
|
||||
if os.getenv("CRON_HOURS") is None:
|
||||
db.execute(
|
||||
"UPDATE settings SET val=? WHERE key='cron_hours'",
|
||||
(str(hours),)
|
||||
)
|
||||
|
||||
global CRON_HOURS
|
||||
CRON_HOURS = hours
|
||||
|
||||
return {"hours": hours}
|
||||
|
||||
|
||||
# Mount static frontend
|
||||
frontend_path = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
||||
"frontend",
|
||||
"dist"
|
||||
)
|
||||
app.mount("/", StaticFiles(directory=frontend_path, html=True), name="static")
|
||||
|
50
backend/app/models.py
Normal file
50
backend/app/models.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from typing import TypedDict, List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ArticleSummary(TypedDict):
|
||||
"""Type definition for article summary data returned from the LLM."""
|
||||
title: str
|
||||
summary_de: str
|
||||
summary_en: str
|
||||
|
||||
|
||||
# Pydantic models for API requests and responses
|
||||
class CronSettings(BaseModel):
|
||||
"""Settings for the cron job that harvests news."""
|
||||
hours: float
|
||||
|
||||
|
||||
class FeedData(BaseModel):
|
||||
"""Data for a news feed."""
|
||||
country: str
|
||||
url: str
|
||||
|
||||
|
||||
class ModelStatus(BaseModel):
|
||||
"""Status of the LLM model."""
|
||||
name: str
|
||||
status: str
|
||||
available_models: List[str]
|
||||
|
||||
|
||||
class ErrorResponse(BaseModel):
|
||||
"""Standard error response."""
|
||||
status: str
|
||||
message: str
|
||||
|
||||
|
||||
class SuccessResponse(BaseModel):
|
||||
"""Standard success response."""
|
||||
status: str
|
||||
|
||||
|
||||
class TimestampResponse(BaseModel):
|
||||
"""Response containing a timestamp."""
|
||||
ts: int
|
||||
|
||||
|
||||
class HoursResponse(BaseModel):
|
||||
"""Response containing hours setting."""
|
||||
hours: float
|
34
backend/app/schema.sql
Normal file
34
backend/app/schema.sql
Normal file
@@ -0,0 +1,34 @@
|
||||
-- Database schema for Owly News Summariser
|
||||
|
||||
-- News table to store articles
|
||||
CREATE TABLE IF NOT EXISTS news (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
title TEXT NOT NULL,
|
||||
description TEXT,
|
||||
url TEXT NOT NULL,
|
||||
published TIMESTAMP NOT NULL,
|
||||
country TEXT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
-- Index for faster queries on published date
|
||||
CREATE INDEX IF NOT EXISTS idx_news_published ON news(published);
|
||||
|
||||
-- Feeds table to store RSS feed sources
|
||||
CREATE TABLE IF NOT EXISTS feeds (
|
||||
id INTEGER PRIMARY KEY,
|
||||
country TEXT,
|
||||
url TEXT UNIQUE NOT NULL
|
||||
);
|
||||
|
||||
-- Settings table for application configuration
|
||||
CREATE TABLE IF NOT EXISTS settings (
|
||||
key TEXT PRIMARY KEY,
|
||||
val TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- Meta table for application metadata
|
||||
CREATE TABLE IF NOT EXISTS meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
val TEXT NOT NULL
|
||||
);
|
@@ -1,7 +1,6 @@
|
||||
{
|
||||
"DE": [
|
||||
"https://www.tagesschau.de/xml/rss2",
|
||||
"https://www.spiegel.de/schlagzeilen/tops/index.rss"
|
||||
"https://www.tagesschau.de/xml/rss2"
|
||||
],
|
||||
"EU": [
|
||||
"https://www.euronews.com/rss?level=theme&name=news"
|
||||
|
381
backend/app/services.py
Normal file
381
backend/app/services.py
Normal file
@@ -0,0 +1,381 @@
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
import sqlite3
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, cast, Dict
|
||||
|
||||
import feedparser
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from backend.app.config import ARTICLE_FETCH_TIMEOUT, MAX_ARTICLE_LENGTH, logger, LLM_MODEL, OLLAMA_HOST, \
|
||||
LLM_TIMEOUT_SECONDS
|
||||
from backend.app.database import db_manager
|
||||
from backend.app.models import ArticleSummary
|
||||
|
||||
|
||||
class NewsFetcher:
|
||||
"""
|
||||
Handles fetching and summarizing news articles from RSS feeds.
|
||||
Uses Ollama/qwen to generate summaries of articles.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
async def fetch_article_content(client: httpx.AsyncClient, url: str) -> str:
|
||||
"""
|
||||
Fetch and extract the main content from an article URL.
|
||||
|
||||
Args:
|
||||
client: An active httpx AsyncClient for making requests
|
||||
url: URL of the article to fetch
|
||||
|
||||
Returns:
|
||||
Extracted text content from the article, or empty string if failed
|
||||
"""
|
||||
try:
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
||||
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
||||
'Chrome/91.0.4472.124 Safari/537.36'
|
||||
}
|
||||
|
||||
response = await client.get(
|
||||
url,
|
||||
headers=headers,
|
||||
timeout=ARTICLE_FETCH_TIMEOUT,
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
|
||||
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']):
|
||||
element.decompose()
|
||||
|
||||
content_selectors = [
|
||||
'article',
|
||||
'[role="main"]',
|
||||
'.content',
|
||||
'.article-content',
|
||||
'.post-content',
|
||||
'.entry-content',
|
||||
'.main-content',
|
||||
'main',
|
||||
'.story-body',
|
||||
'.article-body'
|
||||
]
|
||||
|
||||
article_text = ""
|
||||
|
||||
for selector in content_selectors:
|
||||
elements = soup.select(selector)
|
||||
if elements:
|
||||
for element in elements:
|
||||
text = element.get_text(separator=' ', strip=True)
|
||||
if len(text) > len(article_text):
|
||||
article_text = text
|
||||
break
|
||||
|
||||
# Fallback: get text from body if no specific content area found
|
||||
if not article_text:
|
||||
body = soup.find('body')
|
||||
if body:
|
||||
article_text = body.get_text(separator=' ', strip=True)
|
||||
|
||||
article_text = re.sub(r'\s+', ' ', article_text) # Normalize whitespace
|
||||
article_text = article_text.strip()
|
||||
|
||||
# Limit length to avoid overwhelming the LLM
|
||||
if len(article_text) > MAX_ARTICLE_LENGTH:
|
||||
article_text = article_text[:MAX_ARTICLE_LENGTH] + "..."
|
||||
|
||||
return article_text
|
||||
|
||||
except httpx.TimeoutException:
|
||||
logger.warning(f"⏰ Timeout fetching article content from: {url}")
|
||||
return ""
|
||||
except httpx.HTTPError as e:
|
||||
logger.warning(f"🌐 HTTP error fetching article content from {url}: {e}")
|
||||
return ""
|
||||
except Exception as e:
|
||||
logger.warning(f"❌ Error fetching article content from {url}: {type(e).__name__}: {e}")
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def build_prompt(url: str, title: str = "", description: str = "", content: str = "") -> str:
|
||||
"""
|
||||
Generate a prompt for the LLM to summarize an article.
|
||||
|
||||
Args:
|
||||
url: Public URL of the article to summarize
|
||||
title: Article title from RSS feed (optional)
|
||||
description: Article description from RSS feed (optional)
|
||||
content: Extracted article content (optional)
|
||||
|
||||
Returns:
|
||||
A formatted prompt string that instructs the LLM to generate
|
||||
a JSON response with title and summaries in German and English
|
||||
"""
|
||||
context_info = []
|
||||
if title:
|
||||
context_info.append(f"RSS-Titel: {title}")
|
||||
if description:
|
||||
context_info.append(f"RSS-Beschreibung: {description}")
|
||||
if content:
|
||||
content_preview = content[:500] + "..." if len(content) > 500 else content
|
||||
context_info.append(f"Artikel-Inhalt: {content_preview}")
|
||||
|
||||
context = "\n".join(context_info) if context_info else "Keine zusätzlichen Informationen verfügbar."
|
||||
|
||||
return (
|
||||
"### Aufgabe\n"
|
||||
f"Du sollst eine Nachricht basierend auf der URL und den verfügbaren Informationen zusammenfassen.\n"
|
||||
f"URL: {url}\n"
|
||||
f"Verfügbare Informationen:\n{context}\n\n"
|
||||
"### Regeln\n"
|
||||
"1. Nutze VORRANGIG den Artikel-Inhalt falls verfügbar, ergänze mit RSS-Informationen\n"
|
||||
"2. Falls kein Artikel-Inhalt verfügbar ist, nutze RSS-Titel und -Beschreibung\n"
|
||||
"3. Falls keine ausreichenden Informationen vorliegen, erstelle eine plausible Zusammenfassung basierend auf der URL\n"
|
||||
"4. Gib ausschließlich **gültiges minifiziertes JSON** zurück – kein Markdown, keine Kommentare\n"
|
||||
"5. Struktur: {\"title\":\"…\",\"description\":\"…\"}\n"
|
||||
"6. title: Aussagekräftiger deutscher Titel (max 100 Zeichen)\n"
|
||||
"7. description: Deutsche Zusammenfassung (zwischen 100 und 160 Wörter)\n"
|
||||
"8. Kein Text vor oder nach dem JSON\n\n"
|
||||
"### Ausgabe\n"
|
||||
"Jetzt antworte mit dem JSON:"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def summarize_article(
|
||||
client: httpx.AsyncClient,
|
||||
url: str,
|
||||
title: str = "",
|
||||
description: str = ""
|
||||
) -> Optional[ArticleSummary]:
|
||||
"""
|
||||
Generate a summary of an article using the LLM.
|
||||
Now fetches the actual article content for more accurate summaries.
|
||||
|
||||
Args:
|
||||
client: An active httpx AsyncClient for making requests
|
||||
url: URL of the article to summarize
|
||||
title: Article title from RSS feed
|
||||
description: Article description from RSS feed
|
||||
|
||||
Returns:
|
||||
A dictionary containing the article title and summaries in German and English,
|
||||
or None if summarization failed
|
||||
"""
|
||||
article_content = await NewsFetcher.fetch_article_content(client, url)
|
||||
|
||||
if not article_content:
|
||||
logger.warning(f"⚠️ Could not fetch article content, using RSS data only")
|
||||
|
||||
prompt = NewsFetcher.build_prompt(url, title, description, article_content)
|
||||
payload = {
|
||||
"model": LLM_MODEL,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"temperature": 0.1,
|
||||
"format": "json"
|
||||
}
|
||||
|
||||
try:
|
||||
response = await client.post(
|
||||
f"{OLLAMA_HOST}/api/generate",
|
||||
json=payload,
|
||||
timeout=LLM_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
llm_response = result["response"]
|
||||
|
||||
if isinstance(llm_response, str):
|
||||
summary_data = json.loads(llm_response)
|
||||
else:
|
||||
summary_data = llm_response
|
||||
|
||||
# Validate required fields
|
||||
required_fields = ["title", "description"]
|
||||
missing_fields = [field for field in required_fields if field not in summary_data]
|
||||
|
||||
if missing_fields:
|
||||
logger.warning(
|
||||
f"⚠️ Missing required fields in summary: {missing_fields}"
|
||||
)
|
||||
return None
|
||||
|
||||
# Check summary quality metrics
|
||||
description = len(summary_data.get("description", "").split())
|
||||
|
||||
if description > 160 or description < 100:
|
||||
logger.warning(
|
||||
f"⚠️ Summary exceeds word limit - "
|
||||
f"Description: {description}/160"
|
||||
)
|
||||
|
||||
return cast(ArticleSummary, summary_data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"❌ JSON parsing error for {url}: {e}")
|
||||
logger.error(
|
||||
f"🔍 Raw response that failed to parse: {llm_response[:500]}..."
|
||||
)
|
||||
return None
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"❌ HTTP error for {url}: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Unexpected error summarizing {url}: {type(e).__name__}: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def harvest_feeds() -> None:
|
||||
"""
|
||||
Fetch articles from all feeds and store summaries in the database.
|
||||
This is the main function that runs periodically to update the news database.
|
||||
"""
|
||||
|
||||
total_articles = 0
|
||||
successful_articles = 0
|
||||
failed_articles = 0
|
||||
|
||||
try:
|
||||
with db_manager.get_cursor() as cursor:
|
||||
cursor.execute("SELECT country, url FROM feeds")
|
||||
feeds = cursor.fetchall()
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
for i, feed_row in enumerate(feeds, 1):
|
||||
feed_stats = await NewsFetcher._process_feed(client, feed_row)
|
||||
|
||||
total_articles += feed_stats['total']
|
||||
successful_articles += feed_stats['successful']
|
||||
failed_articles += feed_stats['failed']
|
||||
|
||||
current_time = int(datetime.now(timezone.utc).timestamp())
|
||||
with db_manager.get_cursor() as cursor:
|
||||
cursor.execute(
|
||||
"UPDATE meta SET val=? WHERE key='last_sync'",
|
||||
(str(current_time),)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Critical error during harvest: {type(e).__name__}: {e}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
async def _process_feed(
|
||||
client: httpx.AsyncClient,
|
||||
feed_row: sqlite3.Row
|
||||
) -> Dict[str, int]:
|
||||
"""
|
||||
Process a single feed, fetching and summarizing all articles.
|
||||
Now saves summaries immediately to the database with better concurrency.
|
||||
|
||||
Args:
|
||||
client: An active httpx AsyncClient for making requests
|
||||
feed_row: A database row containing feed information
|
||||
|
||||
Returns:
|
||||
Dictionary with processing statistics
|
||||
"""
|
||||
stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0}
|
||||
|
||||
try:
|
||||
feed_data = feedparser.parse(feed_row["url"])
|
||||
|
||||
if hasattr(feed_data, 'bozo') and feed_data.bozo:
|
||||
logger.warning(f"⚠️ Feed has parsing issues: {feed_row['url']}")
|
||||
if hasattr(feed_data, 'bozo_exception'):
|
||||
logger.warning(f"⚠️ Feed exception: {feed_data.bozo_exception}")
|
||||
|
||||
total_entries = len(feed_data.entries)
|
||||
|
||||
if total_entries == 0:
|
||||
logger.warning(f"⚠️ No entries found in feed: {feed_row['url']}")
|
||||
return stats
|
||||
|
||||
for i, entry in enumerate(feed_data.entries, 1):
|
||||
stats['total'] += 1
|
||||
|
||||
if not hasattr(entry, "link"):
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
if not hasattr(entry, "published_parsed"):
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
article_url = entry.link
|
||||
|
||||
try:
|
||||
published = datetime(
|
||||
*entry.published_parsed[:6],
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
except (TypeError, ValueError):
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Check if article already exists - use readonly connection for better concurrency
|
||||
try:
|
||||
with db_manager.get_cursor_with_retry(readonly=True) as cursor:
|
||||
cursor.execute("SELECT id FROM news WHERE url = ?", (article_url,))
|
||||
if cursor.fetchone():
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
except Exception as db_error:
|
||||
logger.warning(f"⚠️ Database check failed for article {i}, continuing: {db_error}")
|
||||
|
||||
rss_title = getattr(entry, 'title', '')
|
||||
rss_description = getattr(entry, 'description', '') or getattr(entry, 'summary', '')
|
||||
|
||||
summary = await NewsFetcher.summarize_article(
|
||||
client,
|
||||
article_url,
|
||||
title=rss_title,
|
||||
description=rss_description
|
||||
)
|
||||
|
||||
if not summary:
|
||||
logger.warning(f"❌ Failed to get summary for article {i}: {article_url}")
|
||||
stats['failed'] += 1
|
||||
continue
|
||||
|
||||
published_timestamp = int(published.timestamp())
|
||||
|
||||
try:
|
||||
with db_manager.get_cursor_with_retry(readonly=False) as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT
|
||||
OR IGNORE INTO news
|
||||
(title, description, url, published, country)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
summary["title"],
|
||||
summary["description"],
|
||||
article_url,
|
||||
published_timestamp,
|
||||
feed_row["country"],
|
||||
)
|
||||
)
|
||||
|
||||
stats['successful'] += 1
|
||||
|
||||
except Exception as db_error:
|
||||
logger.error(f"❌ Database error for article {i}: {db_error}")
|
||||
stats['failed'] += 1
|
||||
continue
|
||||
|
||||
await asyncio.sleep(0.01) # 10ms delay to yield control
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error processing feed {feed_row['url']}: {type(e).__name__}: {e}")
|
||||
|
||||
return stats
|
@@ -1,8 +1,29 @@
|
||||
# URL for the Ollama service
|
||||
OLLAMA_HOST=http://localhost:11434
|
||||
|
||||
# Interval for scheduled news fetching in hours (minimum: 0.5)
|
||||
# Interval for scheduled news fetching in hours
|
||||
CRON_HOURS=1
|
||||
|
||||
# Minimum interval for scheduled news fetching in hours
|
||||
MIN_CRON_HOURS=0.5
|
||||
|
||||
# Cooldown period in minutes between manual syncs
|
||||
SYNC_COOLDOWN_MINUTES=30
|
||||
|
||||
# LLM model to use for summarization
|
||||
LLM_MODEL=qwen2:7b-instruct-q4_K_M
|
||||
|
||||
# Timeout in seconds for LLM requests
|
||||
LLM_TIMEOUT_SECONDS=180
|
||||
|
||||
# Timeout in seconds for Ollama API requests
|
||||
OLLAMA_API_TIMEOUT_SECONDS=10
|
||||
|
||||
# Timeout in seconds for article fetching
|
||||
ARTICLE_FETCH_TIMEOUT=30
|
||||
|
||||
# Maximum length of article content to process
|
||||
MAX_ARTICLE_LENGTH=5000
|
||||
|
||||
# SQLite database connection string
|
||||
DATABASE_URL=sqlite:///./newsdb.sqlite
|
||||
DB_NAME=owlynews.sqlite3
|
||||
|
@@ -8,3 +8,4 @@ uvicorn[standard]
|
||||
python-multipart
|
||||
psycopg2-binary
|
||||
sqlalchemy
|
||||
beautifulsoup4
|
||||
|
@@ -4,6 +4,7 @@ import {useNews} from './stores/useNews';
|
||||
import FeedManager from './components/FeedManager.vue';
|
||||
import CronSlider from './components/CronSlider.vue';
|
||||
import SyncButton from './components/SyncButton.vue';
|
||||
import NewsRefreshButton from './components/NewsRefreshButton.vue';
|
||||
import ModelStatus from './components/ModelStatus.vue';
|
||||
|
||||
const news = useNews();
|
||||
@@ -12,32 +13,29 @@ const filters = ref({country: 'DE'});
|
||||
onMounted(async () => {
|
||||
await news.loadLastSync();
|
||||
await news.sync(filters.value);
|
||||
await news.getNews(filters.value);
|
||||
});
|
||||
</script>
|
||||
<template>
|
||||
<main class="max-w-4xl mx-auto p-4 space-y-6">
|
||||
<h1 class="text-2xl font-bold">📰 Local News Summariser</h1>
|
||||
|
||||
<div class="grid md:grid-cols-3 gap-4">
|
||||
<div class="grid md:grid-cols-4 gap-4">
|
||||
<CronSlider/>
|
||||
<SyncButton/>
|
||||
<NewsRefreshButton/>
|
||||
<ModelStatus/>
|
||||
</div>
|
||||
|
||||
<FeedManager/>
|
||||
|
||||
<section v-if="news.offline" class="p-2 bg-yellow-100 border-l-4 border-yellow-500">
|
||||
Offline – Datenstand: {{ new Date(news.lastSync).toLocaleString() }}
|
||||
</section>
|
||||
|
||||
<article v-for="a in news.articles" :key="a.id" class="bg-white rounded p-4 shadow">
|
||||
<h2 class="font-semibold">{{ a.title }}</h2>
|
||||
<p class="text-sm text-gray-600">{{ new Date(a.published).toLocaleString() }} – {{
|
||||
a.source
|
||||
}}</p>
|
||||
<p class="mt-2">{{ a.summary_de }}</p>
|
||||
<p class="italic mt-2 text-sm text-gray-700">{{ a.summary_en }}</p>
|
||||
<a :href="a.id" target="_blank" class="text-blue-600 hover:underline">Original →</a>
|
||||
<p class="text-sm text-gray-600">{{ new Date(a.published).toLocaleString() }} – {{ a.country }}</p>
|
||||
<p>{{a.published}}</p>
|
||||
<p class="mt-2">{{ a.description }}</p>
|
||||
<p class="italic mt-2 text-sm text-gray-700">Added: {{ new Date(a.created_at).toLocaleString() }}</p>
|
||||
<a :href="a.url" target="_blank" class="text-blue-600 hover:underline">Original →</a>
|
||||
</article>
|
||||
</main>
|
||||
</template>
|
||||
|
@@ -12,7 +12,7 @@ onMounted(async () => {
|
||||
async function update() {
|
||||
saving.value = true;
|
||||
await fetch('/settings/cron', {
|
||||
method: 'PUT',
|
||||
method: 'POST',
|
||||
headers: {'Content-Type': 'application/json'},
|
||||
body: JSON.stringify({hours: hours.value})
|
||||
});
|
||||
|
27
frontend/src/components/NewsRefreshButton.vue
Normal file
27
frontend/src/components/NewsRefreshButton.vue
Normal file
@@ -0,0 +1,27 @@
|
||||
<script setup lang="ts">
|
||||
import { ref } from 'vue';
|
||||
import { useNews } from '../stores/useNews';
|
||||
|
||||
const news = useNews();
|
||||
const isLoading = ref(false);
|
||||
|
||||
async function refreshNews() {
|
||||
isLoading.value = true;
|
||||
try {
|
||||
await news.getNews({country: 'DE'});
|
||||
} finally {
|
||||
isLoading.value = false;
|
||||
}
|
||||
}
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<button
|
||||
@click="refreshNews"
|
||||
class="px-4 py-2 rounded bg-green-600 hover:bg-green-700 text-white flex items-center justify-center"
|
||||
:disabled="isLoading"
|
||||
>
|
||||
<span v-if="isLoading" class="animate-spin mr-2">⟳</span>
|
||||
<span>Refresh News</span>
|
||||
</button>
|
||||
</template>
|
@@ -1,16 +1,16 @@
|
||||
import {defineStore} from 'pinia';
|
||||
import {set, get} from 'idb-keyval';
|
||||
|
||||
export const useNews = defineStore('news', {
|
||||
state: () => ({
|
||||
articles: [] as {
|
||||
id: string,
|
||||
published: number,
|
||||
id: number,
|
||||
title: string,
|
||||
source: string,
|
||||
summary_de: string,
|
||||
summary_en: string
|
||||
}[], lastSync: 0, offline: false
|
||||
description: string,
|
||||
url: string,
|
||||
published: number,
|
||||
country: string,
|
||||
created_at: number
|
||||
}[], lastSync: 0
|
||||
}),
|
||||
actions: {
|
||||
async loadLastSync() {
|
||||
@@ -21,23 +21,31 @@ export const useNews = defineStore('news', {
|
||||
return Date.now() - this.lastSync > 30 * 60 * 1000; // 30‑min guard
|
||||
},
|
||||
async sync(filters: Record<string, string>) {
|
||||
try {
|
||||
if (!this.canManualSync()) throw new Error('Too soon');
|
||||
const q = new URLSearchParams(filters).toString();
|
||||
const res = await fetch(`/news?${q}`);
|
||||
if (!res.ok) throw new Error('network');
|
||||
if (!this.canManualSync()) {
|
||||
console.log('Too soon to sync again');
|
||||
return;
|
||||
}
|
||||
|
||||
const q = new URLSearchParams(filters).toString();
|
||||
const res = await fetch(`/news?${q}`);
|
||||
|
||||
if (res.ok) {
|
||||
const data = await res.json();
|
||||
this.articles = data;
|
||||
this.lastSync = Date.now();
|
||||
await set(JSON.stringify(filters), data);
|
||||
this.offline = false;
|
||||
} catch (e) {
|
||||
const cached = await get(JSON.stringify(filters));
|
||||
if (cached) {
|
||||
this.articles = cached;
|
||||
this.offline = true;
|
||||
}
|
||||
}
|
||||
},
|
||||
async getNews(filters: Record<string, string> = {}) {
|
||||
const q = new URLSearchParams(filters).toString();
|
||||
const res = await fetch(`/news?${q}`);
|
||||
|
||||
if (res.ok) {
|
||||
const data = await res.json();
|
||||
this.articles = data;
|
||||
return data;
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@@ -5,7 +5,6 @@ import vue from '@vitejs/plugin-vue';
|
||||
import vueJsx from '@vitejs/plugin-vue-jsx';
|
||||
import {defineConfig} from 'vite';
|
||||
import vueDevTools from 'vite-plugin-vue-devtools';
|
||||
import {VitePWA} from "vite-plugin-pwa";
|
||||
|
||||
// https://vite.dev/config/
|
||||
export default defineConfig({
|
||||
@@ -13,22 +12,7 @@ export default defineConfig({
|
||||
vue(),
|
||||
vueJsx(),
|
||||
vueDevTools(),
|
||||
tailwindcss(),
|
||||
VitePWA({
|
||||
registerType: 'autoUpdate',
|
||||
workbox: {
|
||||
runtimeCaching: [
|
||||
{
|
||||
urlPattern: /\/news\?/,
|
||||
handler: 'NetworkFirst',
|
||||
options: {
|
||||
cacheName: 'news-api',
|
||||
networkTimeoutSeconds: 3
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
})
|
||||
tailwindcss()
|
||||
],
|
||||
build: {outDir: 'dist'},
|
||||
resolve: {
|
||||
|
Reference in New Issue
Block a user