Files
owly-news/backend/app/services.py

481 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import re
import sqlite3
from datetime import datetime, timezone
from typing import Dict, Optional, cast
import feedparser
import httpx
from bs4 import BeautifulSoup
from backend.app.config import (
ARTICLE_FETCH_TIMEOUT,
LLM_MODEL,
LLM_TIMEOUT_SECONDS,
MAX_ARTICLE_LENGTH,
OLLAMA_HOST,
logger,
)
from backend.app.database import db_manager
from backend.app.models import ArticleSummary
class NewsFetcher:
"""
Handles fetching and summarizing news articles from RSS feeds.
Uses Ollama/qwen to generate summaries of articles.
"""
@staticmethod
async def fetch_article_content(
client: httpx.AsyncClient,
url: str) -> str:
"""
Fetch and extract the main content from an article URL.
Args:
client: An active httpx AsyncClient for making requests
url: URL of the article to fetch
Returns:
Extracted text content from the article, or empty string if failed
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/91.0.4472.124 Safari/537.36'
}
response = await client.get(
url,
headers=headers,
timeout=ARTICLE_FETCH_TIMEOUT,
follow_redirects=True
)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
for element in soup(['script',
'style',
'nav',
'header',
'footer',
'aside',
'form',
'button']):
element.decompose()
content_selectors = [
'article',
'[role="main"]',
'.content',
'.article-content',
'.post-content',
'.entry-content',
'.main-content',
'main',
'.story-body',
'.article-body'
]
article_text = ""
for selector in content_selectors:
elements = soup.select(selector)
if elements:
for element in elements:
text = element.get_text(separator=' ', strip=True)
if len(text) > len(article_text):
article_text = text
break
# Fallback: get text from body if no specific content area found
if not article_text:
body = soup.find('body')
if body:
article_text = body.get_text(separator=' ', strip=True)
article_text = re.sub(
r'\s+', ' ', article_text) # Normalize whitespace
article_text = article_text.strip()
# Limit length to avoid overwhelming the LLM
if len(article_text) > MAX_ARTICLE_LENGTH:
article_text = article_text[:MAX_ARTICLE_LENGTH] + "..."
return article_text
except httpx.TimeoutException:
logger.warning(f"⏰ Timeout fetching article content from: {url}")
return ""
except httpx.HTTPError as e:
logger.warning(
f"🌐 HTTP error fetching article content from {url}: {e}")
return ""
except Exception as e:
logger.warning(
f"❌ Error fetching article content from {url}: {
type(e).__name__}: {e}")
return ""
@staticmethod
def build_prompt(
title: str = "",
summary: str = "",
content: str = "") -> str:
"""
Generate a prompt for the LLM to summarize an article.
Args:
title: Article title from RSS feed (optional)
summary: Article summary from RSS feed (optional)
content: Extracted article content (optional)
Returns:
A formatted prompt string that instructs the LLM to generate
a JSON response with title, summary and tags in German
"""
context_info = []
if title:
context_info.append(f"RSS-Titel: {title}")
if summary:
context_info.append(f"RSS-Beschreibung: {summary}")
if content:
content_preview = content[:500] + \
"..." if len(content) > 500 else content
context_info.append(f"Artikel-Inhalt: {content_preview}")
context = "\n".join(
context_info) if context_info else "Keine zusätzlichen Informationen verfügbar."
return (
"### Vorliegende Informationen\n"
f"{context}\n\n"
"### Längenbegrenzungen\n"
"title: Format \"ORT: Titel\", max 100 Zeichen\n"
"location: nur der ORT-Teil, max 40 Zeichen\n"
"summary: 100160 Wörter\n"
"tags: bis zu 6 Schlüsselwörter, durch Komma getrennt, alles Kleinbuchstaben.\n\n"
"### Regeln\n"
"1. Nutze ausschließlich Informationen, die im bereitgestellten Material eindeutig vorkommen. Externes Wissen ist untersagt.\n"
"2. Liegt sowohl Artikel-Text als auch RSS-Metadaten vor, hat der Artikel-Text Vorrang; verwende RSS nur ergänzend.\n"
"3. Liegt nur RSS-Titel und/oder -Beschreibung vor, stütze dich ausschließlich darauf.\n"
"4. Sind die Informationen unzureichend, gib exakt {\"location\":\"\",\"title\":\"\",\"summary\":\"\",\"tags\":\"\"} zurück.\n"
"5. Gib nur gültiges, minifiziertes JSON zurück keine Zeilenumbrüche, kein Markdown, keine Kommentare.\n"
"6. Verwende keine hypothetischen Formulierungen (\"könnte\", \"möglicherweise\" etc.).\n"
"7. Wörtliche Zitate dürfen höchstens 15 % des Summary-Texts ausmachen.\n"
"8. Kein Text vor oder nach dem JSON.\n\n"
"### Ausgabe\n"
"Antworte jetzt ausschließlich mit dem JSON:\n"
)
@staticmethod
def build_system_prompt():
return (
"Du bist ein hochpräziser JSON-Summarizer und Experte für die Zusammenfassung von Artikeln.\n\n"
"### Vorgehen\n"
"Schritt 1: Identifiziere Hauptthema und Zweck.\n"
"Schritt 2: Extrahiere die wichtigsten Fakten und Ergebnisse.\n"
"Schritt 3: Erkenne die zentralen Argumente und Standpunkte.\n"
"Schritt 4: Ordne die Informationen nach Wichtigkeit.\n"
"Schritt 5: Erstelle eine prägnante, klare und sachliche Zusammenfassung.\n\n"
)
@staticmethod
async def summarize_article(
client: httpx.AsyncClient,
url: str,
title: str = "",
summary: str = ""
) -> Optional[ArticleSummary]:
"""
Generate a summary of an article using the LLM.
Now fetches the actual article content for more accurate summaries.
Args:
client: An active httpx AsyncClient for making requests
url: URL of the article to summarize
title: Article title from RSS feed
summary: Article summary from RSS feed
Returns:
A dictionary containing the article title and summaries in German and English,
or None if summarization failed
"""
logger.info("[AI] Fetching article content from: " + url)
article_content = await NewsFetcher.fetch_article_content(client, url)
if not article_content:
logger.warning(
f"⚠️ Could not fetch article content, using RSS data only")
prompt = NewsFetcher.build_prompt(title, summary, article_content)
system_prompt = NewsFetcher.build_system_prompt()
payload = {
"model": LLM_MODEL,
"prompt": prompt,
"system": system_prompt,
"stream": False,
"temperature": 0.1,
"format": {
"type": "object",
"properties": {
"title": {
"type": "string"
},
"summary": {
"type": "string"
},
"tags": {
"type": "array",
"items": {
"type": "string"
}
},
"location": {
"type": "string"
}
},
"required": [
"title",
"summary",
"tags"
]
},
"options": {
"num_gpu": 1, # Force GPU usage
"num_ctx": 8192, # Context size
}
}
logger.info("[AI] Running summary generation...")
try:
response = await client.post(
f"{OLLAMA_HOST}/api/generate",
json=payload,
timeout=LLM_TIMEOUT_SECONDS
)
response.raise_for_status()
result = response.json()
llm_response = result["response"]
logger.info("[AI] " + llm_response)
if isinstance(llm_response, str):
summary_data = json.loads(llm_response)
else:
summary_data = llm_response
# Validate required fields
required_fields = ["title", "summary"]
missing_fields = [
field for field in required_fields if field not in summary_data]
if missing_fields:
logger.warning(
f"⚠️ Missing required fields in summary: {missing_fields}"
)
return None
# Check summary quality metrics
summary_length = len(summary_data.get("summary", "").split())
if summary_length > 160:
logger.warning(
f"⚠️ Summary exceeds word limit - "
f"Summary: {summary_length}/160"
)
return cast(ArticleSummary, summary_data)
except json.JSONDecodeError as e:
logger.error(f"❌ JSON parsing error for {url}: {e}")
logger.error(
f"🔍 Raw response that failed to parse: {llm_response[:500]}..."
)
return None
except httpx.HTTPError as e:
logger.error(f"❌ HTTP error for {url}: {e}")
return None
except Exception as e:
logger.error(
f"❌ Unexpected error summarizing {url}: {
type(e).__name__}: {e}")
return None
@staticmethod
async def harvest_feeds() -> None:
"""
Fetch articles from all feeds and store summaries in the database.
This is the main function that runs periodically to update the news database.
"""
total_articles = 0
successful_articles = 0
failed_articles = 0
try:
with db_manager.get_cursor() as cursor:
cursor.execute("SELECT country, url FROM feeds")
feeds = cursor.fetchall()
async with httpx.AsyncClient() as client:
for i, feed_row in enumerate(feeds, 1):
feed_stats = await NewsFetcher._process_feed(client, feed_row)
total_articles += feed_stats['total']
successful_articles += feed_stats['successful']
failed_articles += feed_stats['failed']
current_time = int(datetime.now(timezone.utc).timestamp())
with db_manager.get_cursor() as cursor:
cursor.execute(
"UPDATE meta SET val=? WHERE key='last_sync'",
(str(current_time),)
)
except Exception as e:
logger.error(
f"❌ Critical error during harvest: {
type(e).__name__}: {e}")
raise
@staticmethod
async def _process_feed(
client: httpx.AsyncClient,
feed_row: sqlite3.Row
) -> Dict[str, int]:
"""
Process a single feed, fetching and summarizing all articles.
Now saves summaries immediately to the database with better concurrency.
Args:
client: An active httpx AsyncClient for making requests
feed_row: A database row containing feed information
Returns:
Dictionary with processing statistics
"""
stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0}
try:
feed_data = feedparser.parse(feed_row["url"])
if hasattr(feed_data, 'bozo') and feed_data.bozo:
logger.warning(
f"⚠️ Feed has parsing issues: {
feed_row['url']}")
if hasattr(feed_data, 'bozo_exception'):
logger.warning(
f"⚠️ Feed exception: {
feed_data.bozo_exception}")
total_entries = len(feed_data.entries)
if total_entries == 0:
logger.warning(
f"⚠️ No entries found in feed: {
feed_row['url']}")
return stats
for i, entry in enumerate(feed_data.entries, 1):
stats['total'] += 1
if not hasattr(entry, "link"):
stats['skipped'] += 1
continue
if not hasattr(entry, "published_parsed"):
stats['skipped'] += 1
continue
article_url = entry.link
try:
published = datetime(
*entry.published_parsed[:6],
tzinfo=timezone.utc
)
except (TypeError, ValueError):
stats['skipped'] += 1
continue
# Check if article already exists - use readonly connection for
# better concurrency
try:
with db_manager.get_cursor_with_retry(readonly=True) as cursor:
cursor.execute(
"SELECT id FROM news WHERE url = ?", (article_url,))
if cursor.fetchone():
stats['skipped'] += 1
continue
except Exception as db_error:
logger.warning(
f"⚠️ Database check failed for article {i}, continuing: {db_error}")
rss_title = getattr(entry, 'title', '')
rss_summary = getattr(
entry, 'description', '') or getattr(
entry, 'summary', '')
summary = await NewsFetcher.summarize_article(
client,
article_url,
title=rss_title,
summary=rss_summary
)
logger.info(summary)
if not summary:
logger.warning(
f"❌ Failed to get summary for article {i}: {article_url}")
stats['failed'] += 1
continue
published_timestamp = int(published.timestamp())
try:
with db_manager.get_cursor_with_retry(readonly=False) as cursor:
cursor.execute(
"""
INSERT
OR IGNORE
INTO news
(title, summary, url, published, country)
VALUES (?, ?, ?, ?, ?)
""",
(
summary["title"],
summary["summary"],
article_url,
published_timestamp,
feed_row["country"],
)
)
stats['successful'] += 1
except Exception as db_error:
logger.error(
f"❌ Database error for article {i}: {db_error}")
stats['failed'] += 1
continue
await asyncio.sleep(0.01) # 10ms delay to yield control
except Exception as e:
logger.error(
f"❌ Error processing feed {
feed_row['url']}: {
type(e).__name__}: {e}")
return stats