diff --git a/discord_embed/generate_html.py b/discord_embed/generate_html.py new file mode 100644 index 0000000..aa9901e --- /dev/null +++ b/discord_embed/generate_html.py @@ -0,0 +1,61 @@ +"""Generate the HTML that makes this program useful. + +This is what we will send to other people on Discord. +You can remove the .html with your web server so the link will look normal. +For example, with nginx, you can do this(note the $uri.html): +location / { + try_files $uri $uri/ $uri.html; +} +""" +import os +from datetime import datetime +from urllib.parse import urljoin + +from discord_embed import settings + + +def generate_html_for_videos( + url: str, + width: int, + height: int, + screenshot: str, + filename: str, +) -> str: + """Generate HTML for video files. + + Args: + url: URL for the video. This is accessible from the browser. + width: This is the width of the video. + height: This is the height of the video. + screenshot: URL for screenshot. + filename: Original video filename. + + Returns: + Returns HTML for video. + """ + video_html = f""" + <!DOCTYPE html> + <html> + <!-- Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} --> + <head> + <meta property="og:type" content="video.other"> + <meta property="twitter:player" content="{url}"> + <meta property="og:video:type" content="text/html"> + <meta property="og:video:width" content="{width}"> + <meta property="og:video:height" content="{height}"> + <meta name="twitter:image" content="{screenshot}"> + <meta http-equiv="refresh" content="0;url={url}"> + </head> + </html> + """ + domain = settings.serve_domain + html_url: str = urljoin(domain, filename) + + # Take the filename and append .html to it. + filename += ".html" + + file_path = os.path.join(settings.upload_folder, filename) + with open(file_path, "w", encoding="utf-8") as f: + f.write(video_html) + + return html_url diff --git a/discord_embed/main.py b/discord_embed/main.py index 4564bad..3464176 100644 --- a/discord_embed/main.py +++ b/discord_embed/main.py @@ -1,28 +1,21 @@ -import datetime -import os -from dataclasses import dataclass -from pathlib import Path +"""Our site has one POST endpoint for uploading videos and one GET +endpoint for getting the HTML. Images are served from a web server.""" +from typing import Dict from urllib.parse import urljoin -import ffmpeg -import requests -from discord_webhook import DiscordWebhook from fastapi import FastAPI, File, Request, UploadFile from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from discord_embed import settings +from discord_embed.video_file_upload import do_things +from discord_embed.webhook import send_webhook -DESCRIPTION = ( - "Discord will only create embeds for videos and images if they are" - " smaller than 8 mb. We can 'abuse' this by creating a .html that" - " contains the 'twitter:player' HTML meta tag linking to the video." -) app = FastAPI( title="discord-nice-embed", - description=DESCRIPTION, - version="1.0.0", + description=settings.DESCRIPTION, + version="0.0.1", contact={ "name": "Joakim Hellsén", "url": "https://github.com/TheLovinator1", @@ -38,164 +31,47 @@ app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") -@dataclass -class FileModel: - filename: str - file_location: str - - -@dataclass -class Resolution: - height: int - width: int - - -def remove_illegal_characters(filename: str) -> str: - filename = filename.replace(" ", ".") - illegal_characters = [ - "*", - '"', - "<", - ">", - "△", - "「", - "」", - "{", - "}", - "|", - "^", - ";", - "/", - "?", - ":", - "@", - "&", - "=", - "+", - "$", - ",", - ] - for character in illegal_characters: - filename = filename.replace(character, "") - - return filename - - -def generate_html_for_videos(url: str, width: int, height: int, screenshot: str, filename: str) -> str: - video_html = f""" - <!DOCTYPE html> - <html> - <!-- Generated at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} --> - <head> - <meta property="og:type" content="video.other"> - <meta property="twitter:player" content="{url}"> - <meta property="og:video:type" content="text/html"> - <meta property="og:video:width" content="{width}"> - <meta property="og:video:height" content="{height}"> - <meta name="twitter:image" content="{screenshot}"> - <meta http-equiv="refresh" content="0;url={url}"> - </head> - </html> - """ - - file_path = os.path.join(settings.upload_folder, filename + ".html") - with open(file_path, "w", encoding="utf-8") as f: - f.write(video_html) - - return urljoin(settings.serve_domain, filename) - - -def send_webhook(message: str) -> None: - webhook = DiscordWebhook( - url=settings.webhook_url, - content=message, - rate_limit_retry=True, - ) - response: requests.Response = webhook.execute() - if not response.ok: - error_msg = f"Error: {response.text!r} ({response.status_code!r})\nMessage: {message!r}" - print(error_msg) - send_webhook(error_msg) - - -def video_resolution(path_to_video: str) -> Resolution | None: - probe = ffmpeg.probe(path_to_video) - video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None) - - if video_stream is None: - return None - - return Resolution( - height=int(video_stream["height"]), - width=int(video_stream["width"]), - ) - - -def make_thumbnail(path_video: str, file_filename: str) -> str | None: - thumbnail = os.path.join(settings.upload_folder, file_filename + ".jpg") - ffmpeg.input(path_video, ss="1").output(thumbnail, vframes=1).overwrite_output().run() - - if not os.path.isfile(thumbnail): - return None - - return urljoin(settings.serve_domain, file_filename + ".jpg") - - -def save_to_disk(file: UploadFile) -> FileModel: - folder_video = os.path.join(settings.upload_folder, "video") - Path(folder_video).mkdir(parents=True, exist_ok=True) - - filename = remove_illegal_characters(file.filename) - file_location = os.path.join(folder_video, filename) - with open(file_location, "wb+") as f: - f.write(file.file.read()) - - return FileModel(filename, file_location) - - -async def if_video_file(file: UploadFile) -> HTMLResponse: - video_file: FileModel = save_to_disk(file) - - resolution = video_resolution(video_file.file_location) - if resolution is None: - send_webhook(f"ERROR: Failed to find resolution for {video_file.file_location!r}") - return HTMLResponse(status_code=400, content="Failed to find resolution") - - thumbnail_url = make_thumbnail(video_file.file_location, video_file.filename) - if thumbnail_url is None: - send_webhook(f"ERROR: Failed to make thumbnail for {video_file.file_location!r}") - return HTMLResponse(status_code=400, content="Failed to make thumbnail") - - file_url = os.path.join(settings.serve_domain, "video", video_file.filename) - html_url = generate_html_for_videos( - url=file_url, - width=resolution.width, - height=resolution.height, - screenshot=thumbnail_url, - filename=video_file.filename, - ) - - send_webhook(f"{html_url!r} was uploaded.") - - return {"html_url": html_url} - - @app.post("/uploadfiles/") -async def upload_file(file: UploadFile = File(...)) -> HTMLResponse: - if file.content_type.startswith("video/"): - return await if_video_file(file) +async def upload_file(file: UploadFile = File(...)) -> Dict[str, str]: + """Page for uploading files. - filename = remove_illegal_characters(file.filename) - file_location = os.path.join(settings.upload_folder, filename) - with open(file_location, "wb+") as f: + If it is a video, we need to make a HTML file, and a thumbnail + otherwise we can just save the file and return the URL for it. + If something goes wrong, we will send a message to Discord. + + Args: + file: Our uploaded file. + + Returns: + Returns a dict with the filename or a link to the .html if it was a video. + """ + domain_url = "" + if file.content_type.startswith("video/"): + return await do_things(file) + + # Replace spaces with dots in filename. + filename = file.filename.replace(" ", ".") + + # Remove ? from filename. + # TODO: Make a list of every illegal character and remove them. + filename = filename.replace("?", "") + + with open(f"{settings.upload_folder}/{filename}", "wb+") as f: f.write(file.file.read()) - html_url = urljoin(settings.serve_domain, filename) - send_webhook(f"{html_url!r} was uploaded.") - - return {"html_url": html_url} + domain_url = urljoin(settings.serve_domain, filename) + send_webhook(f"{domain_url} was uploaded.") + return {"html_url": domain_url} @app.get("/", response_class=HTMLResponse) async def main(request: Request): + """Our index view. + + You can upload files here. + + Returns: + HTMLResponse: Returns HTML for site. + """ + return templates.TemplateResponse("index.html", {"request": request}) diff --git a/discord_embed/settings.py b/discord_embed/settings.py index 0ab5838..e8f2611 100644 --- a/discord_embed/settings.py +++ b/discord_embed/settings.py @@ -5,23 +5,38 @@ import sys from dotenv import load_dotenv +DESCRIPTION = ( + "Discord will only create embeds for videos and images if they are " + "smaller than 8 mb. We can 'abuse' this by creating a .html that " + "contains the 'twitter:player' HTML meta tag linking to the video." +) +# Load environment variables load_dotenv() +# Check if user has added a domain to the environment. try: serve_domain = os.environ["SERVE_DOMAIN"] except KeyError: sys.exit("discord-embed: Environment variable 'SERVE_DOMAIN' is missing!") + +# Remove trailing slash from domain if serve_domain.endswith("/"): serve_domain = serve_domain[:-1] +# Check if we have a folder for uploads. try: upload_folder = os.environ["UPLOAD_FOLDER"] except KeyError: sys.exit("Environment variable 'UPLOAD_FOLDER' is missing!") + +# Create upload_folder if it doesn't exist. pathlib.Path(upload_folder).mkdir(parents=True, exist_ok=True) + +# Remove trailing slash from upload_folder if upload_folder.endswith("/"): upload_folder = upload_folder[:-1] +# Discord webhook URL try: webhook_url = os.environ["WEBHOOK_URL"] except KeyError: diff --git a/discord_embed/video.py b/discord_embed/video.py new file mode 100644 index 0000000..41c0c31 --- /dev/null +++ b/discord_embed/video.py @@ -0,0 +1,47 @@ +"""Stuff that has to do with videos.""" +import sys + +import ffmpeg + +from discord_embed import settings + + +def video_resolution(path_to_video: str) -> tuple[int, int]: + """Find video resolution. + + Args: + path_to_video: Path to video file. + + Returns: + Returns height and width. + """ + probe = ffmpeg.probe(path_to_video) + video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None) + if video_stream is None: + print("No video stream found", file=sys.stderr) + sys.exit(1) + + width = int(video_stream["width"]) + height = int(video_stream["height"]) + + return height, width + + +def make_thumbnail(path_video: str, file_filename: str) -> str: + """Make thumbnail for Discord. This is a screenshot of the video. + + Args: + path_video: Path where video file is stored. + file_filename: File name for URL. + + Returns: + Returns thumbnail filename. + """ + ( + ffmpeg.input(path_video, ss="1") + .output(f"{settings.upload_folder}/{file_filename}.jpg", vframes=1) + .overwrite_output() + .run() + ) + # Return URL for thumbnail. + return f"{settings.serve_domain}/{file_filename}.jpg" diff --git a/discord_embed/video_file_upload.py b/discord_embed/video_file_upload.py new file mode 100644 index 0000000..fa89c27 --- /dev/null +++ b/discord_embed/video_file_upload.py @@ -0,0 +1,62 @@ +"""Things that has to do with video file uploading.""" +import os +from pathlib import Path +from typing import Dict + +from fastapi import UploadFile + +from discord_embed import settings +from discord_embed.generate_html import generate_html_for_videos +from discord_embed.video import make_thumbnail, video_resolution +from discord_embed.webhook import send_webhook + + +def save_to_disk(file: UploadFile) -> tuple[str, str]: + """Save file to disk. + + If spaces in filename, replace with dots. + + Args: + file: Our uploaded file. + + Returns: + Returns filename and file location. + """ + # Create folder if it doesn't exist. + folder_video = os.path.join(settings.upload_folder, "video") + Path(folder_video).mkdir(parents=True, exist_ok=True) + + # Replace spaces with dots in filename. + filename = file.filename.replace(" ", ".") + + # Save file to disk. + file_location = os.path.join(folder_video, filename) + with open(file_location, "wb+") as f: + f.write(file.file.read()) + + return filename, file_location + + +async def do_things(file: UploadFile) -> Dict[str, str]: + """Save video to disk, generate HTML, thumbnail, and return a .html URL. + + Args: + file: Our uploaded file. + + Returns: + Returns URL for video. + """ + filename, file_location = save_to_disk(file) + + file_url = f"{settings.serve_domain}/video/{filename}" + height, width = video_resolution(file_location) + screenshot_url = make_thumbnail(file_location, filename) + html_url = generate_html_for_videos( + url=file_url, + width=width, + height=height, + screenshot=screenshot_url, + filename=filename, + ) + send_webhook(f"{settings.serve_domain}/{filename} was uploaded.") + return {"html_url": f"{html_url}"} diff --git a/discord_embed/webhook.py b/discord_embed/webhook.py new file mode 100644 index 0000000..4bc4930 --- /dev/null +++ b/discord_embed/webhook.py @@ -0,0 +1,18 @@ +"""Send webhook to Discord.""" +from discord_webhook import DiscordWebhook + +from discord_embed import settings + + +def send_webhook(message: str) -> None: + """Send webhook to Discord. + + Args: + message: The message to send. + """ + webhook = DiscordWebhook( + url=settings.webhook_url, + content=message, + rate_limit_retry=True, + ) + webhook.execute() diff --git a/tests/test_discord_embed.py b/tests/test_discord_embed.py index 5274f28..b71bcfc 100644 --- a/tests/test_discord_embed.py +++ b/tests/test_discord_embed.py @@ -1,26 +1,36 @@ import imghdr import os -from discord_embed import settings -from discord_embed.main import ( - app, - generate_html_for_videos, - make_thumbnail, - send_webhook, - video_resolution, -) +from discord_embed import __version__, settings +from discord_embed.generate_html import generate_html_for_videos +from discord_embed.main import app +from discord_embed.video import make_thumbnail, video_resolution +from discord_embed.webhook import send_webhook from fastapi.testclient import TestClient client = TestClient(app) TEST_FILE = "tests/test.mp4" +def test_version(): + """Test that version is correct.""" + assert __version__ == "1.0.0" + + def test_domain_ends_with_slash(): + """Test that domain ends with slash.""" assert not settings.serve_domain.endswith("/") def test_generate_html_for_videos(): + """Test that generate_html_for_videos() works.""" + # TODO: We should probably import this from settings.py instead of + # hardcoding it here. If we change it in settings.py, it won't be + # changed here. + domain = os.environ["SERVE_DOMAIN"] + + # Remove trailing slash from domain if domain.endswith("/"): domain = domain[:-1] @@ -35,26 +45,47 @@ def test_generate_html_for_videos(): def test_video_resolution(): - resolution = video_resolution(TEST_FILE) - assert resolution.width == 422 - assert resolution.height == 422 + """Test that video_resolution() works.""" + assert video_resolution(TEST_FILE) == (422, 422) def test_make_thumbnail(): + """Test that make_thumbnail() works.""" + # TODO: We should probably import this from settings.py instead of + # hardcoding it here. If we change it in settings.py, it won't be + # changed here. + domain = os.environ["SERVE_DOMAIN"] + + # Remove trailing slash from domain if domain.endswith("/"): domain = domain[:-1] thumbnail = make_thumbnail(TEST_FILE, "test.mp4") + # Check that thumbnail is a jpeg. assert imghdr.what(f"{settings.upload_folder}/test.mp4.jpg") == "jpeg" + + # Check that the it returns the correct URL. assert thumbnail == f"{domain}/test.mp4.jpg" +def test_save_to_disk(): + """Test that save_to_disk() works.""" + # TODO: Implement this test. I need to mock the UploadFile object. + + +def test_do_things(): + """Test that do_things() works.""" + # TODO: Implement this test. I need to mock the UploadFile object. + + def test_send_webhook(): + """Test that send_webhook() works.""" send_webhook("Running Pytest") def test_main(): + """Test that main() works.""" data_without_trailing_nl = "" response = client.get("/") @@ -71,7 +102,14 @@ def test_main(): def test_upload_file(): + """Test if we can upload files.""" + # TODO: We should probably import this from settings.py instead of + # hardcoding it here. If we change it in settings.py, it won't be + # changed here. + domain = os.environ["SERVE_DOMAIN"] + + # Remove trailing slash from domain if domain.endswith("/"): domain = domain[:-1]