61
discord_embed/generate_html.py
Normal file
61
discord_embed/generate_html.py
Normal file
@ -0,0 +1,61 @@
|
||||
"""Generate the HTML that makes this program useful.
|
||||
|
||||
This is what we will send to other people on Discord.
|
||||
You can remove the .html with your web server so the link will look normal.
|
||||
For example, with nginx, you can do this(note the $uri.html):
|
||||
location / {
|
||||
try_files $uri $uri/ $uri.html;
|
||||
}
|
||||
"""
|
||||
import os
|
||||
from datetime import datetime
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from discord_embed import settings
|
||||
|
||||
|
||||
def generate_html_for_videos(
|
||||
url: str,
|
||||
width: int,
|
||||
height: int,
|
||||
screenshot: str,
|
||||
filename: str,
|
||||
) -> str:
|
||||
"""Generate HTML for video files.
|
||||
|
||||
Args:
|
||||
url: URL for the video. This is accessible from the browser.
|
||||
width: This is the width of the video.
|
||||
height: This is the height of the video.
|
||||
screenshot: URL for screenshot.
|
||||
filename: Original video filename.
|
||||
|
||||
Returns:
|
||||
Returns HTML for video.
|
||||
"""
|
||||
video_html = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<!-- Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -->
|
||||
<head>
|
||||
<meta property="og:type" content="video.other">
|
||||
<meta property="twitter:player" content="{url}">
|
||||
<meta property="og:video:type" content="text/html">
|
||||
<meta property="og:video:width" content="{width}">
|
||||
<meta property="og:video:height" content="{height}">
|
||||
<meta name="twitter:image" content="{screenshot}">
|
||||
<meta http-equiv="refresh" content="0;url={url}">
|
||||
</head>
|
||||
</html>
|
||||
"""
|
||||
domain = settings.serve_domain
|
||||
html_url: str = urljoin(domain, filename)
|
||||
|
||||
# Take the filename and append .html to it.
|
||||
filename += ".html"
|
||||
|
||||
file_path = os.path.join(settings.upload_folder, filename)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(video_html)
|
||||
|
||||
return html_url
|
@ -1,28 +1,21 @@
|
||||
import datetime
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
"""Our site has one POST endpoint for uploading videos and one GET
|
||||
endpoint for getting the HTML. Images are served from a web server."""
|
||||
from typing import Dict
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import ffmpeg
|
||||
import requests
|
||||
from discord_webhook import DiscordWebhook
|
||||
from fastapi import FastAPI, File, Request, UploadFile
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
|
||||
from discord_embed import settings
|
||||
from discord_embed.video_file_upload import do_things
|
||||
from discord_embed.webhook import send_webhook
|
||||
|
||||
DESCRIPTION = (
|
||||
"Discord will only create embeds for videos and images if they are"
|
||||
" smaller than 8 mb. We can 'abuse' this by creating a .html that"
|
||||
" contains the 'twitter:player' HTML meta tag linking to the video."
|
||||
)
|
||||
app = FastAPI(
|
||||
title="discord-nice-embed",
|
||||
description=DESCRIPTION,
|
||||
version="1.0.0",
|
||||
description=settings.DESCRIPTION,
|
||||
version="0.0.1",
|
||||
contact={
|
||||
"name": "Joakim Hellsén",
|
||||
"url": "https://github.com/TheLovinator1",
|
||||
@ -38,164 +31,47 @@ app.mount("/static", StaticFiles(directory="static"), name="static")
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileModel:
|
||||
filename: str
|
||||
file_location: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Resolution:
|
||||
height: int
|
||||
width: int
|
||||
|
||||
|
||||
def remove_illegal_characters(filename: str) -> str:
|
||||
filename = filename.replace(" ", ".")
|
||||
illegal_characters = [
|
||||
"*",
|
||||
'"',
|
||||
"<",
|
||||
">",
|
||||
"△",
|
||||
"「",
|
||||
"」",
|
||||
"{",
|
||||
"}",
|
||||
"|",
|
||||
"^",
|
||||
";",
|
||||
"/",
|
||||
"?",
|
||||
":",
|
||||
"@",
|
||||
"&",
|
||||
"=",
|
||||
"+",
|
||||
"$",
|
||||
",",
|
||||
]
|
||||
for character in illegal_characters:
|
||||
filename = filename.replace(character, "")
|
||||
|
||||
return filename
|
||||
|
||||
|
||||
def generate_html_for_videos(url: str, width: int, height: int, screenshot: str, filename: str) -> str:
|
||||
video_html = f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<!-- Generated at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -->
|
||||
<head>
|
||||
<meta property="og:type" content="video.other">
|
||||
<meta property="twitter:player" content="{url}">
|
||||
<meta property="og:video:type" content="text/html">
|
||||
<meta property="og:video:width" content="{width}">
|
||||
<meta property="og:video:height" content="{height}">
|
||||
<meta name="twitter:image" content="{screenshot}">
|
||||
<meta http-equiv="refresh" content="0;url={url}">
|
||||
</head>
|
||||
</html>
|
||||
"""
|
||||
|
||||
file_path = os.path.join(settings.upload_folder, filename + ".html")
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(video_html)
|
||||
|
||||
return urljoin(settings.serve_domain, filename)
|
||||
|
||||
|
||||
def send_webhook(message: str) -> None:
|
||||
webhook = DiscordWebhook(
|
||||
url=settings.webhook_url,
|
||||
content=message,
|
||||
rate_limit_retry=True,
|
||||
)
|
||||
response: requests.Response = webhook.execute()
|
||||
if not response.ok:
|
||||
error_msg = f"Error: {response.text!r} ({response.status_code!r})\nMessage: {message!r}"
|
||||
print(error_msg)
|
||||
send_webhook(error_msg)
|
||||
|
||||
|
||||
def video_resolution(path_to_video: str) -> Resolution | None:
|
||||
probe = ffmpeg.probe(path_to_video)
|
||||
video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None)
|
||||
|
||||
if video_stream is None:
|
||||
return None
|
||||
|
||||
return Resolution(
|
||||
height=int(video_stream["height"]),
|
||||
width=int(video_stream["width"]),
|
||||
)
|
||||
|
||||
|
||||
def make_thumbnail(path_video: str, file_filename: str) -> str | None:
|
||||
thumbnail = os.path.join(settings.upload_folder, file_filename + ".jpg")
|
||||
ffmpeg.input(path_video, ss="1").output(thumbnail, vframes=1).overwrite_output().run()
|
||||
|
||||
if not os.path.isfile(thumbnail):
|
||||
return None
|
||||
|
||||
return urljoin(settings.serve_domain, file_filename + ".jpg")
|
||||
|
||||
|
||||
def save_to_disk(file: UploadFile) -> FileModel:
|
||||
folder_video = os.path.join(settings.upload_folder, "video")
|
||||
Path(folder_video).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
filename = remove_illegal_characters(file.filename)
|
||||
file_location = os.path.join(folder_video, filename)
|
||||
with open(file_location, "wb+") as f:
|
||||
f.write(file.file.read())
|
||||
|
||||
return FileModel(filename, file_location)
|
||||
|
||||
|
||||
async def if_video_file(file: UploadFile) -> HTMLResponse:
|
||||
video_file: FileModel = save_to_disk(file)
|
||||
|
||||
resolution = video_resolution(video_file.file_location)
|
||||
if resolution is None:
|
||||
send_webhook(f"ERROR: Failed to find resolution for {video_file.file_location!r}")
|
||||
return HTMLResponse(status_code=400, content="Failed to find resolution")
|
||||
|
||||
thumbnail_url = make_thumbnail(video_file.file_location, video_file.filename)
|
||||
if thumbnail_url is None:
|
||||
send_webhook(f"ERROR: Failed to make thumbnail for {video_file.file_location!r}")
|
||||
return HTMLResponse(status_code=400, content="Failed to make thumbnail")
|
||||
|
||||
file_url = os.path.join(settings.serve_domain, "video", video_file.filename)
|
||||
html_url = generate_html_for_videos(
|
||||
url=file_url,
|
||||
width=resolution.width,
|
||||
height=resolution.height,
|
||||
screenshot=thumbnail_url,
|
||||
filename=video_file.filename,
|
||||
)
|
||||
|
||||
send_webhook(f"{html_url!r} was uploaded.")
|
||||
|
||||
return {"html_url": html_url}
|
||||
|
||||
|
||||
@app.post("/uploadfiles/")
|
||||
async def upload_file(file: UploadFile = File(...)) -> HTMLResponse:
|
||||
if file.content_type.startswith("video/"):
|
||||
return await if_video_file(file)
|
||||
async def upload_file(file: UploadFile = File(...)) -> Dict[str, str]:
|
||||
"""Page for uploading files.
|
||||
|
||||
filename = remove_illegal_characters(file.filename)
|
||||
file_location = os.path.join(settings.upload_folder, filename)
|
||||
with open(file_location, "wb+") as f:
|
||||
If it is a video, we need to make a HTML file, and a thumbnail
|
||||
otherwise we can just save the file and return the URL for it.
|
||||
If something goes wrong, we will send a message to Discord.
|
||||
|
||||
Args:
|
||||
file: Our uploaded file.
|
||||
|
||||
Returns:
|
||||
Returns a dict with the filename or a link to the .html if it was a video.
|
||||
"""
|
||||
domain_url = ""
|
||||
if file.content_type.startswith("video/"):
|
||||
return await do_things(file)
|
||||
|
||||
# Replace spaces with dots in filename.
|
||||
filename = file.filename.replace(" ", ".")
|
||||
|
||||
# Remove ? from filename.
|
||||
# TODO: Make a list of every illegal character and remove them.
|
||||
filename = filename.replace("?", "")
|
||||
|
||||
with open(f"{settings.upload_folder}/{filename}", "wb+") as f:
|
||||
f.write(file.file.read())
|
||||
|
||||
html_url = urljoin(settings.serve_domain, filename)
|
||||
send_webhook(f"{html_url!r} was uploaded.")
|
||||
|
||||
return {"html_url": html_url}
|
||||
domain_url = urljoin(settings.serve_domain, filename)
|
||||
send_webhook(f"{domain_url} was uploaded.")
|
||||
return {"html_url": domain_url}
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def main(request: Request):
|
||||
"""Our index view.
|
||||
|
||||
You can upload files here.
|
||||
|
||||
Returns:
|
||||
HTMLResponse: Returns HTML for site.
|
||||
"""
|
||||
|
||||
return templates.TemplateResponse("index.html", {"request": request})
|
||||
|
@ -5,23 +5,38 @@ import sys
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
DESCRIPTION = (
|
||||
"Discord will only create embeds for videos and images if they are "
|
||||
"smaller than 8 mb. We can 'abuse' this by creating a .html that "
|
||||
"contains the 'twitter:player' HTML meta tag linking to the video."
|
||||
)
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Check if user has added a domain to the environment.
|
||||
try:
|
||||
serve_domain = os.environ["SERVE_DOMAIN"]
|
||||
except KeyError:
|
||||
sys.exit("discord-embed: Environment variable 'SERVE_DOMAIN' is missing!")
|
||||
|
||||
# Remove trailing slash from domain
|
||||
if serve_domain.endswith("/"):
|
||||
serve_domain = serve_domain[:-1]
|
||||
|
||||
# Check if we have a folder for uploads.
|
||||
try:
|
||||
upload_folder = os.environ["UPLOAD_FOLDER"]
|
||||
except KeyError:
|
||||
sys.exit("Environment variable 'UPLOAD_FOLDER' is missing!")
|
||||
|
||||
# Create upload_folder if it doesn't exist.
|
||||
pathlib.Path(upload_folder).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Remove trailing slash from upload_folder
|
||||
if upload_folder.endswith("/"):
|
||||
upload_folder = upload_folder[:-1]
|
||||
|
||||
# Discord webhook URL
|
||||
try:
|
||||
webhook_url = os.environ["WEBHOOK_URL"]
|
||||
except KeyError:
|
||||
|
47
discord_embed/video.py
Normal file
47
discord_embed/video.py
Normal file
@ -0,0 +1,47 @@
|
||||
"""Stuff that has to do with videos."""
|
||||
import sys
|
||||
|
||||
import ffmpeg
|
||||
|
||||
from discord_embed import settings
|
||||
|
||||
|
||||
def video_resolution(path_to_video: str) -> tuple[int, int]:
|
||||
"""Find video resolution.
|
||||
|
||||
Args:
|
||||
path_to_video: Path to video file.
|
||||
|
||||
Returns:
|
||||
Returns height and width.
|
||||
"""
|
||||
probe = ffmpeg.probe(path_to_video)
|
||||
video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None)
|
||||
if video_stream is None:
|
||||
print("No video stream found", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
width = int(video_stream["width"])
|
||||
height = int(video_stream["height"])
|
||||
|
||||
return height, width
|
||||
|
||||
|
||||
def make_thumbnail(path_video: str, file_filename: str) -> str:
|
||||
"""Make thumbnail for Discord. This is a screenshot of the video.
|
||||
|
||||
Args:
|
||||
path_video: Path where video file is stored.
|
||||
file_filename: File name for URL.
|
||||
|
||||
Returns:
|
||||
Returns thumbnail filename.
|
||||
"""
|
||||
(
|
||||
ffmpeg.input(path_video, ss="1")
|
||||
.output(f"{settings.upload_folder}/{file_filename}.jpg", vframes=1)
|
||||
.overwrite_output()
|
||||
.run()
|
||||
)
|
||||
# Return URL for thumbnail.
|
||||
return f"{settings.serve_domain}/{file_filename}.jpg"
|
62
discord_embed/video_file_upload.py
Normal file
62
discord_embed/video_file_upload.py
Normal file
@ -0,0 +1,62 @@
|
||||
"""Things that has to do with video file uploading."""
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
from fastapi import UploadFile
|
||||
|
||||
from discord_embed import settings
|
||||
from discord_embed.generate_html import generate_html_for_videos
|
||||
from discord_embed.video import make_thumbnail, video_resolution
|
||||
from discord_embed.webhook import send_webhook
|
||||
|
||||
|
||||
def save_to_disk(file: UploadFile) -> tuple[str, str]:
|
||||
"""Save file to disk.
|
||||
|
||||
If spaces in filename, replace with dots.
|
||||
|
||||
Args:
|
||||
file: Our uploaded file.
|
||||
|
||||
Returns:
|
||||
Returns filename and file location.
|
||||
"""
|
||||
# Create folder if it doesn't exist.
|
||||
folder_video = os.path.join(settings.upload_folder, "video")
|
||||
Path(folder_video).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Replace spaces with dots in filename.
|
||||
filename = file.filename.replace(" ", ".")
|
||||
|
||||
# Save file to disk.
|
||||
file_location = os.path.join(folder_video, filename)
|
||||
with open(file_location, "wb+") as f:
|
||||
f.write(file.file.read())
|
||||
|
||||
return filename, file_location
|
||||
|
||||
|
||||
async def do_things(file: UploadFile) -> Dict[str, str]:
|
||||
"""Save video to disk, generate HTML, thumbnail, and return a .html URL.
|
||||
|
||||
Args:
|
||||
file: Our uploaded file.
|
||||
|
||||
Returns:
|
||||
Returns URL for video.
|
||||
"""
|
||||
filename, file_location = save_to_disk(file)
|
||||
|
||||
file_url = f"{settings.serve_domain}/video/{filename}"
|
||||
height, width = video_resolution(file_location)
|
||||
screenshot_url = make_thumbnail(file_location, filename)
|
||||
html_url = generate_html_for_videos(
|
||||
url=file_url,
|
||||
width=width,
|
||||
height=height,
|
||||
screenshot=screenshot_url,
|
||||
filename=filename,
|
||||
)
|
||||
send_webhook(f"{settings.serve_domain}/{filename} was uploaded.")
|
||||
return {"html_url": f"{html_url}"}
|
18
discord_embed/webhook.py
Normal file
18
discord_embed/webhook.py
Normal file
@ -0,0 +1,18 @@
|
||||
"""Send webhook to Discord."""
|
||||
from discord_webhook import DiscordWebhook
|
||||
|
||||
from discord_embed import settings
|
||||
|
||||
|
||||
def send_webhook(message: str) -> None:
|
||||
"""Send webhook to Discord.
|
||||
|
||||
Args:
|
||||
message: The message to send.
|
||||
"""
|
||||
webhook = DiscordWebhook(
|
||||
url=settings.webhook_url,
|
||||
content=message,
|
||||
rate_limit_retry=True,
|
||||
)
|
||||
webhook.execute()
|
@ -1,26 +1,36 @@
|
||||
import imghdr
|
||||
import os
|
||||
|
||||
from discord_embed import settings
|
||||
from discord_embed.main import (
|
||||
app,
|
||||
generate_html_for_videos,
|
||||
make_thumbnail,
|
||||
send_webhook,
|
||||
video_resolution,
|
||||
)
|
||||
from discord_embed import __version__, settings
|
||||
from discord_embed.generate_html import generate_html_for_videos
|
||||
from discord_embed.main import app
|
||||
from discord_embed.video import make_thumbnail, video_resolution
|
||||
from discord_embed.webhook import send_webhook
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
client = TestClient(app)
|
||||
TEST_FILE = "tests/test.mp4"
|
||||
|
||||
|
||||
def test_version():
|
||||
"""Test that version is correct."""
|
||||
assert __version__ == "1.0.0"
|
||||
|
||||
|
||||
def test_domain_ends_with_slash():
|
||||
"""Test that domain ends with slash."""
|
||||
assert not settings.serve_domain.endswith("/")
|
||||
|
||||
|
||||
def test_generate_html_for_videos():
|
||||
"""Test that generate_html_for_videos() works."""
|
||||
# TODO: We should probably import this from settings.py instead of
|
||||
# hardcoding it here. If we change it in settings.py, it won't be
|
||||
# changed here.
|
||||
|
||||
domain = os.environ["SERVE_DOMAIN"]
|
||||
|
||||
# Remove trailing slash from domain
|
||||
if domain.endswith("/"):
|
||||
domain = domain[:-1]
|
||||
|
||||
@ -35,26 +45,47 @@ def test_generate_html_for_videos():
|
||||
|
||||
|
||||
def test_video_resolution():
|
||||
resolution = video_resolution(TEST_FILE)
|
||||
assert resolution.width == 422
|
||||
assert resolution.height == 422
|
||||
"""Test that video_resolution() works."""
|
||||
assert video_resolution(TEST_FILE) == (422, 422)
|
||||
|
||||
|
||||
def test_make_thumbnail():
|
||||
"""Test that make_thumbnail() works."""
|
||||
# TODO: We should probably import this from settings.py instead of
|
||||
# hardcoding it here. If we change it in settings.py, it won't be
|
||||
# changed here.
|
||||
|
||||
domain = os.environ["SERVE_DOMAIN"]
|
||||
|
||||
# Remove trailing slash from domain
|
||||
if domain.endswith("/"):
|
||||
domain = domain[:-1]
|
||||
|
||||
thumbnail = make_thumbnail(TEST_FILE, "test.mp4")
|
||||
# Check that thumbnail is a jpeg.
|
||||
assert imghdr.what(f"{settings.upload_folder}/test.mp4.jpg") == "jpeg"
|
||||
|
||||
# Check that the it returns the correct URL.
|
||||
assert thumbnail == f"{domain}/test.mp4.jpg"
|
||||
|
||||
|
||||
def test_save_to_disk():
|
||||
"""Test that save_to_disk() works."""
|
||||
# TODO: Implement this test. I need to mock the UploadFile object.
|
||||
|
||||
|
||||
def test_do_things():
|
||||
"""Test that do_things() works."""
|
||||
# TODO: Implement this test. I need to mock the UploadFile object.
|
||||
|
||||
|
||||
def test_send_webhook():
|
||||
"""Test that send_webhook() works."""
|
||||
send_webhook("Running Pytest")
|
||||
|
||||
|
||||
def test_main():
|
||||
"""Test that main() works."""
|
||||
data_without_trailing_nl = ""
|
||||
response = client.get("/")
|
||||
|
||||
@ -71,7 +102,14 @@ def test_main():
|
||||
|
||||
|
||||
def test_upload_file():
|
||||
"""Test if we can upload files."""
|
||||
# TODO: We should probably import this from settings.py instead of
|
||||
# hardcoding it here. If we change it in settings.py, it won't be
|
||||
# changed here.
|
||||
|
||||
domain = os.environ["SERVE_DOMAIN"]
|
||||
|
||||
# Remove trailing slash from domain
|
||||
if domain.endswith("/"):
|
||||
domain = domain[:-1]
|
||||
|
||||
|
Reference in New Issue
Block a user