Use Ruff and fix all its warnings
This commit is contained in:
@ -1 +0,0 @@
|
||||
__version__ = "1.0.0"
|
||||
|
@ -1,11 +1,19 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from discord_embed import settings
|
||||
|
||||
|
||||
def generate_html_for_videos(url: str, width: int, height: int, screenshot: str, filename: str) -> str:
|
||||
def generate_html_for_videos(
|
||||
url: str,
|
||||
width: int,
|
||||
height: int,
|
||||
screenshot: str,
|
||||
filename: str,
|
||||
) -> str:
|
||||
"""Generate HTML for video files.
|
||||
|
||||
Args:
|
||||
@ -18,10 +26,13 @@ def generate_html_for_videos(url: str, width: int, height: int, screenshot: str,
|
||||
Returns:
|
||||
Returns HTML for video.
|
||||
"""
|
||||
time_now: datetime.datetime = datetime.datetime.now(tz=datetime.UTC)
|
||||
time_now_str: str = time_now.strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
|
||||
video_html: str = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<!-- Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -->
|
||||
<!-- Generated at {time_now_str} -->
|
||||
<head>
|
||||
<meta property="og:type" content="video.other">
|
||||
<meta property="twitter:player" content="{url}">
|
||||
@ -39,8 +50,8 @@ def generate_html_for_videos(url: str, width: int, height: int, screenshot: str,
|
||||
# Take the filename and append .html to it.
|
||||
filename += ".html"
|
||||
|
||||
file_path: str = os.path.join(settings.upload_folder, filename)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
file_path = Path(settings.upload_folder, filename)
|
||||
with Path.open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(video_html)
|
||||
|
||||
return html_url
|
||||
|
@ -1,8 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from fastapi import FastAPI, File, Request, UploadFile
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
|
||||
from discord_embed import settings
|
||||
@ -17,12 +19,11 @@ app: FastAPI = FastAPI(
|
||||
},
|
||||
)
|
||||
|
||||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||||
templates: Jinja2Templates = Jinja2Templates(directory="templates")
|
||||
|
||||
|
||||
@app.post("/uploadfiles/", description="Where to send a POST request to upload files.")
|
||||
async def upload_file(file: UploadFile = File()):
|
||||
async def upload_file(file: UploadFile = File()): # noqa: B008
|
||||
"""Page for uploading files.
|
||||
|
||||
If it is a video, we need to make an HTML file, and a thumbnail
|
||||
@ -35,15 +36,22 @@ async def upload_file(file: UploadFile = File()):
|
||||
Returns:
|
||||
Returns a dict with the filename, or a link to the .html if it was a video.
|
||||
"""
|
||||
if file.filename is None:
|
||||
send_webhook("Filename is None")
|
||||
return JSONResponse(content={"error": "Filename is None"}, status_code=500)
|
||||
if file.content_type is None:
|
||||
send_webhook("Content type is None")
|
||||
return JSONResponse(content={"error": "Content type is None"}, status_code=500)
|
||||
|
||||
if file.content_type.startswith("video/"):
|
||||
html_url: str = await do_things(file)
|
||||
else:
|
||||
filename: str = await remove_illegal_chars(file.filename)
|
||||
|
||||
with open(f"{settings.upload_folder}/{filename}", "wb+") as f:
|
||||
with Path.open(Path(settings.upload_folder, filename), "wb+") as f:
|
||||
f.write(file.file.read())
|
||||
|
||||
html_url: str = urljoin(settings.serve_domain, filename) # type: ignore
|
||||
html_url: str = urljoin(settings.serve_domain, filename)
|
||||
|
||||
send_webhook(f"{html_url} was uploaded.")
|
||||
return JSONResponse(content={"html_url": html_url})
|
||||
@ -58,8 +66,7 @@ async def remove_illegal_chars(file_name: str) -> str:
|
||||
Returns:
|
||||
Returns a string with the filename without illegal characters.
|
||||
"""
|
||||
|
||||
filename: str = file_name.replace(" ", ".") # type: ignore
|
||||
filename: str = file_name.replace(" ", ".")
|
||||
illegal_characters: list[str] = [
|
||||
"*",
|
||||
'"',
|
||||
@ -84,13 +91,31 @@ async def remove_illegal_chars(file_name: str) -> str:
|
||||
",",
|
||||
]
|
||||
for character in illegal_characters:
|
||||
filename: str = filename.replace(character, "") # type: ignore
|
||||
filename: str = filename.replace(character, "")
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
index_html: str = """
|
||||
<html lang="en">
|
||||
|
||||
<body>
|
||||
<h1>discord-nice-embed</h1>
|
||||
<a href="/docs">Swagger UI - API documentation</a>
|
||||
<br />
|
||||
<a href="/redoc">ReDoc - Alternative API documentation</a>
|
||||
<form action="/uploadfiles/" enctype="multipart/form-data" method="post">
|
||||
<input name="file" type="file" />
|
||||
<input type="submit" value="Upload file" />
|
||||
</form>
|
||||
</body>
|
||||
|
||||
</html>
|
||||
"""
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
|
||||
async def main(request: Request):
|
||||
async def main(request: Request): # noqa: ARG001
|
||||
"""Our index view.
|
||||
|
||||
You can upload files here.
|
||||
@ -99,7 +124,6 @@ async def main(request: Request):
|
||||
request: Our request.
|
||||
|
||||
Returns:
|
||||
TemplateResponse: Returns HTML for site.
|
||||
HTMLResponse: Our index.html page.
|
||||
"""
|
||||
|
||||
return templates.TemplateResponse("index.html", {"request": request})
|
||||
return index_html
|
||||
|
@ -1,37 +1,14 @@
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
from __future__ import annotations
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
load_dotenv(find_dotenv(), verbose=True)
|
||||
|
||||
# Check if user has added a domain to the environment.
|
||||
try:
|
||||
serve_domain: str = os.environ["SERVE_DOMAIN"]
|
||||
except KeyError:
|
||||
sys.exit("discord-embed: Environment variable 'SERVE_DOMAIN' is missing!")
|
||||
|
||||
# Remove trailing slash from domain
|
||||
if serve_domain.endswith("/"):
|
||||
serve_domain = serve_domain[:-1]
|
||||
|
||||
# Check if we have a folder for uploads.
|
||||
try:
|
||||
upload_folder: str = os.environ["UPLOAD_FOLDER"]
|
||||
except KeyError:
|
||||
sys.exit("Environment variable 'UPLOAD_FOLDER' is missing!")
|
||||
|
||||
# Create upload_folder if it doesn't exist.
|
||||
pathlib.Path(upload_folder).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Remove trailing slash from upload_folder
|
||||
if upload_folder.endswith("/"):
|
||||
upload_folder = upload_folder[:-1]
|
||||
|
||||
# Discord webhook URL
|
||||
try:
|
||||
webhook_url: str = os.environ["WEBHOOK_URL"]
|
||||
except KeyError:
|
||||
sys.exit("Environment variable 'WEBHOOK_URL' is missing!")
|
||||
webhook_url: str = os.environ["WEBHOOK_URL"]
|
||||
serve_domain: str = os.environ["SERVE_DOMAIN"].removesuffix("/")
|
||||
upload_folder: str = os.environ["UPLOAD_FOLDER"].removesuffix("/")
|
||||
Path(upload_folder).mkdir(parents=True, exist_ok=True)
|
||||
|
@ -1,3 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
|
||||
@ -5,6 +8,8 @@ import ffmpeg
|
||||
|
||||
from discord_embed import settings
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Resolution:
|
||||
@ -28,9 +33,12 @@ def video_resolution(path_to_video: str) -> Resolution:
|
||||
Returns height and width.
|
||||
"""
|
||||
probe = ffmpeg.probe(path_to_video)
|
||||
video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None)
|
||||
video_stream = next(
|
||||
(stream for stream in probe["streams"] if stream["codec_type"] == "video"),
|
||||
None,
|
||||
)
|
||||
if video_stream is None:
|
||||
print("No video stream found", file=sys.stderr)
|
||||
logger.critical("No video stream found")
|
||||
sys.exit(1)
|
||||
|
||||
width: int = int(video_stream["width"])
|
||||
|
@ -1,13 +1,16 @@
|
||||
import os
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import UploadFile
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from discord_embed import settings
|
||||
from discord_embed.generate_html import generate_html_for_videos
|
||||
from discord_embed.video import Resolution, make_thumbnail, video_resolution
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import UploadFile
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoFile:
|
||||
@ -32,19 +35,23 @@ def save_to_disk(file: UploadFile) -> VideoFile:
|
||||
Returns:
|
||||
VideoFile object with the filename and location.
|
||||
"""
|
||||
if file.filename is None:
|
||||
msg = "Filename is None"
|
||||
raise ValueError(msg)
|
||||
|
||||
# Create the folder where we should save the files
|
||||
folder_video: str = os.path.join(settings.upload_folder, "video")
|
||||
Path(folder_video).mkdir(parents=True, exist_ok=True)
|
||||
save_folder_video = Path(settings.upload_folder, "video")
|
||||
Path(save_folder_video).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Replace spaces with dots in the filename.
|
||||
filename: str = file.filename.replace(" ", ".")
|
||||
|
||||
# Save the uploaded file to disk.
|
||||
file_location: str = os.path.join(folder_video, filename)
|
||||
with open(file_location, "wb+") as f:
|
||||
file_location = Path(save_folder_video, filename)
|
||||
with Path.open(file_location, "wb+") as f:
|
||||
f.write(file.file.read())
|
||||
|
||||
return VideoFile(filename, file_location)
|
||||
return VideoFile(filename, str(file_location))
|
||||
|
||||
|
||||
async def do_things(file: UploadFile) -> str:
|
||||
@ -56,7 +63,6 @@ async def do_things(file: UploadFile) -> str:
|
||||
Returns:
|
||||
Returns URL for video.
|
||||
"""
|
||||
|
||||
video_file: VideoFile = save_to_disk(file)
|
||||
|
||||
file_url: str = f"{settings.serve_domain}/video/{video_file.filename}"
|
||||
|
@ -1,7 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from discord_webhook import DiscordWebhook
|
||||
|
||||
from discord_embed import settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from requests import Response
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def send_webhook(message: str) -> None:
|
||||
"""Send a webhook to Discord.
|
||||
@ -11,7 +21,9 @@ def send_webhook(message: str) -> None:
|
||||
"""
|
||||
webhook: DiscordWebhook = DiscordWebhook(
|
||||
url=settings.webhook_url,
|
||||
content=message,
|
||||
content=message or "discord-nice-embed: No message was provided.",
|
||||
rate_limit_retry=True,
|
||||
)
|
||||
webhook.execute()
|
||||
response: Response = webhook.execute()
|
||||
if not response.ok:
|
||||
logger.critical("Webhook failed to send\n %s\n %s", response, message)
|
||||
|
Reference in New Issue
Block a user