diff --git a/README.md b/README.md index c74054a..9594164 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,14 @@ _Note: Some features are currently in development._ - Add your favorite feeds to start archiving content. - Explore, manage, and enjoy your centralized feed archive. +### CLI + +There is a CLI available for FeedVault. You can run the CLI with the following command: + +```bash +python -m cli +``` + ## Contributing Feel free to contribute to the project. If you have any questions, please open an issue. diff --git a/app/cli.py b/app/cli.py deleted file mode 100644 index c0ca47d..0000000 --- a/app/cli.py +++ /dev/null @@ -1,167 +0,0 @@ -from __future__ import annotations - -import sys -import traceback -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from typing import TYPE_CHECKING - -import requests -import typer -from reader import Feed, ParseError, Reader, StorageError, UpdatedFeed, UpdateError, UpdateResult -from rich import print - -from app.dependencies import get_reader -from app.scrapers.rss_link_database import scrape -from app.settings import DATA_DIR - -if TYPE_CHECKING: - from collections.abc import Iterable - -app = typer.Typer( - name="FeedVault CLI", - no_args_is_help=True, -) - - -def _add_broken_feed_to_csv(feed: Feed | UpdateResult | None) -> None: - """Add a broken feed to a CSV file.""" - if feed is None: - print("Feed is None.") - return - - with Path("broken_feeds.csv").open("a", encoding="utf-8") as f: - f.write(f"{feed.url}\n") - - -@app.command( - name="update_feeds", - help="Update all the feeds.", -) -def update_feeds() -> None: - """Update all the feeds.""" - reader: Reader = get_reader() - print("Updating feeds...") - - feeds: Iterable[Feed] = reader.get_feeds(broken=False, updates_enabled=True, new=True) - - total_feeds: int | None = reader.get_feed_counts(broken=False, updates_enabled=True).total - if not total_feeds: - print("[bold red]No feeds to update[/bold red]") - return - - print(f"Feeds to update: {total_feeds}") - - def update_feed(feed: Feed) -> None: - try: - updated_feed: UpdatedFeed | None = reader.update_feed(feed) - if updated_feed is not None: - print( - f"New: [green]{updated_feed.new}[/green], modified: [yellow]{updated_feed.modified}[/yellow], unmodified: {updated_feed.unmodified} - {feed.url}", # noqa: E501 - ) - - except ParseError as e: - print(f"[bold red]Error parsing feed[/bold red]: {feed.url} ({e})") - - except UpdateError as e: - print(f"[bold red]Error updating feed[/bold red]: {feed.url} ({e})") - - except StorageError as e: - print(f"[bold red]Error updating feed[/bold red]: {feed.url}") - print(f"[bold red]Storage error[/bold red]: {e}") - - except AssertionError: - print(f"[bold red]Assertion error[/bold red]: {feed.url}") - traceback.print_exc(file=sys.stderr) - reader.disable_feed_updates(feed) - _add_broken_feed_to_csv(feed) - - except KeyboardInterrupt: - print("[bold red]Keyboard interrupt[/bold red]") - reader.close() - sys.exit(1) - - with ThreadPoolExecutor(max_workers=50) as executor: - executor.map(update_feed, feeds) - - print(f"Updated {total_feeds} feeds.") - - -@app.command( - name="download_steam_ids", - help="Download Steam IDs from the Steam API.", -) -def download_steam_ids() -> None: - """Download Steam IDs from "https://api.steampowered.com/ISteamApps/GetAppList/v2/".""" - print("Downloading Steam IDs...") - - r: requests.Response = requests.get("https://api.steampowered.com/ISteamApps/GetAppList/v2/", timeout=10) - r.raise_for_status() - - data: dict[str, dict[str, list[dict[str, str]]]] = r.json() - app_ids: list[dict[str, str]] = data["applist"]["apps"] - - file_path: Path = Path(DATA_DIR) / "steam_ids.txt" - with file_path.open("w", encoding="utf-8") as f: - for app_id in app_ids: - f.write(f"{app_id["appid"]}\n") - - print(f"Steam IDs downloaded. {len(app_ids)} IDs saved to {file_path}.") - - -@app.command( - name="add_steam_feeds", - help="Add Steam feeds to the reader. Needs 'download_steam_ids' to be run first.", -) -def add_steam_feeds() -> None: - """Add the ids from "steam_ids.txt" to the reader.""" - reader: Reader = get_reader() - print("Adding Steam feeds...") - - file_path: Path = Path(DATA_DIR) / "steam_ids.txt" - if not file_path.exists(): - print("File not found.") - return - - with file_path.open("r", encoding="utf-8") as f: - steam_ids: list[str] = f.read().splitlines() - - for count, steam_id in enumerate(steam_ids): - try: - reader.add_feed(f"https://store.steampowered.com/feeds/news/app/{steam_id}") - print(f"[{count}/{len(steam_ids)}] Added feed: {steam_id}") - - except ParseError as e: - print(f"[bold red]Error parsing feed[/bold red] ({e})") - - except UpdateError as e: - print(f"[bold red]Error updating feed[/bold red] ({e})") - - except StorageError as e: - print(f"[bold red]Error updating feed[/bold red] ({e})") - - except AssertionError as e: - print(f"[bold red]Assertion error[/bold red] ({e})") - traceback.print_exc(file=sys.stderr) - - except KeyboardInterrupt: - print("[bold red]Keyboard interrupt[/bold red]") - reader.close() - sys.exit(1) - - print(f"Added {len(steam_ids)} Steam feeds.") - - -@app.command( - name="grab_links", - help="Grab RSS feeds from different sources.", -) -def grab_links() -> None: - """Grab RSS feeds from different sources.""" - print("Grabbing links...") - rss_links: str = scrape() - print(rss_links) - - -if __name__ == "__main__": - app() diff --git a/app/routers/static.py b/app/routers/static.py index 8cec484..0ef8b23 100644 --- a/app/routers/static.py +++ b/app/routers/static.py @@ -37,7 +37,11 @@ async def favicon(request: Request): async def index(request: Request, reader: CommonReader, stats: CommonStats): """Index page.""" feeds: Iterable[Feed] = reader.get_feeds(limit=15) - return templates.TemplateResponse(request=request, name="index.html", context={"feeds": feeds, "stats": stats}) + return templates.TemplateResponse( + request=request, + name="index.html", + context={"feeds": feeds, "stats": stats}, + ) @static_router.get(path="/feeds", summary="Feeds page.", tags=["HTML"]) @@ -65,12 +69,22 @@ async def feeds( return templates.TemplateResponse( request=request, name="feeds.html", - context={"feeds": feeds, "stats": stats, "next_url": next_url, "prev_url": prev_url}, + context={ + "feeds": feeds, + "stats": stats, + "next_url": next_url, + "prev_url": prev_url, + }, ) @static_router.get(path="/feed/{feed_url:path}", summary="Feed page.", tags=["HTML"]) -async def feed(request: Request, feed_url: str, reader: CommonReader, stats: CommonStats): +async def feed( + request: Request, + feed_url: str, + reader: CommonReader, + stats: CommonStats, +): """Feed page.""" feed: Feed = reader.get_feed(feed_url) entries = list(reader.get_entries(feed=feed.url)) @@ -94,9 +108,13 @@ async def search( # noqa: PLR0913, PLR0917 ): """Search page.""" if next_feed and next_entry: - entries = list(reader.search_entries(q, starting_after=(next_feed, next_entry), limit=15)) + entries = list( + reader.search_entries(q, starting_after=(next_feed, next_entry), limit=15), + ) elif prev_feed and prev_entry: - entries = list(reader.search_entries(q, starting_after=(prev_feed, prev_entry), limit=15)) + entries = list( + reader.search_entries(q, starting_after=(prev_feed, prev_entry), limit=15), + ) else: entries = list(reader.search_entries(q, limit=15)) @@ -170,13 +188,21 @@ async def upload_files(request: Request, files: list[UploadFile] = File(...)): @static_router.get(path="/upload", summary="Upload page.", tags=["HTML"]) async def upload_page(request: Request, stats: CommonStats): """Upload page.""" - return templates.TemplateResponse(request=request, name="upload.html", context={"stats": stats}) + return templates.TemplateResponse( + request=request, + name="upload.html", + context={"stats": stats}, + ) @static_router.get(path="/contact", summary="Contact page.", tags=["HTML"]) async def contact(request: Request, stats: CommonStats): """Contact page.""" - return templates.TemplateResponse(request=request, name="contact.html", context={"stats": stats}) + return templates.TemplateResponse( + request=request, + name="contact.html", + context={"stats": stats}, + ) @static_router.post(path="/contact", summary="Contact page.", tags=["HTML"]) @@ -192,11 +218,19 @@ async def contact_form(request: Request, stats: CommonStats, message: str = Form @static_router.get(path="/add", summary="Add feeds page.", tags=["HTML"]) async def add_page(request: Request, stats: CommonStats): """Add feeds page.""" - return templates.TemplateResponse(request=request, name="add.html", context={"stats": stats}) + return templates.TemplateResponse( + request=request, + name="add.html", + context={"stats": stats}, + ) @static_router.post(path="/add", summary="Add feeds page.", tags=["HTML"]) -async def add_feed(reader: CommonReader, stats: CommonStats, feed_urls: str = Form(...)): +async def add_feed( + reader: CommonReader, + stats: CommonStats, + feed_urls: str = Form(...), +): """Add feeds page.""" feed_info: list[dict[str, str]] = [] # Each line is a feed URL. diff --git a/app/scrapers/rss_link_database.py b/app/scrapers/rss_link_database.py index 5c485c3..9cc93f7 100644 --- a/app/scrapers/rss_link_database.py +++ b/app/scrapers/rss_link_database.py @@ -6,7 +6,7 @@ import orjson from click import echo -def scrape(): +def scrape() -> str: """Scrape. Raises: @@ -17,7 +17,7 @@ def scrape(): msg = "RSS-Link-Database repository not found." raise FileNotFoundError(msg) - rss_links = [] + rss_links: list[str] = [] for file in repository_path.glob("*.json"): echo(f"Scraping {file.name}...") diff --git a/app/settings.py b/app/settings.py index b387bf8..92ae638 100644 --- a/app/settings.py +++ b/app/settings.py @@ -4,6 +4,10 @@ from pathlib import Path from platformdirs import user_data_dir -DATA_DIR: str = user_data_dir(appname="FeedVault", appauthor="TheLovinator", roaming=True) +DATA_DIR: str = user_data_dir( + appname="FeedVault", + appauthor="TheLovinator", + roaming=True, +) DB_PATH: Path = Path(DATA_DIR) / "reader.sqlite" MEDIA_ROOT: Path = Path(DATA_DIR) / "uploads" diff --git a/cli/__init__.py b/cli/__init__.py new file mode 100644 index 0000000..1e6f6aa --- /dev/null +++ b/cli/__init__.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from .add_steam_feeds import add_steam_feeds +from .download_steam_ids import download_steam_ids +from .grab_links import grab_links +from .update_feeds import update_feeds + +__all__: list[str] = [ + "add_steam_feeds", + "download_steam_ids", + "grab_links", + "update_feeds", +] diff --git a/cli/__main__.py b/cli/__main__.py new file mode 100644 index 0000000..5925c24 --- /dev/null +++ b/cli/__main__.py @@ -0,0 +1,4 @@ +from cli.cli import app + +if __name__ == "__main__": + app() diff --git a/cli/add_steam_feeds.py b/cli/add_steam_feeds.py new file mode 100644 index 0000000..8b07a44 --- /dev/null +++ b/cli/add_steam_feeds.py @@ -0,0 +1,53 @@ +import sys +import traceback +from pathlib import Path + +from reader import ParseError, Reader, StorageError, UpdateError +from rich import print + +from app.dependencies import get_reader +from app.settings import DATA_DIR +from cli.cli import app + + +@app.command( + name="add_steam_feeds", + help="Add Steam feeds to the reader. Needs 'download_steam_ids' to be run first.", +) +def add_steam_feeds() -> None: + """Add the ids from "steam_ids.txt" to the reader.""" + reader: Reader = get_reader() + print("Adding Steam feeds...") + + file_path: Path = Path(DATA_DIR) / "steam_ids.txt" + if not file_path.exists(): + print("File not found.") + return + + with file_path.open("r", encoding="utf-8") as f: + steam_ids: list[str] = f.read().splitlines() + + for count, steam_id in enumerate(steam_ids): + try: + reader.add_feed(f"https://store.steampowered.com/feeds/news/app/{steam_id}") + print(f"[{count}/{len(steam_ids)}] Added feed: {steam_id}") + + except ParseError as e: + print(f"[bold red]Error parsing feed[/bold red] ({e})") + + except UpdateError as e: + print(f"[bold red]Error updating feed[/bold red] ({e})") + + except StorageError as e: + print(f"[bold red]Error updating feed[/bold red] ({e})") + + except AssertionError as e: + print(f"[bold red]Assertion error[/bold red] ({e})") + traceback.print_exc(file=sys.stderr) + + except KeyboardInterrupt: + print("[bold red]Keyboard interrupt[/bold red]") + reader.close() + sys.exit(1) + + print(f"Added {len(steam_ids)} Steam feeds.") diff --git a/cli/cli.py b/cli/cli.py new file mode 100644 index 0000000..1f18bb3 --- /dev/null +++ b/cli/cli.py @@ -0,0 +1,6 @@ +import typer + +app = typer.Typer( + name="FeedVault CLI", + no_args_is_help=True, +) diff --git a/cli/download_steam_ids.py b/cli/download_steam_ids.py new file mode 100644 index 0000000..fab501f --- /dev/null +++ b/cli/download_steam_ids.py @@ -0,0 +1,32 @@ +from pathlib import Path + +import requests +from rich import print + +from app.settings import DATA_DIR +from cli.cli import app + + +@app.command( + name="download_steam_ids", + help="Download Steam IDs from the Steam API.", +) +def download_steam_ids() -> None: + """Download Steam IDs from "https://api.steampowered.com/ISteamApps/GetAppList/v2/".""" + print("Downloading Steam IDs...") + + r: requests.Response = requests.get( + "https://api.steampowered.com/ISteamApps/GetAppList/v2/", + timeout=10, + ) + r.raise_for_status() + + data: dict[str, dict[str, list[dict[str, str]]]] = r.json() + app_ids: list[dict[str, str]] = data["applist"]["apps"] + + file_path: Path = Path(DATA_DIR) / "steam_ids.txt" + with file_path.open("w", encoding="utf-8") as f: + for app_id in app_ids: + f.write(f"{app_id["appid"]}\n") + + print(f"Steam IDs downloaded. {len(app_ids)} IDs saved to {file_path}.") diff --git a/cli/grab_links.py b/cli/grab_links.py new file mode 100644 index 0000000..43a9a1b --- /dev/null +++ b/cli/grab_links.py @@ -0,0 +1,15 @@ +from rich import print + +from app.scrapers.rss_link_database import scrape +from cli.cli import app + + +@app.command( + name="grab_links", + help="Grab RSS feeds from different sources.", +) +def grab_links() -> None: + """Grab RSS feeds from different sources.""" + print("Grabbing links...") + rss_links: str = scrape() + print(rss_links) diff --git a/cli/update_feeds.py b/cli/update_feeds.py new file mode 100644 index 0000000..c80578e --- /dev/null +++ b/cli/update_feeds.py @@ -0,0 +1,92 @@ +import sys +import traceback +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import TYPE_CHECKING + +from reader import ( + Feed, + ParseError, + Reader, + StorageError, + UpdatedFeed, + UpdateError, + UpdateResult, +) +from rich import print + +from app.dependencies import get_reader +from cli.cli import app + +if TYPE_CHECKING: + from collections.abc import Iterable + + +def _add_broken_feed_to_csv(feed: Feed | UpdateResult | None) -> None: + """Add a broken feed to a CSV file.""" + if feed is None: + print("Feed is None.") + return + + with Path("broken_feeds.csv").open("a", encoding="utf-8") as f: + f.write(f"{feed.url}\n") + + +@app.command( + name="update_feeds", + help="Update all the feeds.", +) +def update_feeds() -> None: + """Update all the feeds.""" + reader: Reader = get_reader() + print("Updating feeds...") + + feeds: Iterable[Feed] = reader.get_feeds( + broken=False, + updates_enabled=True, + new=True, + ) + + total_feeds: int | None = reader.get_feed_counts( + broken=False, + updates_enabled=True, + ).total + if not total_feeds: + print("[bold red]No feeds to update[/bold red]") + return + + print(f"Feeds to update: {total_feeds}") + + def update_feed(feed: Feed) -> None: + try: + updated_feed: UpdatedFeed | None = reader.update_feed(feed) + if updated_feed is not None: + print( + f"New: [green]{updated_feed.new}[/green], modified: [yellow]{updated_feed.modified}[/yellow], unmodified: {updated_feed.unmodified} - {feed.url}", # noqa: E501 + ) + + except ParseError as e: + print(f"[bold red]Error parsing feed[/bold red]: {feed.url} ({e})") + + except UpdateError as e: + print(f"[bold red]Error updating feed[/bold red]: {feed.url} ({e})") + + except StorageError as e: + print(f"[bold red]Error updating feed[/bold red]: {feed.url}") + print(f"[bold red]Storage error[/bold red]: {e}") + + except AssertionError: + print(f"[bold red]Assertion error[/bold red]: {feed.url}") + traceback.print_exc(file=sys.stderr) + reader.disable_feed_updates(feed) + _add_broken_feed_to_csv(feed) + + except KeyboardInterrupt: + print("[bold red]Keyboard interrupt[/bold red]") + reader.close() + sys.exit(1) + + with ThreadPoolExecutor(max_workers=50) as executor: + executor.map(update_feed, feeds) + + print(f"Updated {total_feeds} feeds.")