Move everything to own file
This commit is contained in:
59
discord_embed/generate_html.py
Normal file
59
discord_embed/generate_html.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
"""Generate the HTML that makes this program useful.
|
||||||
|
|
||||||
|
This is what we will send to other people on Discord.
|
||||||
|
You can remove the .html with your web server so the link will look normal.
|
||||||
|
For example, with nginx, you can do this(note the $uri.html):
|
||||||
|
location / {
|
||||||
|
try_files $uri $uri/ $uri.html;
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from discord_embed import settings
|
||||||
|
|
||||||
|
|
||||||
|
def generate_html_for_videos(
|
||||||
|
url: str,
|
||||||
|
width: int,
|
||||||
|
height: int,
|
||||||
|
screenshot: str,
|
||||||
|
filename: str,
|
||||||
|
) -> str:
|
||||||
|
"""Generate HTML for video files.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url (str): URL for the video. This is accessible from the browser.
|
||||||
|
width (int): This is the width of the video.
|
||||||
|
height (int): This is the height of the video.
|
||||||
|
screenshot (str): URL for screenshot.
|
||||||
|
filename (str): Original video filename.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Returns HTML for video.
|
||||||
|
"""
|
||||||
|
video_html = f"""
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<!-- Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -->
|
||||||
|
<head>
|
||||||
|
<meta property="og:type" content="video.other">
|
||||||
|
<meta property="twitter:player" content="{url}">
|
||||||
|
<meta property="og:video:type" content="text/html">
|
||||||
|
<meta property="og:video:width" content="{width}">
|
||||||
|
<meta property="og:video:height" content="{height}">
|
||||||
|
<meta name="twitter:image" content="{screenshot}">
|
||||||
|
<meta http-equiv="refresh" content="0;url={url}">
|
||||||
|
</head>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
html_url = os.path.join(settings.domain, filename)
|
||||||
|
|
||||||
|
# Take the filename and append .html to it.
|
||||||
|
filename += ".html"
|
||||||
|
|
||||||
|
file_path = os.path.join(settings.upload_folder, filename)
|
||||||
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
|
f.write(video_html)
|
||||||
|
|
||||||
|
return html_url
|
@ -1,19 +1,17 @@
|
|||||||
import os
|
"""Our site has one POST endpoint for uploading videos and one GET
|
||||||
import sys
|
endpoint for getting the HTML. Images are served from a webserver."""
|
||||||
from datetime import datetime
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
|
||||||
import ffmpeg
|
|
||||||
from discord_webhook import DiscordWebhook
|
|
||||||
from fastapi import FastAPI, File, UploadFile
|
from fastapi import FastAPI, File, UploadFile
|
||||||
from fastapi.responses import HTMLResponse
|
from fastapi.responses import HTMLResponse
|
||||||
|
|
||||||
from discord_embed.settings import Settings
|
from discord_embed import settings
|
||||||
|
from discord_embed.video_file_upload import do_things
|
||||||
|
from discord_embed.webhook import send_webhook
|
||||||
|
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="discord-nice-embed",
|
title="discord-nice-embed",
|
||||||
description=Settings.description,
|
description=settings.DESCRIPTION,
|
||||||
version="0.0.1",
|
version="0.0.1",
|
||||||
contact={
|
contact={
|
||||||
"name": "Joakim Hellsén",
|
"name": "Joakim Hellsén",
|
||||||
@ -27,55 +25,6 @@ app = FastAPI(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def send_webhook(message: str) -> None:
|
|
||||||
"""Send webhook to Discord.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
message (str): The message to send.
|
|
||||||
"""
|
|
||||||
webhook = DiscordWebhook(
|
|
||||||
url=Settings.webhook_url,
|
|
||||||
content=message,
|
|
||||||
rate_limit_retry=True,
|
|
||||||
)
|
|
||||||
await webhook.execute()
|
|
||||||
|
|
||||||
|
|
||||||
async def video_file_uploaded(file: UploadFile) -> Dict[str, str]:
|
|
||||||
"""Save video to disk, generate HTML, thumbnail, and return a .html URL.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file (UploadFile): Our file object.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dict[str, str]: Returns URL for video.
|
|
||||||
"""
|
|
||||||
# Create folder if it doesn't exist.
|
|
||||||
folder_video = os.path.join(Settings.upload_folder, "video")
|
|
||||||
Path(folder_video).mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
# Replace spaces with dots in filename.
|
|
||||||
filename = file.filename.replace(" ", ".")
|
|
||||||
|
|
||||||
# Save file to disk.
|
|
||||||
file_location = os.path.join(folder_video, filename)
|
|
||||||
with open(file_location, "wb+") as file:
|
|
||||||
await file.write(file.file.read())
|
|
||||||
|
|
||||||
file_url = f"{Settings.domain}/video/{filename}"
|
|
||||||
height, width = find_video_resolution(file_location)
|
|
||||||
screenshot_url = make_thumbnail_from_video(file_location, filename)
|
|
||||||
html_url = generate_html_for_videos(
|
|
||||||
url=file_url,
|
|
||||||
width=width,
|
|
||||||
height=height,
|
|
||||||
screenshot=screenshot_url,
|
|
||||||
filename=filename,
|
|
||||||
)
|
|
||||||
await send_webhook(f"{Settings.domain}/{filename} was uploaded.")
|
|
||||||
return {"html_url": f"{html_url}"}
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/uploadfiles/")
|
@app.post("/uploadfiles/")
|
||||||
async def upload_file(file: UploadFile = File(...)) -> Dict[str, str]:
|
async def upload_file(file: UploadFile = File(...)) -> Dict[str, str]:
|
||||||
"""Page for uploading files.
|
"""Page for uploading files.
|
||||||
@ -91,19 +40,24 @@ async def upload_file(file: UploadFile = File(...)) -> Dict[str, str]:
|
|||||||
Dict[str, str]: Returns a dict with the filename or a link to
|
Dict[str, str]: Returns a dict with the filename or a link to
|
||||||
the .html if it was a video.
|
the .html if it was a video.
|
||||||
"""
|
"""
|
||||||
|
domain_url = ""
|
||||||
try:
|
try:
|
||||||
if file.content_type.startswith("video/"):
|
if file.content_type.startswith("video/"):
|
||||||
return video_file_uploaded(file)
|
return await do_things(file)
|
||||||
|
|
||||||
with open(f"{Settings.upload_folder}/{file.filename}", "wb+") as file:
|
# Replace spaces with dots in filename.
|
||||||
await file.write(file.file.read())
|
filename = file.filename.replace(" ", ".")
|
||||||
domain_url = f"{Settings.domain}/{file.filename}"
|
|
||||||
await send_webhook(f"{domain_url} was uploaded.")
|
with open(f"{settings.upload_folder}/{filename}", "wb+") as f:
|
||||||
|
f.write(file.file.read())
|
||||||
|
|
||||||
|
domain_url = f"{settings.domain}/{filename}"
|
||||||
|
send_webhook(f"{domain_url} was uploaded.")
|
||||||
return {"html_url": domain_url}
|
return {"html_url": domain_url}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as exception:
|
||||||
await send_webhook(f"Something went wrong for {domain_url}:\n{e}")
|
send_webhook(f"{domain_url}:\n{exception}")
|
||||||
return {"error": f"Something went wrong: {e}"}
|
return {"error": f"Something went wrong: {exception}"}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/", response_class=HTMLResponse)
|
@app.get("/", response_class=HTMLResponse)
|
||||||
@ -132,98 +86,3 @@ async def main():
|
|||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def generate_html_for_videos(
|
|
||||||
url: str,
|
|
||||||
width: int,
|
|
||||||
height: int,
|
|
||||||
screenshot: str,
|
|
||||||
filename: str,
|
|
||||||
) -> str:
|
|
||||||
"""Generate HTML for video files.
|
|
||||||
|
|
||||||
This is what we will send to other people on Discord.
|
|
||||||
You can remove the .html with your web server so the link will look normal.
|
|
||||||
For example, with nginx, you can do this(note the $uri.html):
|
|
||||||
location / {
|
|
||||||
try_files $uri $uri/ $uri.html;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Args:
|
|
||||||
url (str): URL for the video. This is accessible from the browser.
|
|
||||||
width (int): This is the width of the video.
|
|
||||||
height (int): This is the height of the video.
|
|
||||||
screenshot (str): URL for screenshot.
|
|
||||||
filename (str): Original video filename.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: [description]
|
|
||||||
"""
|
|
||||||
video_html = f"""
|
|
||||||
<!DOCTYPE html>
|
|
||||||
<html>
|
|
||||||
<!-- Generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -->
|
|
||||||
<head>
|
|
||||||
<meta property="og:type" content="video.other">
|
|
||||||
<meta property="twitter:player" content="{url}">
|
|
||||||
<meta property="og:video:type" content="text/html">
|
|
||||||
<meta property="og:video:width" content="{width}">
|
|
||||||
<meta property="og:video:height" content="{height}">
|
|
||||||
<meta name="twitter:image" content="{screenshot}">
|
|
||||||
<meta http-equiv="refresh" content="0;url={url}">
|
|
||||||
</head>
|
|
||||||
</html>
|
|
||||||
"""
|
|
||||||
html_url = os.path.join(Settings.domain, filename)
|
|
||||||
|
|
||||||
# Take the filename and append .html to it.
|
|
||||||
filename += ".html"
|
|
||||||
|
|
||||||
file_path = os.path.join(Settings.upload_folder, filename)
|
|
||||||
with open(file_path, "w", encoding="utf-8") as file:
|
|
||||||
file.write(video_html)
|
|
||||||
|
|
||||||
return html_url
|
|
||||||
|
|
||||||
|
|
||||||
def find_video_resolution(path_to_video: str) -> tuple[int, int]:
|
|
||||||
"""Find video resolution.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path_to_video (str): Path to video file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
tuple[int, int]: Returns height and width.
|
|
||||||
"""
|
|
||||||
probe = ffmpeg.probe(path_to_video)
|
|
||||||
video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None)
|
|
||||||
if video_stream is None:
|
|
||||||
print("No video stream found", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
width = int(video_stream["width"])
|
|
||||||
height = int(video_stream["height"])
|
|
||||||
|
|
||||||
return height, width
|
|
||||||
|
|
||||||
|
|
||||||
def make_thumbnail_from_video(path_video: str, file_filename: str) -> str:
|
|
||||||
"""Make thumbnail for Discord. This is a screenshot of the video.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path_video (str): Path where video file is stored.
|
|
||||||
file_filename (str): File name for URL.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: Returns thumbnail filename.
|
|
||||||
"""
|
|
||||||
(
|
|
||||||
ffmpeg.input(path_video, ss="1")
|
|
||||||
.output(f"{Settings.upload_folder}/{file_filename}.jpg", vframes=1)
|
|
||||||
.overwrite_output()
|
|
||||||
.run()
|
|
||||||
)
|
|
||||||
# Return URL for thumbnail.
|
|
||||||
return f"{Settings.domain}/{file_filename}.jpg"
|
|
||||||
|
@ -1,44 +1,43 @@
|
|||||||
|
"""Read settings from environment variables."""
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
DESCRIPTION = (
|
||||||
class Settings:
|
|
||||||
description = (
|
|
||||||
"Discord will only create embeds for videos and images if they are "
|
"Discord will only create embeds for videos and images if they are "
|
||||||
"smaller than 8 mb. We can 'abuse' this by creating a .html that "
|
"smaller than 8 mb. We can 'abuse' this by creating a .html that "
|
||||||
"contains the 'twitter:player' HTML meta tag linking to the video."
|
"contains the 'twitter:player' HTML meta tag linking to the video."
|
||||||
)
|
)
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
# Check if user has added a domain to the environment.
|
# Check if user has added a domain to the environment.
|
||||||
try:
|
try:
|
||||||
domain = os.environ["DOMAIN"]
|
domain = os.environ["DOMAIN"]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
sys.exit("discord-embed: Environment variable 'DOMAIN' is missing!")
|
sys.exit("discord-embed: Environment variable 'DOMAIN' is missing!")
|
||||||
|
|
||||||
# Remove trailing slash from domain
|
# Remove trailing slash from domain
|
||||||
if domain.endswith("/"):
|
if domain.endswith("/"):
|
||||||
domain = domain[:-1]
|
domain = domain[:-1]
|
||||||
|
|
||||||
# Check if we have a folder for uploads.
|
# Check if we have a folder for uploads.
|
||||||
try:
|
try:
|
||||||
upload_folder = os.environ["UPLOAD_FOLDER"]
|
upload_folder = os.environ["UPLOAD_FOLDER"]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
sys.exit("Environment variable 'UPLOAD_FOLDER' is missing!")
|
sys.exit("Environment variable 'UPLOAD_FOLDER' is missing!")
|
||||||
|
|
||||||
# Create upload_folder if it doesn't exist.
|
# Create upload_folder if it doesn't exist.
|
||||||
pathlib.Path(upload_folder).mkdir(parents=True, exist_ok=True)
|
pathlib.Path(upload_folder).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
# Remove trailing slash from upload_folder
|
# Remove trailing slash from upload_folder
|
||||||
if upload_folder.endswith("/"):
|
if upload_folder.endswith("/"):
|
||||||
upload_folder = upload_folder[:-1]
|
upload_folder = upload_folder[:-1]
|
||||||
|
|
||||||
# Discord webhook URL
|
# Discord webhook URL
|
||||||
try:
|
try:
|
||||||
webhook_url = os.environ["WEBHOOK_URL"]
|
webhook_url = os.environ["WEBHOOK_URL"]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
sys.exit("Environment variable 'WEBHOOK_URL' is missing!")
|
sys.exit("Environment variable 'WEBHOOK_URL' is missing!")
|
||||||
|
47
discord_embed/video.py
Normal file
47
discord_embed/video.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
"""Stuff that has to do with videos."""
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import ffmpeg
|
||||||
|
|
||||||
|
from discord_embed import settings
|
||||||
|
|
||||||
|
|
||||||
|
def video_resolution(path_to_video: str) -> tuple[int, int]:
|
||||||
|
"""Find video resolution.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path_to_video (str): Path to video file.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[int, int]: Returns height and width.
|
||||||
|
"""
|
||||||
|
probe = ffmpeg.probe(path_to_video)
|
||||||
|
video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None)
|
||||||
|
if video_stream is None:
|
||||||
|
print("No video stream found", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
width = int(video_stream["width"])
|
||||||
|
height = int(video_stream["height"])
|
||||||
|
|
||||||
|
return height, width
|
||||||
|
|
||||||
|
|
||||||
|
def make_thumbnail(path_video: str, file_filename: str) -> str:
|
||||||
|
"""Make thumbnail for Discord. This is a screenshot of the video.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path_video (str): Path where video file is stored.
|
||||||
|
file_filename (str): File name for URL.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Returns thumbnail filename.
|
||||||
|
"""
|
||||||
|
(
|
||||||
|
ffmpeg.input(path_video, ss="1")
|
||||||
|
.output(f"{settings.upload_folder}/{file_filename}.jpg", vframes=1)
|
||||||
|
.overwrite_output()
|
||||||
|
.run()
|
||||||
|
)
|
||||||
|
# Return URL for thumbnail.
|
||||||
|
return f"{settings.domain}/{file_filename}.jpg"
|
62
discord_embed/video_file_upload.py
Normal file
62
discord_embed/video_file_upload.py
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
"""Things that has to do with video file uploading."""
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from fastapi import UploadFile
|
||||||
|
|
||||||
|
from discord_embed import settings
|
||||||
|
from discord_embed.generate_html import generate_html_for_videos
|
||||||
|
from discord_embed.video import make_thumbnail, video_resolution
|
||||||
|
from discord_embed.webhook import send_webhook
|
||||||
|
|
||||||
|
|
||||||
|
def save_to_disk(file: UploadFile) -> tuple[str, str]:
|
||||||
|
"""Save file to disk.
|
||||||
|
|
||||||
|
If spaces in filename, replace with dots.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file (UploadFile): Our file object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[str, str]: Returns filename and file location.
|
||||||
|
"""
|
||||||
|
# Create folder if it doesn't exist.
|
||||||
|
folder_video = os.path.join(settings.upload_folder, "video")
|
||||||
|
Path(folder_video).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Replace spaces with dots in filename.
|
||||||
|
filename = file.filename.replace(" ", ".")
|
||||||
|
|
||||||
|
# Save file to disk.
|
||||||
|
file_location = os.path.join(folder_video, filename)
|
||||||
|
with open(file_location, "wb+") as f:
|
||||||
|
f.write(file.file.read())
|
||||||
|
|
||||||
|
return filename, file_location
|
||||||
|
|
||||||
|
|
||||||
|
async def do_things(file: UploadFile) -> Dict[str, str]:
|
||||||
|
"""Save video to disk, generate HTML, thumbnail, and return a .html URL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file (UploadFile): Our file object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict[str, str]: Returns URL for video.
|
||||||
|
"""
|
||||||
|
filename, file_location = save_to_disk(file)
|
||||||
|
|
||||||
|
file_url = f"{settings.domain}/video/{filename}"
|
||||||
|
height, width = video_resolution(file_location)
|
||||||
|
screenshot_url = make_thumbnail(file_location, filename)
|
||||||
|
html_url = generate_html_for_videos(
|
||||||
|
url=file_url,
|
||||||
|
width=width,
|
||||||
|
height=height,
|
||||||
|
screenshot=screenshot_url,
|
||||||
|
filename=filename,
|
||||||
|
)
|
||||||
|
send_webhook(f"{settings.domain}/{filename} was uploaded.")
|
||||||
|
return {"html_url": f"{html_url}"}
|
18
discord_embed/webhook.py
Normal file
18
discord_embed/webhook.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
"""Send webhook to Discord."""
|
||||||
|
from discord_webhook import DiscordWebhook
|
||||||
|
|
||||||
|
from discord_embed import settings
|
||||||
|
|
||||||
|
|
||||||
|
def send_webhook(message: str) -> None:
|
||||||
|
"""Send webhook to Discord.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message (str): The message to send.
|
||||||
|
"""
|
||||||
|
webhook = DiscordWebhook(
|
||||||
|
url=settings.webhook_url,
|
||||||
|
content=message,
|
||||||
|
rate_limit_retry=True,
|
||||||
|
)
|
||||||
|
webhook.execute()
|
Reference in New Issue
Block a user