250 lines
7.4 KiB
Python
250 lines
7.4 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
from fastapi import APIRouter, File, Form, Request, UploadFile
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from reader import FeedExistsError, InvalidFeedURLError
|
|
|
|
from app.dependencies import CommonReader, CommonStats # noqa: TCH001
|
|
from app.settings import MEDIA_ROOT
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Iterable
|
|
|
|
from fastapi.datastructures import Address
|
|
from reader import Feed
|
|
|
|
|
|
logger: logging.Logger = logging.getLogger(__name__)
|
|
|
|
static_router = APIRouter(tags=["HTML"])
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
|
|
@static_router.get("/favicon.ico", summary="Favicon.", tags=["HTML"])
|
|
async def favicon(request: Request):
|
|
"""Favicon."""
|
|
return FileResponse("static/favicon.ico")
|
|
|
|
|
|
@static_router.get(path="/", summary="Index page.", tags=["HTML"])
|
|
async def index(request: Request, reader: CommonReader, stats: CommonStats):
|
|
"""Index page."""
|
|
feeds: Iterable[Feed] = reader.get_feeds(limit=15)
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="index.html",
|
|
context={"feeds": feeds, "stats": stats},
|
|
)
|
|
|
|
|
|
@static_router.get(path="/feeds", summary="Feeds page.", tags=["HTML"])
|
|
async def feeds(
|
|
request: Request,
|
|
reader: CommonReader,
|
|
stats: CommonStats,
|
|
next_url: str | None = None,
|
|
prev_url: str | None = None,
|
|
):
|
|
"""Feeds page."""
|
|
if next_url:
|
|
feeds = list(reader.get_feeds(starting_after=next_url, limit=15))
|
|
elif prev_url:
|
|
feeds = list(reader.get_feeds(starting_after=prev_url, limit=15))
|
|
else:
|
|
feeds = list(reader.get_feeds(limit=15))
|
|
|
|
# This is the last feed on the page.
|
|
next_url = feeds[-1].url if feeds else None
|
|
|
|
# This is the first feed on the page.
|
|
prev_url = feeds[0].url if feeds else None
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="feeds.html",
|
|
context={
|
|
"feeds": feeds,
|
|
"stats": stats,
|
|
"next_url": next_url,
|
|
"prev_url": prev_url,
|
|
},
|
|
)
|
|
|
|
|
|
@static_router.get(path="/feed/{feed_url:path}", summary="Feed page.", tags=["HTML"])
|
|
async def feed(
|
|
request: Request,
|
|
feed_url: str,
|
|
reader: CommonReader,
|
|
stats: CommonStats,
|
|
):
|
|
"""Feed page."""
|
|
feed: Feed = reader.get_feed(feed_url)
|
|
entries = list(reader.get_entries(feed=feed.url))
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="feed.html",
|
|
context={"feed": feed, "entries": entries, "stats": stats},
|
|
)
|
|
|
|
|
|
@static_router.get(path="/search", summary="Search page.", tags=["HTML"])
|
|
async def search( # noqa: PLR0913, PLR0917
|
|
request: Request,
|
|
q: str,
|
|
reader: CommonReader,
|
|
stats: CommonStats,
|
|
next_feed: str | None = None,
|
|
next_entry: str | None = None,
|
|
prev_feed: str | None = None,
|
|
prev_entry: str | None = None,
|
|
):
|
|
"""Search page."""
|
|
if next_feed and next_entry:
|
|
entries = list(
|
|
reader.search_entries(q, starting_after=(next_feed, next_entry), limit=15),
|
|
)
|
|
elif prev_feed and prev_entry:
|
|
entries = list(
|
|
reader.search_entries(q, starting_after=(prev_feed, prev_entry), limit=15),
|
|
)
|
|
else:
|
|
entries = list(reader.search_entries(q, limit=15))
|
|
|
|
# TODO(TheLovinator): We need to show the entries in the search results. # noqa: TD003
|
|
reader.update_search()
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="search.html",
|
|
context={
|
|
"query": q,
|
|
"entries": entries,
|
|
"stats": stats,
|
|
"next_feed": next_feed,
|
|
"next_entry": next_entry,
|
|
"prev_feed": prev_feed,
|
|
"prev_entry": prev_entry,
|
|
},
|
|
)
|
|
|
|
|
|
@static_router.post(path="/upload", summary="Upload files.", tags=["HTML"])
|
|
async def upload_files(request: Request, files: list[UploadFile] = File(...)):
|
|
"""Upload files."""
|
|
media_root: str = os.getenv(key="MEDIA_ROOT", default=MEDIA_ROOT.as_posix())
|
|
file_infos: list[dict[str, str]] = []
|
|
upload_time = int(time.time())
|
|
|
|
# Save metadata
|
|
request_client: Address | None = request.client
|
|
if request_client:
|
|
host: str = request_client.host or "unknown"
|
|
else:
|
|
host = "unknown"
|
|
|
|
metadata = {
|
|
"upload_time": upload_time,
|
|
"files": [file.filename for file in files if file.filename],
|
|
"ip": host,
|
|
"user_agent": request.headers.get("user-agent") or "unknown",
|
|
"description": request.headers.get("description") or "No description.",
|
|
}
|
|
metadata_path: Path = Path(media_root) / f"{upload_time}.json"
|
|
metadata_path.parent.mkdir(parents=True, exist_ok=True)
|
|
metadata_path.write_text(str(metadata))
|
|
|
|
# Save uploaded files
|
|
for file in files:
|
|
if not file:
|
|
logger.error("No file uploaded.")
|
|
continue
|
|
|
|
if not file.filename:
|
|
logger.error("No file name.")
|
|
continue
|
|
|
|
file_path: Path = Path(media_root) / f"{upload_time}" / file.filename
|
|
|
|
content: bytes = b""
|
|
while chunk := await file.read(1024): # Read in chunks of 1024 bytes
|
|
content += chunk
|
|
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
Path(file_path).write_bytes(content)
|
|
|
|
file_infos.append({"filename": file.filename})
|
|
|
|
return {"files_uploaded": file_infos}
|
|
|
|
|
|
@static_router.get(path="/upload", summary="Upload page.", tags=["HTML"])
|
|
async def upload_page(request: Request, stats: CommonStats):
|
|
"""Upload page."""
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="upload.html",
|
|
context={"stats": stats},
|
|
)
|
|
|
|
|
|
@static_router.get(path="/contact", summary="Contact page.", tags=["HTML"])
|
|
async def contact(request: Request, stats: CommonStats):
|
|
"""Contact page."""
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="contact.html",
|
|
context={"stats": stats},
|
|
)
|
|
|
|
|
|
@static_router.post(path="/contact", summary="Contact page.", tags=["HTML"])
|
|
async def contact_form(request: Request, stats: CommonStats, message: str = Form(...)):
|
|
"""Contact page."""
|
|
# TODO(TheLovinator): Send the message to the admin. # noqa: TD003
|
|
return {
|
|
"message": message,
|
|
"stats": stats,
|
|
}
|
|
|
|
|
|
@static_router.get(path="/add", summary="Add feeds page.", tags=["HTML"])
|
|
async def add_page(request: Request, stats: CommonStats):
|
|
"""Add feeds page."""
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="add.html",
|
|
context={"stats": stats},
|
|
)
|
|
|
|
|
|
@static_router.post(path="/add", summary="Add feeds page.", tags=["HTML"])
|
|
async def add_feed(
|
|
reader: CommonReader,
|
|
stats: CommonStats,
|
|
feed_urls: str = Form(...),
|
|
):
|
|
"""Add feeds page."""
|
|
feed_info: list[dict[str, str]] = []
|
|
# Each line is a feed URL.
|
|
for feed_url in feed_urls.split("\n"):
|
|
try:
|
|
reader.add_feed(feed_url.strip())
|
|
feed_info.append({"url": feed_url.strip(), "status": "Added"})
|
|
except FeedExistsError as e:
|
|
feed_info.append({"url": feed_url.strip(), "status": str(e)})
|
|
except InvalidFeedURLError as e:
|
|
feed_info.append({"url": feed_url.strip(), "status": str(e)})
|
|
|
|
return {
|
|
"feed_urls": feed_urls,
|
|
"stats": stats,
|
|
"feed_info": feed_info,
|
|
}
|