Compare commits

..

2 commits

Author SHA1 Message Date
1065838ef7
Hide paid ttvdrops images and update lint checks
Some checks failed
Test and build Docker image / docker (push) Failing after 3s
2026-05-31 00:24:56 +02:00
23a619c9ea
Bump ruff pre-commit to v0.15.15 2026-05-31 00:23:51 +02:00
12 changed files with 152 additions and 39 deletions

View file

@ -38,7 +38,7 @@ repos:
# An extremely fast Python linter and formatter.
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.10
rev: v0.15.15
hooks:
- id: ruff-format
types_or: [ python, pyi, jupyter, pyproject ]

View file

@ -1,5 +1,7 @@
{
"cSpell.words": [
"appauthor",
"appname",
"argnames",
"argvalues",
"autoexport",
@ -20,6 +22,7 @@
"overwritable",
"pipx",
"pyproject",
"retcode",
"Skulbladi",
"thead",
"thelovinator",

View file

@ -133,7 +133,7 @@ def extract_domain(url: str) -> str: # noqa: PLR0911
if not url:
return "Other"
try:
try: # noqa: PLW0717
# Special handling for YouTube feeds
if "youtube.com/feeds/videos.xml" in url:
return "YouTube"
@ -1219,13 +1219,13 @@ def _capture_full_page_screenshot_sync(
Returns:
bytes | None: PNG bytes on success, otherwise None.
"""
try:
try: # noqa: PLW0717
with sync_playwright() as playwright:
browser: Browser = playwright.chromium.launch(
headless=True,
args=["--disable-dev-shm-usage", "--no-sandbox"],
)
try:
try: # noqa: PLW0717
if screenshot_layout == "mobile":
page = browser.new_page(
viewport={"width": 390, "height": 844},
@ -1432,7 +1432,7 @@ def get_ttvdrops_reward_description(drop: JsonObject, reward: JsonObject) -> str
return reward_name
def extract_ttvdrops_media_gallery_items(value: JsonValue) -> list[JsonObject]: # noqa: C901
def extract_ttvdrops_media_gallery_items(value: JsonValue, *, hide_paid: bool = False) -> list[JsonObject]: # noqa: C901
"""Extract benefit/reward media gallery items from a ttvdrops API response.
Returns:
@ -1441,6 +1441,9 @@ def extract_ttvdrops_media_gallery_items(value: JsonValue) -> list[JsonObject]:
media_items: list[JsonObject] = []
def add_reward_image(drop: JsonObject, reward: JsonObject) -> None:
if hide_paid and json_value_to_int(drop.get("required_minutes_watched")) <= 0:
return
image_url = reward.get("image_url")
if isinstance(image_url, str):
add_unique_media_gallery_item(
@ -1491,7 +1494,8 @@ def fetch_ttvdrops_campaign_media_items(entry: Entry) -> list[JsonObject]:
logger.exception("Failed to fetch ttvdrops campaign data from %s", api_url)
return []
return extract_ttvdrops_media_gallery_items(response_json)
hide_paid: bool = "1" in parse_qs(urlparse(entry.feed.url).query).get("hide_paid", [])
return extract_ttvdrops_media_gallery_items(response_json, hide_paid=hide_paid)
def get_entry_media_gallery_items(

View file

@ -91,7 +91,7 @@ def setup_backup_repo(backup_path: Path) -> bool:
"""Ensure the backup directory exists and contains a git repository.
If the directory does not yet contain a ``.git`` folder a new repository is
initialised. A basic git identity is configured locally so that commits
initialized. A basic git identity is configured locally so that commits
succeed even in environments where a global ``~/.gitconfig`` is absent.
Args:
@ -100,12 +100,12 @@ def setup_backup_repo(backup_path: Path) -> bool:
Returns:
``True`` if the repository is ready, ``False`` on any error.
"""
try:
try: # noqa: PLW0717
backup_path.mkdir(parents=True, exist_ok=True)
git_dir: Path = backup_path / ".git"
if not git_dir.exists():
subprocess.run([GIT_EXECUTABLE, "init", str(backup_path)], check=True, capture_output=True) # noqa: S603
logger.info("Initialised git backup repository at %s", backup_path)
logger.info("Initialized git backup repository at %s", backup_path)
# Ensure a local identity exists so that `git commit` always works.
for key, value in (("user.email", "discord-rss-bot@localhost"), ("user.name", "discord-rss-bot")):
@ -155,7 +155,7 @@ def setup_backup_repo(backup_path: Path) -> bool:
def export_state(reader: Reader, backup_path: Path) -> None:
"""Serialise the current bot state to ``state.json`` inside *backup_path*.
"""Serialize the current bot state to ``state.json`` inside *backup_path*.
Args:
reader: The :class:`reader.Reader` instance to read state from.
@ -190,7 +190,7 @@ def export_state(reader: Reader, backup_path: Path) -> None:
if clean_layout in {"desktop", "mobile"}:
global_screenshot_layout = clean_layout
state: JsonObject = {"feeds": feeds_state, "webhooks": webhooks}
state: JsonObject = {"feeds": feeds_state, "webhooks": webhooks} # pyright: ignore[reportAssignmentType]
if global_update_interval is not None:
state["global_update_interval"] = global_update_interval
if global_screenshot_layout is not None:
@ -217,7 +217,7 @@ def commit_state_change(reader: Reader, message: str) -> None:
if not setup_backup_repo(backup_path):
return
try:
try: # noqa: PLW0717
export_state(reader, backup_path)
subprocess.run([GIT_EXECUTABLE, "-C", str(backup_path), "add", "-A"], check=True, capture_output=True) # noqa: S603

View file

@ -76,7 +76,7 @@ def fetch_hoyolab_post(post_id: str) -> JsonObject | None:
return None
http_ok = 200
try:
try: # noqa: PLW0717
url: str = f"https://bbs-api-os.hoyolab.com/community/post/wapi/getPostFull?post_id={post_id}"
response: requests.Response = requests.get(url, timeout=10)
@ -143,8 +143,8 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: JsonObject
)
if image_list:
image_url: str = str(image_list[0].get("url", ""))
image_height: int = int(image_list[0].get("height", 1080))
image_width: int = int(image_list[0].get("width", 1920))
image_height: int = int(image_list[0].get("height", "1080")) # pyright: ignore[reportArgumentType]
image_width: int = int(image_list[0].get("width", "1920")) # pyright: ignore[reportArgumentType]
logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width)
discord_embed.set_image(url=image_url, height=image_height, width=image_width)
@ -185,7 +185,7 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: JsonObject
# Only show Youtube URL if available
structured_content: str = str(post.get("structured_content", ""))
if structured_content: # noqa: PLR1702
try:
try: # noqa: PLW0717
loaded_structured_content = cast("JsonValue", json.loads(structured_content))
structured_content_data: list[JsonObject] = (
[cast("JsonObject", item) for item in loaded_structured_content if isinstance(item, dict)]

View file

@ -239,9 +239,12 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
)
scheduler.start()
logger.info("Scheduler started.")
yield
reader.close()
scheduler.shutdown(wait=True)
try:
yield
finally:
reader.close()
scheduler.shutdown(wait=True)
app: FastAPI = FastAPI(lifespan=lifespan)

View file

@ -59,7 +59,7 @@ def pytest_sessionstart(session: pytest.Session) -> None:
# the worker-specific location.
settings_module: ModuleType | None = sys.modules.get("discord_rss_bot.settings")
if settings_module is not None:
settings_module.data_dir = str(worker_data_dir)
settings_module.data_dir = str(worker_data_dir) # pyright: ignore[reportAttributeAccessIssue]
get_reader_attr = getattr(settings_module, "get_reader", None)
if get_reader_attr is not None and hasattr(get_reader_attr, "cache_clear"):
get_reader = cast("CachedReaderFactory", get_reader_attr)

View file

@ -102,6 +102,6 @@ def test_pytest_collection_modifyitems_noops_when_real_git_backup_tests_enabled(
config.getoption.return_value = True
items: list[MagicMock] = [MagicMock()]
hooks.pytest_collection_modifyitems(config=config, items=items)
hooks.pytest_collection_modifyitems(config=config, items=items) # pyright: ignore[reportArgumentType]
config.getoption.assert_called_once_with("--run-real-git-backup-tests")

View file

@ -781,8 +781,24 @@ def test_get_ttvdrops_campaign_api_url_from_campaign_page() -> None:
assert api_url == "https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
@pytest.mark.parametrize(
("feed_url", "include_paid_reward"),
[
("https://ttvdrops.lovinator.space/twitch/feed.xml", True),
("https://ttvdrops.lovinator.space/twitch/feed.xml?hide_paid=0", True),
("https://ttvdrops.lovinator.space/twitch/feed.xml?hide_paid=true", True),
("https://ttvdrops.lovinator.space/twitch/feed.xml?hide_paid=1", False),
("https://ttvdrops.lovinator.space/twitch/feed.xml?lang=en&hide_paid=1", False),
("https://ttvdrops.lovinator.space/twitch/feed.xml?hide_paid=0&hide_paid=1", False),
],
)
@patch("discord_rss_bot.feeds.httpx.get")
def test_fetch_ttvdrops_campaign_media_items_extracts_reward_alt_text(mock_get: MagicMock) -> None:
def test_fetch_ttvdrops_campaign_media_items_extracts_reward_alt_text(
mock_get: MagicMock,
feed_url: str,
*,
include_paid_reward: bool,
) -> None:
response = MagicMock()
response.status_code = 200
response.json.return_value = {
@ -796,22 +812,38 @@ def test_fetch_ttvdrops_campaign_media_items_extracts_reward_alt_text(mock_get:
{"image_url": "javascript:alert(1)"},
],
},
{
"name": "Paid drop",
"required_minutes_watched": 0,
"required_subs": 2,
"benefits": [
{"name": "Pay2win reward", "image_url": "/media/benefits/images/paid-reward.png"},
],
},
],
}
mock_get.return_value = response
entry = MagicMock()
entry.link = "https://ttvdrops.lovinator.space/twitch/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/"
entry.id = "entry-4"
entry.feed.url = "https://example.com/feed.xml"
entry.feed.url = feed_url
media_items = feeds.fetch_ttvdrops_campaign_media_items(entry)
assert media_items == [
expected_media_items: list[JsonObject] = [
{
"url": "https://ttvdrops.lovinator.space/media/benefits/images/reward.png",
"description": "120 minutes watched: Skulbladi",
},
]
if include_paid_reward:
expected_media_items.append(
{
"url": "https://ttvdrops.lovinator.space/media/benefits/images/paid-reward.png",
"description": "2 subscriptions: Pay2win reward",
},
)
assert media_items == expected_media_items
mock_get.assert_called_once_with(
"https://ttvdrops.lovinator.space/twitch/api/v1/campaigns/93ba35ae-5bfc-43fe-88ac-49a0aabb2fe2/",
follow_redirects=True,
@ -819,6 +851,78 @@ def test_fetch_ttvdrops_campaign_media_items_extracts_reward_alt_text(mock_get:
)
def test_extract_ttvdrops_media_gallery_items_includes_paid_rewards_by_default() -> None:
media_items = feeds.extract_ttvdrops_media_gallery_items(
{
"drops": [
{
"required_subs": 1,
"benefits": [{"name": "Paid reward", "image_url": "/media/paid.png"}],
},
],
},
)
assert media_items == [
{
"url": "https://ttvdrops.lovinator.space/media/paid.png",
"description": "1 subscriptions: Paid reward",
},
]
def test_extract_ttvdrops_media_gallery_items_hide_paid_omits_non_watch_rewards() -> None:
media_items = feeds.extract_ttvdrops_media_gallery_items(
{
"drops": [
{
"required_minutes_watched": 30,
"benefits": [{"name": "Watch reward", "image_url": "/media/watch.png"}],
},
{
"required_minutes_watched": 0,
"required_subs": 1,
"benefits": [{"name": "Paid reward", "image_url": "/media/paid.png"}],
},
{
"benefits": [{"name": "Unknown reward", "image_url": "/media/unknown.png"}],
},
],
},
hide_paid=True,
)
assert media_items == [
{
"url": "https://ttvdrops.lovinator.space/media/watch.png",
"description": "30 minutes watched: Watch reward",
},
]
def test_extract_ttvdrops_media_gallery_items_extracts_nested_watch_rewards() -> None:
media_items = feeds.extract_ttvdrops_media_gallery_items(
{
"campaign": {
"drops": [
{
"required_minutes_watched": 45,
"rewards": [{"name": "Nested reward", "image_url": "/media/nested.png"}],
},
],
},
},
hide_paid=True,
)
assert media_items == [
{
"url": "https://ttvdrops.lovinator.space/media/nested.png",
"description": "45 minutes watched: Nested reward",
},
]
def test_capture_full_page_screenshot_uses_thread_when_loop_running() -> None:
"""Capture should offload sync Playwright work when called from an active event loop."""
with patch("discord_rss_bot.feeds._capture_full_page_screenshot_sync", return_value=b"png") as mock_capture_sync:

View file

@ -4,7 +4,6 @@ import contextlib
import json
import shutil
import subprocess # noqa: S404
from pathlib import Path
from typing import TYPE_CHECKING
from typing import cast
from unittest.mock import MagicMock
@ -196,8 +195,8 @@ def test_export_state_creates_state_json(tmp_path: Path) -> None:
data = cast("JsonObject", json.loads(state_file.read_text(encoding="utf-8")))
assert "feeds" in data
assert "webhooks" in data
assert data["feeds"][0]["url"] == "https://example.com/feed.rss"
assert data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc"
assert data["feeds"][0]["url"] == "https://example.com/feed.rss" # type: ignore
assert data["feeds"][0]["webhook"] == "https://discord.com/api/webhooks/123/abc" # type: ignore
def test_export_state_omits_empty_tags(tmp_path: Path) -> None:
@ -227,7 +226,7 @@ def test_export_state_omits_empty_tags(tmp_path: Path) -> None:
data = cast("JsonObject", json.loads((backup_path / "state.json").read_text()))
# Only "url" key should be present (no empty-value tags)
assert list(data["feeds"][0].keys()) == ["url"]
assert list(data["feeds"][0].keys()) == ["url"] # type: ignore
def test_commit_state_change_noop_when_not_configured(monkeypatch: pytest.MonkeyPatch) -> None:
@ -575,7 +574,7 @@ def test_embed_backup_end_to_end(monkeypatch: pytest.MonkeyPatch, tmp_path: Path
state_data = cast("JsonObject", json.loads(state_file.read_text(encoding="utf-8")))
# Find our test feed in the state
test_feed_data = next((feed for feed in state_data["feeds"] if feed["url"] == test_feed_url), None)
test_feed_data = next((feed for feed in state_data["feeds"] if feed["url"] == test_feed_url), None) # type: ignore
assert test_feed_data is not None, f"Test feed not found in state.json: {state_data}"
# The embed settings are stored as a nested dict under custom_embed tag

View file

@ -174,7 +174,7 @@ class TestCreateHoyolabWebhook:
entry = make_entry(link=None)
entry = typing.cast("Entry", entry)
webhook = create_hoyolab_webhook("https://discord.test/webhook", entry, post_data)
webhook = create_hoyolab_webhook("https://discord.test/webhook", entry, post_data) # type: ignore
assert webhook is webhook_instance
mock_webhook_cls.assert_called_once_with(url="https://discord.test/webhook", rate_limit_retry=True)
@ -222,7 +222,7 @@ class TestCreateHoyolabWebhook:
entry = make_entry()
entry = typing.cast("Entry", entry)
webhook = create_hoyolab_webhook("https://discord.test/webhook", entry, post_data)
webhook = create_hoyolab_webhook("https://discord.test/webhook", entry, post_data) # type: ignore
assert webhook is webhook_instance
webhook_instance.remove_embeds.assert_not_called()

View file

@ -438,7 +438,7 @@ def test_blacklist_preview_shows_labeled_field_values_for_substring_match() -> N
stub_reader = StubReader()
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
try:
try: # noqa: PLW0717
with patch("discord_rss_bot.main.create_html_for_feed", return_value="<div>Rendered</div>"):
response: Response = client.get(
url="/blacklist_preview",
@ -656,7 +656,7 @@ def test_sent_webhooks_view_shows_saved_records() -> None:
app.dependency_overrides[get_reader_dependency] = StubReader
try:
try: # noqa: PLW0717
response: Response = client.get(url="/sent_webhooks")
assert response.status_code == 200, f"/sent_webhooks failed: {response.text}"
@ -1224,7 +1224,7 @@ def test_post_entry_uses_feed_url_to_disambiguate_duplicate_ids() -> None:
app.dependency_overrides[get_reader_dependency] = StubReader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
try: # noqa: PLW0717
with patch("discord_rss_bot.main.send_entry_to_discord", side_effect=fake_send_entry_to_discord):
response: Response = no_redirect_client.get(
url="/post_entry",
@ -1960,7 +1960,7 @@ def test_webhook_entries_mass_update_preview_shows_old_and_new_urls() -> None:
return []
app.dependency_overrides[get_reader_dependency] = StubReader
try:
try: # noqa: PLW0717
with (
patch(
"discord_rss_bot.main.get_data_from_hook_url",
@ -2040,7 +2040,7 @@ def test_bulk_change_feed_urls_updates_matching_feeds() -> None:
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
try: # noqa: PLW0717
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
@ -2095,7 +2095,7 @@ def test_webhook_entries_mass_update_preview_fragment_endpoint() -> None:
return self._feeds
app.dependency_overrides[get_reader_dependency] = StubReader
try:
try: # noqa: PLW0717
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
@ -2168,7 +2168,7 @@ def test_bulk_change_feed_urls_force_update_overwrites_conflict() -> None: # no
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
try: # noqa: PLW0717
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
side_effect=lambda url: (url.replace("old.example.com", "new.example.com"), None),
@ -2242,7 +2242,7 @@ def test_bulk_change_feed_urls_force_update_ignores_resolution_error() -> None:
app.dependency_overrides[get_reader_dependency] = lambda: stub_reader
no_redirect_client = TestClient(app, follow_redirects=False)
try:
try: # noqa: PLW0717
with patch(
"discord_rss_bot.main.resolve_final_feed_url",
return_value=("https://new.example.com/rss/a.xml", "HTTP 404"),