Source code for inatcog.obs_query

"""Module to query iNat observations."""
from dronefly.core.query.query import Query
from pyinaturalist.models import Observation, Observations

from .utils import get_home


[docs]class INatObsQuery: """Query iNat for one or more observation.""" def __init__(self, cog): self.cog = cog
[docs] async def query_single_obs(self, ctx, query: Query): """Query observations and return first if found.""" query_response = await self.cog.query.get(ctx, query) kwargs = query_response.obs_args() kwargs["per_page"] = 1 home = await get_home(ctx) kwargs["preferred_place_id"] = home response = await self.cog.api.get_observations(**kwargs) if not response["results"]: raise LookupError( f"No observation found {query_response.obs_query_description()}" ) return Observation.from_json(response["results"][0])
[docs] async def query_observations(self, ctx, query: Query, page=1): """Query observations and return iterator for any found.""" query_response = await self.cog.query.get(ctx, query) kwargs = query_response.obs_args() kwargs["per_page"] = 200 kwargs["page"] = page home = await get_home(ctx) kwargs["preferred_place_id"] = home response = await self.cog.api.get_observations(**kwargs) if not response["results"]: raise LookupError( f"No observations found {query_response.obs_query_description()}" ) return ( Observations.from_json(response), response["total_results"], response["per_page"], )