382 lines
14 KiB
Python
382 lines
14 KiB
Python
import asyncio
|
||
import json
|
||
import re
|
||
import sqlite3
|
||
from datetime import datetime, timezone
|
||
from typing import Optional, cast, Dict
|
||
|
||
import feedparser
|
||
import httpx
|
||
from bs4 import BeautifulSoup
|
||
|
||
from backend.app.config import ARTICLE_FETCH_TIMEOUT, MAX_ARTICLE_LENGTH, logger, LLM_MODEL, OLLAMA_HOST, \
|
||
LLM_TIMEOUT_SECONDS
|
||
from backend.app.database import db_manager
|
||
from backend.app.models import ArticleSummary
|
||
|
||
|
||
class NewsFetcher:
|
||
"""
|
||
Handles fetching and summarizing news articles from RSS feeds.
|
||
Uses Ollama/qwen to generate summaries of articles.
|
||
"""
|
||
|
||
@staticmethod
|
||
async def fetch_article_content(client: httpx.AsyncClient, url: str) -> str:
|
||
"""
|
||
Fetch and extract the main content from an article URL.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
url: URL of the article to fetch
|
||
|
||
Returns:
|
||
Extracted text content from the article, or empty string if failed
|
||
"""
|
||
try:
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
||
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
||
'Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
response = await client.get(
|
||
url,
|
||
headers=headers,
|
||
timeout=ARTICLE_FETCH_TIMEOUT,
|
||
follow_redirects=True
|
||
)
|
||
|
||
response.raise_for_status()
|
||
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
|
||
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']):
|
||
element.decompose()
|
||
|
||
content_selectors = [
|
||
'article',
|
||
'[role="main"]',
|
||
'.content',
|
||
'.article-content',
|
||
'.post-content',
|
||
'.entry-content',
|
||
'.main-content',
|
||
'main',
|
||
'.story-body',
|
||
'.article-body'
|
||
]
|
||
|
||
article_text = ""
|
||
|
||
for selector in content_selectors:
|
||
elements = soup.select(selector)
|
||
if elements:
|
||
for element in elements:
|
||
text = element.get_text(separator=' ', strip=True)
|
||
if len(text) > len(article_text):
|
||
article_text = text
|
||
break
|
||
|
||
# Fallback: get text from body if no specific content area found
|
||
if not article_text:
|
||
body = soup.find('body')
|
||
if body:
|
||
article_text = body.get_text(separator=' ', strip=True)
|
||
|
||
article_text = re.sub(r'\s+', ' ', article_text) # Normalize whitespace
|
||
article_text = article_text.strip()
|
||
|
||
# Limit length to avoid overwhelming the LLM
|
||
if len(article_text) > MAX_ARTICLE_LENGTH:
|
||
article_text = article_text[:MAX_ARTICLE_LENGTH] + "..."
|
||
|
||
return article_text
|
||
|
||
except httpx.TimeoutException:
|
||
logger.warning(f"⏰ Timeout fetching article content from: {url}")
|
||
return ""
|
||
except httpx.HTTPError as e:
|
||
logger.warning(f"🌐 HTTP error fetching article content from {url}: {e}")
|
||
return ""
|
||
except Exception as e:
|
||
logger.warning(f"❌ Error fetching article content from {url}: {type(e).__name__}: {e}")
|
||
return ""
|
||
|
||
@staticmethod
|
||
def build_prompt(url: str, title: str = "", description: str = "", content: str = "") -> str:
|
||
"""
|
||
Generate a prompt for the LLM to summarize an article.
|
||
|
||
Args:
|
||
url: Public URL of the article to summarize
|
||
title: Article title from RSS feed (optional)
|
||
description: Article description from RSS feed (optional)
|
||
content: Extracted article content (optional)
|
||
|
||
Returns:
|
||
A formatted prompt string that instructs the LLM to generate
|
||
a JSON response with title and summaries in German and English
|
||
"""
|
||
context_info = []
|
||
if title:
|
||
context_info.append(f"RSS-Titel: {title}")
|
||
if description:
|
||
context_info.append(f"RSS-Beschreibung: {description}")
|
||
if content:
|
||
content_preview = content[:500] + "..." if len(content) > 500 else content
|
||
context_info.append(f"Artikel-Inhalt: {content_preview}")
|
||
|
||
context = "\n".join(context_info) if context_info else "Keine zusätzlichen Informationen verfügbar."
|
||
|
||
return (
|
||
"### Aufgabe\n"
|
||
f"Du sollst eine Nachricht basierend auf der URL und den verfügbaren Informationen zusammenfassen.\n"
|
||
f"URL: {url}\n"
|
||
f"Verfügbare Informationen:\n{context}\n\n"
|
||
"### Regeln\n"
|
||
"1. Nutze VORRANGIG den Artikel-Inhalt falls verfügbar, ergänze mit RSS-Informationen\n"
|
||
"2. Falls kein Artikel-Inhalt verfügbar ist, nutze RSS-Titel und -Beschreibung\n"
|
||
"3. Falls keine ausreichenden Informationen vorliegen, erstelle eine plausible Zusammenfassung basierend auf der URL\n"
|
||
"4. Gib ausschließlich **gültiges minifiziertes JSON** zurück – kein Markdown, keine Kommentare\n"
|
||
"5. Struktur: {\"title\":\"…\",\"description\":\"…\"}\n"
|
||
"6. title: Aussagekräftiger deutscher Titel (max 100 Zeichen)\n"
|
||
"7. description: Deutsche Zusammenfassung (zwischen 100 und 160 Wörter)\n"
|
||
"8. Kein Text vor oder nach dem JSON\n\n"
|
||
"### Ausgabe\n"
|
||
"Jetzt antworte mit dem JSON:"
|
||
)
|
||
|
||
@staticmethod
|
||
async def summarize_article(
|
||
client: httpx.AsyncClient,
|
||
url: str,
|
||
title: str = "",
|
||
description: str = ""
|
||
) -> Optional[ArticleSummary]:
|
||
"""
|
||
Generate a summary of an article using the LLM.
|
||
Now fetches the actual article content for more accurate summaries.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
url: URL of the article to summarize
|
||
title: Article title from RSS feed
|
||
description: Article description from RSS feed
|
||
|
||
Returns:
|
||
A dictionary containing the article title and summaries in German and English,
|
||
or None if summarization failed
|
||
"""
|
||
article_content = await NewsFetcher.fetch_article_content(client, url)
|
||
|
||
if not article_content:
|
||
logger.warning(f"⚠️ Could not fetch article content, using RSS data only")
|
||
|
||
prompt = NewsFetcher.build_prompt(url, title, description, article_content)
|
||
payload = {
|
||
"model": LLM_MODEL,
|
||
"prompt": prompt,
|
||
"stream": False,
|
||
"temperature": 0.1,
|
||
"format": "json"
|
||
}
|
||
|
||
try:
|
||
response = await client.post(
|
||
f"{OLLAMA_HOST}/api/generate",
|
||
json=payload,
|
||
timeout=LLM_TIMEOUT_SECONDS
|
||
)
|
||
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
llm_response = result["response"]
|
||
|
||
if isinstance(llm_response, str):
|
||
summary_data = json.loads(llm_response)
|
||
else:
|
||
summary_data = llm_response
|
||
|
||
# Validate required fields
|
||
required_fields = ["title", "description"]
|
||
missing_fields = [field for field in required_fields if field not in summary_data]
|
||
|
||
if missing_fields:
|
||
logger.warning(
|
||
f"⚠️ Missing required fields in summary: {missing_fields}"
|
||
)
|
||
return None
|
||
|
||
# Check summary quality metrics
|
||
description = len(summary_data.get("description", "").split())
|
||
|
||
if description > 160 or description < 100:
|
||
logger.warning(
|
||
f"⚠️ Summary exceeds word limit - "
|
||
f"Description: {description}/160"
|
||
)
|
||
|
||
return cast(ArticleSummary, summary_data)
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"❌ JSON parsing error for {url}: {e}")
|
||
logger.error(
|
||
f"🔍 Raw response that failed to parse: {llm_response[:500]}..."
|
||
)
|
||
return None
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"❌ HTTP error for {url}: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"❌ Unexpected error summarizing {url}: {type(e).__name__}: {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
async def harvest_feeds() -> None:
|
||
"""
|
||
Fetch articles from all feeds and store summaries in the database.
|
||
This is the main function that runs periodically to update the news database.
|
||
"""
|
||
|
||
total_articles = 0
|
||
successful_articles = 0
|
||
failed_articles = 0
|
||
|
||
try:
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute("SELECT country, url FROM feeds")
|
||
feeds = cursor.fetchall()
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
for i, feed_row in enumerate(feeds, 1):
|
||
feed_stats = await NewsFetcher._process_feed(client, feed_row)
|
||
|
||
total_articles += feed_stats['total']
|
||
successful_articles += feed_stats['successful']
|
||
failed_articles += feed_stats['failed']
|
||
|
||
current_time = int(datetime.now(timezone.utc).timestamp())
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute(
|
||
"UPDATE meta SET val=? WHERE key='last_sync'",
|
||
(str(current_time),)
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Critical error during harvest: {type(e).__name__}: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
async def _process_feed(
|
||
client: httpx.AsyncClient,
|
||
feed_row: sqlite3.Row
|
||
) -> Dict[str, int]:
|
||
"""
|
||
Process a single feed, fetching and summarizing all articles.
|
||
Now saves summaries immediately to the database with better concurrency.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
feed_row: A database row containing feed information
|
||
|
||
Returns:
|
||
Dictionary with processing statistics
|
||
"""
|
||
stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0}
|
||
|
||
try:
|
||
feed_data = feedparser.parse(feed_row["url"])
|
||
|
||
if hasattr(feed_data, 'bozo') and feed_data.bozo:
|
||
logger.warning(f"⚠️ Feed has parsing issues: {feed_row['url']}")
|
||
if hasattr(feed_data, 'bozo_exception'):
|
||
logger.warning(f"⚠️ Feed exception: {feed_data.bozo_exception}")
|
||
|
||
total_entries = len(feed_data.entries)
|
||
|
||
if total_entries == 0:
|
||
logger.warning(f"⚠️ No entries found in feed: {feed_row['url']}")
|
||
return stats
|
||
|
||
for i, entry in enumerate(feed_data.entries, 1):
|
||
stats['total'] += 1
|
||
|
||
if not hasattr(entry, "link"):
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
if not hasattr(entry, "published_parsed"):
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
article_url = entry.link
|
||
|
||
try:
|
||
published = datetime(
|
||
*entry.published_parsed[:6],
|
||
tzinfo=timezone.utc
|
||
)
|
||
except (TypeError, ValueError):
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
# Check if article already exists - use readonly connection for better concurrency
|
||
try:
|
||
with db_manager.get_cursor_with_retry(readonly=True) as cursor:
|
||
cursor.execute("SELECT id FROM news WHERE url = ?", (article_url,))
|
||
if cursor.fetchone():
|
||
stats['skipped'] += 1
|
||
continue
|
||
except Exception as db_error:
|
||
logger.warning(f"⚠️ Database check failed for article {i}, continuing: {db_error}")
|
||
|
||
rss_title = getattr(entry, 'title', '')
|
||
rss_description = getattr(entry, 'description', '') or getattr(entry, 'summary', '')
|
||
|
||
summary = await NewsFetcher.summarize_article(
|
||
client,
|
||
article_url,
|
||
title=rss_title,
|
||
description=rss_description
|
||
)
|
||
|
||
if not summary:
|
||
logger.warning(f"❌ Failed to get summary for article {i}: {article_url}")
|
||
stats['failed'] += 1
|
||
continue
|
||
|
||
published_timestamp = int(published.timestamp())
|
||
|
||
try:
|
||
with db_manager.get_cursor_with_retry(readonly=False) as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT
|
||
OR IGNORE INTO news
|
||
(title, description, url, published, country)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
summary["title"],
|
||
summary["description"],
|
||
article_url,
|
||
published_timestamp,
|
||
feed_row["country"],
|
||
)
|
||
)
|
||
|
||
stats['successful'] += 1
|
||
|
||
except Exception as db_error:
|
||
logger.error(f"❌ Database error for article {i}: {db_error}")
|
||
stats['failed'] += 1
|
||
continue
|
||
|
||
await asyncio.sleep(0.01) # 10ms delay to yield control
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error processing feed {feed_row['url']}: {type(e).__name__}: {e}")
|
||
|
||
return stats
|