Use discord.py instead of Hikari so we can use user commands
This commit is contained in:
205
main.py
205
main.py
@ -1,119 +1,164 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import hikari
|
||||
import lightbulb
|
||||
import discord
|
||||
import openai
|
||||
from dotenv import load_dotenv
|
||||
from discord import app_commands
|
||||
from openai import OpenAI
|
||||
|
||||
from misc import chat, get_allowed_users, get_trigger_keywords
|
||||
from misc import chat, get_allowed_users
|
||||
from settings import Settings
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
load_dotenv(verbose=True)
|
||||
settings: Settings = Settings.from_env()
|
||||
discord_token: str = settings.discord_token
|
||||
openai_api_key: str = settings.openai_api_key
|
||||
|
||||
discord_token: str | None = os.getenv("DISCORD_TOKEN")
|
||||
openai_api_key: str | None = os.getenv("OPENAI_TOKEN")
|
||||
if not discord_token or not openai_api_key:
|
||||
logger.error("You haven't configured the bot correctly. Please set the environment variables.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
bot = hikari.GatewayBot(
|
||||
token=discord_token,
|
||||
intents=hikari.Intents.GUILD_MESSAGES | hikari.Intents.GUILD_MESSAGE_TYPING,
|
||||
logs="INFO",
|
||||
)
|
||||
bot_client: lightbulb.GatewayEnabledClient = lightbulb.client_from_app(bot)
|
||||
bot.subscribe(hikari.StartingEvent, bot_client.start)
|
||||
|
||||
openai_client = OpenAI(api_key=openai_api_key)
|
||||
|
||||
|
||||
@bot_client.register()
|
||||
class Ask(
|
||||
lightbulb.SlashCommand,
|
||||
name="ask",
|
||||
description="Ask the AI a question.",
|
||||
):
|
||||
"""A command to ask the AI a question."""
|
||||
class LoviBotClient(discord.Client):
|
||||
"""The main bot client."""
|
||||
|
||||
text: str = lightbulb.string("text", "The question or message to ask the AI.")
|
||||
def __init__(self, *, intents: discord.Intents) -> None:
|
||||
"""Initialize the bot client."""
|
||||
super().__init__(intents=intents)
|
||||
|
||||
# The tree stores all the commands and subcommands
|
||||
self.tree = app_commands.CommandTree(self)
|
||||
|
||||
async def setup_hook(self) -> None:
|
||||
"""Setup the bot client."""
|
||||
# Copy the global commands to all the guilds so we don't have to wait 1 hour for the commands to be available
|
||||
self.tree.copy_global_to(guild=discord.Object(id=98905546077241344)) # KillYoy's server
|
||||
self.tree.copy_global_to(guild=discord.Object(id=341001473661992962)) # TheLovinator's server
|
||||
|
||||
# Sync commands globally
|
||||
await self.tree.sync()
|
||||
|
||||
async def on_ready(self) -> None:
|
||||
"""Event to handle when the bot is ready."""
|
||||
logger.info("Logged in as %s", self.user)
|
||||
logger.info("Current latency: %s", self.latency)
|
||||
logger.info("Bot is ready and in the following guilds:")
|
||||
for guild in self.guilds:
|
||||
logger.info(" - %s", guild.name)
|
||||
|
||||
async def on_message(self, message: discord.Message) -> None:
|
||||
"""Event to handle when a message is received."""
|
||||
# Ignore messages from the bot itself
|
||||
if message.author == self.user:
|
||||
return
|
||||
|
||||
@lightbulb.invoke
|
||||
async def invoke(self, ctx: lightbulb.Context) -> None:
|
||||
"""Handle the /ask command."""
|
||||
# Only allow certain users to interact with the bot
|
||||
allowed_users: list[str] = get_allowed_users()
|
||||
if ctx.user.username not in allowed_users:
|
||||
logger.info("Ignoring message from: %s", ctx.user.username)
|
||||
await ctx.respond("You are not allowed to use this command.", ephemeral=True)
|
||||
if message.author.name not in allowed_users:
|
||||
logger.info("Ignoring message from: %s", message.author.name)
|
||||
return
|
||||
|
||||
if not self.text:
|
||||
logger.error("No question or message provided.")
|
||||
await ctx.respond("You need to provide a question or message.")
|
||||
incoming_message: str | None = message.content
|
||||
if not incoming_message:
|
||||
logger.error("No message content found in the event: %s", message)
|
||||
return
|
||||
|
||||
try:
|
||||
response: str | None = chat(self.text, openai_client)
|
||||
except openai.OpenAIError as e:
|
||||
logger.exception("An error occurred while chatting with the AI model.")
|
||||
await ctx.respond(f"An error occurred: {e}")
|
||||
return
|
||||
lowercase_message: str = incoming_message.lower() if incoming_message else ""
|
||||
trigger_keywords: list[str] = ["lovibot", "<@345000831499894795>"]
|
||||
if any(trigger in lowercase_message for trigger in trigger_keywords):
|
||||
logger.info("Received message: %s from: %s", incoming_message, message.author.name)
|
||||
|
||||
if response:
|
||||
await ctx.respond(response)
|
||||
else:
|
||||
await ctx.respond("I forgor how to think 💀")
|
||||
async with message.channel.typing():
|
||||
try:
|
||||
response: str | None = chat(incoming_message, openai_client)
|
||||
except openai.OpenAIError as e:
|
||||
logger.exception("An error occurred while chatting with the AI model.")
|
||||
e.add_note(f"Message: {incoming_message}\nEvent: {message}\nWho: {message.author.name}")
|
||||
await message.channel.send(f"An error occurred while chatting with the AI model. {e}")
|
||||
return
|
||||
|
||||
if response:
|
||||
logger.info("Responding to message: %s with: %s", incoming_message, response)
|
||||
await message.channel.send(response)
|
||||
else:
|
||||
logger.warning("No response from the AI model. Message: %s", incoming_message)
|
||||
await message.channel.send("I forgor how to think 💀")
|
||||
|
||||
|
||||
@bot.listen(hikari.MessageCreateEvent)
|
||||
async def on_message(event: hikari.MessageCreateEvent) -> None:
|
||||
"""Respond to a message."""
|
||||
if not event.is_human:
|
||||
# Everything enabled except `presences`, `members`, and `message_content`.
|
||||
intents: discord.Intents = discord.Intents.default()
|
||||
intents.message_content = True
|
||||
client = LoviBotClient(intents=intents)
|
||||
|
||||
|
||||
@client.tree.context_menu(name="Ask LoviBot")
|
||||
@app_commands.allowed_installs(guilds=True, users=True)
|
||||
@app_commands.allowed_contexts(guilds=True, dms=True, private_channels=True)
|
||||
async def handle_ai_query(interaction: discord.Interaction, message: discord.Message) -> None:
|
||||
"""A context menu command to ask the AI a question."""
|
||||
await interaction.response.defer()
|
||||
|
||||
user_name_lowercase: str = interaction.user.name.lower()
|
||||
logger.info("Received command from: %s", user_name_lowercase)
|
||||
|
||||
allowed_users: list[str] = get_allowed_users()
|
||||
if user_name_lowercase not in allowed_users:
|
||||
logger.info("Ignoring message from: %s", user_name_lowercase)
|
||||
await interaction.followup.send("You are not allowed to use this command.", ephemeral=True)
|
||||
return
|
||||
|
||||
try:
|
||||
response: str | None = chat(message.content, openai_client)
|
||||
except openai.OpenAIError as e:
|
||||
logger.exception("An error occurred while chatting with the AI model.")
|
||||
await interaction.followup.send(f"An error occurred: {e}")
|
||||
return
|
||||
|
||||
if response:
|
||||
await interaction.followup.send(response)
|
||||
else:
|
||||
await interaction.followup.send("I forgor how to think 💀")
|
||||
|
||||
|
||||
@client.tree.command(name="ask", description="Ask LoviBot a question.")
|
||||
@app_commands.allowed_installs(guilds=True, users=True)
|
||||
@app_commands.allowed_contexts(guilds=True, dms=True, private_channels=True)
|
||||
@app_commands.describe(text="Ask LoviBot a question.")
|
||||
async def ask(interaction: discord.Interaction, text: str) -> None:
|
||||
"""A command to ask the AI a question."""
|
||||
await interaction.response.defer()
|
||||
|
||||
if not text:
|
||||
logger.error("No question or message provided.")
|
||||
await interaction.followup.send("You need to provide a question or message.", ephemeral=True)
|
||||
return
|
||||
|
||||
# Only allow certain users to interact with the bot
|
||||
allowed_users: list[str] = get_allowed_users()
|
||||
if event.author.username not in allowed_users:
|
||||
logger.info("Ignoring message from: %s", event.author.username)
|
||||
|
||||
user_name_lowercase: str = interaction.user.name.lower()
|
||||
logger.info("Received command from: %s", user_name_lowercase)
|
||||
|
||||
if user_name_lowercase not in allowed_users:
|
||||
logger.info("Ignoring message from: %s", user_name_lowercase)
|
||||
await interaction.followup.send("You are not allowed to use this command.", ephemeral=True)
|
||||
return
|
||||
|
||||
incoming_message: str | None = event.message.content
|
||||
if not incoming_message:
|
||||
logger.error("No message content found in the event: %s", event)
|
||||
try:
|
||||
response: str | None = chat(text, openai_client)
|
||||
except openai.OpenAIError as e:
|
||||
logger.exception("An error occurred while chatting with the AI model.")
|
||||
await interaction.followup.send(f"An error occurred: {e}")
|
||||
return
|
||||
|
||||
lowercase_message: str = incoming_message.lower() if incoming_message else ""
|
||||
trigger_keywords: list[str] = get_trigger_keywords(bot)
|
||||
if any(trigger in lowercase_message for trigger in trigger_keywords):
|
||||
logger.info("Received message: %s from: %s", incoming_message, event.author.username)
|
||||
|
||||
async with bot.rest.trigger_typing(event.channel_id):
|
||||
try:
|
||||
response: str | None = chat(incoming_message, openai_client)
|
||||
except openai.OpenAIError as e:
|
||||
logger.exception("An error occurred while chatting with the AI model.")
|
||||
e.add_note(f"Message: {incoming_message}\nEvent: {event}\nWho: {event.author.username}")
|
||||
await bot.rest.create_message(
|
||||
event.channel_id, f"An error occurred while chatting with the AI model. {e}"
|
||||
)
|
||||
return
|
||||
|
||||
if response:
|
||||
logger.info("Responding to message: %s with: %s", incoming_message, response)
|
||||
await bot.rest.create_message(event.channel_id, response)
|
||||
else:
|
||||
logger.warning("No response from the AI model. Message: %s", incoming_message)
|
||||
await bot.rest.create_message(event.channel_id, "I forgor how to think 💀")
|
||||
if response:
|
||||
await interaction.followup.send(response)
|
||||
else:
|
||||
await interaction.followup.send(f"I forgor how to think 💀\nText: {text}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("Starting the bot.")
|
||||
bot.run()
|
||||
client.run(discord_token, root_logger=True)
|
||||
|
13
misc.py
13
misc.py
@ -4,7 +4,6 @@ import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import hikari
|
||||
from openai import OpenAI
|
||||
from openai.types.chat.chat_completion import ChatCompletion
|
||||
|
||||
@ -28,18 +27,6 @@ def get_allowed_users() -> list[str]:
|
||||
]
|
||||
|
||||
|
||||
def get_trigger_keywords(bot: hikari.GatewayBotAware) -> list[str]:
|
||||
"""Get the list of trigger keywords to respond to.
|
||||
|
||||
Returns:
|
||||
The list of trigger keywords.
|
||||
"""
|
||||
bot_user: hikari.OwnUser | None = bot.get_me()
|
||||
bot_mention_string: str = f"<@{bot_user.id}>" if bot_user else ""
|
||||
notification_keywords: list[str] = ["lovibot", bot_mention_string]
|
||||
return notification_keywords
|
||||
|
||||
|
||||
def chat(user_message: str, openai_client: OpenAI) -> str | None:
|
||||
"""Chat with the bot using the OpenAI API.
|
||||
|
||||
|
37
misc_test.py
37
misc_test.py
@ -1,37 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from misc import get_trigger_keywords
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_bot() -> Mock:
|
||||
"""Create a mock bot instance.
|
||||
|
||||
Returns:
|
||||
A mock bot instance.
|
||||
"""
|
||||
return Mock()
|
||||
|
||||
|
||||
def test_get_trigger_keywords_with_bot_user(mock_bot: Mock) -> None:
|
||||
"""Test getting trigger keywords with a bot user."""
|
||||
mock_bot.get_me.return_value.id = 123456789
|
||||
expected_keywords: list[str] = ["lovibot", "<@123456789>"]
|
||||
|
||||
result: list[str] = get_trigger_keywords(mock_bot)
|
||||
|
||||
assert result == expected_keywords
|
||||
|
||||
|
||||
def test_get_trigger_keywords_without_bot_user(mock_bot: Mock) -> None:
|
||||
"""Test getting trigger keywords without a bot user."""
|
||||
mock_bot.get_me.return_value = None
|
||||
expected_keywords: list[str] = ["lovibot", ""]
|
||||
|
||||
result: list[str] = get_trigger_keywords(mock_bot)
|
||||
|
||||
assert result == expected_keywords
|
@ -5,18 +5,14 @@ description = "My shit bot"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"hikari-lightbulb>=3.0.0a15",
|
||||
"hikari",
|
||||
"openai",
|
||||
"python-dotenv",
|
||||
"discord-py",
|
||||
"audioop-lts>=0.2.1",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"pytest-asyncio",
|
||||
"pytest",
|
||||
"ruff",
|
||||
]
|
||||
dev = ["pytest", "ruff"]
|
||||
|
||||
[tool.ruff]
|
||||
# https://docs.astral.sh/ruff/linter/
|
||||
@ -87,3 +83,6 @@ log_cli_date_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
# Only test files with the following suffixes.
|
||||
python_files = "test_*.py *_test.py *_tests.py"
|
||||
|
||||
[tool.uv.sources]
|
||||
discord-py = { git = "https://github.com/Rapptz/discord.py", rev = "master" }
|
||||
|
29
settings.py
Normal file
29
settings.py
Normal file
@ -0,0 +1,29 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(verbose=True)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Settings:
|
||||
"""Class to hold settings for the bot."""
|
||||
|
||||
discord_token: str
|
||||
openai_api_key: str
|
||||
|
||||
@classmethod
|
||||
@lru_cache(maxsize=1)
|
||||
def from_env(cls) -> Settings:
|
||||
"""Create a new instance of the class from environment variables.
|
||||
|
||||
Returns:
|
||||
A new instance of the class with the settings.
|
||||
"""
|
||||
discord_token: str = os.getenv("DISCORD_TOKEN", "")
|
||||
openai_api_key: str = os.getenv("OPENAI_TOKEN", "")
|
||||
return cls(discord_token, openai_api_key)
|
Reference in New Issue
Block a user