Lemon sadness

This commit is contained in:
Joakim Hellsén 2024-05-20 04:34:51 +02:00
commit bfe90aa69d
No known key found for this signature in database
GPG key ID: D196AE66FEBE1DC9
52 changed files with 1564 additions and 2492 deletions

View file

@ -1,78 +0,0 @@
from __future__ import annotations
from django.http import HttpRequest # noqa: TCH002
from ninja import ModelSchema, NinjaAPI
from ninja.pagination import paginate
from feedvault.models import Domain, Entry, Feed
api_v1 = NinjaAPI(
title="FeedVault API",
version="0.1.0",
description="FeedVault API",
urls_namespace="api_v1",
)
class FeedOut(ModelSchema):
class Meta:
model = Feed
fields: str = "__all__"
class EntriesOut(ModelSchema):
class Meta:
model = Entry
fields: str = "__all__"
class DomainsOut(ModelSchema):
class Meta:
model = Domain
fields: str = "__all__"
@api_v1.get("/feeds/", response=list[FeedOut])
@paginate
def list_feeds(request: HttpRequest) -> None:
"""Get a list of feeds."""
return Feed.objects.all() # type: ignore # noqa: PGH003
@api_v1.get("/feeds/{feed_id}/", response=FeedOut)
def get_feed(request: HttpRequest, feed_id: int) -> Feed:
"""Get a feed by ID."""
return Feed.objects.get(id=feed_id)
@api_v1.get("/feeds/{feed_id}/entries/", response=list[EntriesOut])
@paginate
def list_entries(request: HttpRequest, feed_id: int) -> list[Entry]:
"""Get a list of entries for a feed."""
return Entry.objects.filter(feed_id=feed_id) # type: ignore # noqa: PGH003
@api_v1.get("/entries/", response=list[EntriesOut])
@paginate
def list_all_entries(request: HttpRequest) -> list[Entry]:
"""Get a list of entries."""
return Entry.objects.all() # type: ignore # noqa: PGH003
@api_v1.get("/entries/{entry_id}/", response=EntriesOut)
def get_entry(request: HttpRequest, entry_id: int) -> Entry:
"""Get an entry by ID."""
return Entry.objects.get(id=entry_id)
@api_v1.get("/domains/", response=list[DomainsOut])
@paginate
def list_domains(request: HttpRequest) -> list[Domain]:
"""Get a list of domains."""
return Domain.objects.all() # type: ignore # noqa: PGH003
@api_v1.get("/domains/{domain_id}/", response=DomainsOut)
def get_domain(request: HttpRequest, domain_id: int) -> Domain:
"""Get a domain by ID."""
return Domain.objects.get(id=domain_id)

View file

@ -2,7 +2,5 @@ from django.apps import AppConfig
class FeedVaultConfig(AppConfig):
"""FeedVault app configuration."""
default_auto_field: str = "django.db.models.BigAutoField"
name: str = "feedvault"

View file

@ -15,7 +15,7 @@ def add_global_context(request: HttpRequest) -> dict[str, str | int]:
Returns:
A dictionary with the global context.
"""
from .models import Feed # noqa: PLC0415
from feeds.models import Feed # noqa: PLC0415
amount_of_feeds: int = Feed.objects.count()
return {"amount_of_feeds": amount_of_feeds}

View file

@ -1,376 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from urllib.parse import ParseResult, urlparse
import dateparser
import feedparser
from django.utils import timezone
from feedparser import FeedParserDict
from feedvault.models import Author, Domain, Entry, Feed, FeedAddResult, Generator, Publisher
if TYPE_CHECKING:
import datetime
from django.contrib.auth.models import AbstractBaseUser, AnonymousUser
logger: logging.Logger = logging.getLogger(__name__)
def get_domain(url: str | None) -> None | str:
"""Get the domain of a URL."""
if not url:
return None
# Parse the URL.
parsed_url: ParseResult = urlparse(url)
if not parsed_url:
logger.error("Error parsing URL: %s", url)
return None
# Get the domain.
return str(parsed_url.netloc)
def get_author(parsed_feed: dict) -> Author:
"""Get the author of a feed.
Args:
parsed_feed: The parsed feed.
Returns:
The author of the feed. If the author doesn't exist, it will be created.
"""
# A dictionary with details about the author of this entry.
author_detail: dict = parsed_feed.get("author_detail", {})
author = Author(
name=author_detail.get("name", ""),
href=author_detail.get("href", ""),
email=author_detail.get("email", ""),
)
# Create the author if it doesn't exist.
try:
author: Author = Author.objects.get(name=author.name, email=author.email, href=author.href)
except Author.DoesNotExist:
author.save()
logger.info("Created author: %s", author)
return author
def def_generator(parsed_feed: dict) -> Generator:
"""Get the generator of a feed.
Args:
parsed_feed: The parsed feed.
Returns:
The generator of the feed. If the generator doesn't exist, it will be created.
"""
generator_detail: dict = parsed_feed.get("generator_detail", {})
generator = Generator(
name=generator_detail.get("name", ""),
href=generator_detail.get("href", ""),
version=generator_detail.get("version", ""),
)
# Create the generator if it doesn't exist.
try:
generator: Generator = Generator.objects.get(
name=generator.name,
href=generator.href,
version=generator.version,
)
except Generator.DoesNotExist:
generator.save()
logger.info("Created generator: %s", generator)
return generator
def get_publisher(parsed_feed: dict) -> Publisher:
"""Get the publisher of a feed.
Args:
parsed_feed: The parsed feed.
Returns:
The publisher of the feed. If the publisher doesn't exist, it will be created.
"""
publisher_detail: dict = parsed_feed.get("publisher_detail", {})
publisher = Publisher(
name=publisher_detail.get("name", ""),
href=publisher_detail.get("href", ""),
email=publisher_detail.get("email", ""),
)
# Create the publisher if it doesn't exist.
try:
publisher: Publisher = Publisher.objects.get(
name=publisher.name,
href=publisher.href,
email=publisher.email,
)
except Publisher.DoesNotExist:
publisher.save()
logger.info("Created publisher: %s", publisher)
return publisher
def parse_feed(url: str | None) -> dict | None:
"""Parse a feed.
Args:
url: The URL of the feed.
Returns:
The parsed feed.
"""
# TODO(TheLovinator): Backup the feed URL. # noqa: TD003
if not url:
return None
# Parse the feed.
parsed_feed: dict = feedparser.parse(url)
if not parsed_feed:
return None
return parsed_feed
def add_entry(feed: Feed, entry: FeedParserDict) -> Entry | None:
"""Add an entry to the database.
Args:
entry: The entry to add.
feed: The feed the entry belongs to.
"""
author: Author = get_author(parsed_feed=entry)
publisher: Publisher = get_publisher(parsed_feed=entry)
pre_updated_parsed: str = str(entry.get("updated_parsed", ""))
updated_parsed: datetime.datetime | None = (
dateparser.parse(date_string=str(pre_updated_parsed)) if pre_updated_parsed else None
)
pre_published_parsed: str = str(entry.get("published_parsed", ""))
published_parsed: datetime.datetime | None = (
dateparser.parse(date_string=str(pre_published_parsed)) if pre_published_parsed else None
)
pre_expired_parsed: str = str(entry.get("expired_parsed", ""))
expired_parsed: datetime.datetime | None = (
dateparser.parse(date_string=str(pre_expired_parsed)) if pre_expired_parsed else None
)
pre_created_parsed = str(entry.get("created_parsed", ""))
created_parsed: datetime.datetime | None = (
dateparser.parse(date_string=str(pre_created_parsed)) if pre_created_parsed else None
)
entry_id = entry.get("id", "")
if not entry_id:
logger.error("Entry ID not found: %s", entry)
added_entry, created = Entry.objects.update_or_create(
feed=feed,
entry_id=entry_id,
defaults={
"author": entry.get("author", ""),
"author_detail": author,
"comments": entry.get("comments", ""),
"content": entry.get("content", {}),
"contributors": entry.get("contributors", {}),
"created": entry.get("created", ""),
"created_parsed": created_parsed,
"enclosures": entry.get("enclosures", []),
"expired": entry.get("expired", ""),
"expired_parsed": expired_parsed,
"license": entry.get("license", ""),
"link": entry.get("link", ""),
"links": entry.get("links", []),
"published": entry.get("published", ""),
"published_parsed": published_parsed,
"publisher": entry.get("publisher", ""),
"publisher_detail": publisher,
"source": entry.get("source", {}),
"summary": entry.get("summary", ""),
"summary_detail": entry.get("summary_detail", {}),
"tags": entry.get("tags", []),
"title": entry.get("title", ""),
"title_detail": entry.get("title_detail", {}),
"updated": entry.get("updated", ""),
"updated_parsed": updated_parsed,
},
)
if created:
logger.info("Created entry: %s", added_entry)
return added_entry
logger.info("Updated entry: %s", added_entry)
return added_entry
def add_domain_to_db(url: str | None) -> Domain | None:
"""Add a domain to the database.
Args:
url: The URL of the domain.
Returns:
The domain that was added.
"""
domain_url: None | str = get_domain(url=url)
if not domain_url:
return None
# Create the domain if it doesn't exist.
domain: Domain
domain, created = Domain.objects.get_or_create(url=domain_url)
if created:
logger.info("Created domain: %s", domain.url)
domain.save()
return domain
def populate_feed(url: str | None, user: AbstractBaseUser | AnonymousUser) -> Feed | None:
"""Populate the feed with entries.
Args:
url: The URL of the feed.
user: The user adding the feed.
Returns:
The feed that was added.
"""
domain: Domain | None = add_domain_to_db(url=url)
if not domain:
return None
# Parse the feed.
parsed_feed: dict | None = parse_feed(url=url)
if not parsed_feed:
return None
author: Author = get_author(parsed_feed=parsed_feed)
generator: Generator = def_generator(parsed_feed=parsed_feed)
publisher: Publisher = get_publisher(parsed_feed=parsed_feed)
pre_published_parsed: str = str(parsed_feed.get("published_parsed", ""))
published_parsed: datetime.datetime | None = (
dateparser.parse(date_string=str(pre_published_parsed)) if pre_published_parsed else None
)
pre_updated_parsed: str = str(parsed_feed.get("updated_parsed", ""))
updated_parsed: datetime.datetime | None = (
dateparser.parse(date_string=str(pre_updated_parsed)) if pre_updated_parsed else None
)
pre_modified: str = str(parsed_feed.get("modified", ""))
modified: timezone.datetime | None = dateparser.parse(date_string=pre_modified) if pre_modified else None
# Create or update the feed
feed, created = Feed.objects.update_or_create(
feed_url=url,
domain=domain,
defaults={
"user": user,
"last_checked": timezone.now(),
"bozo": parsed_feed.get("bozo", 0),
"bozo_exception": parsed_feed.get("bozo_exception", ""),
"encoding": parsed_feed.get("encoding", ""),
"etag": parsed_feed.get("etag", ""),
"headers": parsed_feed.get("headers", {}),
"href": parsed_feed.get("href", ""),
"modified": modified,
"namespaces": parsed_feed.get("namespaces", {}),
"status": parsed_feed.get("status", 0),
"version": parsed_feed.get("version", ""),
"author": parsed_feed.get("author", ""),
"author_detail": author,
"cloud": parsed_feed.get("cloud", {}),
"contributors": parsed_feed.get("contributors", {}),
"docs": parsed_feed.get("docs", ""),
"errorreportsto": parsed_feed.get("errorreportsto", ""),
"generator": parsed_feed.get("generator", ""),
"generator_detail": generator,
"icon": parsed_feed.get("icon", ""),
"feed_id": parsed_feed.get("id", ""),
"image": parsed_feed.get("image", {}),
"info": parsed_feed.get("info", ""),
"language": parsed_feed.get("language", ""),
"license": parsed_feed.get("license", ""),
"link": parsed_feed.get("link", ""),
"links": parsed_feed.get("links", []),
"logo": parsed_feed.get("logo", ""),
"published": parsed_feed.get("published", ""),
"published_parsed": published_parsed,
"publisher": parsed_feed.get("publisher", ""),
"publisher_detail": publisher,
"rights": parsed_feed.get("rights", ""),
"rights_detail": parsed_feed.get("rights_detail", {}),
"subtitle": parsed_feed.get("subtitle", ""),
"subtitle_detail": parsed_feed.get("subtitle_detail", {}),
"tags": parsed_feed.get("tags", []),
"textinput": parsed_feed.get("textinput", {}),
"title": parsed_feed.get("title", ""),
"title_detail": parsed_feed.get("title_detail", {}),
"ttl": parsed_feed.get("ttl", ""),
"updated": parsed_feed.get("updated", ""),
"updated_parsed": updated_parsed,
},
)
grab_entries(feed=feed)
if created:
logger.info("Created feed: %s", feed)
return feed
logger.info("Updated feed: %s", feed)
return feed
def grab_entries(feed: Feed) -> None | list[Entry]:
"""Grab the entries from a feed.
Args:
feed: The feed to grab the entries from.
Returns:
The entries that were added. If no entries were added, None is returned.
"""
# Set the last checked time to now.
feed.last_checked = timezone.now()
feed.save()
entries_added: list[Entry] = []
# Parse the feed.
parsed_feed: dict | None = parse_feed(url=feed.feed_url)
if not parsed_feed:
return None
entries = parsed_feed.get("entries", [])
for entry in entries:
added_entry: Entry | None = add_entry(feed=feed, entry=entry)
if not added_entry:
continue
entries_added.append(added_entry)
logger.info("Added entries: %s", entries_added)
return entries_added
def add_url(url: str, user: AbstractBaseUser | AnonymousUser) -> FeedAddResult:
"""Add a feed to the database so we can grab entries from it later."""
domain: Domain | None = add_domain_to_db(url=url)
if not domain:
return FeedAddResult(feed=None, created=False, error="Domain not found")
# Add the URL to the database.
_feed, _created = Feed.objects.get_or_create(feed_url=url, user=user, domain=domain)
return FeedAddResult(feed=_feed, created=_created, error=None)

View file

@ -1,66 +0,0 @@
from __future__ import annotations
from collections import defaultdict
from datetime import timedelta
from threading import Thread
from django.core.management.base import BaseCommand, no_translations
from django.db.models import Q
from django.utils import timezone
from rich.console import Console
from rich.progress import Progress
from feedvault.feeds import grab_entries
from feedvault.models import Feed
console = Console()
class DomainUpdater(Thread):
def __init__(self, feeds: list[Feed], progress: Progress, *args, **kwargs) -> None: # noqa: ANN002, ANN003
"""Update feeds in a separate thread.
Args:
feeds: The feeds to update.
progress: The Rich progress bar.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
self.feeds: list[Feed] = feeds
self.progress: Progress = progress
def run(self) -> None:
with self.progress as progress:
task = progress.add_task("[cyan]Updating feeds...", total=len(self.feeds))
for feed in self.feeds:
grab_entries(feed)
progress.update(task, advance=1, description=f"[green]Updated {feed.feed_url}")
class Command(BaseCommand):
help = "Check for new entries in feeds"
requires_migrations_checks = True
@no_translations
def handle(self, *args, **options) -> None: # noqa: ANN002, ANN003, ARG002
feeds = Feed.objects.filter(
Q(last_checked__lte=timezone.now() - timedelta(minutes=15)) | Q(last_checked__isnull=True),
)
domain_feeds = defaultdict(list)
for feed in feeds:
domain_feeds[feed.domain.pk].append(feed)
threads = []
progress = Progress()
for feeds in domain_feeds.values():
thread = DomainUpdater(feeds, progress)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
console.log("[bold green]Successfully updated feeds")

View file

@ -1,206 +0,0 @@
# Generated by Django 5.0.3 on 2024-03-15 01:27
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Domain',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('url', models.URLField(unique=True)),
('name', models.CharField(max_length=255)),
('categories', models.JSONField(blank=True, null=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('hidden', models.BooleanField(default=False)),
('hidden_at', models.DateTimeField(blank=True, null=True)),
('hidden_reason', models.TextField(blank=True)),
],
options={
'verbose_name': 'Domain',
'verbose_name_plural': 'Domains',
'ordering': ['name'],
},
),
migrations.CreateModel(
name='Author',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('name', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('email', models.TextField(blank=True)),
],
options={
'verbose_name': 'Author',
'verbose_name_plural': 'Authors',
'ordering': ['name'],
'unique_together': {('name', 'email', 'href')},
},
),
migrations.CreateModel(
name='Generator',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('name', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('version', models.TextField(blank=True)),
],
options={
'verbose_name': 'Feed generator',
'verbose_name_plural': 'Feed generators',
'ordering': ['name'],
'unique_together': {('name', 'version', 'href')},
},
),
migrations.CreateModel(
name='Links',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('rel', models.TextField(blank=True)),
('type', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('title', models.TextField(blank=True)),
],
options={
'verbose_name': 'Link',
'verbose_name_plural': 'Links',
'ordering': ['href'],
'unique_together': {('href', 'rel')},
},
),
migrations.CreateModel(
name='Publisher',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('name', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('email', models.TextField(blank=True)),
],
options={
'verbose_name': 'Publisher',
'verbose_name_plural': 'Publishers',
'ordering': ['name'],
'unique_together': {('name', 'email', 'href')},
},
),
migrations.CreateModel(
name='Feed',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('feed_url', models.URLField(unique=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('last_checked', models.DateTimeField(blank=True, null=True)),
('active', models.BooleanField(default=True)),
('bozo', models.BooleanField()),
('bozo_exception', models.TextField(blank=True)),
('encoding', models.TextField(blank=True)),
('etag', models.TextField(blank=True)),
('headers', models.JSONField(blank=True, null=True)),
('href', models.TextField(blank=True)),
('modified', models.DateTimeField(blank=True, null=True)),
('namespaces', models.JSONField(blank=True, null=True)),
('status', models.IntegerField()),
('version', models.CharField(blank=True, max_length=255)),
('author', models.TextField(blank=True)),
('cloud', models.JSONField(blank=True, null=True)),
('contributors', models.JSONField(blank=True, null=True)),
('docs', models.TextField(blank=True)),
('errorreportsto', models.TextField(blank=True)),
('generator', models.TextField(blank=True)),
('icon', models.TextField(blank=True)),
('_id', models.TextField(blank=True)),
('image', models.JSONField(blank=True, null=True)),
('info', models.TextField(blank=True)),
('info_detail', models.JSONField(blank=True, null=True)),
('language', models.TextField(blank=True)),
('license', models.TextField(blank=True)),
('link', models.TextField(blank=True)),
('links', models.JSONField(blank=True, null=True)),
('logo', models.TextField(blank=True)),
('published', models.TextField(blank=True)),
('published_parsed', models.DateTimeField(blank=True, null=True)),
('publisher', models.TextField(blank=True)),
('rights', models.TextField(blank=True)),
('rights_detail', models.JSONField(blank=True, null=True)),
('subtitle', models.TextField(blank=True)),
('subtitle_detail', models.JSONField(blank=True, null=True)),
('tags', models.JSONField(blank=True, null=True)),
('textinput', models.JSONField(blank=True, null=True)),
('title', models.TextField(blank=True)),
('title_detail', models.JSONField(blank=True, null=True)),
('ttl', models.TextField(blank=True)),
('updated', models.TextField(blank=True)),
('updated_parsed', models.DateTimeField(blank=True, null=True)),
('author_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feedvault.author')),
('domain', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='feedvault.domain')),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
('generator_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feedvault.generator')),
('publisher_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feedvault.publisher')),
],
options={
'verbose_name': 'Feed',
'verbose_name_plural': 'Feeds',
'ordering': ['-created_at'],
},
),
migrations.CreateModel(
name='Entry',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('author', models.TextField(blank=True)),
('comments', models.TextField(blank=True)),
('content', models.JSONField(blank=True, null=True)),
('contributors', models.JSONField(blank=True, null=True)),
('created', models.TextField(blank=True)),
('created_parsed', models.DateTimeField(blank=True, null=True)),
('enclosures', models.JSONField(blank=True, null=True)),
('expired', models.TextField(blank=True)),
('expired_parsed', models.DateTimeField(blank=True, null=True)),
('_id', models.TextField(blank=True)),
('license', models.TextField(blank=True)),
('link', models.TextField(blank=True)),
('links', models.JSONField(blank=True, null=True)),
('published', models.TextField(blank=True)),
('published_parsed', models.DateTimeField(blank=True, null=True)),
('publisher', models.TextField(blank=True)),
('source', models.JSONField(blank=True, null=True)),
('summary', models.TextField(blank=True)),
('summary_detail', models.JSONField(blank=True, null=True)),
('tags', models.JSONField(blank=True, null=True)),
('title', models.TextField(blank=True)),
('title_detail', models.JSONField(blank=True, null=True)),
('updated', models.TextField(blank=True)),
('updated_parsed', models.DateTimeField(blank=True, null=True)),
('author_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='entries', to='feedvault.author')),
('feed', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='feedvault.feed')),
('publisher_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='entries', to='feedvault.publisher')),
],
options={
'verbose_name': 'Entry',
'verbose_name_plural': 'Entries',
'ordering': ['-created_parsed'],
},
),
]

View file

@ -1,18 +0,0 @@
# Generated by Django 5.0.3 on 2024-03-15 13:21
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('feedvault', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='feed',
name='status',
field=models.IntegerField(null=True),
),
]

View file

@ -1,23 +0,0 @@
# Generated by Django 5.0.3 on 2024-03-15 16:42
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('feedvault', '0002_alter_feed_status'),
]
operations = [
migrations.RenameField(
model_name='entry',
old_name='_id',
new_name='entry_id',
),
migrations.RenameField(
model_name='feed',
old_name='_id',
new_name='feed_id',
),
]

View file

@ -1,18 +0,0 @@
# Generated by Django 5.0.3 on 2024-03-17 02:49
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('feedvault', '0003_rename__id_entry_entry_id_rename__id_feed_feed_id'),
]
operations = [
migrations.AlterField(
model_name='feed',
name='bozo',
field=models.BooleanField(default=False),
),
]

View file

@ -1,32 +0,0 @@
# Generated by Django 5.0.3 on 2024-03-17 03:19
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('feedvault', '0004_alter_feed_bozo'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='UserUploadedFile',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('file', models.FileField(upload_to='uploads/')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('has_been_processed', models.BooleanField(default=False)),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'Uploaded file',
'verbose_name_plural': 'Uploaded files',
'ordering': ['-created_at'],
},
),
]

View file

@ -1,25 +0,0 @@
# Generated by Django 5.0.3 on 2024-03-17 03:29
import feedvault.models
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('feedvault', '0005_useruploadedfile'),
]
operations = [
migrations.AddField(
model_name='useruploadedfile',
name='original_filename',
field=models.TextField(default='a', help_text='The original filename of the file.'),
preserve_default=False,
),
migrations.AlterField(
model_name='useruploadedfile',
name='file',
field=models.FileField(upload_to=feedvault.models.get_upload_path),
),
]

View file

@ -1,52 +0,0 @@
# Generated by Django 5.0.3 on 2024-03-17 16:00
import django.db.models.deletion
import feedvault.models
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('feedvault', '0006_useruploadedfile_original_filename_and_more'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name='useruploadedfile',
name='description',
field=models.TextField(blank=True, help_text='Description added by user.'),
),
migrations.AddField(
model_name='useruploadedfile',
name='notes',
field=models.TextField(blank=True, help_text='Notes from admin.'),
),
migrations.AlterField(
model_name='useruploadedfile',
name='created_at',
field=models.DateTimeField(auto_now_add=True, help_text='The time the file was uploaded.'),
),
migrations.AlterField(
model_name='useruploadedfile',
name='file',
field=models.FileField(help_text='The file that was uploaded.', upload_to=feedvault.models.get_upload_path),
),
migrations.AlterField(
model_name='useruploadedfile',
name='has_been_processed',
field=models.BooleanField(default=False, help_text='Has the file content been added to the archive?'),
),
migrations.AlterField(
model_name='useruploadedfile',
name='modified_at',
field=models.DateTimeField(auto_now=True, help_text='The last time the file was modified.'),
),
migrations.AlterField(
model_name='useruploadedfile',
name='user',
field=models.ForeignKey(blank=True, help_text='The user that uploaded the file.', null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL),
),
]

View file

@ -1,339 +0,0 @@
from __future__ import annotations
import logging
import typing
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
from django.db import models
from django.db.models import JSONField
logger: logging.Logger = logging.getLogger(__name__)
@dataclass
class FeedAddResult:
"""The result of adding a feed to the database."""
feed: Feed | None
created: bool
error: str | None
class Domain(models.Model):
"""A domain that has one or more feeds."""
url = models.URLField(unique=True)
name = models.CharField(max_length=255)
categories = models.JSONField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
hidden = models.BooleanField(default=False)
hidden_at = models.DateTimeField(null=True, blank=True)
hidden_reason = models.TextField(blank=True)
class Meta:
"""Meta information for the domain model."""
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Domain"
verbose_name_plural: str = "Domains"
def __str__(self) -> str:
"""Return string representation of the domain."""
if_hidden: Literal[" (hidden)", ""] = " (hidden)" if self.hidden else ""
return self.name + if_hidden
def get_absolute_url(self) -> str:
"""Return the absolute URL of the domain."""
return f"/domain/{self.pk}/"
class Author(models.Model):
"""An author of an entry."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
name = models.TextField(blank=True)
href = models.TextField(blank=True)
email = models.TextField(blank=True)
class Meta:
"""Meta information for the author model."""
unique_together: typing.ClassVar[list[str]] = ["name", "email", "href"]
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Author"
verbose_name_plural: str = "Authors"
def __str__(self) -> str:
"""Return string representation of the author."""
return f"{self.name} - {self.email} - {self.href}"
class Generator(models.Model):
"""What program or service generated the feed."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
name = models.TextField(blank=True)
href = models.TextField(blank=True)
version = models.TextField(blank=True)
class Meta:
"""Meta information for the generator model."""
unique_together: typing.ClassVar[list[str]] = ["name", "version", "href"]
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Feed generator"
verbose_name_plural: str = "Feed generators"
def __str__(self) -> str:
"""Return string representation of the generator."""
return self.name
class Links(models.Model):
"""A link to a feed or entry."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
rel = models.TextField(blank=True)
type = models.TextField(blank=True)
href = models.TextField(blank=True)
title = models.TextField(blank=True)
class Meta:
"""Meta information for the links model."""
unique_together: typing.ClassVar[list[str]] = ["href", "rel"]
ordering: typing.ClassVar[list[str]] = ["href"]
verbose_name: str = "Link"
verbose_name_plural: str = "Links"
def __str__(self) -> str:
"""Return string representation of the links."""
return self.href
class Publisher(models.Model):
"""The publisher of a feed or entry."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
name = models.TextField(blank=True)
href = models.TextField(blank=True)
email = models.TextField(blank=True)
class Meta:
"""Meta information for the publisher model."""
unique_together: typing.ClassVar[list[str]] = ["name", "email", "href"]
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Publisher"
verbose_name_plural: str = "Publishers"
def __str__(self) -> str:
"""Return string representation of the publisher."""
return self.name
class Feed(models.Model):
"""A RSS/Atom/JSON feed."""
feed_url = models.URLField(unique=True)
# The user that added the feed
user = models.ForeignKey("auth.User", on_delete=models.SET_NULL, null=True, blank=True)
domain = models.ForeignKey(Domain, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
last_checked = models.DateTimeField(null=True, blank=True)
active = models.BooleanField(default=True)
# General data
bozo = models.BooleanField(default=False)
bozo_exception = models.TextField(blank=True)
encoding = models.TextField(blank=True)
etag = models.TextField(blank=True)
headers = JSONField(null=True, blank=True)
href = models.TextField(blank=True)
modified = models.DateTimeField(null=True, blank=True)
namespaces = JSONField(null=True, blank=True)
status = models.IntegerField(null=True)
version = models.CharField(max_length=255, blank=True)
# Feed data
author = models.TextField(blank=True)
author_detail = models.ForeignKey(
Author,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="feeds",
)
cloud = JSONField(null=True, blank=True)
contributors = JSONField(null=True, blank=True)
docs = models.TextField(blank=True)
errorreportsto = models.TextField(blank=True)
generator = models.TextField(blank=True)
generator_detail = models.ForeignKey(
Generator,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="feeds",
)
icon = models.TextField(blank=True)
feed_id = models.TextField(blank=True)
image = JSONField(null=True, blank=True)
info = models.TextField(blank=True)
info_detail = JSONField(null=True, blank=True)
language = models.TextField(blank=True)
license = models.TextField(blank=True)
link = models.TextField(blank=True)
links = JSONField(null=True, blank=True)
logo = models.TextField(blank=True)
published = models.TextField(blank=True)
published_parsed = models.DateTimeField(null=True, blank=True)
publisher = models.TextField(blank=True)
publisher_detail = models.ForeignKey(
Publisher,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="feeds",
)
rights = models.TextField(blank=True)
rights_detail = JSONField(null=True, blank=True)
subtitle = models.TextField(blank=True)
subtitle_detail = JSONField(null=True, blank=True)
tags = JSONField(null=True, blank=True)
textinput = JSONField(null=True, blank=True)
title = models.TextField(blank=True)
title_detail = JSONField(null=True, blank=True)
ttl = models.TextField(blank=True)
updated = models.TextField(blank=True)
updated_parsed = models.DateTimeField(null=True, blank=True)
class Meta:
"""Meta information for the feed model."""
ordering: typing.ClassVar[list[str]] = ["-created_at"]
verbose_name: str = "Feed"
verbose_name_plural: str = "Feeds"
def __str__(self) -> str:
"""Return string representation of the feed."""
return f"{self.domain} - {self.title}"
def get_absolute_url(self) -> str:
"""Return the absolute URL of the feed."""
return f"/feed/{self.pk}/"
class Entry(models.Model):
"""Each feed has multiple entries."""
feed = models.ForeignKey(Feed, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
# Entry data
author = models.TextField(blank=True)
author_detail = models.ForeignKey(
Author,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="entries",
)
comments = models.TextField(blank=True)
content = JSONField(null=True, blank=True)
contributors = JSONField(null=True, blank=True)
created = models.TextField(blank=True)
created_parsed = models.DateTimeField(null=True, blank=True)
enclosures = JSONField(null=True, blank=True)
expired = models.TextField(blank=True)
expired_parsed = models.DateTimeField(null=True, blank=True)
entry_id = models.TextField(blank=True)
license = models.TextField(blank=True)
link = models.TextField(blank=True)
links = JSONField(null=True, blank=True)
published = models.TextField(blank=True)
published_parsed = models.DateTimeField(null=True, blank=True)
publisher = models.TextField(blank=True)
publisher_detail = models.ForeignKey(
Publisher,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="entries",
)
source = JSONField(null=True, blank=True)
summary = models.TextField(blank=True)
summary_detail = JSONField(null=True, blank=True)
tags = JSONField(null=True, blank=True)
title = models.TextField(blank=True)
title_detail = JSONField(null=True, blank=True)
updated = models.TextField(blank=True)
updated_parsed = models.DateTimeField(null=True, blank=True)
class Meta:
"""Meta information for the entry model."""
ordering: typing.ClassVar[list[str]] = ["-created_parsed"]
verbose_name: str = "Entry"
verbose_name_plural: str = "Entries"
def __str__(self) -> str:
"""Return string representation of the entry."""
return f"{self.feed.feed_url} - {self.title}"
def get_upload_path(instance: UserUploadedFile, filename: str) -> str:
"""Don't save the file with the original filename."""
ext: str = Path(filename).suffix
filename = f"{uuid.uuid4()}{ext}" # For example: 51dc07a7-a299-473c-a737-1ef16bc71609.opml
return f"uploads/{instance.user.id}/{filename}" # type: ignore # noqa: PGH003
class UserUploadedFile(models.Model):
"""A file uploaded to the server by a user."""
file = models.FileField(upload_to=get_upload_path, help_text="The file that was uploaded.")
original_filename = models.TextField(help_text="The original filename of the file.")
created_at = models.DateTimeField(auto_now_add=True, help_text="The time the file was uploaded.")
modified_at = models.DateTimeField(auto_now=True, help_text="The last time the file was modified.")
user = models.ForeignKey(
"auth.User",
on_delete=models.SET_NULL,
null=True,
blank=True,
help_text="The user that uploaded the file.",
)
has_been_processed = models.BooleanField(default=False, help_text="Has the file content been added to the archive?")
description = models.TextField(blank=True, help_text="Description added by user.")
notes = models.TextField(blank=True, help_text="Notes from admin.")
class Meta:
"""Meta information for the uploaded file model."""
ordering: typing.ClassVar[list[str]] = ["-created_at"]
verbose_name: str = "Uploaded file"
verbose_name_plural: str = "Uploaded files"
def __str__(self) -> str:
return f"{self.original_filename} - {self.created_at}"
def get_absolute_url(self) -> str:
"""Return the absolute URL of the uploaded file.
Note that you will need to be logged in to access the file.
"""
return f"/download/{self.pk}"

View file

@ -4,22 +4,29 @@ import os
import sys
from pathlib import Path
from django.utils import timezone
from dotenv import find_dotenv, load_dotenv
load_dotenv(dotenv_path=find_dotenv(), verbose=True)
# Is True when running tests, used for not spamming Discord when new users are created
TESTING: bool = len(sys.argv) > 1 and sys.argv[1] == "test"
DEBUG: bool = os.getenv(key="DEBUG", default="True").lower() == "true"
BASE_DIR: Path = Path(__file__).resolve().parent.parent
DEBUG: bool = os.getenv(key="DEBUG", default="True").lower() == "true"
SECRET_KEY: str = os.getenv("SECRET_KEY", default="")
ROOT_URLCONF = "feedvault.urls"
TIME_ZONE = "Europe/Stockholm"
LANGUAGE_CODE = "en-us"
USE_I18N = True
USE_TZ = True
ADMINS: list[tuple[str, str]] = [("Joakim Hellsén", "django@feedvault.se")]
ADMINS: list[tuple[str, str]] = [("Joakim Hellsén", "tlovinator@gmail.com")]
ALLOWED_HOSTS: list[str] = [".feedvault.se", ".localhost", "127.0.0.1"]
USE_X_FORWARDED_HOST = True
INTERNAL_IPS: list[str] = ["127.0.0.1", "localhost", "192.168.1.143"]
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
WSGI_APPLICATION = "feedvault.wsgi.application"
ROOT_URLCONF = "feedvault.urls"
SITE_ID = 1
if not DEBUG:
CSRF_COOKIE_DOMAIN = ".feedvault.se"
@ -38,16 +45,14 @@ EMAIL_TIMEOUT = 10
DEFAULT_FROM_EMAIL: str = os.getenv(key="EMAIL_HOST_USER", default="webmaster@localhost")
SERVER_EMAIL: str = os.getenv(key="EMAIL_HOST_USER", default="webmaster@localhost")
USE_X_FORWARDED_HOST = True
INTERNAL_IPS: list[str] = ["127.0.0.1", "localhost", "192.168.1.143"]
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
SITE_ID = 1
STATIC_URL = "static/"
STATIC_ROOT: Path = BASE_DIR / "staticfiles"
STATIC_ROOT.mkdir(parents=True, exist_ok=True)
STATICFILES_DIRS: list[Path] = [BASE_DIR / "static"]
for static_dir in STATICFILES_DIRS:
static_dir.mkdir(parents=True, exist_ok=True)
MEDIA_URL = "media/"
MEDIA_ROOT: Path = BASE_DIR / "media"
@ -55,7 +60,7 @@ MEDIA_ROOT.mkdir(parents=True, exist_ok=True)
INSTALLED_APPS: list[str] = [
"feedvault.apps.FeedVaultConfig",
"feeds.apps.FeedsConfig",
"debug_toolbar",
"django.contrib.auth",
"whitenoise.runserver_nostatic",
@ -80,38 +85,38 @@ MIDDLEWARE: list[str] = [
"django_htmx.middleware.HtmxMiddleware",
]
DATABASE_PATH: str = os.getenv("DATABASE_PATH", "/data")
DATABASES = {
# TODO(TheLovinator): #1 Use unix socket for postgres in production
# https://github.com/TheLovinator1/feedvault.se/issues/1
DATABASES: dict[str, dict[str, str]] = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": Path(DATABASE_PATH) / "feedvault.sqlite3",
"ATOMIC_REQUESTS": True,
"OPTIONS": {
"timeout": 30,
"transaction_mode": "IMMEDIATE",
"init_command": "PRAGMA journal_mode=WAL;",
},
"ENGINE": "django.db.backends.postgresql",
"NAME": os.getenv("DB_NAME", ""),
"USER": os.getenv("DB_USER", ""),
"PASSWORD": os.getenv("DB_PASSWORD", ""),
"HOST": os.getenv("DB_HOST", ""),
"PORT": os.getenv("DB_PORT", ""),
},
}
if not DEBUG:
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS: str = "default"
# Password validation
# https://docs.djangoproject.com/en/5.0/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS: list[dict[str, str]] = [
{
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
# TODO(TheLovinator): #2 Use unix socket for redis in production
# https://github.com/TheLovinator1/feedvault.se/issues/2
REDIS_LOCATION: str = f"redis://{os.getenv('REDIS_HOST', "")}:{os.getenv('REDIS_PORT', "")}/1"
CACHES: dict[str, dict[str, str | dict[str, str]]] = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": REDIS_LOCATION,
"KEY_PREFIX": "feedvault-dev" if DEBUG else "feedvault",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"PARSER_CLASS": "redis.connection._HiredisParser",
"PASSWORD": os.getenv("REDIS_PASSWORD", ""),
},
},
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
},
{
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
},
]
}
# A list containing the settings for all template engines to be used with Django.
TEMPLATES = [
@ -139,11 +144,6 @@ TEMPLATES = [
},
]
# Create data/logs folder if it doesn't exist
log_folder: Path = BASE_DIR / "data" / "logs"
log_folder.mkdir(parents=True, exist_ok=True)
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
@ -152,15 +152,10 @@ LOGGING = {
"level": "DEBUG",
"class": "logging.StreamHandler",
},
"file": {
"level": "DEBUG",
"class": "logging.FileHandler",
"filename": BASE_DIR / "data" / "logs" / f"{timezone.now().strftime('%Y%m%d')}.log",
},
},
"loggers": {
"django": {
"handlers": ["console", "file"],
"handlers": ["console"],
"level": "INFO",
"propagate": True,
},
@ -170,7 +165,7 @@ LOGGING = {
"propagate": True,
},
"": {
"handlers": ["console", "file"],
"handlers": ["console"],
"level": "DEBUG",
"propagate": True,
},
@ -187,3 +182,18 @@ STORAGES: dict[str, dict[str, str]] = {
else "whitenoise.storage.CompressedManifestStaticFilesStorage",
},
}
AUTH_PASSWORD_VALIDATORS: list[dict[str, str]] = [
{
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
},
{
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
},
]

View file

@ -1,39 +0,0 @@
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
from discord_webhook import DiscordWebhook
from django.conf import settings
from django.contrib.auth.models import User
from django.db.models.signals import post_save
from django.dispatch import receiver
if TYPE_CHECKING:
from requests import Response
logger: logging.Logger = logging.getLogger(__name__)
@receiver(post_save, sender=User)
def notify_when_new_user(sender: User, instance: User, *, created: bool, **kwargs) -> None: # noqa: ANN003
"""Send a Discord notification when a new user is created.
Args:
sender: The User model.
instance: The instance of the sender.
created: A boolean indicating if the instance was created.
**kwargs: Arbitrary keyword arguments.
"""
if created:
webhook_url: str | None = os.getenv("DISCORD_WEBHOOK_URL")
if not webhook_url:
logger.error("Discord webhook URL not found.")
return
msg: str = f"New user registered on FeedVault 👀: {instance.username}"
webhook = DiscordWebhook(url=webhook_url, content=msg)
if not settings.TESTING:
response: Response = webhook.execute()
logger.info("Discord notification sent: (%s) %s", response.status_code, response.text)

View file

@ -1,284 +0,0 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from django.contrib.auth.models import User
from django.http.response import HttpResponse
from django.test import Client, TestCase
from django.urls import reverse
from feedvault.models import Domain, Entry, Feed, UserUploadedFile
if TYPE_CHECKING:
from django.http import HttpResponse
class TestIndexPage(TestCase):
def test_index_page(self) -> None:
"""Test if the index page is accessible."""
response: HttpResponse = self.client.get(reverse("index"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
response: HttpResponse = self.client.get("/")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
class TestFeedPage(TestCase):
def setUp(self) -> None:
"""Create a test feed."""
self.domain: Domain = Domain.objects.create(
name="feedvault",
url="feedvault.se",
)
self.user: User = User.objects.create_user(
username="testuser",
email="hello@feedvault.se",
password="testpassword", # noqa: S106
)
self.feed: Feed = Feed.objects.create(
user=self.user,
bozo=False,
feed_url="https://feedvault.se/feed.xml",
domain=self.domain,
)
def test_feed_page(self) -> None:
"""Test if the feed page is accessible."""
feed_id = self.feed.pk
response: HttpResponse = self.client.get(reverse("feed", kwargs={"feed_id": feed_id}))
assert response.status_code == 200, f"Expected 200, got {response.status_code}. {response.content}"
def test_feed_page_not_found(self) -> None:
"""Test if the feed page is accessible."""
feed_id = self.feed.pk + 1
response: HttpResponse = self.client.get(reverse("feed", kwargs={"feed_id": feed_id}))
assert response.status_code == 404, f"Expected 404, got {response.status_code}. {response.content}"
class TestFeedsPage(TestCase):
def test_feeds_page(self) -> None:
"""Test if the feeds page is accessible."""
response: HttpResponse = self.client.get(reverse("feeds"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
class TestAddPage(TestCase):
def setUp(self) -> None:
"""Create a test user."""
self.user: User = User.objects.create_user(
username="testuser",
email="hello@feedvault.se",
password="testpassword", # noqa: S106
)
self.client.force_login(user=self.user)
def test_add_page(self) -> None:
"""Test if the add page is accessible."""
response: HttpResponse = self.client.post(reverse("add"), {"urls": "https://feedvault.se/feed.xml"})
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
class TestUploadPage(TestCase):
def setUp(self) -> None:
"""Create a test user."""
self.user: User = User.objects.create_user(
username="testuser",
email="hello@feedvault.se",
password="testpassword", # noqa: S106
)
self.client.force_login(user=self.user)
def test_upload_page(self) -> None:
"""Test if the upload page is accessible."""
# Check the amounts of files in the database
assert UserUploadedFile.objects.count() == 0, f"Expected 0, got {UserUploadedFile.objects.count()}"
# Open this file and upload it
current_file = __file__
with Path(current_file).open("rb") as file:
response: HttpResponse = self.client.post(reverse("upload"), {"file": file})
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.content}"
# Check if the file is in the database
assert UserUploadedFile.objects.count() == 1, f"Expected 1, got {UserUploadedFile.objects.count()}"
class TestRobotsPage(TestCase):
def test_robots_page(self) -> None:
"""Test if the robots page is accessible."""
response: HttpResponse = self.client.get(reverse("robots"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
def test_robots_page_content(self) -> None:
"""Test if the robots page contains the expected content."""
response: HttpResponse = self.client.get(reverse("robots"))
assert (
response.content
== b"User-agent: *\nDisallow: /add\nDisallow: /upload\nDisallow: /accounts/\n\nSitemap: https://feedvault.se/sitemap.xml"
), f"Expected b'User-agent: *\nDisallow: /add\nDisallow: /upload\nDisallow: /accounts/\n\nSitemap: https://feedvault.se/sitemap.xml', got {response.content}" # noqa: E501
class TestDomains(TestCase):
def test_domains_page(self) -> None:
"""Test if the domains page is accessible."""
response: HttpResponse = self.client.get(reverse("domains"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
class TestAPI(TestCase):
def test_api_page(self) -> None:
"""Test if the API page is accessible."""
response: HttpResponse = self.client.get(reverse("api_v1:openapi-view"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
class TestAPIFeeds(TestCase):
def test_api_feeds_page(self) -> None:
"""Test if the API feeds page is accessible."""
response: HttpResponse = self.client.get(reverse("api_v1:list_feeds"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
class FeedVaultAPITests(TestCase):
def setUp(self) -> None:
# Set up data for the whole TestCase
self.client = Client()
# Creating a domain instance
self.domain: Domain = Domain.objects.create(name="Example Domain")
# Creating a feed instance
self.feed: Feed = Feed.objects.create(title="Example Feed", domain=self.domain, bozo=False)
# Creating entry instances
self.entry1: Entry = Entry.objects.create(title="Example Entry 1", feed=self.feed)
self.entry2: Entry = Entry.objects.create(title="Example Entry 2", feed=self.feed)
def test_list_feeds(self) -> None:
response: HttpResponse = self.client.get("/api/v1/feeds/")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "Example Feed" in response.content.decode()
def test_get_feed(self) -> None:
response: HttpResponse = self.client.get(f"/api/v1/feeds/{self.feed.pk}/")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "Example Feed" in response.content.decode()
def test_list_entries(self) -> None:
response: HttpResponse = self.client.get(f"/api/v1/feeds/{self.feed.pk}/entries/")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "Example Entry 1" in response.content.decode()
assert "Example Entry 2" in response.content.decode()
def test_get_entry(self) -> None:
response: HttpResponse = self.client.get(f"/api/v1/entries/{self.entry1.pk}/")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "Example Entry 1" in response.content.decode()
def test_list_domains(self) -> None:
response: HttpResponse = self.client.get("/api/v1/domains/")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "Example Domain" in response.content.decode()
def test_get_domain(self) -> None:
response: HttpResponse = self.client.get(f"/api/v1/domains/{self.domain.pk}/")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "Example Domain" in response.content.decode()
class TestAccount(TestCase):
def test_login_page(self) -> None:
"""Test if the login page is accessible."""
response: HttpResponse = self.client.get(reverse("login"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
def test_register_page(self) -> None:
"""Test if the register page is accessible."""
response: HttpResponse = self.client.get(reverse("register"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
class TestLogoutPage(TestCase):
def setUp(self) -> None:
"""Create a test user."""
self.user: User = User.objects.create_user(
username="testuser",
email="hello@feedvault.se",
password="testpassword", # noqa: S106
)
self.client.force_login(user=self.user)
def test_logout_page(self) -> None:
"""Test if the logout page is accessible."""
response: HttpResponse = self.client.post(reverse("logout"))
assert response.status_code == 302, f"Expected 300, got {response.status_code}"
# Check if the user is logged out
response: HttpResponse = self.client.get(reverse("index"))
assert response.status_code == 200
assert "testuser" not in response.content.decode(
"utf-8",
), f"Expected 'testuser' not in response, got {response.content}"
class TestSitemap(TestCase):
def test_sitemap(self) -> None:
"""Test if the sitemap is accessible."""
response: HttpResponse = self.client.get(reverse("django.contrib.sitemaps.views.sitemap"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "urlset" in response.content.decode(), f"Expected 'urlset' in response, got {response.content}"
response2 = self.client.get("/sitemap.xml")
assert response2.status_code == 200, f"Expected 200, got {response2.status_code}"
assert "urlset" in response2.content.decode(), f"Expected 'urlset' in response, got {response2.content}"
class TestSearch(TestCase):
def setUp(self) -> None:
"""Create a test feed."""
self.domain: Domain = Domain.objects.create(
name="feedvault",
url="feedvault.se",
)
self.user: User = User.objects.create_user(
username="testuser",
email="hello@feedvault.se",
password="testpassword", # noqa: S106
)
self.feed: Feed = Feed.objects.create(
user=self.user,
bozo=False,
feed_url="https://feedvault.se/feed.xml",
domain=self.domain,
)
def test_search_page(self) -> None:
"""Test if the search page is accessible."""
response: HttpResponse = self.client.get(reverse("search"))
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
def test_search_page_search(self) -> None:
"""Search for a term that doesn't exist."""
response: HttpResponse = self.client.get(reverse("search"), {"q": "test"})
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert (
"No results found" in response.content.decode()
), f"Expected 'No results found' in response, got {response.content}"
def test_search_page_search_found(self) -> None:
"""Search for a term that exists."""
response: HttpResponse = self.client.get(reverse("search"), {"q": "feedvault"})
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "feedvault" in response.content.decode(), f"Expected 'feedvault' in response, got {response.content}"
def test_search_page_search_empty(self) -> None:
"""Search for an empty term. This should redirect to the feeds page."""
response: HttpResponse = self.client.get(reverse("search"), {"q": ""})
assert response.status_code == 200, f"Expected 302, got {response.status_code}"
assert (
"Latest Feeds" in response.content.decode()
), f"Expected 'Latest Feeds' in response, got {response.content}"

View file

@ -1,34 +1,24 @@
from __future__ import annotations
from django.contrib.sitemaps import GenericSitemap
from django.contrib.sitemaps import GenericSitemap, Sitemap
from django.contrib.sitemaps.views import sitemap
from django.urls import include, path
from django.views.decorators.cache import cache_page
from feeds.models import Feed
from feedvault import views
from feedvault.api import api_v1
from feedvault.models import Domain, Feed
from feedvault.sitemaps import StaticViewSitemap
from feedvault.views import CustomLoginView, CustomLogoutView, ProfileView, RegisterView
app_name: str = "feedvault"
sitemaps = {
sitemaps: dict[str, type[Sitemap] | Sitemap] = {
"static": StaticViewSitemap,
"feeds": GenericSitemap({"queryset": Feed.objects.all(), "date_field": "created_at"}),
"domains": GenericSitemap({"queryset": Domain.objects.all(), "date_field": "created_at"}),
}
urlpatterns: list = [
path(route="", view=views.IndexView.as_view(), name="index"),
path(route="", view=include("feeds.urls")),
path("__debug__/", include("debug_toolbar.urls")),
path(route="feed/<int:feed_id>/", view=views.FeedView.as_view(), name="feed"),
path(route="feeds/", view=views.FeedsView.as_view(), name="feeds"),
path(route="add/", view=views.AddView.as_view(), name="add"),
path(route="upload/", view=views.UploadView.as_view(), name="upload"),
path(route="download/", view=views.DownloadView.as_view(), name="download"),
path(route="delete_upload/", view=views.DeleteUploadView.as_view(), name="delete_upload"),
path(route="edit_description/", view=views.EditDescriptionView.as_view(), name="edit_description"),
path(route="robots.txt", view=cache_page(timeout=60 * 60 * 365)(views.RobotsView.as_view()), name="robots"),
path(
"sitemap.xml",
@ -36,13 +26,4 @@ urlpatterns: list = [
{"sitemaps": sitemaps},
name="django.contrib.sitemaps.views.sitemap",
),
path(route="search/", view=views.SearchView.as_view(), name="search"),
path(route="domains/", view=views.DomainsView.as_view(), name="domains"),
path(route="domain/<int:domain_id>/", view=views.DomainView.as_view(), name="domain"),
path("api/v1/", api_v1.urls), # type: ignore # noqa: PGH003
path(route="accounts/login/", view=CustomLoginView.as_view(), name="login"),
path(route="accounts/register/", view=RegisterView.as_view(), name="register"),
path(route="accounts/logout/", view=CustomLogoutView.as_view(), name="logout"),
# path(route="accounts/change-password/", view=CustomPasswordChangeView.as_view(), name="change_password"),
path(route="accounts/profile/", view=ProfileView.as_view(), name="profile"),
]

View file

@ -1,478 +1,13 @@
from __future__ import annotations
import logging
from mimetypes import guess_type
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import login
from django.contrib.auth.forms import AuthenticationForm, UserCreationForm
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.views import LoginView, LogoutView, PasswordChangeView
from django.contrib.messages.views import SuccessMessageMixin
from django.core.exceptions import SuspiciousOperation
from django.core.paginator import EmptyPage, Page, Paginator
from django.db.models.manager import BaseManager
from django.http import FileResponse, Http404, HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, render
from django.template import loader
from django.urls import reverse_lazy
from django.http import HttpRequest, HttpResponse
from django.views import View
from django.views.generic.edit import CreateView
from feedvault.feeds import add_url
from feedvault.models import Domain, Entry, Feed, FeedAddResult, UserUploadedFile
if TYPE_CHECKING:
from django.contrib.auth.models import User
from django.core.files.uploadedfile import UploadedFile
from django.db.models.manager import BaseManager
logger: logging.Logger = logging.getLogger(__name__)
class HtmxHttpRequest(HttpRequest):
htmx: Any
class IndexView(View):
"""Index path."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the index page."""
template = loader.get_template(template_name="index.html")
context: dict[str, str] = {
"description": "FeedVault allows users to archive and search their favorite web feeds.",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/",
"title": "FeedVault",
}
return HttpResponse(content=template.render(context=context, request=request))
class FeedView(View):
"""A single feed."""
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: # noqa: ANN002, ANN003, ARG002
"""Load the feed page."""
feed_id = kwargs.get("feed_id", None)
if not feed_id:
return HttpResponse(content="No id", status=400)
feed: Feed = get_object_or_404(Feed, id=feed_id)
entries: BaseManager[Entry] = Entry.objects.filter(feed=feed).order_by("-created_parsed")[:100]
context = {
"feed": feed,
"entries": entries,
"description": f"Archive of {feed.href}",
"keywords": "feed, rss, atom, archive, rss list",
"author": f"{feed.author_detail.name if feed.author_detail else "FeedVault"}",
"canonical": f"https://feedvault.se/feed/{feed_id}/",
"title": f"{feed.title} - FeedVault",
}
return render(request, "feed.html", context)
class FeedsView(View):
"""All feeds."""
def get(self, request: HtmxHttpRequest) -> HttpResponse:
"""All feeds."""
feeds: BaseManager[Feed] = Feed.objects.only("id", "feed_url")
paginator = Paginator(object_list=feeds, per_page=100)
page_number = int(request.GET.get("page", default=1))
try:
pages: Page = paginator.get_page(page_number)
except EmptyPage:
return HttpResponse("")
context: dict[str, str | Page | int] = {
"feeds": pages,
"description": "An archive of web feeds",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/feeds/",
"title": "Feeds",
"page": page_number,
}
template_name = "partials/feeds.html" if request.htmx else "feeds.html"
return render(request, template_name, context)
class AddView(LoginRequiredMixin, View):
"""Add a feed."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the index page."""
template = loader.get_template(template_name="index.html")
context: dict[str, str] = {
"description": "FeedVault allows users to archive and search their favorite web feeds.",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/",
}
return HttpResponse(content=template.render(context=context, request=request))
def post(self, request: HttpRequest) -> HttpResponse:
"""Add a feed."""
if not request.user.is_authenticated:
return HttpResponse(content="Not logged in", status=401)
if not request.user.is_active:
return HttpResponse(content="User is not active", status=403)
urls: str | None = request.POST.get("urls", None)
if not urls:
return HttpResponse(content="No urls", status=400)
# Split the urls by newline.
for url in urls.split("\n"):
feed_result: FeedAddResult = add_url(url, request.user)
feed: Feed | None = feed_result.feed
if not feed_result or not feed:
messages.error(request, f"{url} - Failed to add, {feed_result.error}")
continue
if feed_result.created:
messages.success(request, f"{feed.feed_url} added to queue")
else:
messages.warning(request, f"{feed.feed_url} already exists")
# Render the index page.
template = loader.get_template(template_name="index.html")
return HttpResponse(content=template.render(context={}, request=request))
class UploadView(LoginRequiredMixin, View):
"""Upload a file."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the index page."""
template = loader.get_template(template_name="index.html")
context: dict[str, str] = {
"description": "FeedVault allows users to archive and search their favorite web feeds.",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/",
}
return HttpResponse(content=template.render(context=context, request=request))
def post(self, request: HttpRequest) -> HttpResponse:
"""Upload a file."""
if not request.user.is_authenticated:
return HttpResponse(content="Not logged in", status=401)
if not request.user.is_active:
return HttpResponse(content="User is not active", status=403)
file: UploadedFile | None = request.FILES.get("file", None)
if not file:
return HttpResponse(content="No file", status=400)
# Save file to media folder
UserUploadedFile.objects.create(user=request.user, file=file, original_filename=file.name)
# Render the index page.
template = loader.get_template(template_name="index.html")
messages.success(request, f"{file.name} uploaded")
messages.info(
request,
"You can find your uploads on your profile page. Files will be parsed and added to the archive when possible. Thanks.", # noqa: E501
)
return HttpResponse(content=template.render(context={}, request=request))
class DeleteUploadView(LoginRequiredMixin, View):
"""Delete an uploaded file."""
def post(self, request: HttpRequest) -> HttpResponse:
"""Delete an uploaded file."""
file_id: str | None = request.POST.get("file_id", None)
if not file_id:
return HttpResponse("No file_id provided", status=400)
user_file: UserUploadedFile | None = UserUploadedFile.objects.filter(user=request.user, id=file_id).first()
if not user_file:
msg = "File not found"
raise Http404(msg)
user_upload_dir: Path = Path(settings.MEDIA_ROOT) / "uploads" / f"{request.user.id}" # type: ignore # noqa: PGH003
file_path: Path = user_upload_dir / Path(user_file.file.name).name
logger.debug("file_path: %s", file_path)
if not file_path.exists() or not file_path.is_file():
logger.error("User '%s' attempted to delete a file that does not exist: %s", request.user, file_path)
msg = "File not found"
raise Http404(msg)
if user_upload_dir not in file_path.parents:
logger.error(
"User '%s' attempted to delete a file that is not in their upload directory: %s",
request.user,
file_path,
)
msg = "Attempted unauthorized file access"
raise SuspiciousOperation(msg)
user_file.delete()
# Go back to the profile page
messages.success(request, f"{file_path.name} deleted")
return HttpResponse(status=204)
class EditDescriptionView(LoginRequiredMixin, View):
"""Edit the description of an uploaded file."""
def post(self, request: HttpRequest) -> HttpResponse:
"""Edit the description of an uploaded file."""
new_description: str | None = request.POST.get("description", None)
file_id: str | None = request.POST.get("file_id", None)
if not new_description:
return HttpResponse("No description provided", status=400)
if not file_id:
return HttpResponse("No file_id provided", status=400)
user_file: UserUploadedFile | None = UserUploadedFile.objects.filter(user=request.user, id=file_id).first()
if not user_file:
msg = "File not found"
raise Http404(msg)
user_upload_dir: Path = Path(settings.MEDIA_ROOT) / "uploads" / f"{request.user.id}" # type: ignore # noqa: PGH003
file_path: Path = user_upload_dir / Path(user_file.file.name).name
logger.debug("file_path: %s", file_path)
if not file_path.exists() or not file_path.is_file():
logger.error("User '%s' attempted to delete a file that does not exist: %s", request.user, file_path)
msg = "File not found"
raise Http404(msg)
if user_upload_dir not in file_path.parents:
logger.error(
"User '%s' attempted to delete a file that is not in their upload directory: %s",
request.user,
file_path,
)
msg = "Attempted unauthorized file access"
raise SuspiciousOperation(msg)
old_description: str = user_file.description
user_file.description = new_description
user_file.save()
logger.info(
"User '%s' updated the description of file '%s' from '%s' to '%s'",
request.user,
file_path,
old_description,
new_description,
)
return HttpResponse(content=new_description, status=200)
class DownloadView(LoginRequiredMixin, View):
"""Download a file."""
def get(self, request: HttpRequest) -> HttpResponse | FileResponse:
"""/download/?file_id=1."""
file_id: str | None = request.GET.get("file_id", None)
if not file_id:
return HttpResponse("No file_id provided", status=400)
user_file: UserUploadedFile | None = UserUploadedFile.objects.filter(user=request.user, id=file_id).first()
if not user_file:
msg = "File not found"
raise Http404(msg)
user_upload_dir: Path = Path(settings.MEDIA_ROOT) / "uploads" / f"{request.user.id}" # type: ignore # noqa: PGH003
file_path: Path = user_upload_dir / Path(user_file.file.name).name
if not file_path.exists() or not file_path.is_file():
msg = "File not found"
raise Http404(msg)
if user_upload_dir not in file_path.parents:
msg = "Attempted unauthorized file access"
raise SuspiciousOperation(msg)
content_type, _ = guess_type(file_path)
response = FileResponse(file_path.open("rb"), content_type=content_type or "application/octet-stream")
response["Content-Disposition"] = f'attachment; filename="{user_file.original_filename or file_path.name}"'
return response
class CustomLoginView(LoginView):
"""Custom login view."""
template_name = "accounts/login.html"
next_page = reverse_lazy("index")
def form_valid(self, form: AuthenticationForm) -> HttpResponse:
"""Check if the form is valid."""
user: User = form.get_user()
login(self.request, user)
return super().form_valid(form)
class RegisterView(CreateView):
"""Register view."""
template_name = "accounts/register.html"
form_class = UserCreationForm
success_url: str = reverse_lazy("login")
extra_context: ClassVar[dict[str, str]] = {
"title": "Register",
"description": "Register a new account",
"keywords": "register, account",
"author": "TheLovinator",
"canonical": "https://feedvault.se/accounts/register/",
}
class CustomLogoutView(LogoutView):
"""Logout view."""
next_page = reverse_lazy("login")
extra_context: ClassVar[dict[str, str]] = {
"title": "Logout",
"description": "Logout of your account",
"keywords": "logout, account",
"author": "TheLovinator",
"canonical": "https://feedvault.se/accounts/logout/",
}
class CustomPasswordChangeView(SuccessMessageMixin, PasswordChangeView):
"""Custom password change view."""
template_name = "accounts/change_password.html"
success_url = reverse_lazy("index")
success_message = "Your password was successfully updated!"
extra_context: ClassVar[dict[str, str]] = {
"title": "Change password",
"description": "Change your password",
"keywords": "change, password, account",
"author": "TheLovinator",
"canonical": "https://feedvault.se/accounts/change-password/",
}
class ProfileView(LoginRequiredMixin, View):
"""Profile page."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the profile page."""
template = loader.get_template(template_name="accounts/profile.html")
# TODO(TheLovinator): Use htmx to load the feeds and uploads # noqa: TD003
user_feeds: BaseManager[Feed] = Feed.objects.filter(user=request.user).order_by("-created_at")[:100]
user_uploads: BaseManager[UserUploadedFile] = UserUploadedFile.objects.filter(user=request.user).order_by(
"-created_at",
)[:100]
context: dict[str, str | Any] = {
"description": f"Profile page for {request.user.get_username()}",
"keywords": f"profile, account, {request.user.get_username()}",
"author": f"{request.user.get_username()}",
"canonical": "https://feedvault.se/accounts/profile/",
"title": f"{request.user.get_username()}",
"user_feeds": user_feeds,
"user_uploads": user_uploads,
}
return HttpResponse(content=template.render(context=context, request=request))
class RobotsView(View):
"""Robots.txt view."""
def get(self, request: HttpRequest) -> HttpResponse: # noqa: ARG002
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the robots.txt file."""
return HttpResponse(
content="User-agent: *\nDisallow: /add\nDisallow: /upload\nDisallow: /accounts/\n\nSitemap: https://feedvault.se/sitemap.xml",
content_type="text/plain",
)
class DomainsView(View):
"""All domains."""
def get(self: DomainsView, request: HtmxHttpRequest) -> HttpResponse:
"""Load the domains page."""
domains: BaseManager[Domain] = Domain.objects.only("id", "url", "created_at")
paginator = Paginator(object_list=domains, per_page=100)
page_number = int(request.GET.get("page", default=1))
try:
pages: Page = paginator.get_page(page_number)
except EmptyPage:
return HttpResponse("")
context: dict[str, str | Page | int] = {
"domains": pages,
"description": "Domains",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/domains/",
"title": "Domains",
"page": page_number,
}
template_name = "partials/domains.html" if request.htmx else "domains.html"
return render(request, template_name, context)
class DomainView(View):
"""A single domain."""
def get(self: DomainView, request: HttpRequest, domain_id: int) -> HttpResponse:
"""Load the domain page."""
domain: Domain = get_object_or_404(Domain, id=domain_id)
feeds: BaseManager[Feed] = Feed.objects.filter(domain=domain).order_by("-created_at")[:100]
context = {
"domain": domain,
"feeds": feeds,
"description": f"Archive of {domain.name}",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": f"https://feedvault.se/domain/{domain_id}/",
"title": f"{domain.name} - FeedVault",
}
return render(request, "domain.html", context)
class SearchView(View):
"""Search view."""
def get(self, request: HtmxHttpRequest) -> HttpResponse:
"""Load the search page."""
query: str | None = request.GET.get("q", None)
if not query:
return FeedsView().get(request)
# TODO(TheLovinator): #20 Search more fields
# https://github.com/TheLovinator1/FeedVault/issues/20
feeds: BaseManager[Feed] = Feed.objects.filter(feed_url__icontains=query).order_by("-created_at")[:100]
context = {
"feeds": feeds,
"description": f"Search results for {query}",
"keywords": f"feed, rss, atom, archive, rss list, {query}",
"author": "TheLovinator",
"canonical": f"https://feedvault.se/search/?q={query}",
"title": f"Search results for {query}",
"query": query,
}
return render(request, "search.html", context)