Merge app and project, use SQLite instead and

This commit is contained in:
Joakim Hellsén 2024-03-15 13:35:00 +01:00
commit 4c16d14e61
No known key found for this signature in database
GPG key ID: D196AE66FEBE1DC9
29 changed files with 221 additions and 454 deletions

View file

View file

@ -1,312 +0,0 @@
from __future__ import annotations
import datetime
import logging
from time import mktime, struct_time
from typing import TYPE_CHECKING
from urllib.parse import ParseResult, urlparse
import feedparser
from django.utils import timezone
from feedparser import FeedParserDict
from feeds.models import Author, Domain, Entry, Feed, Generator, Publisher
if TYPE_CHECKING:
from django.contrib.auth.models import AbstractBaseUser, AnonymousUser
logger: logging.Logger = logging.getLogger(__name__)
def get_domain(url: str | None) -> None | str:
"""Get the domain of a URL."""
if not url:
return None
# Parse the URL.
parsed_url: ParseResult = urlparse(url)
if not parsed_url:
logger.error("Error parsing URL: %s", url)
return None
# Get the domain.
return str(parsed_url.netloc)
def get_author(parsed_feed: dict) -> Author:
"""Get the author of a feed.
Args:
parsed_feed: The parsed feed.
Returns:
The author of the feed. If the author doesn't exist, it will be created.
"""
# A dictionary with details about the author of this entry.
author_detail: dict = parsed_feed.get("author_detail", {})
author = Author(
name=author_detail.get("name", ""),
href=author_detail.get("href", ""),
email=author_detail.get("email", ""),
)
# Create the author if it doesn't exist.
try:
author: Author = Author.objects.get(name=author.name, email=author.email, href=author.href)
except Author.DoesNotExist:
author.save()
logger.info("Created author: %s", author)
return author
def def_generator(parsed_feed: dict) -> Generator:
"""Get the generator of a feed.
Args:
parsed_feed: The parsed feed.
Returns:
The generator of the feed. If the generator doesn't exist, it will be created.
"""
generator_detail: dict = parsed_feed.get("generator_detail", {})
generator = Generator(
name=generator_detail.get("name", ""),
href=generator_detail.get("href", ""),
version=generator_detail.get("version", ""),
)
# Create the generator if it doesn't exist.
try:
generator: Generator = Generator.objects.get(
name=generator.name,
href=generator.href,
version=generator.version,
)
except Generator.DoesNotExist:
generator.save()
logger.info("Created generator: %s", generator)
return generator
def get_publisher(parsed_feed: dict) -> Publisher:
"""Get the publisher of a feed.
Args:
parsed_feed: The parsed feed.
Returns:
The publisher of the feed. If the publisher doesn't exist, it will be created.
"""
publisher_detail: dict = parsed_feed.get("publisher_detail", {})
publisher = Publisher(
name=publisher_detail.get("name", ""),
href=publisher_detail.get("href", ""),
email=publisher_detail.get("email", ""),
)
# Create the publisher if it doesn't exist.
try:
publisher: Publisher = Publisher.objects.get(
name=publisher.name,
href=publisher.href,
email=publisher.email,
)
except Publisher.DoesNotExist:
publisher.save()
logger.info("Created publisher: %s", publisher)
return publisher
def parse_feed(url: str | None) -> dict | None:
"""Parse a feed.
Args:
url: The URL of the feed.
Returns:
The parsed feed.
"""
# TODO(TheLovinator): Backup the feed URL to a cloudflare worker. # noqa: TD003
if not url:
return None
# Parse the feed.
parsed_feed: dict = feedparser.parse(url)
if not parsed_feed:
return None
return parsed_feed
def struct_time_to_datetime(struct_time: struct_time | None) -> datetime.datetime | None:
"""Convert a struct_time to a datetime."""
if not struct_time:
return None
if struct_time == "Mon, 01 Jan 0001 00:00:00 +0000":
return None
dt: datetime.datetime = datetime.datetime.fromtimestamp(mktime(struct_time), tz=datetime.timezone.utc)
if not dt:
logger.error("Error converting struct_time to datetime: %s", struct_time)
return None
return dt
def add_entry(feed: Feed, entry: FeedParserDict) -> Entry | None:
"""Add an entry to the database.
Args:
entry: The entry to add.
feed: The feed the entry belongs to.
"""
author: Author = get_author(parsed_feed=entry)
publisher: Publisher = get_publisher(parsed_feed=entry)
updated_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("updated_parsed")) # type: ignore # noqa: PGH003
published_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("published_parsed")) # type: ignore # noqa: PGH003
expired_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("expired_parsed")) # type: ignore # noqa: PGH003
created_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("created_parsed")) # type: ignore # noqa: PGH003
_entry = Entry(
feed=feed,
author=entry.get("author", ""),
author_detail=author,
comments=entry.get("comments", ""),
content=entry.get("content", {}),
contributors=entry.get("contributors", {}),
created=entry.get("created", ""),
created_parsed=created_parsed,
enclosures=entry.get("enclosures", []),
expired=entry.get("expired", ""),
expired_parsed=expired_parsed,
_id=entry.get("id", ""),
license=entry.get("license", ""),
link=entry.get("link", ""),
links=entry.get("links", []),
published=entry.get("published", ""),
published_parsed=published_parsed,
publisher=entry.get("publisher", ""),
publisher_detail=publisher,
source=entry.get("source", {}),
summary=entry.get("summary", ""),
summary_detail=entry.get("summary_detail", {}),
tags=entry.get("tags", []),
title=entry.get("title", ""),
title_detail=entry.get("title_detail", {}),
updated=entry.get("updated", ""),
updated_parsed=updated_parsed,
)
# Save the entry.
try:
_entry.save()
except Exception:
logger.exception("Error saving entry for feed: %s", feed)
return None
logger.info("Created entry: %s", _entry)
return _entry
def add_feed(url: str | None, user: AbstractBaseUser | AnonymousUser) -> Feed | None:
"""Add a feed to the database.
Args:
url: The URL of the feed.
user: The user adding the feed.
Returns:
The feed that was added.
"""
# Parse the feed.
parsed_feed: dict | None = parse_feed(url=url)
if not parsed_feed:
return None
domain_url: None | str = get_domain(url=url)
if not domain_url:
return None
# Create the domain if it doesn't exist.
domain: Domain
domain, created = Domain.objects.get_or_create(url=domain_url)
if created:
logger.info("Created domain: %s", domain.url)
domain.save()
author: Author = get_author(parsed_feed=parsed_feed)
generator: Generator = def_generator(parsed_feed=parsed_feed)
publisher: Publisher = get_publisher(parsed_feed=parsed_feed)
published_parsed: datetime | None = struct_time_to_datetime(struct_time=parsed_feed.get("published_parsed")) # type: ignore # noqa: PGH003
updated_parsed: datetime | None = struct_time_to_datetime(struct_time=parsed_feed.get("updated_parsed")) # type: ignore # noqa: PGH003
# Create the feed
feed = Feed(
feed_url=url,
user=user,
domain=domain,
last_checked=timezone.now(),
bozo=parsed_feed.get("bozo", 0),
bozo_exception=parsed_feed.get("bozo_exception", ""),
encoding=parsed_feed.get("encoding", ""),
etag=parsed_feed.get("etag", ""),
headers=parsed_feed.get("headers", {}),
href=parsed_feed.get("href", ""),
modified=parsed_feed.get("modified"),
namespaces=parsed_feed.get("namespaces", {}),
status=parsed_feed.get("status", 0),
version=parsed_feed.get("version", ""),
author=parsed_feed.get("author", ""),
author_detail=author,
cloud=parsed_feed.get("cloud", {}),
contributors=parsed_feed.get("contributors", {}),
docs=parsed_feed.get("docs", ""),
errorreportsto=parsed_feed.get("errorreportsto", ""),
generator=parsed_feed.get("generator", ""),
generator_detail=generator,
icon=parsed_feed.get("icon", ""),
_id=parsed_feed.get("id", ""),
image=parsed_feed.get("image", {}),
info=parsed_feed.get("info", ""),
language=parsed_feed.get("language", ""),
license=parsed_feed.get("license", ""),
link=parsed_feed.get("link", ""),
links=parsed_feed.get("links", []),
logo=parsed_feed.get("logo", ""),
published=parsed_feed.get("published", ""),
published_parsed=published_parsed,
publisher=parsed_feed.get("publisher", ""),
publisher_detail=publisher,
rights=parsed_feed.get("rights", ""),
rights_detail=parsed_feed.get("rights_detail", {}),
subtitle=parsed_feed.get("subtitle", ""),
subtitle_detail=parsed_feed.get("subtitle_detail", {}),
tags=parsed_feed.get("tags", []),
textinput=parsed_feed.get("textinput", {}),
title=parsed_feed.get("title", ""),
title_detail=parsed_feed.get("title_detail", {}),
ttl=parsed_feed.get("ttl", ""),
updated=parsed_feed.get("updated", ""),
updated_parsed=updated_parsed,
)
# Save the feed.
try:
feed.save()
except Exception:
logger.exception("Error saving feed: %s", feed)
return None
entries = parsed_feed.get("entries", [])
for entry in entries:
added_entry: Entry | None = add_entry(feed=feed, entry=entry)
if not added_entry:
continue
logger.info("Created feed: %s", feed)
return feed

View file

@ -1,8 +0,0 @@
from django.apps import AppConfig
class FeedsConfig(AppConfig):
"""This Django app is responsible for managing the feeds."""
default_auto_field = "django.db.models.BigAutoField"
name = "feeds"

View file

@ -1,27 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from django.http import HttpRequest
def add_global_context(request: HttpRequest) -> dict[str, str | int]: # noqa: ARG001
"""Add global context to all templates.
Args:
request: The request object.
Returns:
A dictionary with the global context.
"""
from feeds.stats import get_db_size # noqa: PLC0415
from .models import Feed # noqa: PLC0415
db_size: str = get_db_size()
amount_of_feeds: int = Feed.objects.count()
return {
"db_size": db_size,
"amount_of_feeds": amount_of_feeds,
}

View file

@ -1,167 +0,0 @@
# Generated by Django 5.0.2 on 2024-02-19 02:47
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Author',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('name', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('email', models.TextField(blank=True)),
],
),
migrations.CreateModel(
name='Domain',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('url', models.URLField(unique=True)),
('name', models.CharField(max_length=255)),
('categories', models.JSONField(blank=True, null=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('hidden', models.BooleanField(default=False)),
('hidden_at', models.DateTimeField(blank=True, null=True)),
('hidden_reason', models.TextField(blank=True)),
],
),
migrations.CreateModel(
name='Links',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('rel', models.TextField(blank=True)),
('type', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('title', models.TextField(blank=True)),
],
),
migrations.CreateModel(
name='Publisher',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('name', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('email', models.TextField(blank=True)),
],
),
migrations.CreateModel(
name='Generator',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('name', models.TextField(blank=True)),
('href', models.TextField(blank=True)),
('version', models.TextField(blank=True)),
],
options={
'unique_together': {('name', 'version', 'href')},
},
),
migrations.CreateModel(
name='Feed',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('feed_url', models.URLField(unique=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('last_checked', models.DateTimeField(blank=True, null=True)),
('active', models.BooleanField(default=True)),
('bozo', models.BooleanField()),
('bozo_exception', models.TextField(blank=True)),
('encoding', models.TextField(blank=True)),
('etag', models.TextField(blank=True)),
('headers', models.JSONField(blank=True, null=True)),
('href', models.TextField(blank=True)),
('modified', models.DateTimeField(blank=True, null=True)),
('namespaces', models.JSONField(blank=True, null=True)),
('status', models.IntegerField()),
('version', models.CharField(blank=True, max_length=255)),
('author', models.TextField(blank=True)),
('cloud', models.JSONField(blank=True, null=True)),
('contributors', models.JSONField(blank=True, null=True)),
('docs', models.TextField(blank=True)),
('errorreportsto', models.TextField(blank=True)),
('generator', models.TextField(blank=True)),
('icon', models.TextField(blank=True)),
('_id', models.TextField(blank=True)),
('image', models.JSONField(blank=True, null=True)),
('info', models.TextField(blank=True)),
('info_detail', models.JSONField(blank=True, null=True)),
('language', models.TextField(blank=True)),
('license', models.TextField(blank=True)),
('link', models.TextField(blank=True)),
('links', models.JSONField(blank=True, null=True)),
('logo', models.TextField(blank=True)),
('published', models.TextField(blank=True)),
('published_parsed', models.DateTimeField(blank=True, null=True)),
('publisher', models.TextField(blank=True)),
('rights', models.TextField(blank=True)),
('rights_detail', models.JSONField(blank=True, null=True)),
('subtitle', models.TextField(blank=True)),
('subtitle_detail', models.JSONField(blank=True, null=True)),
('tags', models.JSONField(blank=True, null=True)),
('textinput', models.JSONField(blank=True, null=True)),
('title', models.TextField(blank=True)),
('title_detail', models.JSONField(blank=True, null=True)),
('ttl', models.TextField(blank=True)),
('updated', models.TextField(blank=True)),
('updated_parsed', models.DateTimeField(blank=True, null=True)),
('author_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feeds.author')),
('domain', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='feeds.domain')),
('generator_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feeds.generator')),
('publisher_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feeds.publisher')),
],
),
migrations.CreateModel(
name='Entry',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('modified_at', models.DateTimeField(auto_now=True)),
('author', models.TextField(blank=True)),
('comments', models.TextField(blank=True)),
('content', models.JSONField(blank=True, null=True)),
('contributors', models.JSONField(blank=True, null=True)),
('created', models.TextField(blank=True)),
('created_parsed', models.DateTimeField(blank=True, null=True)),
('enclosures', models.JSONField(blank=True, null=True)),
('expired', models.TextField(blank=True)),
('expired_parsed', models.DateTimeField(blank=True, null=True)),
('_id', models.TextField(blank=True)),
('license', models.TextField(blank=True)),
('link', models.TextField(blank=True)),
('links', models.JSONField(blank=True, null=True)),
('published', models.TextField(blank=True)),
('published_parsed', models.DateTimeField(blank=True, null=True)),
('publisher', models.TextField(blank=True)),
('source', models.JSONField(blank=True, null=True)),
('summary', models.TextField(blank=True)),
('summary_detail', models.JSONField(blank=True, null=True)),
('tags', models.JSONField(blank=True, null=True)),
('title', models.TextField(blank=True)),
('title_detail', models.JSONField(blank=True, null=True)),
('updated', models.TextField(blank=True)),
('updated_parsed', models.DateTimeField(blank=True, null=True)),
('author_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='entries', to='feeds.author')),
('feed', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='feeds.feed')),
('publisher_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='entries', to='feeds.publisher')),
],
),
]

View file

@ -1,53 +0,0 @@
# Generated by Django 5.0.2 on 2024-02-23 05:27
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('feeds', '0001_initial'),
]
operations = [
migrations.AlterModelOptions(
name='author',
options={'ordering': ['name'], 'verbose_name': 'Author', 'verbose_name_plural': 'Authors'},
),
migrations.AlterModelOptions(
name='domain',
options={'ordering': ['name'], 'verbose_name': 'Domain', 'verbose_name_plural': 'Domains'},
),
migrations.AlterModelOptions(
name='entry',
options={'ordering': ['-created_parsed'], 'verbose_name': 'Entry', 'verbose_name_plural': 'Entries'},
),
migrations.AlterModelOptions(
name='feed',
options={'ordering': ['-created_at'], 'verbose_name': 'Feed', 'verbose_name_plural': 'Feeds'},
),
migrations.AlterModelOptions(
name='generator',
options={'ordering': ['name'], 'verbose_name': 'Feed generator', 'verbose_name_plural': 'Feed generators'},
),
migrations.AlterModelOptions(
name='links',
options={'ordering': ['href'], 'verbose_name': 'Link', 'verbose_name_plural': 'Links'},
),
migrations.AlterModelOptions(
name='publisher',
options={'ordering': ['name'], 'verbose_name': 'Publisher', 'verbose_name_plural': 'Publishers'},
),
migrations.AlterUniqueTogether(
name='author',
unique_together={('name', 'email', 'href')},
),
migrations.AlterUniqueTogether(
name='links',
unique_together={('href', 'rel')},
),
migrations.AlterUniqueTogether(
name='publisher',
unique_together={('name', 'email', 'href')},
),
]

View file

@ -1,21 +0,0 @@
# Generated by Django 5.0.2 on 2024-02-23 05:38
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('feeds', '0002_alter_author_options_alter_domain_options_and_more'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name='feed',
name='user',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL),
),
]

View file

@ -1,284 +0,0 @@
from __future__ import annotations
import logging
import typing
from typing import Literal
from django.db import models
from django.db.models import JSONField
logger: logging.Logger = logging.getLogger(__name__)
class Domain(models.Model):
"""A domain that has one or more feeds."""
url = models.URLField(unique=True)
name = models.CharField(max_length=255)
categories = models.JSONField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
hidden = models.BooleanField(default=False)
hidden_at = models.DateTimeField(null=True, blank=True)
hidden_reason = models.TextField(blank=True)
class Meta:
"""Meta information for the domain model."""
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Domain"
verbose_name_plural: str = "Domains"
def __str__(self) -> str:
"""Return string representation of the domain."""
if_hidden: Literal[" (hidden)", ""] = " (hidden)" if self.hidden else ""
return self.name + if_hidden
def get_absolute_url(self) -> str:
"""Return the absolute URL of the domain."""
return f"/domain/{self.pk}/"
class Author(models.Model):
"""An author of an entry."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
name = models.TextField(blank=True)
href = models.TextField(blank=True)
email = models.TextField(blank=True)
class Meta:
"""Meta information for the author model."""
unique_together: typing.ClassVar[list[str]] = ["name", "email", "href"]
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Author"
verbose_name_plural: str = "Authors"
def __str__(self) -> str:
"""Return string representation of the author."""
return f"{self.name} - {self.email} - {self.href}"
class Generator(models.Model):
"""What program or service generated the feed."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
name = models.TextField(blank=True)
href = models.TextField(blank=True)
version = models.TextField(blank=True)
class Meta:
"""Meta information for the generator model."""
unique_together: typing.ClassVar[list[str]] = ["name", "version", "href"]
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Feed generator"
verbose_name_plural: str = "Feed generators"
def __str__(self) -> str:
"""Return string representation of the generator."""
return self.name
class Links(models.Model):
"""A link to a feed or entry."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
rel = models.TextField(blank=True)
type = models.TextField(blank=True)
href = models.TextField(blank=True)
title = models.TextField(blank=True)
class Meta:
"""Meta information for the links model."""
unique_together: typing.ClassVar[list[str]] = ["href", "rel"]
ordering: typing.ClassVar[list[str]] = ["href"]
verbose_name: str = "Link"
verbose_name_plural: str = "Links"
def __str__(self) -> str:
"""Return string representation of the links."""
return self.href
class Publisher(models.Model):
"""The publisher of a feed or entry."""
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
name = models.TextField(blank=True)
href = models.TextField(blank=True)
email = models.TextField(blank=True)
class Meta:
"""Meta information for the publisher model."""
unique_together: typing.ClassVar[list[str]] = ["name", "email", "href"]
ordering: typing.ClassVar[list[str]] = ["name"]
verbose_name: str = "Publisher"
verbose_name_plural: str = "Publishers"
def __str__(self) -> str:
"""Return string representation of the publisher."""
return self.name
class Feed(models.Model):
"""A RSS/Atom/JSON feed."""
feed_url = models.URLField(unique=True)
# The user that added the feed
user = models.ForeignKey("auth.User", on_delete=models.SET_NULL, null=True, blank=True)
domain = models.ForeignKey(Domain, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
last_checked = models.DateTimeField(null=True, blank=True)
active = models.BooleanField(default=True)
# General data
bozo = models.BooleanField()
bozo_exception = models.TextField(blank=True)
encoding = models.TextField(blank=True)
etag = models.TextField(blank=True)
headers = JSONField(null=True, blank=True)
href = models.TextField(blank=True)
modified = models.DateTimeField(null=True, blank=True)
namespaces = JSONField(null=True, blank=True)
status = models.IntegerField()
version = models.CharField(max_length=255, blank=True)
# Feed data
author = models.TextField(blank=True)
author_detail = models.ForeignKey(
Author,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="feeds",
)
cloud = JSONField(null=True, blank=True)
contributors = JSONField(null=True, blank=True)
docs = models.TextField(blank=True)
errorreportsto = models.TextField(blank=True)
generator = models.TextField(blank=True)
generator_detail = models.ForeignKey(
Generator,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="feeds",
)
icon = models.TextField(blank=True)
_id = models.TextField(blank=True)
image = JSONField(null=True, blank=True)
info = models.TextField(blank=True)
info_detail = JSONField(null=True, blank=True)
language = models.TextField(blank=True)
license = models.TextField(blank=True)
link = models.TextField(blank=True)
links = JSONField(null=True, blank=True)
logo = models.TextField(blank=True)
published = models.TextField(blank=True)
published_parsed = models.DateTimeField(null=True, blank=True)
publisher = models.TextField(blank=True)
publisher_detail = models.ForeignKey(
Publisher,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="feeds",
)
rights = models.TextField(blank=True)
rights_detail = JSONField(null=True, blank=True)
subtitle = models.TextField(blank=True)
subtitle_detail = JSONField(null=True, blank=True)
tags = JSONField(null=True, blank=True)
textinput = JSONField(null=True, blank=True)
title = models.TextField(blank=True)
title_detail = JSONField(null=True, blank=True)
ttl = models.TextField(blank=True)
updated = models.TextField(blank=True)
updated_parsed = models.DateTimeField(null=True, blank=True)
class Meta:
"""Meta information for the feed model."""
ordering: typing.ClassVar[list[str]] = ["-created_at"]
verbose_name: str = "Feed"
verbose_name_plural: str = "Feeds"
def __str__(self) -> str:
"""Return string representation of the feed."""
return f"{self.domain} - {self.title}"
def get_absolute_url(self) -> str:
"""Return the absolute URL of the feed."""
return f"/feed/{self.pk}/"
class Entry(models.Model):
"""Each feed has multiple entries."""
feed = models.ForeignKey(Feed, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
# Entry data
author = models.TextField(blank=True)
author_detail = models.ForeignKey(
Author,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="entries",
)
comments = models.TextField(blank=True)
content = JSONField(null=True, blank=True)
contributors = JSONField(null=True, blank=True)
created = models.TextField(blank=True)
created_parsed = models.DateTimeField(null=True, blank=True)
enclosures = JSONField(null=True, blank=True)
expired = models.TextField(blank=True)
expired_parsed = models.DateTimeField(null=True, blank=True)
_id = models.TextField(blank=True)
license = models.TextField(blank=True)
link = models.TextField(blank=True)
links = JSONField(null=True, blank=True)
published = models.TextField(blank=True)
published_parsed = models.DateTimeField(null=True, blank=True)
publisher = models.TextField(blank=True)
publisher_detail = models.ForeignKey(
Publisher,
on_delete=models.PROTECT,
null=True,
blank=True,
related_name="entries",
)
source = JSONField(null=True, blank=True)
summary = models.TextField(blank=True)
summary_detail = JSONField(null=True, blank=True)
tags = JSONField(null=True, blank=True)
title = models.TextField(blank=True)
title_detail = JSONField(null=True, blank=True)
updated = models.TextField(blank=True)
updated_parsed = models.DateTimeField(null=True, blank=True)
class Meta:
"""Meta information for the entry model."""
ordering: typing.ClassVar[list[str]] = ["-created_parsed"]
verbose_name: str = "Entry"
verbose_name_plural: str = "Entries"
def __str__(self) -> str:
"""Return string representation of the entry."""
return f"{self.feed} - {self.title}"

View file

@ -1,19 +0,0 @@
from __future__ import annotations
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
class StaticViewSitemap(Sitemap):
"""Sitemap for static views."""
changefreq: str = "daily"
priority: float = 0.5
def items(self: StaticViewSitemap) -> list[str]:
"""Return all the items in the sitemap."""
return ["feeds:index", "feeds:feeds", "feeds:domains"]
def location(self, item: str) -> str:
"""Return the location of the item."""
return reverse(item)

View file

@ -1,33 +0,0 @@
from __future__ import annotations
import logging
from django.core.cache import cache
from django.db import connection
logger: logging.Logger = logging.getLogger(__name__)
def get_db_size() -> str:
"""Get the size of the database.
Returns:
str: The size of the database.
"""
# Try to get value from cache
db_size = cache.get("db_size")
if db_size is not None:
logger.debug("Got db_size from cache")
return db_size
with connection.cursor() as cursor:
cursor.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
row = cursor.fetchone()
db_size = "0 MB" if row is None else str(row[0])
# Store value in cache for 15 minutes
cache.set("db_size", db_size, 60 * 15)
return db_size

View file

@ -1 +0,0 @@
# Create your tests here.

View file

@ -1,58 +0,0 @@
from __future__ import annotations
from django.contrib.sitemaps import GenericSitemap
from django.contrib.sitemaps.views import sitemap
from django.urls import URLPattern, path
from django.views.decorators.cache import cache_page
from feeds import views
from feeds.models import Domain, Feed
from feeds.sitemaps import StaticViewSitemap
from .views import APIView, CustomLoginView, CustomLogoutView, ProfileView, RegisterView
app_name: str = "feeds"
sitemaps = {
"static": StaticViewSitemap,
"feeds": GenericSitemap({"queryset": Feed.objects.all(), "date_field": "created_at"}),
"domains": GenericSitemap({"queryset": Domain.objects.all(), "date_field": "created_at"}),
}
# Normal pages
urlpatterns: list[URLPattern] = [
path(route="", view=views.IndexView.as_view(), name="index"),
path(route="feed/<int:feed_id>/", view=views.FeedView.as_view(), name="feed"),
path(route="feeds/", view=views.FeedsView.as_view(), name="feeds"),
path(route="add", view=views.AddView.as_view(), name="add"),
path(route="upload", view=views.UploadView.as_view(), name="upload"),
path(route="robots.txt", view=cache_page(timeout=60 * 60 * 365)(views.RobotsView.as_view()), name="robots"),
path(
"sitemap.xml",
sitemap,
{"sitemaps": sitemaps},
name="django.contrib.sitemaps.views.sitemap",
),
path(route="domains/", view=views.DomainsView.as_view(), name="domains"),
path(route="domain/<int:domain_id>/", view=views.DomainView.as_view(), name="domain"),
]
# API urls
urlpatterns += [
path(route="api/", view=APIView.as_view(), name="api"),
path(route="api/feeds/", view=views.APIFeedsView.as_view(), name="api_feeds"),
path(route="api/feeds/<int:feed_id>/", view=views.APIFeedView.as_view(), name="api_feeds_id"),
path(route="api/feeds/<int:feed_id>/entries/", view=views.APIFeedEntriesView.as_view(), name="api_feed_entries"),
path(route="api/entries/", view=views.APIEntriesView.as_view(), name="api_entries"),
path(route="api/entries/<int:entry_id>/", view=views.APIEntryView.as_view(), name="api_entries_id"),
]
# Account urls
urlpatterns += [
path(route="accounts/login/", view=CustomLoginView.as_view(), name="login"),
path(route="accounts/register/", view=RegisterView.as_view(), name="register"),
path(route="accounts/logout/", view=CustomLogoutView.as_view(), name="logout"),
# path(route="accounts/change-password/", view=CustomPasswordChangeView.as_view(), name="change_password"),
path(route="accounts/profile/", view=ProfileView.as_view(), name="profile"),
]

View file

@ -1,487 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from django.contrib import messages
from django.contrib.auth import login
from django.contrib.auth.forms import AuthenticationForm, UserCreationForm
from django.contrib.auth.views import LoginView, LogoutView, PasswordChangeView
from django.contrib.messages.views import SuccessMessageMixin
from django.core.paginator import EmptyPage, Page, PageNotAnInteger, Paginator
from django.forms.models import model_to_dict
from django.http import HttpRequest, HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404, render
from django.template import loader
from django.urls import reverse_lazy
from django.views import View
from django.views.generic.edit import CreateView
from django.views.generic.list import ListView
from feeds.add_feeds import add_feed
from feeds.models import Domain, Entry, Feed
if TYPE_CHECKING:
from django.contrib.auth.models import User
class IndexView(View):
"""Index path."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the index page."""
template = loader.get_template(template_name="index.html")
context = {
"description": "FeedVault allows users to archive and search their favorite web feeds.",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/",
"title": "FeedVault",
}
return HttpResponse(content=template.render(context=context, request=request))
class FeedView(View):
"""A single feed."""
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: # noqa: ANN002, ANN003, ARG002
"""Load the feed page."""
feed_id = kwargs.get("feed_id", None)
if not feed_id:
return HttpResponse(content="No id", status=400)
feed = get_object_or_404(Feed, id=feed_id)
entries = Entry.objects.filter(feed=feed).order_by("-created_parsed")[:100]
context = {
"feed": feed,
"entries": entries,
"description": f"Archive of {feed.href}",
"keywords": "feed, rss, atom, archive, rss list",
"author": f"{feed.author_detail.name if feed.author_detail else "FeedVault"}",
"canonical": f"https://feedvault.se/feed/{feed_id}/",
"title": f"{feed.title} - FeedVault",
}
return render(request, "feed.html", context)
class FeedsView(ListView):
"""All feeds."""
model = Feed
paginate_by = 100
template_name = "feeds.html"
context_object_name = "feeds"
def get_context_data(self, **kwargs) -> dict: # noqa: ANN003
"""Get the context data."""
context = super().get_context_data(**kwargs)
feed_amount: int = Feed.objects.count() or 0
context["description"] = f"Archiving {feed_amount} feeds"
context["keywords"] = "feed, rss, atom, archive, rss list"
context["author"] = "TheLovinator"
context["canonical"] = "https://feedvault.se/feeds/"
context["title"] = "Feeds"
return context
class AddView(View):
"""Add a feed."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the index page."""
template = loader.get_template(template_name="index.html")
context = {
"description": "FeedVault allows users to archive and search their favorite web feeds.",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/",
}
return HttpResponse(content=template.render(context=context, request=request))
def post(self, request: HttpRequest) -> HttpResponse:
"""Add a feed."""
if not request.user.is_authenticated:
return HttpResponse(content="Not logged in", status=401)
if not request.user.is_active:
return HttpResponse(content="User is not active", status=403)
urls: str | None = request.POST.get("urls", None)
if not urls:
return HttpResponse(content="No urls", status=400)
# Split the urls by newline.
for url in urls.split("\n"):
feed: None | Feed = add_feed(url, request.user)
if not feed:
messages.error(request, f"{url} - Failed to add")
continue
# Check if bozo is true.
if feed.bozo:
messages.warning(request, f"{feed.feed_url} - Bozo: {feed.bozo_exception}")
messages.success(request, f"{feed.feed_url} added")
# Render the index page.
template = loader.get_template(template_name="index.html")
return HttpResponse(content=template.render(context={}, request=request))
class UploadView(View):
"""Upload a file."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the index page."""
template = loader.get_template(template_name="index.html")
context = {
"description": "FeedVault allows users to archive and search their favorite web feeds.",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/",
}
return HttpResponse(content=template.render(context=context, request=request))
def post(self, request: HttpRequest) -> HttpResponse:
"""Upload a file."""
if not request.user.is_authenticated:
return HttpResponse(content="Not logged in", status=401)
if not request.user.is_active:
return HttpResponse(content="User is not active", status=403)
file = request.FILES.get("file", None)
if not file:
return HttpResponse(content="No file", status=400)
# Split the urls by newline.
for url in file.read().decode("utf-8").split("\n"):
feed: None | Feed = add_feed(url, request.user)
if not feed:
messages.error(request, f"{url} - Failed to add")
continue
# Check if bozo is true.
if feed.bozo:
messages.warning(request, f"{feed.feed_url} - Bozo: {feed.bozo_exception}")
messages.success(request, f"{feed.feed_url} added")
# Render the index page.
template = loader.get_template(template_name="index.html")
return HttpResponse(content=template.render(context={}, request=request))
class CustomLoginView(LoginView):
"""Custom login view."""
template_name = "accounts/login.html"
def form_valid(self, form: AuthenticationForm) -> HttpResponse:
"""Check if the form is valid."""
user: User = form.get_user()
login(self.request, user)
return super().form_valid(form)
class RegisterView(CreateView):
"""Register view."""
template_name = "accounts/register.html"
form_class = UserCreationForm
success_url = reverse_lazy("feeds:login")
# Add context data to the view
def get_context_data(self, **kwargs) -> dict: # noqa: ANN003
"""Get the context data."""
context = super().get_context_data(**kwargs)
context["description"] = "Register a new account"
context["keywords"] = "register, account, feed, rss, atom, archive, rss list"
context["author"] = "TheLovinator"
context["canonical"] = "https://feedvault.se/accounts/register/"
context["title"] = "Register"
return context
class CustomLogoutView(LogoutView):
"""Logout view."""
next_page = "feeds:index" # Redirect to index after logout
class CustomPasswordChangeView(SuccessMessageMixin, PasswordChangeView):
"""Custom password change view."""
template_name = "accounts/change_password.html"
success_url = reverse_lazy("feeds:index")
success_message = "Your password was successfully updated!"
# Add context data to the view
def get_context_data(self, **kwargs) -> dict: # noqa: ANN003
"""Get the context data."""
context = super().get_context_data(**kwargs)
context["description"] = "Change your password"
context["keywords"] = "change, password, account, feed, rss, atom, archive, rss list"
context["author"] = "TheLovinator"
context["canonical"] = "https://feedvault.se/accounts/change-password/"
context["title"] = "Change password"
return context
class ProfileView(View):
"""Profile page."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the profile page."""
template = loader.get_template(template_name="accounts/profile.html")
user_feeds = Feed.objects.filter(user=request.user).order_by("-created_at")[:100]
context: dict[str, str | Any] = {
"description": f"Profile page for {request.user.get_username()}",
"keywords": f"profile, account, {request.user.get_username()}",
"author": f"{request.user.get_username()}",
"canonical": "https://feedvault.se/accounts/profile/",
"title": f"{request.user.get_username()}",
"user_feeds": user_feeds,
}
return HttpResponse(content=template.render(context=context, request=request))
class APIView(View):
"""API documentation page."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the API page."""
template = loader.get_template(template_name="api.html")
context = {
"description": "FeedVault allows users to archive and search their favorite web feeds.",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/api/",
"title": "API Documentation",
}
return HttpResponse(content=template.render(context=context, request=request))
class RobotsView(View):
"""Robots.txt view."""
def get(self, request: HttpRequest) -> HttpResponse: # noqa: ARG002
"""Load the robots.txt file."""
return HttpResponse(
content="""User-agent: *\nDisallow: /add\nDisallow: /upload\nDisallow: /accounts/""",
content_type="text/plain",
)
class APIFeedsView(View):
"""API Feeds view."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Get all feeds with pagination."""
# Retrieve all feeds
feeds_list = Feed.objects.all()
# Pagination settings
page: int = int(request.GET.get("page", 1)) # Get the page number from the query parameters, default to 1
per_page: int = int(request.GET.get("per_page", 1000)) # Number of feeds per page, default to 1000 (max 1000)
# Add a ceiling to the per_page value
max_per_page = 1000
if per_page > max_per_page:
per_page = max_per_page
# Create Paginator instance
paginator = Paginator(feeds_list, per_page)
try:
feeds: Page = paginator.page(page)
except PageNotAnInteger:
# If page is not an integer, deliver first page.
feeds = paginator.page(1)
except EmptyPage:
# If page is out of range (e.g., 9999), deliver last page of results.
feeds = paginator.page(paginator.num_pages)
# Convert feeds to dictionary
feeds_dict = [model_to_dict(feed) for feed in feeds]
# Return the paginated entries as JsonResponse
response = JsonResponse(feeds_dict, safe=False)
# Add pagination headers
response["X-Page"] = feeds.number
response["X-Page-Count"] = paginator.num_pages
response["X-Per-Page"] = per_page
response["X-Total-Count"] = paginator.count
response["X-First-Page"] = 1
response["X-Last-Page"] = paginator.num_pages
# Next and previous page links
if feeds.has_next():
response["X-Next-Page"] = feeds.next_page_number()
if feeds.has_previous():
response["X-Prev-Page"] = feeds.previous_page_number()
return response
class APIFeedView(View):
"""API Feed view."""
def get(self, request: HttpRequest, feed_id: int) -> HttpResponse: # noqa: ARG002
"""Get a single feed."""
feed = get_object_or_404(Feed, id=feed_id)
return JsonResponse(model_to_dict(feed), safe=False)
class APIEntriesView(View):
"""API Entries view."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Get all entries with pagination."""
# Retrieve all entries
entries_list = Entry.objects.all()
# Pagination settings
page: int = int(request.GET.get("page", 1)) # Get the page number from the query parameters, default to 1
per_page: int = int(request.GET.get("per_page", 1000))
# Add a ceiling to the per_page value
max_per_page = 1000
if per_page > max_per_page:
per_page = max_per_page
# Create Paginator instance
paginator = Paginator(entries_list, per_page)
try:
entries: Page = paginator.page(page)
except PageNotAnInteger:
# If page is not an integer, deliver first page.
entries = paginator.page(1)
except EmptyPage:
# If page is out of range (e.g. 9999), deliver last page of results.
entries = paginator.page(paginator.num_pages)
# Convert entries to dictionary
entries_dict = [model_to_dict(entry) for entry in entries]
# Return the paginated entries as JsonResponse
response = JsonResponse(entries_dict, safe=False)
# Add pagination headers
response["X-Page"] = entries.number
response["X-Page-Count"] = paginator.num_pages
response["X-Per-Page"] = per_page
response["X-Total-Count"] = paginator.count
response["X-First-Page"] = 1
response["X-Last-Page"] = paginator.num_pages
# Next and previous page links
if entries.has_next():
response["X-Next-Page"] = entries.next_page_number()
if entries.has_previous():
response["X-Prev-Page"] = entries.previous_page_number()
return response
class APIEntryView(View):
"""API Entry view."""
def get(self, request: HttpRequest, entry_id: int) -> HttpResponse: # noqa: ARG002
"""Get a single entry."""
entry = get_object_or_404(Entry, id=entry_id)
return JsonResponse(model_to_dict(entry), safe=False)
class APIFeedEntriesView(View):
"""API Feed Entries view."""
def get(self, request: HttpRequest, feed_id: int) -> HttpResponse:
"""Get all entries for a single feed with pagination."""
# Retrieve all entries for a single feed
entries_list = Entry.objects.filter(feed_id=feed_id)
# Pagination settings
page: int = int(request.GET.get("page", 1)) # Get the page number from the query parameters, default to 1
per_page: int = int(request.GET.get("per_page", 1000))
# Add a ceiling to the per_page value
max_per_page = 1000
if per_page > max_per_page:
per_page = max_per_page
# Create Paginator instance
paginator = Paginator(entries_list, per_page)
try:
entries: Page = paginator.page(page)
except PageNotAnInteger:
# If page is not an integer, deliver first page.
entries = paginator.page(1)
except EmptyPage:
# If page is out of range (e.g. 9999), deliver last page of results.
entries = paginator.page(paginator.num_pages)
# Convert entries to dictionary
entries_dict = [model_to_dict(entry) for entry in entries]
# Return the paginated entries as JsonResponse
response = JsonResponse(entries_dict, safe=False)
# Add pagination headers
response["X-Page"] = entries.number
response["X-Page-Count"] = paginator.num_pages
response["X-Per-Page"] = per_page
response["X-Total-Count"] = paginator.count
response["X-First-Page"] = 1
response["X-Last-Page"] = paginator.num_pages
# Next and previous page links
if entries.has_next():
response["X-Next-Page"] = entries.next_page_number()
if entries.has_previous():
response["X-Prev-Page"] = entries.previous_page_number()
return response
class DomainsView(View):
"""All domains."""
def get(self, request: HttpRequest) -> HttpResponse:
"""Load the domains page."""
domains = Domain.objects.all()
template = loader.get_template(template_name="domains.html")
context = {
"domains": domains,
"description": "Domains",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": "https://feedvault.se/domains/",
"title": "Domains",
}
return HttpResponse(content=template.render(context=context, request=request))
class DomainView(View):
"""A single domain."""
def get(self, request: HttpRequest, domain_id: int) -> HttpResponse:
"""Load the domain page."""
domain = get_object_or_404(Domain, id=domain_id)
feeds = Feed.objects.filter(domain=domain).order_by("-created_at")[:100]
context = {
"domain": domain,
"feeds": feeds,
"description": f"Archive of {domain.name}",
"keywords": "feed, rss, atom, archive, rss list",
"author": "TheLovinator",
"canonical": f"https://feedvault.se/domain/{domain_id}/",
"title": f"{domain.name} - FeedVault",
}
return render(request, "domain.html", context)