Go back to Django
This commit is contained in:
parent
d7be14f5a2
commit
7eee113cdf
22 changed files with 1481 additions and 172 deletions
296
feeds/add_feeds.py
Normal file
296
feeds/add_feeds.py
Normal file
|
|
@ -0,0 +1,296 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from time import mktime, struct_time
|
||||
from urllib.parse import ParseResult, urlparse
|
||||
|
||||
import feedparser
|
||||
from django.utils import timezone
|
||||
from feedparser import FeedParserDict
|
||||
|
||||
from feeds.models import Author, Domain, Entry, Feed, Generator, Publisher
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_domain(url: str | None) -> None | str:
|
||||
"""Get the domain of a URL."""
|
||||
if not url:
|
||||
return None
|
||||
|
||||
# Parse the URL.
|
||||
parsed_url: ParseResult = urlparse(url)
|
||||
if not parsed_url:
|
||||
logger.error("Error parsing URL: %s", url)
|
||||
return None
|
||||
|
||||
# Get the domain.
|
||||
return str(parsed_url.netloc)
|
||||
|
||||
|
||||
def get_author(parsed_feed: dict) -> Author:
|
||||
"""Get the author of a feed.
|
||||
|
||||
Args:
|
||||
parsed_feed: The parsed feed.
|
||||
|
||||
Returns:
|
||||
The author of the feed. If the author doesn't exist, it will be created.
|
||||
"""
|
||||
# A dictionary with details about the author of this entry.
|
||||
author_detail: dict = parsed_feed.get("author_detail", {})
|
||||
author = Author(
|
||||
name=author_detail.get("name", ""),
|
||||
href=author_detail.get("href", ""),
|
||||
email=author_detail.get("email", ""),
|
||||
)
|
||||
|
||||
# Create the author if it doesn't exist.
|
||||
try:
|
||||
author: Author = Author.objects.get(name=author.name, email=author.email, href=author.href)
|
||||
except Author.DoesNotExist:
|
||||
author.save()
|
||||
logger.info("Created author: %s", author)
|
||||
|
||||
return author
|
||||
|
||||
|
||||
def def_generator(parsed_feed: dict) -> Generator:
|
||||
"""Get the generator of a feed.
|
||||
|
||||
Args:
|
||||
parsed_feed: The parsed feed.
|
||||
|
||||
Returns:
|
||||
The generator of the feed. If the generator doesn't exist, it will be created.
|
||||
"""
|
||||
generator_detail: dict = parsed_feed.get("generator_detail", {})
|
||||
generator = Generator(
|
||||
name=generator_detail.get("name", ""),
|
||||
href=generator_detail.get("href", ""),
|
||||
version=generator_detail.get("version", ""),
|
||||
)
|
||||
|
||||
# Create the generator if it doesn't exist.
|
||||
try:
|
||||
generator: Generator = Generator.objects.get(
|
||||
name=generator.name,
|
||||
href=generator.href,
|
||||
version=generator.version,
|
||||
)
|
||||
except Generator.DoesNotExist:
|
||||
generator.save()
|
||||
logger.info("Created generator: %s", generator)
|
||||
|
||||
return generator
|
||||
|
||||
|
||||
def get_publisher(parsed_feed: dict) -> Publisher:
|
||||
"""Get the publisher of a feed.
|
||||
|
||||
Args:
|
||||
parsed_feed: The parsed feed.
|
||||
|
||||
Returns:
|
||||
The publisher of the feed. If the publisher doesn't exist, it will be created.
|
||||
"""
|
||||
publisher_detail: dict = parsed_feed.get("publisher_detail", {})
|
||||
publisher = Publisher(
|
||||
name=publisher_detail.get("name", ""),
|
||||
href=publisher_detail.get("href", ""),
|
||||
email=publisher_detail.get("email", ""),
|
||||
)
|
||||
|
||||
# Create the publisher if it doesn't exist.
|
||||
try:
|
||||
publisher: Publisher = Publisher.objects.get(
|
||||
name=publisher.name,
|
||||
href=publisher.href,
|
||||
email=publisher.email,
|
||||
)
|
||||
except Publisher.DoesNotExist:
|
||||
publisher.save()
|
||||
logger.info("Created publisher: %s", publisher)
|
||||
|
||||
return publisher
|
||||
|
||||
|
||||
def parse_feed(url: str | None) -> dict | None:
|
||||
"""Parse a feed.
|
||||
|
||||
Args:
|
||||
url: The URL of the feed.
|
||||
|
||||
Returns:
|
||||
The parsed feed.
|
||||
"""
|
||||
# TODO(TheLovinator): Backup the feed URL to a cloudflare worker. # noqa: TD003
|
||||
if not url:
|
||||
return None
|
||||
|
||||
# Parse the feed.
|
||||
parsed_feed: dict = feedparser.parse(url)
|
||||
if not parsed_feed:
|
||||
return None
|
||||
|
||||
return parsed_feed
|
||||
|
||||
|
||||
def struct_time_to_datetime(struct_time: struct_time | None) -> datetime.datetime | None:
|
||||
"""Convert a struct_time to a datetime."""
|
||||
if not struct_time:
|
||||
return None
|
||||
|
||||
dt: datetime.datetime = datetime.datetime.fromtimestamp(mktime(struct_time), tz=datetime.timezone.utc)
|
||||
if not dt:
|
||||
logger.error("Error converting struct_time to datetime: %s", struct_time)
|
||||
return None
|
||||
return dt
|
||||
|
||||
|
||||
def add_entry(feed: Feed, entry: FeedParserDict) -> Entry | None:
|
||||
"""Add an entry to the database.
|
||||
|
||||
Args:
|
||||
entry: The entry to add.
|
||||
feed: The feed the entry belongs to.
|
||||
"""
|
||||
author: Author = get_author(parsed_feed=entry)
|
||||
publisher: Publisher = get_publisher(parsed_feed=entry)
|
||||
updated_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("updated_parsed")) # type: ignore # noqa: PGH003
|
||||
published_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("published_parsed")) # type: ignore # noqa: PGH003
|
||||
expired_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("expired_parsed")) # type: ignore # noqa: PGH003
|
||||
created_parsed: datetime | None = struct_time_to_datetime(struct_time=entry.get("created_parsed")) # type: ignore # noqa: PGH003
|
||||
|
||||
_entry = Entry(
|
||||
feed=feed,
|
||||
author=entry.get("author", ""),
|
||||
author_detail=author,
|
||||
comments=entry.get("comments", ""),
|
||||
content=entry.get("content", {}),
|
||||
contributors=entry.get("contributors", {}),
|
||||
created=entry.get("created", ""),
|
||||
created_parsed=created_parsed,
|
||||
enclosures=entry.get("enclosures", []),
|
||||
expired=entry.get("expired", ""),
|
||||
expired_parsed=expired_parsed,
|
||||
_id=entry.get("id", ""),
|
||||
license=entry.get("license", ""),
|
||||
link=entry.get("link", ""),
|
||||
links=entry.get("links", []),
|
||||
published=entry.get("published", ""),
|
||||
published_parsed=published_parsed,
|
||||
publisher=entry.get("publisher", ""),
|
||||
publisher_detail=publisher,
|
||||
source=entry.get("source", {}),
|
||||
summary=entry.get("summary", ""),
|
||||
summary_detail=entry.get("summary_detail", {}),
|
||||
tags=entry.get("tags", []),
|
||||
title=entry.get("title", ""),
|
||||
title_detail=entry.get("title_detail", {}),
|
||||
updated=entry.get("updated", ""),
|
||||
updated_parsed=updated_parsed,
|
||||
)
|
||||
|
||||
# Save the entry.
|
||||
try:
|
||||
_entry.save()
|
||||
except Exception:
|
||||
logger.exception("Error saving entry for feed: %s", feed)
|
||||
return None
|
||||
|
||||
logger.info("Created entry: %s", _entry)
|
||||
|
||||
return _entry
|
||||
|
||||
|
||||
def add_feed(url: str | None) -> None | Feed:
|
||||
"""Add a feed to the database."""
|
||||
# Parse the feed.
|
||||
parsed_feed: dict | None = parse_feed(url=url)
|
||||
if not parsed_feed:
|
||||
return None
|
||||
|
||||
domain_url: None | str = get_domain(url=url)
|
||||
if not domain_url:
|
||||
return None
|
||||
|
||||
# Create the domain if it doesn't exist.
|
||||
domain: Domain
|
||||
domain, created = Domain.objects.get_or_create(url=domain_url)
|
||||
if created:
|
||||
logger.info("Created domain: %s", domain.url)
|
||||
domain.save()
|
||||
|
||||
author: Author = get_author(parsed_feed=parsed_feed)
|
||||
generator: Generator = def_generator(parsed_feed=parsed_feed)
|
||||
publisher: Publisher = get_publisher(parsed_feed=parsed_feed)
|
||||
|
||||
published_parsed: datetime | None = struct_time_to_datetime(struct_time=parsed_feed.get("published_parsed")) # type: ignore # noqa: PGH003
|
||||
updated_parsed: datetime | None = struct_time_to_datetime(struct_time=parsed_feed.get("updated_parsed")) # type: ignore # noqa: PGH003
|
||||
|
||||
# Create the feed
|
||||
feed = Feed(
|
||||
feed_url=url,
|
||||
domain=domain,
|
||||
last_checked=timezone.now(),
|
||||
bozo=parsed_feed.get("bozo", 0),
|
||||
bozo_exception=parsed_feed.get("bozo_exception", ""),
|
||||
encoding=parsed_feed.get("encoding", ""),
|
||||
etag=parsed_feed.get("etag", ""),
|
||||
headers=parsed_feed.get("headers", {}),
|
||||
href=parsed_feed.get("href", ""),
|
||||
modified=parsed_feed.get("modified"),
|
||||
namespaces=parsed_feed.get("namespaces", {}),
|
||||
status=parsed_feed.get("status", 0),
|
||||
version=parsed_feed.get("version", ""),
|
||||
author=parsed_feed.get("author", ""),
|
||||
author_detail=author,
|
||||
cloud=parsed_feed.get("cloud", {}),
|
||||
contributors=parsed_feed.get("contributors", {}),
|
||||
docs=parsed_feed.get("docs", ""),
|
||||
errorreportsto=parsed_feed.get("errorreportsto", ""),
|
||||
generator=parsed_feed.get("generator", ""),
|
||||
generator_detail=generator,
|
||||
icon=parsed_feed.get("icon", ""),
|
||||
_id=parsed_feed.get("id", ""),
|
||||
image=parsed_feed.get("image", {}),
|
||||
info=parsed_feed.get("info", ""),
|
||||
language=parsed_feed.get("language", ""),
|
||||
license=parsed_feed.get("license", ""),
|
||||
link=parsed_feed.get("link", ""),
|
||||
links=parsed_feed.get("links", []),
|
||||
logo=parsed_feed.get("logo", ""),
|
||||
published=parsed_feed.get("published", ""),
|
||||
published_parsed=published_parsed,
|
||||
publisher=parsed_feed.get("publisher", ""),
|
||||
publisher_detail=publisher,
|
||||
rights=parsed_feed.get("rights", ""),
|
||||
rights_detail=parsed_feed.get("rights_detail", {}),
|
||||
subtitle=parsed_feed.get("subtitle", ""),
|
||||
subtitle_detail=parsed_feed.get("subtitle_detail", {}),
|
||||
tags=parsed_feed.get("tags", []),
|
||||
textinput=parsed_feed.get("textinput", {}),
|
||||
title=parsed_feed.get("title", ""),
|
||||
title_detail=parsed_feed.get("title_detail", {}),
|
||||
ttl=parsed_feed.get("ttl", ""),
|
||||
updated=parsed_feed.get("updated", ""),
|
||||
updated_parsed=updated_parsed,
|
||||
)
|
||||
|
||||
# Save the feed.
|
||||
try:
|
||||
feed.save()
|
||||
except Exception:
|
||||
logger.exception("Error saving feed: %s", feed)
|
||||
return None
|
||||
|
||||
entries = parsed_feed.get("entries", [])
|
||||
for entry in entries:
|
||||
added_entry: Entry | None = add_entry(feed=feed, entry=entry)
|
||||
if not added_entry:
|
||||
continue
|
||||
|
||||
logger.info("Created feed: %s", feed)
|
||||
return feed
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
# Generated by Django 5.0.2 on 2024-02-18 20:59
|
||||
# Generated by Django 5.0.2 on 2024-02-19 02:47
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
|
@ -12,13 +12,24 @@ class Migration(migrations.Migration):
|
|||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Author',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('modified_at', models.DateTimeField(auto_now=True)),
|
||||
('name', models.TextField(blank=True)),
|
||||
('href', models.TextField(blank=True)),
|
||||
('email', models.TextField(blank=True)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Domain',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('name', models.CharField(max_length=255, unique=True)),
|
||||
('url', models.URLField()),
|
||||
('categories', models.JSONField()),
|
||||
('url', models.URLField(unique=True)),
|
||||
('name', models.CharField(max_length=255)),
|
||||
('categories', models.JSONField(blank=True, null=True)),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('modified_at', models.DateTimeField(auto_now=True)),
|
||||
('hidden', models.BooleanField(default=False)),
|
||||
|
|
@ -27,55 +38,95 @@ class Migration(migrations.Migration):
|
|||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Feed',
|
||||
name='Links',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('modified_at', models.DateTimeField(auto_now=True)),
|
||||
('rel', models.TextField(blank=True)),
|
||||
('type', models.TextField(blank=True)),
|
||||
('href', models.TextField(blank=True)),
|
||||
('title', models.TextField(blank=True)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Publisher',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('modified_at', models.DateTimeField(auto_now=True)),
|
||||
('name', models.TextField(blank=True)),
|
||||
('href', models.TextField(blank=True)),
|
||||
('email', models.TextField(blank=True)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Generator',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('modified_at', models.DateTimeField(auto_now=True)),
|
||||
('name', models.TextField(blank=True)),
|
||||
('href', models.TextField(blank=True)),
|
||||
('version', models.TextField(blank=True)),
|
||||
],
|
||||
options={
|
||||
'unique_together': {('name', 'version', 'href')},
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Feed',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('feed_url', models.URLField(unique=True)),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('modified_at', models.DateTimeField(auto_now=True)),
|
||||
('last_checked', models.DateTimeField(blank=True, null=True)),
|
||||
('active', models.BooleanField(default=True)),
|
||||
('bozo', models.BooleanField()),
|
||||
('bozo_exception', models.TextField()),
|
||||
('encoding', models.TextField()),
|
||||
('etag', models.TextField()),
|
||||
('headers', models.JSONField()),
|
||||
('href', models.TextField()),
|
||||
('modified', models.DateTimeField()),
|
||||
('namespaces', models.JSONField()),
|
||||
('bozo_exception', models.TextField(blank=True)),
|
||||
('encoding', models.TextField(blank=True)),
|
||||
('etag', models.TextField(blank=True)),
|
||||
('headers', models.JSONField(blank=True, null=True)),
|
||||
('href', models.TextField(blank=True)),
|
||||
('modified', models.DateTimeField(blank=True, null=True)),
|
||||
('namespaces', models.JSONField(blank=True, null=True)),
|
||||
('status', models.IntegerField()),
|
||||
('version', models.CharField(max_length=50)),
|
||||
('author', models.TextField()),
|
||||
('author_detail', models.JSONField()),
|
||||
('cloud', models.JSONField()),
|
||||
('contributors', models.JSONField()),
|
||||
('docs', models.TextField()),
|
||||
('errorreportsto', models.TextField()),
|
||||
('generator', models.TextField()),
|
||||
('generator_detail', models.TextField()),
|
||||
('icon', models.TextField()),
|
||||
('_id', models.TextField()),
|
||||
('image', models.JSONField()),
|
||||
('info', models.TextField()),
|
||||
('info_detail', models.JSONField()),
|
||||
('language', models.TextField()),
|
||||
('license', models.TextField()),
|
||||
('link', models.TextField()),
|
||||
('links', models.JSONField()),
|
||||
('logo', models.TextField()),
|
||||
('published', models.TextField()),
|
||||
('published_parsed', models.DateTimeField()),
|
||||
('publisher', models.TextField()),
|
||||
('publisher_detail', models.JSONField()),
|
||||
('rights', models.TextField()),
|
||||
('rights_detail', models.JSONField()),
|
||||
('subtitle', models.TextField()),
|
||||
('subtitle_detail', models.JSONField()),
|
||||
('tags', models.JSONField()),
|
||||
('textinput', models.JSONField()),
|
||||
('title', models.TextField()),
|
||||
('title_detail', models.JSONField()),
|
||||
('ttl', models.TextField()),
|
||||
('updated', models.TextField()),
|
||||
('updated_parsed', models.DateTimeField()),
|
||||
('version', models.CharField(blank=True, max_length=255)),
|
||||
('author', models.TextField(blank=True)),
|
||||
('cloud', models.JSONField(blank=True, null=True)),
|
||||
('contributors', models.JSONField(blank=True, null=True)),
|
||||
('docs', models.TextField(blank=True)),
|
||||
('errorreportsto', models.TextField(blank=True)),
|
||||
('generator', models.TextField(blank=True)),
|
||||
('icon', models.TextField(blank=True)),
|
||||
('_id', models.TextField(blank=True)),
|
||||
('image', models.JSONField(blank=True, null=True)),
|
||||
('info', models.TextField(blank=True)),
|
||||
('info_detail', models.JSONField(blank=True, null=True)),
|
||||
('language', models.TextField(blank=True)),
|
||||
('license', models.TextField(blank=True)),
|
||||
('link', models.TextField(blank=True)),
|
||||
('links', models.JSONField(blank=True, null=True)),
|
||||
('logo', models.TextField(blank=True)),
|
||||
('published', models.TextField(blank=True)),
|
||||
('published_parsed', models.DateTimeField(blank=True, null=True)),
|
||||
('publisher', models.TextField(blank=True)),
|
||||
('rights', models.TextField(blank=True)),
|
||||
('rights_detail', models.JSONField(blank=True, null=True)),
|
||||
('subtitle', models.TextField(blank=True)),
|
||||
('subtitle_detail', models.JSONField(blank=True, null=True)),
|
||||
('tags', models.JSONField(blank=True, null=True)),
|
||||
('textinput', models.JSONField(blank=True, null=True)),
|
||||
('title', models.TextField(blank=True)),
|
||||
('title_detail', models.JSONField(blank=True, null=True)),
|
||||
('ttl', models.TextField(blank=True)),
|
||||
('updated', models.TextField(blank=True)),
|
||||
('updated_parsed', models.DateTimeField(blank=True, null=True)),
|
||||
('author_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feeds.author')),
|
||||
('domain', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='feeds.domain')),
|
||||
('generator_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feeds.generator')),
|
||||
('publisher_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='feeds', to='feeds.publisher')),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
|
|
@ -84,33 +135,33 @@ class Migration(migrations.Migration):
|
|||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('modified_at', models.DateTimeField(auto_now=True)),
|
||||
('author', models.TextField()),
|
||||
('author_detail', models.JSONField()),
|
||||
('comments', models.TextField()),
|
||||
('content', models.JSONField()),
|
||||
('contributors', models.JSONField()),
|
||||
('created', models.TextField()),
|
||||
('created_parsed', models.DateTimeField()),
|
||||
('enclosures', models.JSONField()),
|
||||
('expired', models.TextField()),
|
||||
('expired_parsed', models.DateTimeField()),
|
||||
('_id', models.TextField()),
|
||||
('license', models.TextField()),
|
||||
('link', models.TextField()),
|
||||
('links', models.JSONField()),
|
||||
('published', models.TextField()),
|
||||
('published_parsed', models.DateTimeField()),
|
||||
('publisher', models.TextField()),
|
||||
('publisher_detail', models.JSONField()),
|
||||
('source', models.JSONField()),
|
||||
('summary', models.TextField()),
|
||||
('summary_detail', models.JSONField()),
|
||||
('tags', models.JSONField()),
|
||||
('title', models.TextField()),
|
||||
('title_detail', models.JSONField()),
|
||||
('updated', models.TextField()),
|
||||
('updated_parsed', models.DateTimeField()),
|
||||
('author', models.TextField(blank=True)),
|
||||
('comments', models.TextField(blank=True)),
|
||||
('content', models.JSONField(blank=True, null=True)),
|
||||
('contributors', models.JSONField(blank=True, null=True)),
|
||||
('created', models.TextField(blank=True)),
|
||||
('created_parsed', models.DateTimeField(blank=True, null=True)),
|
||||
('enclosures', models.JSONField(blank=True, null=True)),
|
||||
('expired', models.TextField(blank=True)),
|
||||
('expired_parsed', models.DateTimeField(blank=True, null=True)),
|
||||
('_id', models.TextField(blank=True)),
|
||||
('license', models.TextField(blank=True)),
|
||||
('link', models.TextField(blank=True)),
|
||||
('links', models.JSONField(blank=True, null=True)),
|
||||
('published', models.TextField(blank=True)),
|
||||
('published_parsed', models.DateTimeField(blank=True, null=True)),
|
||||
('publisher', models.TextField(blank=True)),
|
||||
('source', models.JSONField(blank=True, null=True)),
|
||||
('summary', models.TextField(blank=True)),
|
||||
('summary_detail', models.JSONField(blank=True, null=True)),
|
||||
('tags', models.JSONField(blank=True, null=True)),
|
||||
('title', models.TextField(blank=True)),
|
||||
('title_detail', models.JSONField(blank=True, null=True)),
|
||||
('updated', models.TextField(blank=True)),
|
||||
('updated_parsed', models.DateTimeField(blank=True, null=True)),
|
||||
('author_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='entries', to='feeds.author')),
|
||||
('feed', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='feeds.feed')),
|
||||
('publisher_detail', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='entries', to='feeds.publisher')),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
|
|
|||
256
feeds/models.py
256
feeds/models.py
|
|
@ -1,17 +1,21 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import typing
|
||||
from typing import Literal
|
||||
|
||||
from django.db import models
|
||||
from django.db.models import JSONField
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Domain(models.Model):
|
||||
"""A domain that has one or more feeds."""
|
||||
|
||||
name = models.CharField(max_length=255, unique=True)
|
||||
url = models.URLField()
|
||||
categories = models.JSONField()
|
||||
url = models.URLField(unique=True)
|
||||
name = models.CharField(max_length=255)
|
||||
categories = models.JSONField(null=True, blank=True)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
hidden = models.BooleanField(default=False)
|
||||
|
|
@ -24,62 +28,154 @@ class Domain(models.Model):
|
|||
return self.name + if_hidden
|
||||
|
||||
|
||||
class Author(models.Model):
|
||||
"""An author of an entry."""
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
name = models.TextField(blank=True)
|
||||
href = models.TextField(blank=True)
|
||||
email = models.TextField(blank=True)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return string representation of the author."""
|
||||
return f"{self.name} - {self.email} - {self.href}"
|
||||
|
||||
|
||||
class Generator(models.Model):
|
||||
"""A generator of a feed."""
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
name = models.TextField(blank=True)
|
||||
href = models.TextField(blank=True)
|
||||
version = models.TextField(blank=True)
|
||||
|
||||
class Meta:
|
||||
"""Meta information for the generator model."""
|
||||
|
||||
unique_together: typing.ClassVar[list[str]] = ["name", "version", "href"]
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return string representation of the generator."""
|
||||
return self.name
|
||||
|
||||
|
||||
class Links(models.Model):
|
||||
"""A link to a feed or entry."""
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
rel = models.TextField(blank=True)
|
||||
type = models.TextField(blank=True)
|
||||
href = models.TextField(blank=True)
|
||||
title = models.TextField(blank=True)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return string representation of the links."""
|
||||
return self.href
|
||||
|
||||
|
||||
class Publisher(models.Model):
|
||||
"""The publisher of a feed or entry."""
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
name = models.TextField(blank=True)
|
||||
href = models.TextField(blank=True)
|
||||
email = models.TextField(blank=True)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return string representation of the publisher."""
|
||||
return self.name
|
||||
|
||||
|
||||
class Feed(models.Model):
|
||||
"""A RSS/Atom/JSON feed."""
|
||||
|
||||
feed_url = models.URLField(unique=True)
|
||||
|
||||
domain = models.ForeignKey(Domain, on_delete=models.CASCADE)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
last_checked = models.DateTimeField(null=True, blank=True)
|
||||
active = models.BooleanField(default=True)
|
||||
|
||||
# General data
|
||||
bozo = models.BooleanField()
|
||||
bozo_exception = models.TextField()
|
||||
encoding = models.TextField()
|
||||
etag = models.TextField()
|
||||
headers = JSONField()
|
||||
href = models.TextField()
|
||||
modified = models.DateTimeField()
|
||||
namespaces = JSONField()
|
||||
bozo_exception = models.TextField(blank=True)
|
||||
encoding = models.TextField(blank=True)
|
||||
etag = models.TextField(blank=True)
|
||||
headers = JSONField(null=True, blank=True)
|
||||
href = models.TextField(blank=True)
|
||||
modified = models.DateTimeField(null=True, blank=True)
|
||||
namespaces = JSONField(null=True, blank=True)
|
||||
status = models.IntegerField()
|
||||
version = models.CharField(max_length=50)
|
||||
version = models.CharField(max_length=255, blank=True)
|
||||
|
||||
# Feed data
|
||||
author = models.TextField()
|
||||
author_detail = JSONField()
|
||||
cloud = JSONField()
|
||||
contributors = JSONField()
|
||||
docs = models.TextField()
|
||||
errorreportsto = models.TextField()
|
||||
generator = models.TextField()
|
||||
generator_detail = models.TextField()
|
||||
icon = models.TextField()
|
||||
_id = models.TextField()
|
||||
image = JSONField()
|
||||
info = models.TextField()
|
||||
info_detail = JSONField()
|
||||
language = models.TextField()
|
||||
license = models.TextField()
|
||||
link = models.TextField()
|
||||
links = JSONField()
|
||||
logo = models.TextField()
|
||||
published = models.TextField()
|
||||
published_parsed = models.DateTimeField()
|
||||
publisher = models.TextField()
|
||||
publisher_detail = JSONField()
|
||||
rights = models.TextField()
|
||||
rights_detail = JSONField()
|
||||
subtitle = models.TextField()
|
||||
subtitle_detail = JSONField()
|
||||
tags = JSONField()
|
||||
textinput = JSONField()
|
||||
title = models.TextField()
|
||||
title_detail = JSONField()
|
||||
ttl = models.TextField()
|
||||
updated = models.TextField()
|
||||
updated_parsed = models.DateTimeField()
|
||||
author = models.TextField(blank=True)
|
||||
author_detail = models.ForeignKey(
|
||||
Author,
|
||||
on_delete=models.PROTECT,
|
||||
null=True,
|
||||
blank=True,
|
||||
related_name="feeds",
|
||||
)
|
||||
|
||||
cloud = JSONField(null=True, blank=True)
|
||||
contributors = JSONField(null=True, blank=True)
|
||||
docs = models.TextField(blank=True)
|
||||
errorreportsto = models.TextField(blank=True)
|
||||
generator = models.TextField(blank=True)
|
||||
generator_detail = models.ForeignKey(
|
||||
Generator,
|
||||
on_delete=models.PROTECT,
|
||||
null=True,
|
||||
blank=True,
|
||||
related_name="feeds",
|
||||
)
|
||||
|
||||
icon = models.TextField(blank=True)
|
||||
_id = models.TextField(blank=True)
|
||||
image = JSONField(null=True, blank=True)
|
||||
info = models.TextField(blank=True)
|
||||
info_detail = JSONField(null=True, blank=True)
|
||||
language = models.TextField(blank=True)
|
||||
license = models.TextField(blank=True)
|
||||
link = models.TextField(blank=True)
|
||||
links = JSONField(null=True, blank=True)
|
||||
logo = models.TextField(blank=True)
|
||||
published = models.TextField(blank=True)
|
||||
published_parsed = models.DateTimeField(null=True, blank=True)
|
||||
publisher = models.TextField(blank=True)
|
||||
publisher_detail = models.ForeignKey(
|
||||
Publisher,
|
||||
on_delete=models.PROTECT,
|
||||
null=True,
|
||||
blank=True,
|
||||
related_name="feeds",
|
||||
)
|
||||
|
||||
rights = models.TextField(blank=True)
|
||||
rights_detail = JSONField(null=True, blank=True)
|
||||
subtitle = models.TextField(blank=True)
|
||||
subtitle_detail = JSONField(null=True, blank=True)
|
||||
tags = JSONField(null=True, blank=True)
|
||||
textinput = JSONField(null=True, blank=True)
|
||||
title = models.TextField(blank=True)
|
||||
title_detail = JSONField(null=True, blank=True)
|
||||
ttl = models.TextField(blank=True)
|
||||
updated = models.TextField(blank=True)
|
||||
updated_parsed = models.DateTimeField(null=True, blank=True)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return string representation of the feed."""
|
||||
return self.title_detail["value"] or "No title"
|
||||
return f"{self.domain} - {self.title}"
|
||||
|
||||
def get_fields(self) -> list:
|
||||
"""Return the fields of the feed."""
|
||||
return [(field.name, field.value_from_object(self)) for field in Feed._meta.fields]
|
||||
|
||||
|
||||
class Entry(models.Model):
|
||||
|
|
@ -90,33 +186,49 @@ class Entry(models.Model):
|
|||
modified_at = models.DateTimeField(auto_now=True)
|
||||
|
||||
# Entry data
|
||||
author = models.TextField()
|
||||
author_detail = JSONField()
|
||||
comments = models.TextField()
|
||||
content = JSONField()
|
||||
contributors = JSONField()
|
||||
created = models.TextField()
|
||||
created_parsed = models.DateTimeField()
|
||||
enclosures = JSONField()
|
||||
expired = models.TextField()
|
||||
expired_parsed = models.DateTimeField()
|
||||
_id = models.TextField()
|
||||
license = models.TextField()
|
||||
link = models.TextField()
|
||||
links = JSONField()
|
||||
published = models.TextField()
|
||||
published_parsed = models.DateTimeField()
|
||||
publisher = models.TextField()
|
||||
publisher_detail = JSONField()
|
||||
source = JSONField()
|
||||
summary = models.TextField()
|
||||
summary_detail = JSONField()
|
||||
tags = JSONField()
|
||||
title = models.TextField()
|
||||
title_detail = JSONField()
|
||||
updated = models.TextField()
|
||||
updated_parsed = models.DateTimeField()
|
||||
author = models.TextField(blank=True)
|
||||
author_detail = models.ForeignKey(
|
||||
Author,
|
||||
on_delete=models.PROTECT,
|
||||
null=True,
|
||||
blank=True,
|
||||
related_name="entries",
|
||||
)
|
||||
comments = models.TextField(blank=True)
|
||||
content = JSONField(null=True, blank=True)
|
||||
contributors = JSONField(null=True, blank=True)
|
||||
created = models.TextField(blank=True)
|
||||
created_parsed = models.DateTimeField(null=True, blank=True)
|
||||
enclosures = JSONField(null=True, blank=True)
|
||||
expired = models.TextField(blank=True)
|
||||
expired_parsed = models.DateTimeField(null=True, blank=True)
|
||||
_id = models.TextField(blank=True)
|
||||
license = models.TextField(blank=True)
|
||||
link = models.TextField(blank=True)
|
||||
links = JSONField(null=True, blank=True)
|
||||
published = models.TextField(blank=True)
|
||||
published_parsed = models.DateTimeField(null=True, blank=True)
|
||||
publisher = models.TextField(blank=True)
|
||||
publisher_detail = models.ForeignKey(
|
||||
Publisher,
|
||||
on_delete=models.PROTECT,
|
||||
null=True,
|
||||
blank=True,
|
||||
related_name="entries",
|
||||
)
|
||||
source = JSONField(null=True, blank=True)
|
||||
summary = models.TextField(blank=True)
|
||||
summary_detail = JSONField(null=True, blank=True)
|
||||
tags = JSONField(null=True, blank=True)
|
||||
title = models.TextField(blank=True)
|
||||
title_detail = JSONField(null=True, blank=True)
|
||||
updated = models.TextField(blank=True)
|
||||
updated_parsed = models.DateTimeField(null=True, blank=True)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""Return string representation of the entry."""
|
||||
return self.title_detail["value"] or "No title"
|
||||
return f"{self.feed} - {self.title}"
|
||||
|
||||
def get_fields(self) -> list:
|
||||
"""Return the fields of the entry."""
|
||||
return [(field.name, field.value_from_object(self)) for field in Entry._meta.fields]
|
||||
|
|
|
|||
33
feeds/stats.py
Normal file
33
feeds/stats.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.db import connection
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_db_size() -> str:
|
||||
"""Get the size of the database.
|
||||
|
||||
Returns:
|
||||
str: The size of the database.
|
||||
"""
|
||||
# Try to get value from cache
|
||||
db_size = cache.get("db_size")
|
||||
|
||||
if db_size is not None:
|
||||
logger.debug("Got db_size from cache")
|
||||
return db_size
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
|
||||
row = cursor.fetchone()
|
||||
|
||||
db_size = "0 MB" if row is None else str(row[0])
|
||||
|
||||
# Store value in cache for 15 minutes
|
||||
cache.set("db_size", db_size, 60 * 15)
|
||||
|
||||
return db_size
|
||||
|
|
@ -10,4 +10,5 @@ urlpatterns: list[URLPattern] = [
|
|||
path(route="", view=views.IndexView.as_view(), name="index"),
|
||||
path(route="feed/<int:feed_id>/", view=views.FeedView.as_view(), name="feed"),
|
||||
path(route="feeds/", view=views.FeedsView.as_view(), name="feeds"),
|
||||
path(route="add", view=views.AddView.as_view(), name="add"),
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,35 +1,92 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from django.contrib import messages
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.shortcuts import get_object_or_404, render
|
||||
from django.template import loader
|
||||
from django.views import View
|
||||
from django.views.generic.list import ListView
|
||||
|
||||
from feeds.add_feeds import add_feed
|
||||
from feeds.models import Entry, Feed
|
||||
from feeds.stats import get_db_size
|
||||
|
||||
|
||||
class IndexView(View):
|
||||
"""Index path."""
|
||||
|
||||
def get(self, request: HttpRequest) -> HttpResponse:
|
||||
"""GET request for index path."""
|
||||
"""Load the index page."""
|
||||
template = loader.get_template(template_name="index.html")
|
||||
context = {}
|
||||
context = {
|
||||
"db_size": get_db_size(),
|
||||
"amount_of_feeds": Feed.objects.count(),
|
||||
}
|
||||
return HttpResponse(content=template.render(context=context, request=request))
|
||||
|
||||
|
||||
class FeedView(View):
|
||||
"""A single feed."""
|
||||
|
||||
def get(self, request: HttpRequest, feed_id: int) -> HttpResponse:
|
||||
"""GET request for index path."""
|
||||
template = loader.get_template(template_name="feed.html")
|
||||
context = {"feed_id": feed_id}
|
||||
return HttpResponse(content=template.render(context=context, request=request))
|
||||
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: # noqa: ANN002, ANN003, ARG002
|
||||
"""Load the feed page."""
|
||||
feed_id = kwargs.get("feed_id", None)
|
||||
if not feed_id:
|
||||
return HttpResponse(content="No id", status=400)
|
||||
|
||||
feed = get_object_or_404(Feed, id=feed_id)
|
||||
entries = Entry.objects.filter(feed=feed).order_by("-created_parsed")[:100]
|
||||
|
||||
context = {"feed": feed, "entries": entries, "db_size": get_db_size(), "amount_of_feeds": Feed.objects.count()}
|
||||
return render(request, "feed.html", context)
|
||||
|
||||
|
||||
class FeedsView(View):
|
||||
class FeedsView(ListView):
|
||||
"""All feeds."""
|
||||
|
||||
model = Feed
|
||||
paginate_by = 100
|
||||
template_name = "feeds.html"
|
||||
context_object_name = "feeds"
|
||||
|
||||
def get_context_data(self, **kwargs) -> dict: # noqa: ANN003
|
||||
"""Get the context data."""
|
||||
context = super().get_context_data(**kwargs)
|
||||
context["db_size"] = get_db_size()
|
||||
context["amount_of_feeds"] = Feed.objects.count()
|
||||
return context
|
||||
|
||||
|
||||
class AddView(View):
|
||||
"""Add a feed."""
|
||||
|
||||
def get(self, request: HttpRequest) -> HttpResponse:
|
||||
"""GET request for index path."""
|
||||
template = loader.get_template(template_name="feeds.html")
|
||||
context = {}
|
||||
"""Load the index page."""
|
||||
template = loader.get_template(template_name="index.html")
|
||||
context = {
|
||||
"db_size": get_db_size(),
|
||||
"amount_of_feeds": Feed.objects.count(),
|
||||
}
|
||||
return HttpResponse(content=template.render(context=context, request=request))
|
||||
|
||||
def post(self, request: HttpRequest) -> HttpResponse:
|
||||
"""Add a feed."""
|
||||
urls: str | None = request.POST.get("urls", None)
|
||||
if not urls:
|
||||
return HttpResponse(content="No urls", status=400)
|
||||
|
||||
# Split the urls by newline.
|
||||
for url in urls.split("\n"):
|
||||
feed: None | Feed = add_feed(url)
|
||||
if not feed:
|
||||
messages.error(request, f"{url} - Failed to add")
|
||||
continue
|
||||
# Check if bozo is true.
|
||||
if feed.bozo:
|
||||
messages.warning(request, f"{feed.feed_url} - Bozo: {feed.bozo_exception}")
|
||||
|
||||
messages.success(request, f"{feed.feed_url} added")
|
||||
|
||||
# Render the index page.
|
||||
template = loader.get_template(template_name="index.html")
|
||||
return HttpResponse(content=template.render(context={}, request=request))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue