Hide paid ttvdrops images and update lint checks
Some checks failed
Test and build Docker image / docker (push) Failing after 3s

This commit is contained in:
Joakim Hellsén 2026-05-31 00:24:56 +02:00
commit 1065838ef7
Signed by: Joakim Hellsén
SSH key fingerprint: SHA256:/9h/CsExpFp+PRhsfA0xznFx2CGfTT5R/kpuFfUgEQk
11 changed files with 151 additions and 38 deletions

View file

@ -133,7 +133,7 @@ def extract_domain(url: str) -> str: # noqa: PLR0911
if not url:
return "Other"
try:
try: # noqa: PLW0717
# Special handling for YouTube feeds
if "youtube.com/feeds/videos.xml" in url:
return "YouTube"
@ -1219,13 +1219,13 @@ def _capture_full_page_screenshot_sync(
Returns:
bytes | None: PNG bytes on success, otherwise None.
"""
try:
try: # noqa: PLW0717
with sync_playwright() as playwright:
browser: Browser = playwright.chromium.launch(
headless=True,
args=["--disable-dev-shm-usage", "--no-sandbox"],
)
try:
try: # noqa: PLW0717
if screenshot_layout == "mobile":
page = browser.new_page(
viewport={"width": 390, "height": 844},
@ -1432,7 +1432,7 @@ def get_ttvdrops_reward_description(drop: JsonObject, reward: JsonObject) -> str
return reward_name
def extract_ttvdrops_media_gallery_items(value: JsonValue) -> list[JsonObject]: # noqa: C901
def extract_ttvdrops_media_gallery_items(value: JsonValue, *, hide_paid: bool = False) -> list[JsonObject]: # noqa: C901
"""Extract benefit/reward media gallery items from a ttvdrops API response.
Returns:
@ -1441,6 +1441,9 @@ def extract_ttvdrops_media_gallery_items(value: JsonValue) -> list[JsonObject]:
media_items: list[JsonObject] = []
def add_reward_image(drop: JsonObject, reward: JsonObject) -> None:
if hide_paid and json_value_to_int(drop.get("required_minutes_watched")) <= 0:
return
image_url = reward.get("image_url")
if isinstance(image_url, str):
add_unique_media_gallery_item(
@ -1491,7 +1494,8 @@ def fetch_ttvdrops_campaign_media_items(entry: Entry) -> list[JsonObject]:
logger.exception("Failed to fetch ttvdrops campaign data from %s", api_url)
return []
return extract_ttvdrops_media_gallery_items(response_json)
hide_paid: bool = "1" in parse_qs(urlparse(entry.feed.url).query).get("hide_paid", [])
return extract_ttvdrops_media_gallery_items(response_json, hide_paid=hide_paid)
def get_entry_media_gallery_items(

View file

@ -91,7 +91,7 @@ def setup_backup_repo(backup_path: Path) -> bool:
"""Ensure the backup directory exists and contains a git repository.
If the directory does not yet contain a ``.git`` folder a new repository is
initialised. A basic git identity is configured locally so that commits
initialized. A basic git identity is configured locally so that commits
succeed even in environments where a global ``~/.gitconfig`` is absent.
Args:
@ -100,12 +100,12 @@ def setup_backup_repo(backup_path: Path) -> bool:
Returns:
``True`` if the repository is ready, ``False`` on any error.
"""
try:
try: # noqa: PLW0717
backup_path.mkdir(parents=True, exist_ok=True)
git_dir: Path = backup_path / ".git"
if not git_dir.exists():
subprocess.run([GIT_EXECUTABLE, "init", str(backup_path)], check=True, capture_output=True) # noqa: S603
logger.info("Initialised git backup repository at %s", backup_path)
logger.info("Initialized git backup repository at %s", backup_path)
# Ensure a local identity exists so that `git commit` always works.
for key, value in (("user.email", "discord-rss-bot@localhost"), ("user.name", "discord-rss-bot")):
@ -155,7 +155,7 @@ def setup_backup_repo(backup_path: Path) -> bool:
def export_state(reader: Reader, backup_path: Path) -> None:
"""Serialise the current bot state to ``state.json`` inside *backup_path*.
"""Serialize the current bot state to ``state.json`` inside *backup_path*.
Args:
reader: The :class:`reader.Reader` instance to read state from.
@ -190,7 +190,7 @@ def export_state(reader: Reader, backup_path: Path) -> None:
if clean_layout in {"desktop", "mobile"}:
global_screenshot_layout = clean_layout
state: JsonObject = {"feeds": feeds_state, "webhooks": webhooks}
state: JsonObject = {"feeds": feeds_state, "webhooks": webhooks} # pyright: ignore[reportAssignmentType]
if global_update_interval is not None:
state["global_update_interval"] = global_update_interval
if global_screenshot_layout is not None:
@ -217,7 +217,7 @@ def commit_state_change(reader: Reader, message: str) -> None:
if not setup_backup_repo(backup_path):
return
try:
try: # noqa: PLW0717
export_state(reader, backup_path)
subprocess.run([GIT_EXECUTABLE, "-C", str(backup_path), "add", "-A"], check=True, capture_output=True) # noqa: S603

View file

@ -76,7 +76,7 @@ def fetch_hoyolab_post(post_id: str) -> JsonObject | None:
return None
http_ok = 200
try:
try: # noqa: PLW0717
url: str = f"https://bbs-api-os.hoyolab.com/community/post/wapi/getPostFull?post_id={post_id}"
response: requests.Response = requests.get(url, timeout=10)
@ -143,8 +143,8 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: JsonObject
)
if image_list:
image_url: str = str(image_list[0].get("url", ""))
image_height: int = int(image_list[0].get("height", 1080))
image_width: int = int(image_list[0].get("width", 1920))
image_height: int = int(image_list[0].get("height", "1080")) # pyright: ignore[reportArgumentType]
image_width: int = int(image_list[0].get("width", "1920")) # pyright: ignore[reportArgumentType]
logger.debug("Image URL: %s, Height: %s, Width: %s", image_url, image_height, image_width)
discord_embed.set_image(url=image_url, height=image_height, width=image_width)
@ -185,7 +185,7 @@ def create_hoyolab_webhook(webhook_url: str, entry: Entry, post_data: JsonObject
# Only show Youtube URL if available
structured_content: str = str(post.get("structured_content", ""))
if structured_content: # noqa: PLR1702
try:
try: # noqa: PLW0717
loaded_structured_content = cast("JsonValue", json.loads(structured_content))
structured_content_data: list[JsonObject] = (
[cast("JsonObject", item) for item in loaded_structured_content if isinstance(item, dict)]

View file

@ -239,9 +239,12 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
)
scheduler.start()
logger.info("Scheduler started.")
yield
reader.close()
scheduler.shutdown(wait=True)
try:
yield
finally:
reader.close()
scheduler.shutdown(wait=True)
app: FastAPI = FastAPI(lifespan=lifespan)