import asyncio import json import re import sqlite3 from datetime import datetime, timezone from typing import Dict, Optional, cast import feedparser import httpx from bs4 import BeautifulSoup from backend.app.config import ( ARTICLE_FETCH_TIMEOUT, LLM_MODEL, LLM_TIMEOUT_SECONDS, MAX_ARTICLE_LENGTH, OLLAMA_HOST, logger, ) from backend.app.database import db_manager from backend.app.models import ArticleSummary class NewsFetcher: """ Handles fetching and summarizing news articles from RSS feeds. Uses Ollama/qwen to generate summaries of articles. """ @staticmethod async def fetch_article_content( client: httpx.AsyncClient, url: str) -> str: """ Fetch and extract the main content from an article URL. Args: client: An active httpx AsyncClient for making requests url: URL of the article to fetch Returns: Extracted text content from the article, or empty string if failed """ try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/91.0.4472.124 Safari/537.36' } response = await client.get( url, headers=headers, timeout=ARTICLE_FETCH_TIMEOUT, follow_redirects=True ) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']): element.decompose() content_selectors = [ 'article', '[role="main"]', '.content', '.article-content', '.post-content', '.entry-content', '.main-content', 'main', '.story-body', '.article-body' ] article_text = "" for selector in content_selectors: elements = soup.select(selector) if elements: for element in elements: text = element.get_text(separator=' ', strip=True) if len(text) > len(article_text): article_text = text break # Fallback: get text from body if no specific content area found if not article_text: body = soup.find('body') if body: article_text = body.get_text(separator=' ', strip=True) article_text = re.sub( r'\s+', ' ', article_text) # Normalize whitespace article_text = article_text.strip() # Limit length to avoid overwhelming the LLM if len(article_text) > MAX_ARTICLE_LENGTH: article_text = article_text[:MAX_ARTICLE_LENGTH] + "..." return article_text except httpx.TimeoutException: logger.warning(f"⏰ Timeout fetching article content from: {url}") return "" except httpx.HTTPError as e: logger.warning( f"🌐 HTTP error fetching article content from {url}: {e}") return "" except Exception as e: logger.warning( f"❌ Error fetching article content from {url}: { type(e).__name__}: {e}") return "" @staticmethod def build_prompt( url: str, title: str = "", summary: str = "", content: str = "") -> str: """ Generate a prompt for the LLM to summarize an article. Args: url: Public URL of the article to summarize title: Article title from RSS feed (optional) summary: Article summary from RSS feed (optional) content: Extracted article content (optional) Returns: A formatted prompt string that instructs the LLM to generate a JSON response with title and summaries in German and English """ context_info = [] if title: context_info.append(f"RSS-Titel: {title}") if summary: context_info.append(f"RSS-Beschreibung: {summary}") if content: content_preview = content[:500] + \ "..." if len(content) > 500 else content context_info.append(f"Artikel-Inhalt: {content_preview}") context = "\n".join( context_info) if context_info else "Keine zusätzlichen Informationen verfügbar." return ( "### Aufgabe\n" f"Du sollst eine Nachricht basierend auf der URL und den verfügbaren Informationen zusammenfassen.\n" f"URL: {url}\n" f"Verfügbare Informationen:\n{context}\n\n" "### Regeln\n" "1. Nutze VORRANGIG den Artikel-Inhalt falls verfügbar, ergänze mit RSS-Informationen\n" "2. Falls kein Artikel-Inhalt verfügbar ist, nutze RSS-Titel und -Beschreibung\n" "3. Falls keine ausreichenden Informationen vorliegen, erstelle eine plausible Zusammenfassung basierend auf der URL\n" "4. Gib ausschließlich **gültiges minifiziertes JSON** zurück – kein Markdown, keine Kommentare\n" "5. Struktur: {\"title\":\"…\",\"summary\":\"…\"}\n" "6. title: Aussagekräftiger deutscher Titel (max 100 Zeichen)\n" "7. summary: Deutsche Zusammenfassung (zwischen 100 und 160 Wörter)\n" "8. Kein Text vor oder nach dem JSON\n\n" "### Ausgabe\n" "Jetzt antworte mit dem JSON:" ) @staticmethod async def summarize_article( client: httpx.AsyncClient, url: str, title: str = "", summary: str = "" ) -> Optional[ArticleSummary]: """ Generate a summary of an article using the LLM. Now fetches the actual article content for more accurate summaries. Args: client: An active httpx AsyncClient for making requests url: URL of the article to summarize title: Article title from RSS feed summary: Article summary from RSS feed Returns: A dictionary containing the article title and summaries in German and English, or None if summarization failed """ article_content = await NewsFetcher.fetch_article_content(client, url) if not article_content: logger.warning( f"⚠️ Could not fetch article content, using RSS data only") prompt = NewsFetcher.build_prompt( url, title, summary, article_content) payload = { "model": LLM_MODEL, "prompt": prompt, "stream": False, "temperature": 0.1, "format": "json", "options": { "num_gpu": 1, # Force GPU usage "num_ctx": 128_000, # Context size } } try: response = await client.post( f"{OLLAMA_HOST}/api/generate", json=payload, timeout=LLM_TIMEOUT_SECONDS ) response.raise_for_status() result = response.json() llm_response = result["response"] if isinstance(llm_response, str): summary_data = json.loads(llm_response) else: summary_data = llm_response # Validate required fields required_fields = ["title", "summary"] missing_fields = [ field for field in required_fields if field not in summary_data] if missing_fields: logger.warning( f"⚠️ Missing required fields in summary: {missing_fields}" ) return None # Check summary quality metrics summary_length = len(summary_data.get("summary", "").split()) if summary_length > 160: logger.warning( f"⚠️ Summary exceeds word limit - " f"Summary: {summary_length}/160" ) return cast(ArticleSummary, summary_data) except json.JSONDecodeError as e: logger.error(f"❌ JSON parsing error for {url}: {e}") logger.error( f"🔍 Raw response that failed to parse: {llm_response[:500]}..." ) return None except httpx.HTTPError as e: logger.error(f"❌ HTTP error for {url}: {e}") return None except Exception as e: logger.error( f"❌ Unexpected error summarizing {url}: { type(e).__name__}: {e}") return None @staticmethod async def harvest_feeds() -> None: """ Fetch articles from all feeds and store summaries in the database. This is the main function that runs periodically to update the news database. """ total_articles = 0 successful_articles = 0 failed_articles = 0 try: with db_manager.get_cursor() as cursor: cursor.execute("SELECT country, url FROM feeds") feeds = cursor.fetchall() async with httpx.AsyncClient() as client: for i, feed_row in enumerate(feeds, 1): feed_stats = await NewsFetcher._process_feed(client, feed_row) total_articles += feed_stats['total'] successful_articles += feed_stats['successful'] failed_articles += feed_stats['failed'] current_time = int(datetime.now(timezone.utc).timestamp()) with db_manager.get_cursor() as cursor: cursor.execute( "UPDATE meta SET val=? WHERE key='last_sync'", (str(current_time),) ) except Exception as e: logger.error( f"❌ Critical error during harvest: { type(e).__name__}: {e}") raise @staticmethod async def _process_feed( client: httpx.AsyncClient, feed_row: sqlite3.Row ) -> Dict[str, int]: """ Process a single feed, fetching and summarizing all articles. Now saves summaries immediately to the database with better concurrency. Args: client: An active httpx AsyncClient for making requests feed_row: A database row containing feed information Returns: Dictionary with processing statistics """ stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0} try: feed_data = feedparser.parse(feed_row["url"]) if hasattr(feed_data, 'bozo') and feed_data.bozo: logger.warning( f"⚠️ Feed has parsing issues: { feed_row['url']}") if hasattr(feed_data, 'bozo_exception'): logger.warning( f"⚠️ Feed exception: { feed_data.bozo_exception}") total_entries = len(feed_data.entries) if total_entries == 0: logger.warning( f"⚠️ No entries found in feed: { feed_row['url']}") return stats for i, entry in enumerate(feed_data.entries, 1): stats['total'] += 1 if not hasattr(entry, "link"): stats['skipped'] += 1 continue if not hasattr(entry, "published_parsed"): stats['skipped'] += 1 continue article_url = entry.link try: published = datetime( *entry.published_parsed[:6], tzinfo=timezone.utc ) except (TypeError, ValueError): stats['skipped'] += 1 continue # Check if article already exists - use readonly connection for # better concurrency try: with db_manager.get_cursor_with_retry(readonly=True) as cursor: cursor.execute( "SELECT id FROM news WHERE url = ?", (article_url,)) if cursor.fetchone(): stats['skipped'] += 1 continue except Exception as db_error: logger.warning( f"⚠️ Database check failed for article {i}, continuing: {db_error}") rss_title = getattr(entry, 'title', '') rss_summary = getattr( entry, 'description', '') or getattr( entry, 'summary', '') summary = await NewsFetcher.summarize_article( client, article_url, title=rss_title, summary=rss_summary ) logger.info(summary) if not summary: logger.warning( f"❌ Failed to get summary for article {i}: {article_url}") stats['failed'] += 1 continue published_timestamp = int(published.timestamp()) try: with db_manager.get_cursor_with_retry(readonly=False) as cursor: cursor.execute( """ INSERT OR IGNORE INTO news (title, summary, url, published, country) VALUES (?, ?, ?, ?, ?) """, ( summary["title"], summary["summary"], article_url, published_timestamp, feed_row["country"], ) ) stats['successful'] += 1 except Exception as db_error: logger.error( f"❌ Database error for article {i}: {db_error}") stats['failed'] += 1 continue await asyncio.sleep(0.01) # 10ms delay to yield control except Exception as e: logger.error( f"❌ Error processing feed { feed_row['url']}: { type(e).__name__}: {e}") return stats