Add endpoint to see all the entries for a webhook

This commit is contained in:
Joakim Hellsén 2026-03-07 19:00:11 +01:00
commit 5215b80643
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
5 changed files with 555 additions and 193 deletions

View file

@ -948,7 +948,7 @@ async def get_feed(feed_url: str, request: Request, starting_after: str = ""):
return templates.TemplateResponse(request=request, name="feed.html", context=context)
def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: PLR0914
def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -> str: # noqa: C901, PLR0914
"""Create HTML for the search results.
Args:
@ -988,6 +988,16 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -
if current_feed_url and source_feed_url != current_feed_url:
from_another_feed = f"<span class='badge bg-warning text-dark'>From another feed: {source_feed_url}</span>"
# Add feed link when viewing from webhook_entries or aggregated views
feed_link: str = ""
if not current_feed_url or source_feed_url != current_feed_url:
encoded_feed_url: str = urllib.parse.quote(source_feed_url)
feed_title: str = entry.feed.title if hasattr(entry.feed, "title") and entry.feed.title else source_feed_url
feed_link = (
f"<a class='text-muted' style='font-size: 0.85em;' "
f"href='/feed?feed_url={encoded_feed_url}'>{feed_title}</a><br>"
)
entry_id: str = urllib.parse.quote(entry.id)
to_discord_html: str = f"<a class='text-muted' href='/post_entry?entry_id={entry_id}'>Send to Discord</a>"
@ -1015,7 +1025,7 @@ def create_html_for_feed(entries: Iterable[Entry], current_feed_url: str = "") -
html += f"""<div class="p-2 mb-2 border border-dark">
{blacklisted}{whitelisted}{from_another_feed}<a class="text-muted text-decoration-none" href="{entry.link}"><h2>{entry.title}</h2></a>
{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html}
{feed_link}{f"By {entry.author} @" if entry.author else ""}{published} - {to_discord_html}
{text}
{video_embed_html}
@ -1397,6 +1407,109 @@ def extract_youtube_video_id(url: str) -> str | None:
return None
@app.get("/webhook_entries", response_class=HTMLResponse)
async def get_webhook_entries( # noqa: C901, PLR0912, PLR0914
webhook_url: str,
request: Request,
starting_after: str = "",
) -> HTMLResponse:
"""Get all latest entries from all feeds for a specific webhook.
Args:
webhook_url: The webhook URL to get entries for.
request: The request object.
starting_after: The entry to start after. Used for pagination.
Returns:
HTMLResponse: The webhook entries page.
Raises:
HTTPException: If no feeds are found for this webhook or webhook doesn't exist.
"""
entries_per_page: int = 20
clean_webhook_url: str = urllib.parse.unquote(webhook_url.strip())
# Get the webhook name from the webhooks list
webhooks: list[dict[str, str]] = cast("list[dict[str, str]]", list(reader.get_tag((), "webhooks", [])))
webhook_name: str = ""
for hook in webhooks:
if hook["url"] == clean_webhook_url:
webhook_name = hook["name"]
break
if not webhook_name:
raise HTTPException(status_code=404, detail=f"Webhook not found: {clean_webhook_url}")
# Get all feeds associated with this webhook
all_feeds: list[Feed] = list(reader.get_feeds())
webhook_feeds: list[Feed] = []
for feed in all_feeds:
try:
feed_webhook: str = str(reader.get_tag(feed.url, "webhook", ""))
if feed_webhook == clean_webhook_url:
webhook_feeds.append(feed)
except TagNotFoundError:
continue
# Get all entries from all feeds for this webhook, sorted by published date
all_entries: list[Entry] = [entry for feed in webhook_feeds for entry in reader.get_entries(feed=feed)]
# Sort entries by published date (newest first)
all_entries.sort(
key=lambda e: e.published or datetime.now(tz=UTC),
reverse=True,
)
# Handle pagination
if starting_after:
try:
start_after_entry: Entry | None = reader.get_entry((
starting_after.split("|", maxsplit=1)[0],
starting_after.split("|")[1],
))
except (FeedNotFoundError, EntryNotFoundError):
start_after_entry = None
else:
start_after_entry = None
# Find the index of the starting entry
start_index: int = 0
if start_after_entry:
for idx, entry in enumerate(all_entries):
if entry.id == start_after_entry.id and entry.feed.url == start_after_entry.feed.url:
start_index = idx + 1
break
# Get the page of entries
paginated_entries: list[Entry] = all_entries[start_index : start_index + entries_per_page]
# Get the last entry for pagination
last_entry: Entry | None = None
if paginated_entries:
last_entry = paginated_entries[-1]
# Create the html for the entries
html: str = create_html_for_feed(paginated_entries)
# Check if there are more entries available
total_entries: int = len(all_entries)
is_show_more_entries_button_visible: bool = (start_index + entries_per_page) < total_entries
context = {
"request": request,
"webhook_name": webhook_name,
"webhook_url": clean_webhook_url,
"entries": paginated_entries,
"html": html,
"last_entry": last_entry,
"is_show_more_entries_button_visible": is_show_more_entries_button_visible,
"total_entries": total_entries,
"feeds_count": len(webhook_feeds),
}
return templates.TemplateResponse(request=request, name="webhook_entries.html", context=context)
if __name__ == "__main__":
sentry_sdk.init(
dsn="https://6e77a0d7acb9c7ea22e85a375e0ff1f4@o4505228040339456.ingest.us.sentry.io/4508792887967744",