Source code for inatcog.api

"""Module to access iNaturalist API."""
from json import JSONDecodeError
import logging
from time import time
from types import SimpleNamespace
from typing import List, Optional, Union

from aiohttp import (
    ClientConnectorError,
    ClientSession,
    ContentTypeError,
    ServerDisconnectedError,
    TraceConfig,
    TraceRequestStartParams,
)
from aiohttp_retry import RetryClient, ExponentialRetry
from aiolimiter import AsyncLimiter
from bs4 import BeautifulSoup
import html2markdown

logger = logging.getLogger("red.dronefly." + __name__)

API_BASE_URL = "https://api.inaturalist.org"
RETRY_EXCEPTIONS = [
    ServerDisconnectedError,
    ConnectionResetError,
    ClientConnectorError,
    JSONDecodeError,
    TimeoutError,
]


[docs]class INatAPI: """Access the iNat API and assets via (api|static).inaturalist.org.""" def __init__(self): # pylint: disable=unused-argument async def on_request_start( session: ClientSession, trace_config_ctx: SimpleNamespace, params: TraceRequestStartParams, ) -> None: current_attempt = trace_config_ctx.trace_request_ctx["current_attempt"] if current_attempt > 1: logger.info( "iNat request attempt #%d: %s", current_attempt, repr(params) ) trace_config = TraceConfig() trace_config.on_request_start.append(on_request_start) self.session = RetryClient( raise_for_status=False, trace_configs=[trace_config], ) self.request_time = time() self.places_cache = {} self.projects_cache = {} self.users_cache = {} self.users_login_cache = {} self.taxa_cache = {} # api_v1_limiter: # --------------- # - Allow up to 50 requests over a 60 second time period (i.e. # a burst of up to 50 within the period, after which requests # are throttled until more capacity is available) # - This honours "try to keep it to 60 requests per minute or lower" # - https://api.inaturalist.org/v1/docs/ self.api_v1_limiter = AsyncLimiter(50, 60) async def _get_rate_limited(self, full_url, **kwargs): """Query API, respecting 60 requests per minute rate limit.""" logger.debug('_get_rate_limited("%s", %s)', full_url, repr(kwargs)) async with self.api_v1_limiter: # i.e. wait 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, and finally give up retry_options = ExponentialRetry( attempts=6, exceptions=RETRY_EXCEPTIONS, ) try: async with self.session.get( full_url, params=kwargs, retry_options=retry_options ) as response: if response.status == 200: return await response.json() else: try: json = await response.json() msg = f"{json.get('error')} ({json.get('status')})" except ContentTypeError: data = await response.text() document = BeautifulSoup(data, "html.parser") # Only use the body, if present if document.body: text = document.body.find().text else: text = document # Treat as much as we can as markdown markdown = html2markdown.convert(text) # Punt the rest back to bs4 to drop unhandled tags msg = BeautifulSoup(markdown, "html.parser").text lookup_failed_msg = f"Lookup failed: {msg}" logger.error(lookup_failed_msg) raise LookupError(lookup_failed_msg) except Exception as e: # pylint: disable=broad-except,invalid-name if any(isinstance(e, exc) for exc in retry_options.exceptions): attempts = retry_options.attempts msg = f"iNat not responding after {attempts} attempts. Please try again later." logger.error(msg) raise LookupError(msg) from e raise e return None
[docs] async def get_controlled_terms(self, *args, **kwargs): """Query API for controlled terms.""" endpoint = "/".join(("/v1/controlled_terms", *args)) full_url = f"{API_BASE_URL}{endpoint}" return await self._get_rate_limited(full_url, **kwargs)
[docs] async def get_observations(self, *args, **kwargs): """Query API for observations. Parameters ---------- *args - If first positional argument is given, it is passed through as-is, appended to the /v1/observations endpoint. **kwargs - All kwargs are passed as params on the API call. """ endpoint = "/v1/observations" id_arg = f"/{args[0]}" if args else "" full_url = f"{API_BASE_URL}{endpoint}{id_arg}" return await self._get_rate_limited(full_url, **kwargs)
[docs] async def get_observation_bounds(self, taxon_ids): """Get the bounds for the specified observations.""" kwargs = { "return_bounds": "true", "verifiable": "true", "taxon_id": ",".join(map(str, taxon_ids)), "per_page": 0, } result = await self.get_observations(**kwargs) if result and "total_bounds" in result: return result["total_bounds"] return None
[docs] async def get_places( self, query: Union[int, str, list], refresh_cache=False, **kwargs ): """Get places for the specified ids or text query.""" first_place_id = None if isinstance(query, list): cached = set(query).issubset(set(self.places_cache)) request = f"/v1/places/{','.join(map(str, query))}" elif isinstance(query, int): cached = query in self.places_cache if cached: first_place_id = query request = f"/v1/places/{query}" else: cached = False request = f"/v1/places/{query}" full_url = f"{API_BASE_URL}{request}" if refresh_cache or not cached: results = await self._get_rate_limited(full_url, **kwargs) if results: places = results.get("results") or [] for place in places: key = place.get("id") if key: if not first_place_id: first_place_id = key record = { "total_results": 1, "page": 1, "per_page": 1, "results": [place], } self.places_cache[key] = record if isinstance(query, list): return { place_id: self.places_cache[int(place_id)] for place_id in query if int(place_id) in self.places_cache } if first_place_id in self.places_cache: return self.places_cache[first_place_id] return None
[docs] async def get_projects( self, query: Union[str, int, list], refresh_cache=False, **kwargs ): """Get projects for the specified ids or text query.""" first_project_id = None if isinstance(query, list): cached = set(query).issubset(set(self.projects_cache)) request = f"/v1/projects/{','.join(map(str, query))}" elif isinstance(query, int): cached = query in self.projects_cache if cached: first_project_id = query request = f"/v1/projects/{query}" else: cached = False request = f"/v1/projects/{query}" full_url = f"{API_BASE_URL}{request}" if refresh_cache or not cached: results = await self._get_rate_limited(full_url, **kwargs) if results: projects = results.get("results") or [] for project in projects: key = project.get("id") if key: if not first_project_id: first_project_id = key record = { "total_results": 1, "page": 1, "per_page": 1, "results": [project], } self.projects_cache[key] = record if isinstance(query, list): return { project_id: self.projects_cache[int(project_id)] for project_id in query if self.projects_cache.get(int(project_id)) } if first_project_id in self.projects_cache: return self.projects_cache[first_project_id] return None
[docs] async def get_observers_stats(self, **kwargs): """Query API for user counts & rankings.""" request = "/v1/observations/observers" # TODO: validate kwargs includes project_id # TODO: support queries with > 500 observers (one page, default) full_url = f"{API_BASE_URL}{request}" return await self._get_rate_limited(full_url, **kwargs)
[docs] async def get_search_results(self, **kwargs): """Get site search results.""" if "is_active" in kwargs and kwargs["is_active"] == "any": full_url = f"{API_BASE_URL}/v1/taxa" else: full_url = f"{API_BASE_URL}/v1/search" return await self._get_rate_limited(full_url, **kwargs)
[docs] async def get_users( self, query: Union[int, str], refresh_cache=False, by_login_id=False, **kwargs ): """Get the users for the specified login, user_id, or query.""" request = f"/v1/users/{query}" if isinstance(query, int) or query.isnumeric(): user_id = int(query) key = user_id elif by_login_id: user_id = None key = query else: user_id = None request = f"/v1/users/autocomplete?q={query}" key = query full_url = f"{API_BASE_URL}{request}" if refresh_cache or ( key not in self.users_cache and key not in self.users_login_cache ): # TODO: provide means to expire the cache (other than reloading the cog). json_data = await self._get_rate_limited(full_url, **kwargs) if json_data: results = json_data.get("results") if not results: return None if user_id is None: if len(results) == 1: # String query matched exactly one result; cache it: user = results[0] # The entry itself is put in the main cache, indexed by user_id. self.users_cache[user["id"]] = json_data # Lookaside by login stores only linkage to the # entry just stored in the main cache. self.users_login_cache[user["login"]] = user["id"] # Additionally add an entry to the main cache for # the query string, but only for other than an # exact login id match as that would serve no # purpose. This is slightly wasteful, but makes for # simpler code. if user["login"] != key: self.users_cache[key] = json_data else: # Cache multiple results matched by string. self.users_cache[key] = json_data # Additional synthesized cache results per matched user, as # if they were queried individually. for user in results: user_json = {} user_json["results"] = [user] self.users_cache[user["id"]] = user_json # Only index the login in the lookaside cache if it # isn't the query string itself, already indexed above # in the main cache. # - i.e. it's possible a search for a login matches # more than one entry (e.g. david, david99, etc.) # so retrieving it from cache must always return # all matching results, not just one for the login # itself if user["login"] != key: self.users_login_cache[user["login"]] = user["id"] else: # i.e. lookup by user_id only returns one match user = results[0] if user: self.users_cache[key] = json_data self.users_login_cache[user["login"]] = key self.request_time = time() if key in self.users_cache: return self.users_cache[key] # - Lookaside for login is only consulted if not found in the main # users_cache. # - This is important, since a lookup by user_id could prime the # lookaside cache with the single login entry, and then a subsequent # search by login could return multiple results into the main cache. # From then on, searching for the login should return the cached # multiple results from the main cache, not the single result that the # lookaside users_login_cache supports. # - This shortcut seems like it would return incomplete results depending # on the order in which lookups are performed. However, since the login # lookaside is primarily in support of iNat login lookups from already # cached project members, this is OK. The load of the whole project # membership at once (get_observers_from_projects) for that use case # ensures all relevant matches are already individually cached. if key in self.users_login_cache: user_id = self.users_login_cache[key] return self.users_cache[user_id] return None
[docs] async def get_observers_from_projects( self, project_ids: Optional[List] = None, user_ids: Optional[List] = None ): """Get observers for a list of project ids. Since the cache is filled as a side effect, this method can be used to prime the cache prior to fetching multiple users at once by id. Users may also be specified, and in that case, project ids may be omitted. The cache will then be primed from a list of user ids. """ if not (project_ids or user_ids): return page = 1 more = True users = [] # Note: This will only handle up to 10,000 users. Anything more # needs to set id_above and id_below. With luck, we won't ever # need to deal with projects this big! while more: params = {"page": page} if project_ids: params["project_id"] = ",".join(map(str, project_ids)) if user_ids: params["user_id"] = ",".join(map(str, user_ids)) response = await self.get_observations("observers", **params) results = response.get("results") or [] for observer in results: user = observer.get("user") if user: user_id = user.get("id") if user_id: # Synthesize a single result as if returned by a get_users # lookup of a single user_id, and cache it: user_json = {} user_json["results"] = [user] users.append(user) self.users_cache[user_id] = user_json self.users_login_cache[user["login"]] = user_id # default values provided defensively to exit loop if missing per_page = response.get("per_page") or len(results) total_results = response.get("total_results") or len(results) if results and (page * per_page < total_results): page += 1 else: more = False # return all user results as a single page return { "total_results": len(users), "pages": 1, "per_page": len(users), "results": users, }