Group feeds by domain
This commit is contained in:
@ -3,7 +3,9 @@ from __future__ import annotations
|
||||
import datetime
|
||||
import logging
|
||||
import pprint
|
||||
import re
|
||||
from typing import TYPE_CHECKING
|
||||
from urllib.parse import ParseResult, urlparse
|
||||
|
||||
from discord_webhook import DiscordEmbed, DiscordWebhook
|
||||
from fastapi import HTTPException
|
||||
@ -29,6 +31,57 @@ if TYPE_CHECKING:
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_domain(url: str) -> str: # noqa: PLR0911
|
||||
"""Extract the domain name from a URL.
|
||||
|
||||
Args:
|
||||
url: The URL to extract the domain from.
|
||||
|
||||
Returns:
|
||||
str: The domain name, formatted for display.
|
||||
"""
|
||||
# Check for empty URL first
|
||||
if not url:
|
||||
return "Other"
|
||||
|
||||
try:
|
||||
# Special handling for YouTube feeds
|
||||
if "youtube.com/feeds/videos.xml" in url:
|
||||
return "YouTube"
|
||||
|
||||
# Special handling for Reddit feeds
|
||||
if "reddit.com" in url or (".rss" in url and "r/" in url):
|
||||
return "Reddit"
|
||||
|
||||
# Parse the URL and extract the domain
|
||||
parsed_url: ParseResult = urlparse(url)
|
||||
domain: str = parsed_url.netloc
|
||||
|
||||
# If we couldn't extract a domain, return "Other"
|
||||
if not domain:
|
||||
return "Other"
|
||||
|
||||
# Remove www. prefix if present
|
||||
domain = re.sub(r"^www\.", "", domain)
|
||||
|
||||
# Special handling for common domains
|
||||
domain_mapping: dict[str, str] = {"github.com": "GitHub"}
|
||||
|
||||
if domain in domain_mapping:
|
||||
return domain_mapping[domain]
|
||||
|
||||
# For other domains, capitalize the first part before the TLD
|
||||
parts: list[str] = domain.split(".")
|
||||
min_domain_parts = 2
|
||||
if len(parts) >= min_domain_parts:
|
||||
return parts[0].capitalize()
|
||||
|
||||
return domain.capitalize()
|
||||
except (ValueError, AttributeError, TypeError) as e:
|
||||
logger.warning("Error extracting domain from %s: %s", url, e)
|
||||
return "Other"
|
||||
|
||||
|
||||
def send_entry_to_discord(entry: Entry, custom_reader: Reader | None = None) -> str | None:
|
||||
"""Send a single entry to Discord.
|
||||
|
||||
|
@ -37,7 +37,7 @@ from discord_rss_bot.custom_message import (
|
||||
replace_tags_in_text_message,
|
||||
save_embed,
|
||||
)
|
||||
from discord_rss_bot.feeds import create_feed, send_entry_to_discord, send_to_discord
|
||||
from discord_rss_bot.feeds import create_feed, extract_domain, send_entry_to_discord, send_to_discord
|
||||
from discord_rss_bot.missing_tags import add_missing_tags
|
||||
from discord_rss_bot.search import create_html_for_search_results
|
||||
from discord_rss_bot.settings import get_reader
|
||||
@ -875,11 +875,12 @@ def make_context_index(request: Request):
|
||||
broken_feeds = []
|
||||
feeds_without_attached_webhook = []
|
||||
|
||||
# Get all feeds and organize them
|
||||
feeds: Iterable[Feed] = reader.get_feeds()
|
||||
for feed in feeds:
|
||||
try:
|
||||
webhook = reader.get_tag(feed.url, "webhook")
|
||||
feed_list.append({"feed": feed, "webhook": webhook})
|
||||
feed_list.append({"feed": feed, "webhook": webhook, "domain": extract_domain(feed.url)})
|
||||
except TagNotFoundError:
|
||||
broken_feeds.append(feed)
|
||||
continue
|
||||
|
@ -28,45 +28,66 @@
|
||||
{{ entry_count.averages[2]|round(1) }})
|
||||
</abbr>
|
||||
</p>
|
||||
<!-- Loop through the webhooks and add the feeds connected to them. -->
|
||||
|
||||
<!-- Loop through the webhooks and add the feeds grouped by domain -->
|
||||
{% for hook_from_context in webhooks %}
|
||||
<div class="p-2 mb-2 border border-dark">
|
||||
<h2 class="h5">
|
||||
<div class="p-2 mb-3 border border-dark">
|
||||
<h2 class="h5 mb-3">
|
||||
<a class="text-muted" href="/webhooks">{{ hook_from_context.name }}</a>
|
||||
</h2>
|
||||
<ul class="list-group">
|
||||
{% for feed_webhook in feeds %}
|
||||
{% set feed = feed_webhook["feed"] %}
|
||||
{% set hook_from_feed = feed_webhook["webhook"] %}
|
||||
{% if hook_from_context.url == hook_from_feed %}
|
||||
<div>
|
||||
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">
|
||||
{# Display username@youtube for YouTube feeds #}
|
||||
{% if "youtube.com/feeds/videos.xml" in feed.url %}
|
||||
{% if "user=" in feed.url %}
|
||||
{{ feed.url.split("user=")[1] }}@youtube
|
||||
{% elif "channel_id=" in feed.url %}
|
||||
{{ feed.title if feed.title else feed.url.split("channel_id=")[1] }}@youtube
|
||||
{% else %}
|
||||
{{ feed.url }}
|
||||
{% endif %}
|
||||
{% else %}
|
||||
{{ feed.url }}
|
||||
{% endif %}
|
||||
</a>
|
||||
{% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %}
|
||||
{% if feed.last_exception %}<span
|
||||
class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %}
|
||||
|
||||
<!-- Group feeds by domain within each webhook -->
|
||||
{% set feeds_for_hook = [] %}
|
||||
{% for feed_webhook in feeds %}
|
||||
{% if hook_from_context.url == feed_webhook.webhook %}
|
||||
{% set _ = feeds_for_hook.append(feed_webhook) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
{% if feeds_for_hook %}
|
||||
<!-- Create a dictionary to hold feeds grouped by domain -->
|
||||
{% set domains = {} %}
|
||||
{% for feed_item in feeds_for_hook %}
|
||||
{% set feed = feed_item.feed %}
|
||||
{% set domain = feed_item.domain %}
|
||||
{% if domain not in domains %}
|
||||
{% set _ = domains.update({domain: []}) %}
|
||||
{% endif %}
|
||||
{% set _ = domains[domain].append(feed) %}
|
||||
{% endfor %}
|
||||
|
||||
<!-- Display domains and their feeds -->
|
||||
{% for domain, domain_feeds in domains.items() %}
|
||||
<div class="card bg-dark border border-dark mb-2">
|
||||
<div class="card-header">
|
||||
<h3 class="h6 mb-0 text-white-50">{{ domain }} ({{ domain_feeds|length }})</h3>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</ul>
|
||||
<div class="card-body p-2">
|
||||
<ul class="list-group list-unstyled mb-0">
|
||||
{% for feed in domain_feeds %}
|
||||
<li>
|
||||
<a class="text-muted" href="/feed?feed_url={{ feed.url|encode_url }}">
|
||||
{% if feed.title %}{{ feed.title }}{% else %}{{ feed.url }}{% endif %}
|
||||
</a>
|
||||
{% if not feed.updates_enabled %}<span class="text-warning">Disabled</span>{% endif %}
|
||||
{% if feed.last_exception %}<span
|
||||
class="text-danger">({{ feed.last_exception.value_str }})</span>{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<p class="text-muted">No feeds associated with this webhook.</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<p>
|
||||
Hello there!
|
||||
<br>
|
||||
<br>
|
||||
You need to add a webhook <a class="text-muted" href="/add_webhook">here</a> to get started. After that, you can
|
||||
add feeds <a class="text-muted" href="/add">here</a>. You can find both of these links in the navigation bar
|
||||
above.
|
||||
@ -79,6 +100,7 @@
|
||||
Thanks!
|
||||
</p>
|
||||
{% endif %}
|
||||
|
||||
<!-- Show feeds without webhooks -->
|
||||
{% if broken_feeds %}
|
||||
<div class="p-2 mb-2 border border-dark">
|
||||
@ -103,6 +125,7 @@
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<!-- Show feeds that has no attached webhook -->
|
||||
{% if feeds_without_attached_webhook %}
|
||||
<div class="p-2 mb-2 border border-dark">
|
||||
|
@ -10,6 +10,7 @@ import pytest
|
||||
from reader import Feed, Reader, make_reader
|
||||
|
||||
from discord_rss_bot.feeds import (
|
||||
extract_domain,
|
||||
is_youtube_feed,
|
||||
send_entry_to_discord,
|
||||
send_to_discord,
|
||||
@ -202,3 +203,57 @@ def test_send_entry_to_discord_youtube_feed(
|
||||
|
||||
# Verify execute_webhook was called
|
||||
mock_execute_webhook.assert_called_once_with(mock_webhook, mock_entry)
|
||||
|
||||
|
||||
def test_extract_domain_youtube_feed() -> None:
|
||||
"""Test extract_domain for YouTube feeds."""
|
||||
url: str = "https://www.youtube.com/feeds/videos.xml?channel_id=123456"
|
||||
assert extract_domain(url) == "YouTube", "YouTube feeds should return 'YouTube' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_reddit_feed() -> None:
|
||||
"""Test extract_domain for Reddit feeds."""
|
||||
url: str = "https://www.reddit.com/r/Python/.rss"
|
||||
assert extract_domain(url) == "Reddit", "Reddit feeds should return 'Reddit' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_github_feed() -> None:
|
||||
"""Test extract_domain for GitHub feeds."""
|
||||
url: str = "https://www.github.com/user/repo"
|
||||
assert extract_domain(url) == "GitHub", "GitHub feeds should return 'GitHub' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_custom_domain() -> None:
|
||||
"""Test extract_domain for custom domains."""
|
||||
url: str = "https://www.example.com/feed"
|
||||
assert extract_domain(url) == "Example", "Custom domains should return the capitalized first part of the domain."
|
||||
|
||||
|
||||
def test_extract_domain_no_www_prefix() -> None:
|
||||
"""Test extract_domain removes 'www.' prefix."""
|
||||
url: str = "https://www.example.com/feed"
|
||||
assert extract_domain(url) == "Example", "The 'www.' prefix should be removed from the domain."
|
||||
|
||||
|
||||
def test_extract_domain_no_tld() -> None:
|
||||
"""Test extract_domain for domains without a TLD."""
|
||||
url: str = "https://localhost/feed"
|
||||
assert extract_domain(url) == "Localhost", "Domains without a TLD should return the capitalized domain."
|
||||
|
||||
|
||||
def test_extract_domain_invalid_url() -> None:
|
||||
"""Test extract_domain for invalid URLs."""
|
||||
url: str = "not-a-valid-url"
|
||||
assert extract_domain(url) == "Other", "Invalid URLs should return 'Other' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_empty_url() -> None:
|
||||
"""Test extract_domain for empty URLs."""
|
||||
url: str = ""
|
||||
assert extract_domain(url) == "Other", "Empty URLs should return 'Other' as the domain."
|
||||
|
||||
|
||||
def test_extract_domain_special_characters() -> None:
|
||||
"""Test extract_domain for URLs with special characters."""
|
||||
url: str = "https://www.ex-ample.com/feed"
|
||||
assert extract_domain(url) == "Ex-ample", "Domains with special characters should return the capitalized domain."
|
||||
|
@ -45,7 +45,7 @@ def test_search() -> None:
|
||||
# Check that the feed was added.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
# Search for an entry.
|
||||
response: Response = client.get(url="/search/?query=a")
|
||||
@ -85,7 +85,7 @@ def test_create_feed() -> None:
|
||||
# Check that the feed was added.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
|
||||
def test_get() -> None:
|
||||
@ -103,7 +103,7 @@ def test_get() -> None:
|
||||
# Check that the feed was added.
|
||||
response = client.get("/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
response: Response = client.get(url="/add")
|
||||
assert response.status_code == 200, f"/add failed: {response.text}"
|
||||
@ -157,7 +157,7 @@ def test_pause_feed() -> None:
|
||||
# Check that the feed was paused.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
|
||||
def test_unpause_feed() -> None:
|
||||
@ -184,7 +184,7 @@ def test_unpause_feed() -> None:
|
||||
# Check that the feed was unpaused.
|
||||
response = client.get(url="/")
|
||||
assert response.status_code == 200, f"Failed to get /: {response.text}"
|
||||
assert feed_url in response.text, f"Feed not found in /: {response.text}"
|
||||
assert encoded_feed_url(feed_url) in response.text, f"Feed not found in /: {response.text}"
|
||||
|
||||
|
||||
def test_remove_feed() -> None:
|
||||
|
Reference in New Issue
Block a user