"""Listeners module for inatcog."""
from attrs import define
from typing import Optional, Tuple, Union
import asyncio
import contextlib
from copy import copy
import logging
import re
import discord
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.commands import BadArgument
from .client import iNatClient
from .converters.base import NaturalQueryConverter
from .embeds.common import NoRoomInDisplay
from .embeds.inat import INatEmbed, INatEmbeds, REACTION_EMOJI
from .interfaces import MixinMeta
from .obs import maybe_match_obs
from .utils import has_valid_user_config
logger = logging.getLogger("red.dronefly." + __name__)
# Minimum 4 characters, first dot must not be followed by a space. Last dot
# must not be preceded by a space.
DOT_TAXON_PAT = re.compile(r"(^|\s)\.(?P<query>[^\s\.].{2,}?[^\s\.])\.(\s|$)")
KNOWN_REACTION_EMOJIS = REACTION_EMOJI.values()
UNKNOWN_REACTION_MSG = "Not a known reaction."
# pylint: disable=no-member, assigning-non-slot
# - See https://github.com/PyCQA/pylint/issues/981
[docs]@define
class PartialMessage:
"""Partial Message to satisfy bot & guild checks."""
author: discord.User
guild: discord.Guild
[docs]@define
class PartialContext:
"Partial Context synthesized from objects passed into listeners."
bot: Red
guild: discord.Guild
channel: discord.ChannelType
author: discord.User
message: Optional[Union[discord.Message, PartialMessage]]
command: Optional[str] = ""
assume_yes: bool = True
interaction: Optional[discord.Interaction] = None
inat_client: iNatClient = None
[docs]class Listeners(INatEmbeds, MixinMeta):
"""Listeners mixin for inatcog."""
[docs] @commands.Cog.listener()
async def on_message_without_command(self, message: discord.Message) -> None:
"""Handle links to iNat."""
await self._ready_event.wait()
if message.author.bot:
return
guild = message.guild
channel = message.channel
# Autoobs and dot_taxon features both need embed_links:
if guild:
if not channel.permissions_for(guild.me).embed_links:
return
guild_config = self.config.guild(guild)
server_listen_scope = await guild_config.listen()
if server_listen_scope is False or (
server_listen_scope is None
and not isinstance(message.channel, discord.Thread)
):
return
# - on_message_without_command only ignores bot prefixes for this instance
# - implementation as suggested by Trusty:
# - https://cogboard.red/t/approved-dronefly/541/5?u=syntheticbee
bot_prefixes = await guild_config.bot_prefixes()
if bot_prefixes:
prefixes = r"|".join(
re.escape(bot_prefix) for bot_prefix in bot_prefixes
)
prefix_pattern = re.compile(r"^({prefixes})".format(prefixes=prefixes))
if re.match(prefix_pattern, message.content):
return
channel_autoobs = not guild or await self.config.channel(channel).autoobs()
if channel_autoobs is None:
autoobs = await guild_config.autoobs()
else:
autoobs = channel_autoobs
if autoobs:
ctx = PartialContext(
self.bot, guild, channel, message.author, message, "msg autoobs", None
)
obs, url = await maybe_match_obs(self, ctx, message.content)
if obs:
# Only output if an observation is found
async with self.inat_client.set_ctx_from_user(ctx) as inat_client:
ctx.inat_client = inat_client
embed = await self.make_obs_embed(ctx, obs, url, preview=False)
await self.send_obs_embed(ctx, embed, obs)
self.bot.dispatch("commandstats_action", ctx)
channel_dot_taxon = not guild or await self.config.channel(channel).dot_taxon()
if channel_dot_taxon is None:
dot_taxon = await guild_config.dot_taxon()
else:
dot_taxon = channel_dot_taxon
if dot_taxon:
mat = re.search(DOT_TAXON_PAT, message.content)
if mat:
msg = None
ctx = PartialContext(
self.bot,
guild,
channel,
message.author,
message,
"msg dot_taxon",
None,
)
async with self.inat_client.set_ctx_from_user(ctx) as inat_client:
ctx.inat_client = inat_client
try:
query = await NaturalQueryConverter.convert(ctx, mat["query"])
if query.controlled_term:
return
query_response = await self.query.get(ctx, query)
except (BadArgument, LookupError):
return
if query.user or query.place or query.project:
msg = await channel.send(
embed=await self.make_obs_counts_embed(query_response)
)
await self.add_obs_reaction_emojis(ctx, msg, query_response)
else:
msg = await channel.send(
embed=await self.get_taxa_embed(ctx, query_response)
)
await self.add_taxon_reaction_emojis(ctx, msg, query_response)
self.bot.dispatch("commandstats_action", ctx)
[docs] async def handle_member_reaction(
self,
emoji: discord.PartialEmoji,
member: discord.Member,
message: discord.Message,
action: str,
):
"""Central handler for member reactions."""
def fake_command_context(message, command, member):
fake_command_message = PartialMessage(member, message.guild)
ctx = PartialContext(
self.bot,
message.guild,
message.channel,
member,
fake_command_message,
command,
None,
)
return ctx
def dispatch_commandstats(ctx):
self.bot.dispatch("commandstats_action", ctx)
if not message.embeds or not message.reactions:
return
reaction = next(
(
reaction
for reaction in message.reactions
if reaction.emoji == str(emoji)
),
None,
)
if not reaction or not reaction.me:
return
inat_embed = INatEmbed.from_discord_embed(message.embeds[0])
msg = copy(message)
msg.embeds[0] = inat_embed
try:
if str(emoji) == REACTION_EMOJI["taxonomy"]:
command = "react taxonomy"
# TODO: DRY up with a context manager:
ctx = fake_command_context(message, command, member)
async with self.inat_client.set_ctx_from_user(ctx) as inat_client:
ctx.inat_client = inat_client
await self.maybe_update_taxonomy(ctx, msg)
dispatch_commandstats(ctx)
elif not inat_embed.has_places():
if str(emoji) == REACTION_EMOJI["self"]:
command = "react self"
ctx = fake_command_context(message, command, member)
async with self.inat_client.set_ctx_from_user(ctx) as inat_client:
ctx.inat_client = inat_client
await self.maybe_update_user(
ctx, msg, member=member, action=action
)
dispatch_commandstats(ctx)
elif str(emoji) == REACTION_EMOJI["user"]:
ctx = PartialContext(
self.bot, message.guild, message.channel, member, None
)
async with self.inat_client.set_ctx_from_user(ctx) as inat_client:
ctx.inat_client = inat_client
await self.maybe_update_user_by_name(
ctx, msg=msg, member=member
)
dispatch_commandstats(ctx)
if not (inat_embed.has_users() or inat_embed.has_not_by_users()):
if str(emoji) == REACTION_EMOJI["home"]:
command = "react home"
ctx = fake_command_context(message, command, member)
async with self.inat_client.set_ctx_from_user(ctx) as inat_client:
ctx.inat_client = inat_client
await self.maybe_update_place(ctx, msg, member, action)
dispatch_commandstats(ctx)
elif str(emoji) == REACTION_EMOJI["place"]:
command = "react place"
ctx = fake_command_context(message, command, member)
async with self.inat_client.set_ctx_from_user(ctx) as inat_client:
ctx.inat_client = inat_client
await self.maybe_update_place_by_name(ctx, msg, member)
dispatch_commandstats(ctx)
except NoRoomInDisplay as err:
if message.id not in self.predicate_locks:
self.predicate_locks[message.id] = asyncio.Lock()
async with self.predicate_locks[message.id]:
error_message = await message.channel.send(err.args[0])
await asyncio.sleep(15)
with contextlib.suppress(discord.HTTPException):
await error_message.delete()
except Exception:
logger.error(
"Exception handling %s %s reaction by %s on %s",
action,
str(emoji),
repr(member),
repr(message),
)
raise
[docs] async def maybe_get_reaction(
self, payload: discord.raw_models.RawReactionActionEvent
) -> Tuple[discord.Member, discord.Message]:
"""Return reaction member & message if valid."""
await self._ready_event.wait()
if str(payload.emoji) not in KNOWN_REACTION_EMOJIS:
raise ValueError(UNKNOWN_REACTION_MSG)
guild_id = payload.guild_id or 0
if not guild_id:
# in DM
member = self.bot.get_user(payload.user_id)
else:
guild = self.bot.get_guild(payload.guild_id)
member = guild.get_member(payload.user_id)
# defensive: not possible?
if member is None:
raise ValueError("User is not a guild member.")
if member.bot:
raise ValueError("User is a bot.")
if self.member_as[(guild_id, member.id)].spammy:
logger.info(
"Spammy: %d-%d-%d; ignored reaction: %s",
guild_id,
payload.channel_id,
member.id,
payload.emoji,
)
raise ValueError("Member is being spammy")
channel = self.bot.get_channel(payload.channel_id)
try:
message = next(
msg for msg in self.bot.cached_messages if msg.id == payload.message_id
)
except StopIteration as err: # too old; have to fetch it
if guild_id and not channel.permissions_for(guild.me).read_message_history:
raise ValueError(
"Message can't be read without read_message_history permission."
) from err
try:
# Reacting to old messages is an API call, so is a privilege only
# extended to users added in this server.
if await has_valid_user_config(self, member, anywhere=False):
# TODO: antispam to prevent exceeded fetch messages > 24 hrs
# rate limit. We need different buckets for different classes
# of use:
# - reasonable limit globally over 24 hrs
# - reasonable limit per member over 24 hrs
logger.debug(
"Handling reaction %s by %d to uncached message: %s-%s",
payload.channel_id,
payload.message_id,
payload.emoji,
member.id,
)
message = await channel.fetch_message(payload.message_id)
else:
raise ValueError(
"Discarded reaction to uncached message by member not known in this server."
) from err
except discord.errors.NotFound:
raise ValueError(
"Message was deleted before reaction handled."
) from err
if message.author != self.bot.user:
raise ValueError("Reaction is not to our own message.")
self.member_as[(guild_id, member.id)].stamp()
return (member, message)
[docs] @commands.Cog.listener()
async def on_raw_reaction_add(
self, payload: discord.raw_models.RawReactionActionEvent
) -> None:
"""Central handler for reactions added to bot messages."""
try:
(member, message) = await self.maybe_get_reaction(payload)
except ValueError as err:
if self._log_ignored_reactions and str(err) != UNKNOWN_REACTION_MSG:
logger.debug(str(err) + "\n" + repr(payload))
return
await self.handle_member_reaction(payload.emoji, member, message, "add")
[docs] @commands.Cog.listener()
async def on_raw_reaction_remove(
self, payload: discord.raw_models.RawReactionActionEvent
) -> None:
"""Central handler for reactions removed from bot messages."""
try:
(member, message) = await self.maybe_get_reaction(payload)
except ValueError as err:
if self._log_ignored_reactions and str(err) != UNKNOWN_REACTION_MSG:
logger.debug(str(err) + "\n" + repr(payload))
return
await self.handle_member_reaction(payload.emoji, member, message, "remove")