"""Module for search command group."""
from math import ceil
import re
from typing import Optional, Union
import urllib.parse
from dronefly.core.formatters.constants import WWW_BASE_URL
from dronefly.core.formatters.generic import format_taxon_name
from dronefly.core.parsers.url import (
PAT_OBS_LINK,
PAT_PLACE_LINK,
PAT_PROJECT_LINK,
PAT_TAXON_LINK,
PAT_USER_LINK,
)
from dronefly.core.query.query import EMPTY_QUERY, Query
from dronefly.core.utils import obs_url_from_v1
from dronefly.discord.embeds import make_embed
from pyinaturalist.models import Observation
from redbot.core import checks, commands
from redbot.core.utils.menus import menu, DEFAULT_CONTROLS
from ..common import grouper
from ..converters.base import NaturalQueryConverter
from ..converters.reply import TaxonReplyConverter
from ..embeds.common import apologize
from ..embeds.inat import INatEmbeds
from ..interfaces import MixinMeta
from ..menus.inat import SearchMenuPages, SearchObsSource
from ..utils import get_home, use_client
[docs]class CommandsSearch(INatEmbeds, MixinMeta):
"""Mixin providing search command group."""
async def _search(self, ctx, query: Union[Query, str], keyword: Optional[str]):
async def cancel_timeout(
ctx, pages, controls, message, page, _timeout, _reaction
):
await menu(ctx, pages, controls, message, page, 0.1)
def get_result(page, results, result_index):
selected_result_offset = result_index + page * per_embed_page
last_index = len(results) - 1
if selected_result_offset > last_index:
selected_result_offset = last_index
return results[selected_result_offset]
def get_thumbnail(page, thumbnails, result_index):
selected_result_offset = result_index + page * per_embed_page
last_index = len(results) - 1
if selected_result_offset > last_index:
selected_result_offset = last_index
return thumbnails[selected_result_offset]
def update_selected(pages, page, result_index):
embed = pages[page]
thumbnail = (
get_thumbnail(page, thumbnails, result_index) if thumbnails else None
)
embed.set_image(url=thumbnail)
results_page_start = page * per_embed_page
results_page_end = results_page_start + per_embed_page
page_of_results = results[results_page_start:results_page_end]
last_index = len(page_of_results) - 1
if result_index > last_index:
result_index = last_index
page = format_page(buttons, page_of_results, result_index)
embed.description = page
selected_index[0] = result_index
return pages
async def _display_selected(ctx, result):
mat = re.search(PAT_OBS_LINK, result)
if mat:
home = await get_home(ctx)
obs_results = None
try:
obs_results = (
await self.api.get_observations(
mat["obs_id"],
include_new_projects=1,
preferred_place_id=home,
)
)["results"]
except LookupError as err:
await apologize(ctx, str(err))
return
obs = Observation.from_json(obs_results[0]) if obs_results else None
if obs:
embed = await self.make_obs_embed(
ctx, obs, f"{WWW_BASE_URL}/observations/{obs.id}"
)
await self.send_obs_embed(
ctx, embed, obs, timeout=10, with_keep=True
)
return
await apologize(ctx, "Not found")
return
mat = re.search(PAT_TAXON_LINK, result)
if mat:
query = await NaturalQueryConverter.convert(ctx, mat["taxon_id"])
try:
query_response = await self.query.get(ctx, query)
except LookupError as err:
await apologize(ctx, str(err))
return
await self.send_embed_for_taxon(ctx, query_response, with_keep=True)
return
mat = re.search(PAT_USER_LINK, result)
if mat:
await ctx.send(
f"{WWW_BASE_URL}/people/{mat['user_id'] or mat['login']}"
)
return
mat = re.search(PAT_PROJECT_LINK, result)
if mat:
await (self.bot.get_command("project")(ctx, query=mat["project_id"]))
return
mat = re.search(PAT_PLACE_LINK, result)
if mat:
await (self.bot.get_command("place")(ctx, query=mat["place_id"]))
async def next_page_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
pages = update_selected(pages, page, 0)
await DEFAULT_CONTROLS["➡️"](
ctx, pages, controls, message, page, timeout, reaction
)
async def prev_page_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
pages = update_selected(pages, page, 0)
await DEFAULT_CONTROLS["⬅️"](
ctx, pages, controls, message, page, timeout, reaction
)
async def prev_result_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
was_selected = selected_index[0]
if was_selected == 0:
# back to bottommost result on prev page:
selected_index[0] = per_embed_page - 1
target_page = (page - 1) % len(pages)
pages = update_selected(pages, target_page, selected_index[0])
if (
ctx.guild
and ctx.channel.permissions_for(ctx.guild.me).manage_messages
):
await message.remove_reaction(reaction, ctx.author)
prev_reaction = DEFAULT_CONTROLS["⬅️"]
else:
selected_index[0] -= 1
prev_reaction = update_selected_reaction
await prev_reaction(ctx, pages, controls, message, page, timeout, reaction)
async def next_result_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
was_selected = selected_index[0]
page_len = (
per_embed_page
if (page + 1) < len(pages)
else len(results) - ((len(pages) - 1) * per_embed_page)
)
if was_selected == page_len - 1:
next_reaction = next_page_reaction
else:
selected_index[0] += 1
pages = update_selected(pages, page, selected_index[0])
next_reaction = update_selected_reaction
await next_reaction(ctx, pages, controls, message, page, timeout, reaction)
async def display_selected_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
result = get_result(page, results, selected_index[0])
if result:
await _display_selected(ctx, result)
if ctx.guild and ctx.channel.permissions_for(ctx.guild.me).manage_messages:
await message.remove_reaction(reaction, ctx.author)
await menu(ctx, pages, controls, message, page, timeout)
async def update_selected_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
result_index = selected_index[0]
result = get_result(page, results, result_index)
if result:
pages = update_selected(pages, page, result_index)
if ctx.guild and ctx.channel.permissions_for(ctx.guild.me).manage_messages:
await message.remove_reaction(reaction, ctx.author)
await menu(ctx, pages, controls, message, page, timeout)
async def update_and_display_selected_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
selected_index[0] = buttons.index(reaction)
await display_selected_reaction(
ctx, pages, controls, message, page, timeout, reaction
)
async def select_result_reaction(
ctx, pages, controls, message, page, timeout, reaction
): # pylint: disable=too-many-arguments
result_index = buttons.index(reaction)
result = get_result(page, results, result_index)
if result:
pages = update_selected(pages, page, result_index)
if ctx.guild and ctx.channel.permissions_for(ctx.guild.me).manage_messages:
await message.remove_reaction(reaction, ctx.author)
await menu(ctx, pages, controls, message, page, timeout)
def make_search_embeds(
query_title, page, thumbnails, index, per_embed_page, pages_len
): # pylint: disable=too-many-arguments
embed = make_embed(
title=f"Search: {query_title} (page {index + 1} of {pages_len})",
url=url,
description=page,
)
try:
thumbnail = thumbnails[index * per_embed_page]
embed.set_image(url=thumbnail)
except IndexError:
pass
return embed
def get_inactive_query_args(query):
kwargs = {}
url = (
f"{WWW_BASE_URL}/taxa/search?"
f"q={urllib.parse.quote_plus(query)}"
"&is_active=any&sources=inactive"
)
kwargs["is_active"] = "any"
return (url, kwargs)
async def get_obs_query_args(query):
query_response = await self.query.get(ctx, query)
kwargs = query_response.obs_args()
# TODO: determine why we don't just use QueryResponse.obs_query_description
# and either use it directly or otherwise share code instead of duplicating
# most of it here.
if query_response.taxon:
query_title = format_taxon_name(query_response.taxon, with_term=True)
else:
query_title = "Observations"
if query_response.user:
query_title += f" by {query_response.user.login}"
if query_response.unobserved_by:
query_title += f" unobserved by {query_response.unobserved_by.login}"
if query_response.id_by:
query_title += f" identified by {query_response.id_by.login}"
if query_response.except_by:
query_title += f" except by {query_response.except_by.login}"
if query_response.project:
query_title += f" in {query_response.project.title}"
if query_response.place:
query_title += f" from {query_response.place.display_name}"
url = obs_url_from_v1(kwargs)
kwargs["per_page"] = 200
return (query_title, url, kwargs)
async def get_query_args(query, keyword):
kwargs = {}
kw_lowered = ""
query_title = ""
url = ""
if isinstance(query, str):
query_title = query
url = f"{WWW_BASE_URL}/search?q={urllib.parse.quote_plus(query)}"
if keyword:
kw_lowered = keyword.lower()
if kw_lowered == "inactive":
(url, kwargs) = get_inactive_query_args(query)
elif kw_lowered == "obs":
(query_title, url, kwargs) = await get_obs_query_args(query)
else:
kwargs["sources"] = kw_lowered
url += f"&sources={keyword}"
return (kw_lowered, query_title, url, kwargs)
async def query_formatted_results(query, kwargs):
thumbnails = []
(results, total_results, per_api_page) = await self.site_search.search(
ctx, query, **kwargs
)
per_embed_page = 10
return (total_results, results, thumbnails, per_api_page, per_embed_page)
def get_button_controls(results, query_type):
all_buttons = [
"\U0001F1E6", # :regional_indicator_a:
"\U0001F1E7", # :regional_indicator_b:
"\U0001F1E8", # :regional_indicator_c:
"\U0001F1E9", # :regional_indicator_d:
"\U0001F1EA", # :regional_indicator_e:
"\U0001F1EB", # :regional_indicator_f:
"\U0001F1EC", # :regional_indicator_g:
"\U0001F1ED", # :regional_indicator_h:
"\U0001F1EE", # :regional_indicator_i:
"\U0001F1EF", # :regional_indicator_j:
][:per_embed_page]
buttons_count = min(len(results), len(all_buttons))
buttons = all_buttons[:buttons_count]
if query_type == "obs":
controls = {
"⬆️": prev_result_reaction,
"⬇️": next_result_reaction,
"⬅️": prev_page_reaction,
"➡️": next_page_reaction,
"✅": display_selected_reaction,
"❌": DEFAULT_CONTROLS["❌"],
}
else:
controls = DEFAULT_CONTROLS.copy()
letter_button_reaction = (
select_result_reaction
if query_type == "obs"
else update_and_display_selected_reaction
)
for button in buttons:
controls[button] = letter_button_reaction
return (buttons, controls)
def format_page(buttons, group, selected=0):
def text_style(i):
if query_type != "obs":
return ""
return "**" if i == selected else ""
def format_result(result, i):
return " ".join((buttons[i], result))
lines = [
(text_style(i) + format_result(result, i) + text_style(i))
for i, result in enumerate(filter(None, group), 0)
]
page = "\n".join(lines)
return page
def format_embeds(
results, total_results, per_api_page, per_embed_page, buttons
):
pages = []
for group in grouper(results, per_embed_page):
page = format_page(buttons, group)
pages.append(page)
pages_len = len(pages) # Causes enumeration (works against lazy load).
if len(results) < total_results:
pages_len = (
f"{pages_len}; "
f"{ceil((total_results - per_api_page)/per_embed_page)} more not shown"
)
embeds = [
make_search_embeds(
query_title, page, thumbnails, index, per_embed_page, pages_len
)
for index, page in enumerate(pages, start=0)
]
return embeds
error_msg = None
pages = []
embeds = []
controls = []
async with ctx.typing():
try:
if keyword and keyword.lower() == "obs":
try:
_query = query or (await TaxonReplyConverter.convert(ctx, ""))
except commands.BadArgument:
_query = EMPTY_QUERY
else:
_query = query
query_type, query_title, url, kwargs = await get_query_args(
_query, keyword
)
if query_type == "obs":
(
results,
total_results,
per_api_page,
) = await self.obs_query.query_observations(ctx, _query)
else:
(
total_results,
results,
thumbnails,
per_api_page,
per_embed_page,
) = await query_formatted_results(_query, kwargs)
if not results:
if isinstance(_query, str) and "in" in _query.split():
raise LookupError(
"The `in` keyword is not supported by this command.\n"
f"Try `{ctx.clean_prefix}taxon` instead or omit the `in` clause.\n"
f"Type `{ctx.clean_prefix}help search` for help.",
)
else:
raise LookupError(
"Nothing matches that query. "
"Check for mistakes in spelling or syntax.\n"
f"Type `{ctx.clean_prefix}help search` for help.",
)
if query_type == "obs":
per_page = 4
pages = SearchMenuPages(
source=SearchObsSource(
self,
ctx,
_query,
results,
total_results,
per_page,
per_api_page,
url,
query_title,
),
clear_reactions_after=True,
)
else:
(buttons, controls) = get_button_controls(results, query_type)
embeds = format_embeds(
results, total_results, per_api_page, per_embed_page, buttons
)
except LookupError as err:
error_msg = str(err)
if error_msg:
await apologize(ctx, error_msg)
elif pages:
await pages.start(ctx)
else:
# Track index in outer scope
# - TODO: use a menu class (from vendored menu) and make this an attribute.
selected_index = [0]
await menu(ctx, embeds, controls, timeout=60)
@commands.group(aliases=["s"], invoke_without_command=True)
@checks.bot_has_permissions(embed_links=True, read_message_history=True)
@use_client
async def search(self, ctx, *, query: Optional[TaxonReplyConverter] = None):
"""Search iNat observations, taxa, places, projects.
• Observations are searched by default.
• Use the arrow reaction buttons to navigate through pages.
• Press a lettered reaction button to display the result in more
detail.
• See subcommand help topics for more information on each kind
of result, e.g. `[p]help search taxa` describes taxa results,
whether from `[p]search` or `[p]search taxa`.
"""
await (self.bot.get_command("search obs")(ctx, query=query))
@search.command(name="site")
@checks.bot_has_permissions(embed_links=True)
@use_client
async def search_site(self, ctx, *, query):
"""Search iNat.
• The results are similar to entering a query in the `Search`
textbox on the website, matching taxa, places, projects, or users.
• Use one of the subcommands to match one kind of result, up to 100
results instead of 30.
• Matching a taxon within another taxon via `in` is only supported
in `[p]search obs` and not in `[p]search site` or other subcommands.
Use `[p]t` with `in` to match a single taxon within another taxon
instead.
"""
await self._search(ctx, query, None)
@search.command(name="places", aliases=["place"])
@use_client
async def search_places(self, ctx, *, query):
"""Search iNat places.
• The results are similar to entering a query in the website's `Search`
textbox, then clicking the `Places` tab.
• Place matches are indicated with the :round_pushpin: emoji to
distinguish places from other kinds of `[p]search` result.
"""
await self._search(ctx, query, "places")
@search.command(name="projects", aliases=["prj", "project"])
@use_client
async def search_projects(self, ctx, *, query):
"""Search iNat projects.
• The results are similar to entering a query into the website's `Search`
textbox, then clicking the `Projects` tab.
• Project matches are indicated with the :briefcase: emoji to
distinguish projects from other kinds of `[p]search` result.
"""
await self._search(ctx, query, "projects")
@search.command(name="taxa", aliases=["taxon"])
@use_client
async def search_taxa(self, ctx, *, query):
"""Search iNat taxa.
• The results are similar to entering a query into the website's `Search`
textbox, then clicking the `Taxa` tab.
• Taxa matches are indicated with :green_circle: emoji to distinguish
taxa from other kinds of `[p]search` result.
• *Note: If you need `in` to find a matching taxon within another taxon,
or want to list user/place stats with `from` or `by`, use `[p]taxon`.*
"""
await self._search(ctx, query, "taxa")
@search.command(name="inactive")
@use_client
async def search_inactive(self, ctx, *, query):
"""Search iNat taxa (includes inactive).
• The results are similar to entering a query into
`More > Taxa Info > Search` textbox on the website,
then clicking `Show active and inactive taxa`.
• This subcommand can be used instead of `[p]search taxa` if you need
to see more pages of results (up to 500 results instead of 100).
• *Note: just as on the website, the search engine ranks the results
differently from `[p]search taxa`, so you may find the order in
which they are listed differs from that command.*
"""
await self._search(ctx, query, "inactive")
@search.command(name="users", aliases=["user", "person", "people"])
@use_client
async def search_users(self, ctx, *, query):
"""Search iNat users.
• The results are similar to typing a query into the website's `Search`
textbox, then clicking the `Users` tab.
• User matches are indicated with :bust_in_silhouette: emoji to
distinguish users from other kinds of `[p]search` result.
• *Note: only iNat login IDs and names can be searched with this command.
To find an iNat login ID for a registered Discord user, use the
`[p]user` command instead. See `[p]help user` for more information.*
"""
await self._search(ctx, query, "users")
@search.command(name="obs", aliases=["observation", "observations"])
@use_client
async def search_obs(self, ctx, *, query: Optional[TaxonReplyConverter] = None):
"""Search iNat observations.
• Command operation is similar to `[p]obs`, except multiple results are
returned; see `[p]help obs` for more details and examples.
• The mechanic for selecting observations is slightly different from
the main command and other subcommands:
**1.** Use the lettered reaction buttons to select an observation.
**2.** Use :white_check_mark: reaction on the selected observation to
keep it or :x: reaction to dismiss it.
**3.** Continue to select more observations if you wish. Once every
observation is either kept or dismissed, then you can react with
:x: on the search display to dismiss it.
"""
await self._search(ctx, query, "obs")