Use FastAPI instead of Django

This commit is contained in:
Joakim Hellsén 2024-05-21 02:43:53 +02:00
commit b462be40af
No known key found for this signature in database
GPG key ID: D196AE66FEBE1DC9
43 changed files with 1105 additions and 1688 deletions

0
app/__init__.py Normal file
View file

29
app/cli.py Normal file
View file

@ -0,0 +1,29 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import click
from reader import Reader, UpdateError
from app.dependencies import get_reader
if TYPE_CHECKING:
from reader import UpdatedFeed
@click.command()
def update_feeds() -> None:
"""Update all the feeds."""
click.echo("Updating feeds...")
reader: Reader = get_reader()
for feed in reader.update_feeds_iter():
value: UpdatedFeed | None | UpdateError = feed.value
if value is not None and isinstance(value, UpdateError):
click.echo(f"Error updating {feed.url}: {value}")
else:
click.echo(f"Updated {feed.url}.")
click.echo("Feeds updated.")
if __name__ == "__main__":
update_feeds()

41
app/dependencies.py Normal file
View file

@ -0,0 +1,41 @@
"""https://fastapi.tiangolo.com/tutorial/dependencies/."""
from __future__ import annotations
from functools import lru_cache
from typing import Annotated
import humanize
from fastapi import Depends
from reader import EntryCounts, FeedCounts, Reader, make_reader
from app.settings import DB_PATH
@lru_cache(maxsize=1)
def get_reader() -> Reader:
"""Return the reader."""
return make_reader(url=DB_PATH.as_posix(), search_enabled=True)
def get_stats() -> str:
"""Return the stats."""
db_size: int = DB_PATH.stat().st_size
# Get the feed counts.
feed_counts: FeedCounts = get_reader().get_feed_counts()
total_feed_counts: int | None = feed_counts.total
if total_feed_counts is None:
total_feed_counts = 0
# Get the entry counts.
entry_counts: EntryCounts = get_reader().get_entry_counts()
total_entry_counts: int | None = entry_counts.total
if total_entry_counts is None:
total_entry_counts = 0
return f"{total_feed_counts} feeds ({total_entry_counts} entries) ~{humanize.naturalsize(db_size, binary=True)}"
CommonReader = Annotated[Reader, Depends(get_reader)]
CommonStats = Annotated[str, Depends(get_stats)]

26
app/main.py Normal file
View file

@ -0,0 +1,26 @@
from __future__ import annotations
import uvicorn
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from app.routers.api import api_router
from app.routers.static import static_router
app = FastAPI(
title="FeedVault API",
description="An API for FeedVault.",
version="0.1.0",
openapi_url="/api/v1/openapi.json",
redoc_url=None,
debug=True,
)
app.mount(path="/static", app=StaticFiles(directory="static"), name="static")
app.include_router(router=api_router)
app.include_router(router=static_router)
if __name__ == "__main__":
uvicorn.run(app=app, host="0.0.0.0", port=8000) # noqa: S104

0
app/routers/__init__.py Normal file
View file

61
app/routers/api.py Normal file
View file

@ -0,0 +1,61 @@
from __future__ import annotations
import datetime # noqa: TCH003
from urllib.parse import unquote
from fastapi import APIRouter
from pydantic import BaseModel
from reader import ExceptionInfo, Feed, FeedNotFoundError
from app.dependencies import CommonReader # noqa: TCH001
from app.validators import uri_validator
api_router = APIRouter(
prefix="/api/v1",
tags=["Feeds"],
responses={404: {"description": "Not found"}},
)
class FeedOut(BaseModel):
"""The feed we return to the user."""
url: str
updated: datetime.datetime | None = None
title: str | None = None
link: str | None = None
author: str | None = None
subtitle: str | None = None
version: str | None = None
user_title: str | None = None
added: datetime.datetime | None = None
last_updated: datetime.datetime | None = None
last_exception: ExceptionInfo | None = None
updates_enabled: bool = True
@api_router.get("/feeds", summary="Get all the feeds in the reader.", tags=["Feeds"])
async def api_feeds(reader: CommonReader) -> list[Feed]:
"""Return all the feeds in the reader."""
return list(reader.get_feeds())
@api_router.get(
path="/feed/{feed_url:path}",
summary="Get a feed from the reader.",
tags=["Feeds"],
response_model=FeedOut | dict[str, str],
response_model_exclude_unset=True,
)
async def api_feed(feed_url: str, reader: CommonReader) -> Feed | dict[str, str]:
"""Return a feed from the reader."""
feed_url = unquote(feed_url)
if not uri_validator(feed_url):
return {"message": "Invalid URL."}
try:
feed: Feed = reader.get_feed(feed_url)
except FeedNotFoundError as e:
return {"message": str(e)}
return feed

131
app/routers/static.py Normal file
View file

@ -0,0 +1,131 @@
from __future__ import annotations
import logging
import os
import time
from pathlib import Path
from typing import TYPE_CHECKING, Iterable
from fastapi import APIRouter, File, Request, UploadFile
from fastapi.responses import FileResponse
from fastapi.templating import Jinja2Templates
from app.dependencies import CommonReader, CommonStats # noqa: TCH001
from app.settings import MEDIA_ROOT
if TYPE_CHECKING:
from fastapi.datastructures import Address
from reader import Feed
from reader.types import Entry, EntrySearchResult
logger: logging.Logger = logging.getLogger(__name__)
static_router = APIRouter(tags=["HTML"])
templates = Jinja2Templates(directory="templates")
@static_router.get("/favicon.ico", summary="Favicon.", tags=["HTML"])
async def favicon(request: Request):
"""Favicon."""
return FileResponse("static/favicon.ico")
@static_router.get(path="/", summary="Index page.", tags=["HTML"])
async def index(request: Request, reader: CommonReader, stats: CommonStats):
"""Index page."""
feeds: Iterable[Feed] = reader.get_feeds()
return templates.TemplateResponse(request=request, name="index.html", context={"feeds": feeds, "stats": stats})
@static_router.get(path="/feeds", summary="Feeds page.", tags=["HTML"])
async def feeds(request: Request, reader: CommonReader, stats: CommonStats):
"""Feeds page."""
feeds: Iterable[Feed] = reader.get_feeds()
return templates.TemplateResponse(request=request, name="feeds.html", context={"feeds": feeds, "stats": stats})
@static_router.get(path="/feed/{feed_url:path}", summary="Feed page.", tags=["HTML"])
async def feed(request: Request, feed_url: str, reader: CommonReader, stats: CommonStats):
"""Feed page."""
feed: Feed = reader.get_feed(feed_url)
entries: Iterable[Entry] = reader.get_entries(feed=feed.url)
return templates.TemplateResponse(
request=request,
name="feed.html",
context={"feed": feed, "entries": entries, "stats": stats},
)
@static_router.get(path="/search", summary="Search page.", tags=["HTML"])
async def search(request: Request, q: str, reader: CommonReader, stats: CommonStats):
"""Search page."""
# TODO(TheLovinator): We need to show the entries in the search results. # noqa: TD003
reader.update_search()
entries: Iterable[EntrySearchResult] = reader.search_entries(q)
return templates.TemplateResponse(
request=request,
name="search.html",
context={"query": q, "entries": entries, "stats": stats},
)
@static_router.post(path="/upload", summary="Upload files.", tags=["HTML"])
async def upload_files(request: Request, files: list[UploadFile] = File(...)):
"""Upload files."""
media_root: str = os.getenv(key="MEDIA_ROOT", default=MEDIA_ROOT.as_posix())
file_infos: list[dict[str, str]] = []
upload_time = int(time.time())
# Save metadata
request_client: Address | None = request.client
if request_client:
host: str = request_client.host or "unknown"
else:
host = "unknown"
metadata = {
"upload_time": upload_time,
"files": [file.filename for file in files if file.filename],
"ip": host,
"user_agent": request.headers.get("user-agent") or "unknown",
"description": request.headers.get("description") or "No description.",
}
metadata_path: Path = Path(media_root) / f"{upload_time}.json"
metadata_path.parent.mkdir(parents=True, exist_ok=True)
metadata_path.write_text(str(metadata))
# Save uploaded files
for file in files:
if not file:
logger.error("No file uploaded.")
continue
if not file.filename:
logger.error("No file name.")
continue
file_path: Path = Path(media_root) / f"{upload_time}" / file.filename
content: bytes = b""
while chunk := await file.read(1024): # Read in chunks of 1024 bytes
content += chunk
file_path.parent.mkdir(parents=True, exist_ok=True)
Path(file_path).write_bytes(content)
file_infos.append({"filename": file.filename})
return {"files_uploaded": file_infos}
@static_router.get(path="/upload", summary="Upload page.", tags=["HTML"])
async def upload_page(request: Request, stats: CommonStats):
"""Upload page."""
return templates.TemplateResponse(request=request, name="upload.html", context={"stats": stats})
@static_router.get(path="/contact", summary="Contact page.", tags=["HTML"])
async def contact(request: Request, stats: CommonStats):
"""Contact page."""
return templates.TemplateResponse(request=request, name="contact.html", context={"stats": stats})

9
app/settings.py Normal file
View file

@ -0,0 +1,9 @@
from __future__ import annotations
from pathlib import Path
from platformdirs import user_data_dir
DATA_DIR: str = user_data_dir(appname="FeedVault", appauthor="TheLovinator", roaming=True)
DB_PATH: Path = Path(DATA_DIR) / "reader.sqlite"
MEDIA_ROOT: Path = Path(DATA_DIR) / "uploads"

17
app/validators.py Normal file
View file

@ -0,0 +1,17 @@
from urllib.parse import ParseResult, urlparse
def uri_validator(url: str) -> bool:
"""Validate a URI.
Args:
url: The URI to validate.
Returns:
True if the URI is valid, False otherwise.
"""
try:
result: ParseResult = urlparse(url)
return all([result.scheme, result.netloc])
except AttributeError:
return False