Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format code with black, yapf, autopep8 and isort #395

Merged
merged 1 commit into from
Dec 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions minato_namikaze/bot_files/cogs/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
import discord
from discord.ext import commands

from ..lib import SimplePages, TagsDatabase, has_guild_permissions, UniqueViolationError, TagsDoesNotExistsError
from ..lib import (
SimplePages,
TagsDatabase,
TagsDoesNotExistsError,
UniqueViolationError,
has_guild_permissions,
)


class Arguments(argparse.ArgumentParser):
Expand Down Expand Up @@ -142,7 +148,7 @@ class Tags(commands.Cog):
def __init__(self, bot):
self.bot = bot
self._reserved_tags_being_made = {}

@property
def display_emoji(self) -> discord.PartialEmoji:
return discord.PartialEmoji(name="\N{LABEL}\ufe0f")
Expand Down Expand Up @@ -173,22 +179,35 @@ async def get_random_tag(self, guild, *args):
return await TagsDatabase(ctx=ctx).give_random_tag()
return await TagsDatabase(ctx=ctx).give_random_tag(guild.id)

async def get_tag(self, guild_id, *,name):
async def get_tag(self, guild_id, *, name):
def disambiguate(rows, query):
if rows is None or len(rows) == 0:
raise RuntimeError("Tag not found.")

names = "\n".join(r["name"] for r in rows)
raise RuntimeError(f"Tag not found. Did you mean...\n{names}")

row = await TagsDatabase(ctx=ctx).search(server_id=guild_id,tag_name=name, exact=True)

row = await TagsDatabase(ctx=ctx).search(server_id=guild_id,
tag_name=name,
exact=True)
if row is None:
return disambiguate(TagsDatabase(ctx=ctx).search(server_id=guild_id,tag_name=name, get_only_name=True), name)
return disambiguate(
TagsDatabase(ctx=ctx).search(server_id=guild_id,
tag_name=name,
get_only_name=True),
name,
)
return row

async def create_tag(self, ctx, name, content):
try:
await TagsDatabase(name=name,content=content, owner_id=ctx.author.id, server_id=ctx.guild.id,ctx=ctx).save()
await TagsDatabase(
name=name,
content=content,
owner_id=ctx.author.id,
server_id=ctx.guild.id,
ctx=ctx,
).save()
except UniqueViolationError:
await ctx.send("This tag already exists.")
# except Exception as e:
Expand Down Expand Up @@ -244,7 +263,8 @@ async def create(self, ctx, name: TagName, *,
"""

if self.is_tag_being_made(ctx.guild.id, name):
return await ctx.send("This tag is currently being made by someone.")
return await ctx.send(
"This tag is currently being made by someone.")

if len(content) > 2000:
return await ctx.send(
Expand All @@ -261,16 +281,16 @@ async def alias(self, ctx, new_name: TagName, *, old_name: TagName):
tag is deleted the alias is deleted as well.
"""
try:
await TagsDatabase(ctx=ctx).add_aliases(tag_name=old_name, server_id=ctx.guild.id)
await TagsDatabase(ctx=ctx).add_aliases(tag_name=old_name,
server_id=ctx.guild.id)
except UniqueViolationError:
await ctx.send('That aliases already exists')
await ctx.send("That aliases already exists")
except TagsDoesNotExistsError as e:
await ctx.send(e)
except:
await ctx.send('Unable to add the aliases')
await ctx.send("Unable to add the aliases")
else:
await ctx.send(f'{new_name} alias added to the {old_name}')

await ctx.send(f"{new_name} alias added to the {old_name}")

@tag.command(ignore_extra=False)
@suggest_box()
Expand Down Expand Up @@ -311,7 +331,8 @@ def check(msg):
"Sorry. This tag is currently being made by someone. "
f'Redo the command "{ctx.prefix}tag make" to retry.')

row = await TagsDatabase(ctx=ctx).do_exactsearch_search(server_id=ctx.guild.id,tag_name=name.lower(), exact=True)
row = await TagsDatabase(ctx=ctx).do_exactsearch_search(
server_id=ctx.guild.id, tag_name=name.lower(), exact=True)
if row is not None:
return await ctx.send(
"Sorry. A tag with that name already exists. "
Expand All @@ -335,7 +356,8 @@ def check(msg):
self.remove_in_progress_tag(ctx.guild.id, name)
return await ctx.send("Aborting.")
if msg.content:
clean_content = await commands.clean_content().convert(tx, msg.content)
clean_content = await commands.clean_content().convert(
tx, msg.content)
else:
# fast path I guess?
clean_content = msg.content
Expand Down Expand Up @@ -790,7 +812,8 @@ async def box(self, ctx):
await ctx.send_help("tag box")

@box.command(name="put")
async def box_put(self, ctx, name: TagName, *, content: commands.clean_content):
async def box_put(self, ctx, name: TagName, *,
content: commands.clean_content):
"""Puts a tag in the tag box.
These are global tags that anyone can opt-in to receiving
via the "tag box take" subcommand.
Expand Down Expand Up @@ -1033,4 +1056,4 @@ async def config(self, ctx):


def setup(bot):
bot.add_cog(Tags(bot))
bot.add_cog(Tags(bot))
146 changes: 92 additions & 54 deletions minato_namikaze/bot_files/lib/database/discord_database/tags.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,66 @@
import io
import json
import random
from datetime import datetime
from orjson import dumps
import json
import io
from functools import cache, cached_property, lru_cache, wraps
from typing import Optional, Union, Literal
from ...util.vars import ChannelAndMessageId
from typing import Literal, Optional, Union

import discord
from discord.ext.commands import Context
from orjson import dumps

from ...util.vars import ChannelAndMessageId


def check_for_ctx(function):
@wraps(function)
def wrap(model, *args, **kwargs):
if not model.ctx:
raise RuntimeError("context was not provided")

return wrap


class UniqueViolationError(ValueError):
pass


class TagsDoesNotExistsError(Exception):
def __str__(self):
return (
"The tag with that name does not exists!"
)
def __str__(self):
return "The tag with that name does not exists!"


class TagsDatabase:
def __init__(self, ctx: Optional[Context] = None, name: Optional[str] = None, content: Optional[str]= None, owner_id: Optional[int]= None, server_id: Optional[int]= None, created_at: Optional[datetime] = None, aliases: Optional[Union[list]] = []):
self.name=name
self.content=content
self.owner_id=owner_id
self.server_id=server_id
self.created_at=created_at
def __init__(
self,
ctx: Optional[Context] = None,
name: Optional[str] = None,
content: Optional[str] = None,
owner_id: Optional[int] = None,
server_id: Optional[int] = None,
created_at: Optional[datetime] = None,
aliases: Optional[Union[list]] = [],
):
self.name = name
self.content = content
self.owner_id = owner_id
self.server_id = server_id
self.created_at = created_at
self.aliases = aliases
self.ctx: Context = ctx
if ctx is not None:
self.tags_channel = ctx.get_config_channel_by_name_or_id(ChannelAndMessageId.tags.value)
self.tags_aliases_channel = ctx.get_config_channel_by_name_or_id(ChannelAndMessageId.tags_aliases.value)

self.tags_channel = ctx.get_config_channel_by_name_or_id(
ChannelAndMessageId.tags.value)
self.tags_aliases_channel = ctx.get_config_channel_by_name_or_id(
ChannelAndMessageId.tags_aliases.value)

@check_for_ctx
def channel(self):
return self.ctx.get_config_channel_by_name_or_id(ChannelAndMessageId.tags.value)

async def add_aliases(self, name:str, server_id: Optional[int]):
#raise uniqueviolationerror
return self.ctx.get_config_channel_by_name_or_id(
ChannelAndMessageId.tags.value)

async def add_aliases(self, name: str, server_id: Optional[int]):
# raise uniqueviolationerror
pass

async def edit(self, tag_content: str):
Expand All @@ -54,9 +69,10 @@ async def edit(self, tag_content: str):
await msg.edit(
suppress=True,
content="\n".join(message_cleanlist),
allowed_mentions=discord.AllowedMentions(
everyone=False, users=False, roles=False, replied_user=False
),
allowed_mentions=discord.AllowedMentions(everyone=False,
users=False,
roles=False,
replied_user=False),
)

@cache
Expand All @@ -71,73 +87,95 @@ async def search(
get_only_name: Optional[Union[bool, Literal[False]]],
search_all: Optional[Union[bool, Literal[False]]],
oldest_first: Optional[Union[bool, Literal[False]]],
exact: bool = False
exact: bool = False,
):
tags_found = []
if search_all:
return await self.channel.history(
limit=None, oldest_first=oldest_first
).flatten()

#Name
limit=None, oldest_first=oldest_first).flatten()

# Name
if name or self.name:

async def predicate(i):
data = json.load(await i.attachments[0].read())
return data['name'].lower() in (name.lower(), self.name.lower()) if not exact else data['name'].lower() == name or data['name'].lower() == self.name.lower()
tag_found = await self.channel.history(limit=None).find(await predicate)
return (data["name"].lower() in (name.lower(),
self.name.lower())
if not exact else data["name"].lower() == name
or data["name"].lower() == self.name.lower())

tag_found = await self.channel.history(limit=None).find(await
predicate)
if tag_found:
tags_found.append(tag_found)
#Owner ID

# Owner ID
if owner_id or self.owner_id:

async def predicate(i):
data = json.load(await i.attachments[0].read())
return data['owner_id'].lower() in (owner_id.lower(), self.owner_id.lower()) if not exact else data['owner_id'] == owner_id or data['owner_id'] == self.owner_id
tag_found = await self.channel.history(limit=None).find(await predicate)
return (data["owner_id"].lower() in (owner_id.lower(),
self.owner_id.lower())
if not exact else data["owner_id"] == owner_id
or data["owner_id"] == self.owner_id)

tag_found = await self.channel.history(limit=None).find(await
predicate)
if tag_found:
tags_found.append(tag_found)
#Guild ID

# Guild ID
if server_id or self.server_id:

async def predicate(i):
data = json.load(await i.attachments[0].read())
return data['server_id'] in (server_id, self.server_id) if not exact else data['server_id'] == server_id or data['server_id'] == self.server_id
return (data["server_id"] in (server_id, self.server_id)
if not exact else data["server_id"] == server_id
or data["server_id"] == self.server_id)

tag_found = await self.channel.history(limit=None).find(predicate)
if tag_found:
tags_found.append(tag_found)
#remove duplicates

# remove duplicates
if tags_found and get_only_name:
tags_found = map(lambda a: a.get('name'),tags_found)
tags_found = map(lambda a: a.get("name"), tags_found)

return list(set(tags_found)) if tags_found else None

def get_object_in_dict(self, include_ctx: Optional[Union[bool, Literal[False]]] = False):

def get_object_in_dict(self,
include_ctx: Optional[Union[
bool, Literal[False]]] = False):
data = dict(
name=self.name,
content=self.content,
owner_id=self.owner_id,
server_id=self.server_id,
created_at=self.created_at,
aliases=self.aliases
aliases=self.aliases,
)
if include_ctx:
data.update({'ctx': self.ctx})
data.update({"ctx": self.ctx})
return data

@check_for_ctx
async def save(self):
data = self.get_object_in_dict()
if self.search(exact=True, name=data['name']) is not None:
raise UniqueViolationError('The tag already exists')
if self.search(exact=True, name=data["name"]) is not None:
raise UniqueViolationError("The tag already exists")
json_bytes = dumps(data)
await self.tags_channel.send(
content=data['name'],
file=discord.File(io.BytesIO(json_bytes), filename=f"{data['name']}.json"),
content=data["name"],
file=discord.File(io.BytesIO(json_bytes),
filename=f"{data['name']}.json"),
)

@cache
@check_for_ctx
async def give_random_tag(self, guild: Optional[int]):
search = await self.search(oldest_first=random.choice([True, False], limit=random.randint(0, 500)), server_id=guild)
return random.choice(search)
search = await self.search(
oldest_first=random.choice([True, False],
limit=random.randint(0, 500)),
server_id=guild,
)
return random.choice(search)