Source code for inatcog.listeners

"""Listeners module for inatcog."""
from attrs import define
from typing import Optional, Tuple, Union
import asyncio
import contextlib
from copy import copy
import logging
import re

import discord
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.commands import BadArgument
from .client import iNatClient
from .converters.base import NaturalQueryConverter
from .embeds.common import NoRoomInDisplay
from .embeds.inat import INatEmbed, INatEmbeds, REACTION_EMOJI
from .interfaces import MixinMeta
from .obs import maybe_match_obs

logger = logging.getLogger("red.dronefly." + __name__)

# Minimum 4 characters, first dot must not be followed by a space. Last dot
# must not be preceded by a space.
DOT_TAXON_PAT = re.compile(r"(^|\s)\.(?P<query>[^\s\.].{2,}?[^\s\.])\.(\s|$)")
KNOWN_REACTION_EMOJIS = REACTION_EMOJI.values()
UNKNOWN_REACTION_MSG = "Not a known reaction."

# pylint: disable=no-member, assigning-non-slot
# - See https://github.com/PyCQA/pylint/issues/981


[docs]@define class PartialMessage: """Partial Message to satisfy bot & guild checks.""" author: discord.User guild: discord.Guild
[docs]@define class PartialContext: "Partial Context synthesized from objects passed into listeners." bot: Red guild: discord.Guild channel: discord.ChannelType author: discord.User message: Optional[Union[discord.Message, PartialMessage]] command: Optional[str] = "" assume_yes: bool = True interaction: Optional[discord.Interaction] = None inat_client: iNatClient = None
[docs]class Listeners(INatEmbeds, MixinMeta): """Listeners mixin for inatcog."""
[docs] @commands.Cog.listener() async def on_message_without_command(self, message: discord.Message) -> None: """Handle links to iNat.""" await self._ready_event.wait() if message.author.bot: return guild = message.guild channel = message.channel # Autoobs and dot_taxon features both need embed_links: if guild: if not channel.permissions_for(guild.me).embed_links: return guild_config = self.config.guild(guild) server_listen_scope = await guild_config.listen() if server_listen_scope is False or ( server_listen_scope is None and not isinstance(message.channel, discord.Thread) ): return # - on_message_without_command only ignores bot prefixes for this instance # - implementation as suggested by Trusty: # - https://cogboard.red/t/approved-dronefly/541/5?u=syntheticbee bot_prefixes = await guild_config.bot_prefixes() if bot_prefixes: prefixes = r"|".join( re.escape(bot_prefix) for bot_prefix in bot_prefixes ) prefix_pattern = re.compile(r"^({prefixes})".format(prefixes=prefixes)) if re.match(prefix_pattern, message.content): return if guild: channel_autoobs = await self.config.channel(channel).autoobs() channel_autoobs_preview = await self.config.channel( channel ).autoobs_preview() else: # i.e. channel is a DM: always enable the feature and assume a preview image # will be attached by Discord, so don't include a preview image in the # observation auto-display channel_autoobs = True channel_autoobs_preview = False if channel_autoobs is None: autoobs = await guild_config.autoobs() else: autoobs = channel_autoobs if channel_autoobs_preview is None: autoobs_preview = await guild_config.autoobs_preview() else: autoobs = channel_autoobs autoobs_preview = channel_autoobs_preview if autoobs: ctx = PartialContext( self.bot, guild, channel, message.author, message, "msg autoobs", None ) obs, url = await maybe_match_obs(self, ctx, message.content) if obs: # Only output if an observation is found async with self.inat_client.set_ctx_from_user(ctx) as inat_client: ctx.inat_client = inat_client embed = await self.make_obs_embed( ctx, obs, url, preview=autoobs_preview ) await self.send_obs_embed(ctx, embed, obs) self.bot.dispatch("commandstats_action", ctx) channel_dot_taxon = not guild or await self.config.channel(channel).dot_taxon() if channel_dot_taxon is None: dot_taxon = await guild_config.dot_taxon() else: dot_taxon = channel_dot_taxon if dot_taxon: mat = re.search(DOT_TAXON_PAT, message.content) if mat: msg = None ctx = PartialContext( self.bot, guild, channel, message.author, message, "msg dot_taxon", None, ) async with self.inat_client.set_ctx_from_user(ctx) as inat_client: ctx.inat_client = inat_client try: query = await NaturalQueryConverter.convert(ctx, mat["query"]) if query.controlled_term: return query_response = await self.query.get(ctx, query) except (BadArgument, LookupError): return if query.user or query.place or query.project: msg = await channel.send( embed=await self.make_obs_counts_embed(query_response) ) await self.add_obs_reaction_emojis(ctx, msg, query_response) else: msg = await channel.send( embed=await self.get_taxa_embed(ctx, query_response) ) await self.add_taxon_reaction_emojis(ctx, msg, query_response) self.bot.dispatch("commandstats_action", ctx)
[docs] async def handle_member_reaction( self, emoji: discord.PartialEmoji, member: discord.Member, message: discord.Message, action: str, ): """Central handler for member reactions.""" def fake_command_context(message, command, member): fake_command_message = PartialMessage(member, message.guild) ctx = PartialContext( self.bot, message.guild, message.channel, member, fake_command_message, command, None, ) return ctx def dispatch_commandstats(ctx): self.bot.dispatch("commandstats_action", ctx) if not message.embeds or not message.reactions: return reaction = next( ( reaction for reaction in message.reactions if reaction.emoji == str(emoji) ), None, ) if not reaction or not reaction.me: return # TODO: class for interactions? currently just a dict keyed by full # message id and content is the parsed inat_embed # - this needs two corresponding pieces of code to make it work across cog reloads: # - save all of those interactions in Config when cog is unloaded # - load them from Config when cog is loaded full_message_id = ( f"{message.guild.id}-{message.channel.id}-{message.id}" if message.guild else f"{message.channel.id}-{message.id}" ) inat_embed = self.interactions.get(full_message_id) if not inat_embed: inat_embed = INatEmbed.from_discord_embed(message.embeds[0]) # Maintain a shadow copy of the INatEmbed which is an augmented discord.Embed # that knows all of the iNat-specific parts. We never go out to the Discord # network from here on until the interaction ends, updating and writing out # this copy of the embed from here on. self.interactions[full_message_id] = inat_embed msg = copy(message) msg.embeds[0] = inat_embed try: if str(emoji) == REACTION_EMOJI["taxonomy"]: command = "react taxonomy" # TODO: DRY up with a context manager: ctx = fake_command_context(message, command, member) async with self.inat_client.set_ctx_from_user(ctx) as inat_client: ctx.inat_client = inat_client await self.maybe_update_taxonomy(ctx, msg) dispatch_commandstats(ctx) elif not inat_embed.has_places(): if str(emoji) == REACTION_EMOJI["self"]: command = "react self" ctx = fake_command_context(message, command, member) async with self.inat_client.set_ctx_from_user(ctx) as inat_client: ctx.inat_client = inat_client await self.maybe_update_user( ctx, msg, member=member, action=action ) dispatch_commandstats(ctx) elif str(emoji) == REACTION_EMOJI["user"]: ctx = PartialContext( self.bot, message.guild, message.channel, member, None ) async with self.inat_client.set_ctx_from_user(ctx) as inat_client: ctx.inat_client = inat_client await self.maybe_update_user_by_name( ctx, msg=msg, member=member ) dispatch_commandstats(ctx) if not (inat_embed.has_users() or inat_embed.has_not_by_users()): if str(emoji) == REACTION_EMOJI["home"]: command = "react home" ctx = fake_command_context(message, command, member) async with self.inat_client.set_ctx_from_user(ctx) as inat_client: ctx.inat_client = inat_client await self.maybe_update_place(ctx, msg, member, action) dispatch_commandstats(ctx) elif str(emoji) == REACTION_EMOJI["place"]: command = "react place" ctx = fake_command_context(message, command, member) async with self.inat_client.set_ctx_from_user(ctx) as inat_client: ctx.inat_client = inat_client await self.maybe_update_place_by_name(ctx, msg, member) dispatch_commandstats(ctx) except NoRoomInDisplay as err: if message.id not in self.predicate_locks: self.predicate_locks[message.id] = asyncio.Lock() async with self.predicate_locks[message.id]: error_message = await message.channel.send(err.args[0]) await asyncio.sleep(15) with contextlib.suppress(discord.HTTPException): await error_message.delete() except Exception: logger.error( "Exception handling %s %s reaction by %s on %s", action, str(emoji), repr(member), repr(message), ) raise
[docs] def maybe_get_reaction( self, payload: discord.raw_models.RawReactionActionEvent ) -> Tuple[discord.Member, discord.Message]: """Return reaction member & message if valid.""" if str(payload.emoji) not in KNOWN_REACTION_EMOJIS: raise ValueError(UNKNOWN_REACTION_MSG) guild_id = payload.guild_id or 0 if not guild_id: # in DM member = self.bot.get_user(payload.user_id) else: guild = self.bot.get_guild(payload.guild_id) member = guild.get_member(payload.user_id) # defensive: not possible? if member is None: raise ValueError("User is not a guild member.") if member.bot: raise ValueError("User is a bot.") if self.member_as[(guild_id, member.id)].spammy: logger.info( "Spammy: %d-%d-%d; ignored reaction: %s", guild_id, payload.channel_id, member.id, payload.emoji, ) raise ValueError("Member is being spammy") message = next( (msg for msg in self.bot.cached_messages if msg.id == payload.message_id), None, ) if message: if message.author != self.bot.user: raise ValueError("Reaction is not to our own message.") self.member_as[(guild_id, member.id)].stamp() return (member, message)
[docs] @commands.Cog.listener() async def on_raw_reaction_add( self, payload: discord.raw_models.RawReactionActionEvent ) -> None: """Central handler for reactions added to bot messages.""" await self._ready_event.wait() try: (member, message) = self.maybe_get_reaction(payload) except ValueError as err: if self._log_ignored_reactions and str(err) != UNKNOWN_REACTION_MSG: logger.debug(str(err) + "\n" + repr(payload)) return if message: await self.handle_member_reaction(payload.emoji, member, message, "add")
[docs] @commands.Cog.listener() async def on_raw_reaction_remove( self, payload: discord.raw_models.RawReactionActionEvent ) -> None: """Central handler for reactions removed from bot messages.""" await self._ready_event.wait() try: (member, message) = self.maybe_get_reaction(payload) except ValueError as err: if self._log_ignored_reactions and str(err) != UNKNOWN_REACTION_MSG: logger.debug(str(err) + "\n" + repr(payload)) return if message: await self.handle_member_reaction(payload.emoji, member, message, "remove")