Move CLI to own package

This commit is contained in:
Joakim Hellsén 2024-06-27 11:53:39 +02:00
commit 7a1226b232
No known key found for this signature in database
GPG key ID: D196AE66FEBE1DC9
12 changed files with 273 additions and 179 deletions

View file

@ -22,6 +22,14 @@ _Note: Some features are currently in development._
- Add your favorite feeds to start archiving content.
- Explore, manage, and enjoy your centralized feed archive.
### CLI
There is a CLI available for FeedVault. You can run the CLI with the following command:
```bash
python -m cli
```
## Contributing
Feel free to contribute to the project. If you have any questions, please open an issue.

View file

@ -1,167 +0,0 @@
from __future__ import annotations
import sys
import traceback
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import TYPE_CHECKING
import requests
import typer
from reader import Feed, ParseError, Reader, StorageError, UpdatedFeed, UpdateError, UpdateResult
from rich import print
from app.dependencies import get_reader
from app.scrapers.rss_link_database import scrape
from app.settings import DATA_DIR
if TYPE_CHECKING:
from collections.abc import Iterable
app = typer.Typer(
name="FeedVault CLI",
no_args_is_help=True,
)
def _add_broken_feed_to_csv(feed: Feed | UpdateResult | None) -> None:
"""Add a broken feed to a CSV file."""
if feed is None:
print("Feed is None.")
return
with Path("broken_feeds.csv").open("a", encoding="utf-8") as f:
f.write(f"{feed.url}\n")
@app.command(
name="update_feeds",
help="Update all the feeds.",
)
def update_feeds() -> None:
"""Update all the feeds."""
reader: Reader = get_reader()
print("Updating feeds...")
feeds: Iterable[Feed] = reader.get_feeds(broken=False, updates_enabled=True, new=True)
total_feeds: int | None = reader.get_feed_counts(broken=False, updates_enabled=True).total
if not total_feeds:
print("[bold red]No feeds to update[/bold red]")
return
print(f"Feeds to update: {total_feeds}")
def update_feed(feed: Feed) -> None:
try:
updated_feed: UpdatedFeed | None = reader.update_feed(feed)
if updated_feed is not None:
print(
f"New: [green]{updated_feed.new}[/green], modified: [yellow]{updated_feed.modified}[/yellow], unmodified: {updated_feed.unmodified} - {feed.url}", # noqa: E501
)
except ParseError as e:
print(f"[bold red]Error parsing feed[/bold red]: {feed.url} ({e})")
except UpdateError as e:
print(f"[bold red]Error updating feed[/bold red]: {feed.url} ({e})")
except StorageError as e:
print(f"[bold red]Error updating feed[/bold red]: {feed.url}")
print(f"[bold red]Storage error[/bold red]: {e}")
except AssertionError:
print(f"[bold red]Assertion error[/bold red]: {feed.url}")
traceback.print_exc(file=sys.stderr)
reader.disable_feed_updates(feed)
_add_broken_feed_to_csv(feed)
except KeyboardInterrupt:
print("[bold red]Keyboard interrupt[/bold red]")
reader.close()
sys.exit(1)
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(update_feed, feeds)
print(f"Updated {total_feeds} feeds.")
@app.command(
name="download_steam_ids",
help="Download Steam IDs from the Steam API.",
)
def download_steam_ids() -> None:
"""Download Steam IDs from "https://api.steampowered.com/ISteamApps/GetAppList/v2/"."""
print("Downloading Steam IDs...")
r: requests.Response = requests.get("https://api.steampowered.com/ISteamApps/GetAppList/v2/", timeout=10)
r.raise_for_status()
data: dict[str, dict[str, list[dict[str, str]]]] = r.json()
app_ids: list[dict[str, str]] = data["applist"]["apps"]
file_path: Path = Path(DATA_DIR) / "steam_ids.txt"
with file_path.open("w", encoding="utf-8") as f:
for app_id in app_ids:
f.write(f"{app_id["appid"]}\n")
print(f"Steam IDs downloaded. {len(app_ids)} IDs saved to {file_path}.")
@app.command(
name="add_steam_feeds",
help="Add Steam feeds to the reader. Needs 'download_steam_ids' to be run first.",
)
def add_steam_feeds() -> None:
"""Add the ids from "steam_ids.txt" to the reader."""
reader: Reader = get_reader()
print("Adding Steam feeds...")
file_path: Path = Path(DATA_DIR) / "steam_ids.txt"
if not file_path.exists():
print("File not found.")
return
with file_path.open("r", encoding="utf-8") as f:
steam_ids: list[str] = f.read().splitlines()
for count, steam_id in enumerate(steam_ids):
try:
reader.add_feed(f"https://store.steampowered.com/feeds/news/app/{steam_id}")
print(f"[{count}/{len(steam_ids)}] Added feed: {steam_id}")
except ParseError as e:
print(f"[bold red]Error parsing feed[/bold red] ({e})")
except UpdateError as e:
print(f"[bold red]Error updating feed[/bold red] ({e})")
except StorageError as e:
print(f"[bold red]Error updating feed[/bold red] ({e})")
except AssertionError as e:
print(f"[bold red]Assertion error[/bold red] ({e})")
traceback.print_exc(file=sys.stderr)
except KeyboardInterrupt:
print("[bold red]Keyboard interrupt[/bold red]")
reader.close()
sys.exit(1)
print(f"Added {len(steam_ids)} Steam feeds.")
@app.command(
name="grab_links",
help="Grab RSS feeds from different sources.",
)
def grab_links() -> None:
"""Grab RSS feeds from different sources."""
print("Grabbing links...")
rss_links: str = scrape()
print(rss_links)
if __name__ == "__main__":
app()

View file

@ -37,7 +37,11 @@ async def favicon(request: Request):
async def index(request: Request, reader: CommonReader, stats: CommonStats):
"""Index page."""
feeds: Iterable[Feed] = reader.get_feeds(limit=15)
return templates.TemplateResponse(request=request, name="index.html", context={"feeds": feeds, "stats": stats})
return templates.TemplateResponse(
request=request,
name="index.html",
context={"feeds": feeds, "stats": stats},
)
@static_router.get(path="/feeds", summary="Feeds page.", tags=["HTML"])
@ -65,12 +69,22 @@ async def feeds(
return templates.TemplateResponse(
request=request,
name="feeds.html",
context={"feeds": feeds, "stats": stats, "next_url": next_url, "prev_url": prev_url},
context={
"feeds": feeds,
"stats": stats,
"next_url": next_url,
"prev_url": prev_url,
},
)
@static_router.get(path="/feed/{feed_url:path}", summary="Feed page.", tags=["HTML"])
async def feed(request: Request, feed_url: str, reader: CommonReader, stats: CommonStats):
async def feed(
request: Request,
feed_url: str,
reader: CommonReader,
stats: CommonStats,
):
"""Feed page."""
feed: Feed = reader.get_feed(feed_url)
entries = list(reader.get_entries(feed=feed.url))
@ -94,9 +108,13 @@ async def search( # noqa: PLR0913, PLR0917
):
"""Search page."""
if next_feed and next_entry:
entries = list(reader.search_entries(q, starting_after=(next_feed, next_entry), limit=15))
entries = list(
reader.search_entries(q, starting_after=(next_feed, next_entry), limit=15),
)
elif prev_feed and prev_entry:
entries = list(reader.search_entries(q, starting_after=(prev_feed, prev_entry), limit=15))
entries = list(
reader.search_entries(q, starting_after=(prev_feed, prev_entry), limit=15),
)
else:
entries = list(reader.search_entries(q, limit=15))
@ -170,13 +188,21 @@ async def upload_files(request: Request, files: list[UploadFile] = File(...)):
@static_router.get(path="/upload", summary="Upload page.", tags=["HTML"])
async def upload_page(request: Request, stats: CommonStats):
"""Upload page."""
return templates.TemplateResponse(request=request, name="upload.html", context={"stats": stats})
return templates.TemplateResponse(
request=request,
name="upload.html",
context={"stats": stats},
)
@static_router.get(path="/contact", summary="Contact page.", tags=["HTML"])
async def contact(request: Request, stats: CommonStats):
"""Contact page."""
return templates.TemplateResponse(request=request, name="contact.html", context={"stats": stats})
return templates.TemplateResponse(
request=request,
name="contact.html",
context={"stats": stats},
)
@static_router.post(path="/contact", summary="Contact page.", tags=["HTML"])
@ -192,11 +218,19 @@ async def contact_form(request: Request, stats: CommonStats, message: str = Form
@static_router.get(path="/add", summary="Add feeds page.", tags=["HTML"])
async def add_page(request: Request, stats: CommonStats):
"""Add feeds page."""
return templates.TemplateResponse(request=request, name="add.html", context={"stats": stats})
return templates.TemplateResponse(
request=request,
name="add.html",
context={"stats": stats},
)
@static_router.post(path="/add", summary="Add feeds page.", tags=["HTML"])
async def add_feed(reader: CommonReader, stats: CommonStats, feed_urls: str = Form(...)):
async def add_feed(
reader: CommonReader,
stats: CommonStats,
feed_urls: str = Form(...),
):
"""Add feeds page."""
feed_info: list[dict[str, str]] = []
# Each line is a feed URL.

View file

@ -6,7 +6,7 @@ import orjson
from click import echo
def scrape():
def scrape() -> str:
"""Scrape.
Raises:
@ -17,7 +17,7 @@ def scrape():
msg = "RSS-Link-Database repository not found."
raise FileNotFoundError(msg)
rss_links = []
rss_links: list[str] = []
for file in repository_path.glob("*.json"):
echo(f"Scraping {file.name}...")

View file

@ -4,6 +4,10 @@ from pathlib import Path
from platformdirs import user_data_dir
DATA_DIR: str = user_data_dir(appname="FeedVault", appauthor="TheLovinator", roaming=True)
DATA_DIR: str = user_data_dir(
appname="FeedVault",
appauthor="TheLovinator",
roaming=True,
)
DB_PATH: Path = Path(DATA_DIR) / "reader.sqlite"
MEDIA_ROOT: Path = Path(DATA_DIR) / "uploads"

13
cli/__init__.py Normal file
View file

@ -0,0 +1,13 @@
from __future__ import annotations
from .add_steam_feeds import add_steam_feeds
from .download_steam_ids import download_steam_ids
from .grab_links import grab_links
from .update_feeds import update_feeds
__all__: list[str] = [
"add_steam_feeds",
"download_steam_ids",
"grab_links",
"update_feeds",
]

4
cli/__main__.py Normal file
View file

@ -0,0 +1,4 @@
from cli.cli import app
if __name__ == "__main__":
app()

53
cli/add_steam_feeds.py Normal file
View file

@ -0,0 +1,53 @@
import sys
import traceback
from pathlib import Path
from reader import ParseError, Reader, StorageError, UpdateError
from rich import print
from app.dependencies import get_reader
from app.settings import DATA_DIR
from cli.cli import app
@app.command(
name="add_steam_feeds",
help="Add Steam feeds to the reader. Needs 'download_steam_ids' to be run first.",
)
def add_steam_feeds() -> None:
"""Add the ids from "steam_ids.txt" to the reader."""
reader: Reader = get_reader()
print("Adding Steam feeds...")
file_path: Path = Path(DATA_DIR) / "steam_ids.txt"
if not file_path.exists():
print("File not found.")
return
with file_path.open("r", encoding="utf-8") as f:
steam_ids: list[str] = f.read().splitlines()
for count, steam_id in enumerate(steam_ids):
try:
reader.add_feed(f"https://store.steampowered.com/feeds/news/app/{steam_id}")
print(f"[{count}/{len(steam_ids)}] Added feed: {steam_id}")
except ParseError as e:
print(f"[bold red]Error parsing feed[/bold red] ({e})")
except UpdateError as e:
print(f"[bold red]Error updating feed[/bold red] ({e})")
except StorageError as e:
print(f"[bold red]Error updating feed[/bold red] ({e})")
except AssertionError as e:
print(f"[bold red]Assertion error[/bold red] ({e})")
traceback.print_exc(file=sys.stderr)
except KeyboardInterrupt:
print("[bold red]Keyboard interrupt[/bold red]")
reader.close()
sys.exit(1)
print(f"Added {len(steam_ids)} Steam feeds.")

6
cli/cli.py Normal file
View file

@ -0,0 +1,6 @@
import typer
app = typer.Typer(
name="FeedVault CLI",
no_args_is_help=True,
)

32
cli/download_steam_ids.py Normal file
View file

@ -0,0 +1,32 @@
from pathlib import Path
import requests
from rich import print
from app.settings import DATA_DIR
from cli.cli import app
@app.command(
name="download_steam_ids",
help="Download Steam IDs from the Steam API.",
)
def download_steam_ids() -> None:
"""Download Steam IDs from "https://api.steampowered.com/ISteamApps/GetAppList/v2/"."""
print("Downloading Steam IDs...")
r: requests.Response = requests.get(
"https://api.steampowered.com/ISteamApps/GetAppList/v2/",
timeout=10,
)
r.raise_for_status()
data: dict[str, dict[str, list[dict[str, str]]]] = r.json()
app_ids: list[dict[str, str]] = data["applist"]["apps"]
file_path: Path = Path(DATA_DIR) / "steam_ids.txt"
with file_path.open("w", encoding="utf-8") as f:
for app_id in app_ids:
f.write(f"{app_id["appid"]}\n")
print(f"Steam IDs downloaded. {len(app_ids)} IDs saved to {file_path}.")

15
cli/grab_links.py Normal file
View file

@ -0,0 +1,15 @@
from rich import print
from app.scrapers.rss_link_database import scrape
from cli.cli import app
@app.command(
name="grab_links",
help="Grab RSS feeds from different sources.",
)
def grab_links() -> None:
"""Grab RSS feeds from different sources."""
print("Grabbing links...")
rss_links: str = scrape()
print(rss_links)

92
cli/update_feeds.py Normal file
View file

@ -0,0 +1,92 @@
import sys
import traceback
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import TYPE_CHECKING
from reader import (
Feed,
ParseError,
Reader,
StorageError,
UpdatedFeed,
UpdateError,
UpdateResult,
)
from rich import print
from app.dependencies import get_reader
from cli.cli import app
if TYPE_CHECKING:
from collections.abc import Iterable
def _add_broken_feed_to_csv(feed: Feed | UpdateResult | None) -> None:
"""Add a broken feed to a CSV file."""
if feed is None:
print("Feed is None.")
return
with Path("broken_feeds.csv").open("a", encoding="utf-8") as f:
f.write(f"{feed.url}\n")
@app.command(
name="update_feeds",
help="Update all the feeds.",
)
def update_feeds() -> None:
"""Update all the feeds."""
reader: Reader = get_reader()
print("Updating feeds...")
feeds: Iterable[Feed] = reader.get_feeds(
broken=False,
updates_enabled=True,
new=True,
)
total_feeds: int | None = reader.get_feed_counts(
broken=False,
updates_enabled=True,
).total
if not total_feeds:
print("[bold red]No feeds to update[/bold red]")
return
print(f"Feeds to update: {total_feeds}")
def update_feed(feed: Feed) -> None:
try:
updated_feed: UpdatedFeed | None = reader.update_feed(feed)
if updated_feed is not None:
print(
f"New: [green]{updated_feed.new}[/green], modified: [yellow]{updated_feed.modified}[/yellow], unmodified: {updated_feed.unmodified} - {feed.url}", # noqa: E501
)
except ParseError as e:
print(f"[bold red]Error parsing feed[/bold red]: {feed.url} ({e})")
except UpdateError as e:
print(f"[bold red]Error updating feed[/bold red]: {feed.url} ({e})")
except StorageError as e:
print(f"[bold red]Error updating feed[/bold red]: {feed.url}")
print(f"[bold red]Storage error[/bold red]: {e}")
except AssertionError:
print(f"[bold red]Assertion error[/bold red]: {feed.url}")
traceback.print_exc(file=sys.stderr)
reader.disable_feed_updates(feed)
_add_broken_feed_to_csv(feed)
except KeyboardInterrupt:
print("[bold red]Keyboard interrupt[/bold red]")
reader.close()
sys.exit(1)
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(update_feed, feeds)
print(f"Updated {total_feeds} feeds.")