Make things async

This commit is contained in:
2022-04-13 01:27:24 +02:00
parent 71a57da59b
commit 31cb5dbf81

View File

@ -49,7 +49,7 @@ def normal_file_uploaded(file: UploadFile) -> Dict[str, str]:
return {"html_url": f"{Settings.domain}/{file.filename}"} return {"html_url": f"{Settings.domain}/{file.filename}"}
def video_file_uploaded(file: UploadFile) -> Dict[str, str]: async def video_file_uploaded(file: UploadFile) -> Dict[str, str]:
"""Save video to disk, generate HTML, thumbnail, and return a .html URL. """Save video to disk, generate HTML, thumbnail, and return a .html URL.
Args: Args:
@ -65,7 +65,7 @@ def video_file_uploaded(file: UploadFile) -> Dict[str, str]:
# Save file to disk. # Save file to disk.
file_location = os.path.join(folder_video, file.filename) file_location = os.path.join(folder_video, file.filename)
with open(file_location, "wb+") as file: with open(file_location, "wb+") as file:
file.write(file.file.read()) await file.write(file.file.read())
file_url = f"{Settings.domain}/video/{file.filename}" file_url = f"{Settings.domain}/video/{file.filename}"
height, width = find_video_resolution(file_location) height, width = find_video_resolution(file_location)
@ -81,7 +81,7 @@ def video_file_uploaded(file: UploadFile) -> Dict[str, str]:
return {"html_url": f"{html_url}"} return {"html_url": f"{html_url}"}
def text_file_uploaded(file: UploadFile) -> Dict[str, str]: async def text_file_uploaded(file: UploadFile) -> Dict[str, str]:
"""Save file to disk and return URL. """Save file to disk and return URL.
Args: Args:
@ -97,7 +97,7 @@ def text_file_uploaded(file: UploadFile) -> Dict[str, str]:
save_location = os.path.join(text_location, file.filename) save_location = os.path.join(text_location, file.filename)
# Save file to disk. # Save file to disk.
with open(save_location, "wb+", encoding="utf-8") as file: with open(save_location, "wb+", encoding="utf-8") as file:
file.write(file.file.read()) await file.write(file.file.read())
with open(save_location, encoding="utf-8") as file: with open(save_location, encoding="utf-8") as file:
lines = file.read() lines = file.read()
@ -113,7 +113,7 @@ def text_file_uploaded(file: UploadFile) -> Dict[str, str]:
) )
html_color = os.path.join(Settings.upload_folder, f"{file.filename}.html") html_color = os.path.join(Settings.upload_folder, f"{file.filename}.html")
with open(html_color, "w", encoding="utf-8") as file: with open(html_color, "w", encoding="utf-8") as file:
file.write(colored_text) await file.write(colored_text)
html_url = f"{Settings.domain}/{file.filename}.html" html_url = f"{Settings.domain}/{file.filename}.html"
@ -149,7 +149,7 @@ async def upload_file(file: UploadFile = File(...)) -> Dict[str, str]:
return text_file_uploaded(file) return text_file_uploaded(file)
with open(f"{Settings.upload_folder}/{file.filename}", "wb+") as file: with open(f"{Settings.upload_folder}/{file.filename}", "wb+") as file:
file.write(file.file.read()) await file.write(file.file.read())
domain_url = f"{Settings.domain}/{file.filename}" domain_url = f"{Settings.domain}/{file.filename}"
hook.send(f"{domain_url} was uploaded.") hook.send(f"{domain_url} was uploaded.")
return {"html_url": domain_url} return {"html_url": domain_url}
@ -187,7 +187,7 @@ async def main():
""" """
def generate_html_for_videos(url: str, width: int, height: int, screenshot: str, filename: str) -> str: # noqa: E501 def generate_html_for_videos(url: str, width: int, height: int, screenshot: str, filename: str) -> str:
"""Generate HTML for video files. """Generate HTML for video files.
This is what we will send to other people on Discord. This is what we will send to other people on Discord.
@ -245,7 +245,7 @@ def find_video_resolution(path_to_video: str) -> tuple[int, int]:
tuple[int, int]: Returns height and width. tuple[int, int]: Returns height and width.
""" """
probe = ffmpeg.probe(path_to_video) probe = ffmpeg.probe(path_to_video)
video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None) # noqa: E501 video_stream = next((stream for stream in probe["streams"] if stream["codec_type"] == "video"), None)
if video_stream is None: if video_stream is None:
print("No video stream found", file=sys.stderr) print("No video stream found", file=sys.stderr)
sys.exit(1) sys.exit(1)