Source code for inatcog.obs
"""Module to work with iNat observations."""
from operator import itemgetter
import re
from dronefly.core.formatters.constants import WWW_BASE_URL
from dronefly.core.parsers.url import PAT_OBS_LINK
from pyinaturalist.models import Observation
from .utils import get_home
[docs]def obs_count_community_id(obs):
idents_count = 0
idents_agree = 0
ident_taxon_ids = []
# TODO: when pyinat supports ident_taxon_ids, this can be removed.
for ident in obs.identifications:
if ident.taxon:
for ancestor_id in ident.taxon.ancestor_ids:
if ancestor_id not in ident_taxon_ids:
ident_taxon_ids.append(ancestor_id)
if ident.taxon.id not in ident_taxon_ids:
ident_taxon_ids.append(ident.taxon.id)
for identification in obs.identifications:
if identification.current:
user_taxon_id = identification.taxon.id
user_taxon_ids = identification.taxon.ancestor_ids
user_taxon_ids.append(user_taxon_id)
if obs.community_taxon_id in user_taxon_ids:
if user_taxon_id in ident_taxon_ids:
# Count towards total & agree:
idents_count += 1
idents_agree += 1
else:
# Neither counts for nor against
pass
else:
# Maverick counts against:
idents_count += 1
return (idents_count, idents_agree)
[docs]async def maybe_match_obs(cog, ctx, content, id_permitted=False):
"""Maybe retrieve an observation from content."""
mat = re.search(PAT_OBS_LINK, content)
obs = url = obs_id = None
if mat:
obs_id = int(mat["obs_id"])
url = mat["url"] or WWW_BASE_URL + "/observations/" + str(obs_id)
if id_permitted:
try:
obs_id = int(content)
except ValueError:
pass
if obs_id:
home = await get_home(ctx)
results = (
await cog.api.get_observations(
obs_id, include_new_projects=1, preferred_place_id=home
)
)["results"]
obs = Observation.from_json(results[0]) if results else None
if obs_id and not url:
url = WWW_BASE_URL + "/observations/" + str(obs_id)
return (obs, url)
[docs]def get_formatted_user_counts(
user_counts: dict, base_url: str, species_only: bool = False, view: str = "obs"
):
"""Format per user observation & species counts."""
def format_observer_link(observer, species_only):
user_id = observer["user_id"]
species_count = observer["species_count"]
observation_count = observer["observation_count"]
login = observer["user"]["login"]
observer_url = base_url + f"&user_id={user_id}"
if species_only:
observer_link = f"[{observation_count:,}]({observer_url}) {login}"
else:
observer_link = (
f"[{observation_count:,} ({species_count:,})]({observer_url}) {login}"
)
return observer_link
def format_identifier_link(identifier):
user_id = identifier["user"]["id"]
observation_count = identifier["count"]
login = identifier["user"]["login"]
identifier_url = base_url + f"&ident_user_id={user_id}¬_user_id={user_id}"
identifier_link = f"[{observation_count:,}]({identifier_url}) {login}"
return identifier_link
if view == "ids":
identifier_links = [
"{}) {}".format(rank + 1, format_identifier_link(ider))
for rank, ider in enumerate(user_counts["results"])
]
return identifier_links
if not species_only and view == "spp":
sorted_observers = sorted(
user_counts["results"],
key=itemgetter("species_count", "observation_count"),
reverse=True,
)
else:
sorted_observers = user_counts["results"]
observer_links = [
"{}) {}".format(rank + 1, format_observer_link(observer, species_only))
for rank, observer in enumerate(sorted_observers)
]
return observer_links