Use FastAPI instead of Flask

This commit is contained in:
2021-10-28 15:31:30 +02:00
parent f3fb957717
commit c5fdfa3215
2 changed files with 93 additions and 77 deletions

149
main.py
View File

@ -4,37 +4,73 @@ import shlex
import subprocess
import sys
from datetime import datetime
from typing import List
from flask import Flask, flash, redirect, render_template, request, url_for
from werkzeug.utils import secure_filename
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse
app = FastAPI()
app = Flask(__name__)
app.config["UPLOAD_FOLDER"] = "Uploads"
try:
domain = os.environ["DOMAIN"] # https://killyoy.lovinator.space/
domain = os.environ["DOMAIN"]
except KeyError:
sys.exit("Environment variable 'domain' is missing!")
print(f"{domain=}")
sys.exit("Environment variable 'DOMAIN' is missing!")
def find_video_resolution(path_to_video):
cmd = "ffprobe -v quiet -print_format json -show_streams "
args = shlex.split(cmd)
args.append(path_to_video)
# run the ffprobe process, decode stdout into utf-8 & convert to JSON
ffprobe_output = subprocess.check_output(args).decode("utf-8")
ffprobe_output = json.loads(ffprobe_output)
@app.post("/uploadfiles/")
async def upload_file(file: UploadFile = File(...)):
try:
if upload_file.content_type == "video/mp4":
os.mkdir("videos")
elif upload_file.content_type == "":
os.mkdir("videos")
elif upload_file.content_type == "":
os.mkdir("files")
except Exception as e:
print(e) # TODO: Send to Discord
# find height and width
height = ffprobe_output["streams"][0]["height"]
width = ffprobe_output["streams"][0]["width"]
print(file.filename)
file_location = f"Uploads/v/{file.filename}"
return height, width
with open(file_location, "wb+") as file_object:
file_object.write(file.file.read())
height, width = find_video_resolution(file_location)
screenshot_url = get_first_frame(file_location, file.filename)
video_url = f"{domain}v/{file.filename}"
html_url = generate_html(
video_url,
width,
height,
screenshot_url,
file.filename,
)
return {
"html_url": f"{html_url}",
"video_url": f"{video_url}",
"width": f"{width}",
"height": f"{height}",
"screenshot_url": f"{screenshot_url}",
"filename": f"{file.filename}",
}
def generate_html(
video_url, video_width, video_height, video_screenshot, video_filename
):
@app.get("/")
async def main():
content = """
<body>
<form action="/uploadfiles/" enctype="multipart/form-data" method="post">
<input name="file" type="file">
<input type="submit">
</form>
</body>
"""
return HTMLResponse(content=content)
def generate_html(video_url, video_width, video_height, video_screenshot, video_filename):
video_html = f"""
<!DOCTYPE html>
<html>
@ -59,6 +95,21 @@ def generate_html(
return html_url
def find_video_resolution(path_to_video):
cmd = "ffprobe -v quiet -print_format json -show_streams "
args = shlex.split(cmd)
args.append(path_to_video)
# run the ffprobe process, decode stdout into utf-8 & convert to JSON
ffprobe_output = subprocess.check_output(args).decode("utf-8")
ffprobe_output = json.loads(ffprobe_output)
# find height and width
height = ffprobe_output["streams"][0]["height"]
width = ffprobe_output["streams"][0]["width"]
return height, width
def get_first_frame(path_video, file_filename):
cmd = f"ffmpeg -y -i {path_video} -vframes 1 Uploads/{file_filename}.jpg"
args = shlex.split(cmd)
@ -66,59 +117,3 @@ def get_first_frame(path_video, file_filename):
subprocess.check_output(args).decode("utf-8")
return f"{domain}{file_filename}.jpg"
@app.route("/")
def index():
return render_template("index.html")
@app.route("/", methods=["GET", "POST"])
def upload_file():
if request.method == "POST":
# check if the post request has the file part
if "file" not in request.files:
flash("No file part", "error")
return redirect(request.url)
file = request.files["file"]
# if user does not select file, browser also
# submit an empty part without filename
if file.filename == "":
flash("No selected file", "error")
return redirect(request.url)
if file:
filename = secure_filename(file.filename)
print(f"{filename=}")
filepath = f"Uploads/v/{filename}"
print(f"{filepath=}")
file.save(filepath)
height, width = find_video_resolution(filepath)
print(f"{height=}")
print(f"{width=}")
screenshot_url = get_first_frame(filepath, filename)
print(f"{screenshot_url=}")
video_url = f"{domain}v/{filename}"
print(f"{video_url=}")
html_url = generate_html(
video_url,
width,
height,
screenshot_url,
filename,
)
print(f"{html_url=}")
return {"html_url": f"{html_url}"}
# If GET
return redirect(url_for("index"))
if __name__ == "__main__":
app.run(debug=True)