import socket from typing import Optional from mypy_boto3_mturk.literals import EventTypeType from jb.config import settings from jb.decorators import influx_client from generalresearchutils.currency import USDCent from jb.models.definitions import HitStatus, AssignmentStatus def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int) -> None: tags = { "host": socket.gethostname(), # could be "amt-jb-0" "service": "amt-jb", "status": status.value, "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } point = { "measurement": "amt_jb.hits", "tags": tags, "fields": {"count": cnt}, } if influx_client: influx_client.write_points(points=[point]) return None def write_assignment_gauge( status: AssignmentStatus, amt_hit_type_id: str, cnt: int ) -> None: tags = { "host": socket.gethostname(), "service": "amt-jb", "status": status.value, "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } point = { "measurement": "amt_jb.assignments", "tags": tags, "fields": {"count": cnt}, } if influx_client: influx_client.write_points(points=[point]) return None def emit_hit_event( status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None ) -> None: """ e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus, so it would just be when status=='Assignable' """ tags = { "host": socket.gethostname(), "service": "amt-jb", "status": status.value, "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } if reason: tags["reason"] = reason point = { "measurement": "amt_jb.hit_events", "tags": tags, "fields": {"value": 1}, } if influx_client: influx_client.write_points([point]) return None def emit_assignment_event( status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None ) -> None: """ e.g. an Assignment was accepted/approved/reject """ tags = { "host": socket.gethostname(), "service": "amt-jb", "status": status.value, "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } if reason: tags["reason"] = reason point = { "measurement": "amt_jb.assignment_events", "tags": tags, "fields": {"value": 1}, } if influx_client: influx_client.write_points([point]) return None def emit_mturk_notification_event( event_type: EventTypeType, amt_hit_type_id: str ) -> None: """ e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet. """ tags = { "host": socket.gethostname(), "service": "amt-jb", "event_type": event_type, "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } point = { "measurement": "amt_jb.mturk_notification_events", "tags": tags, "fields": {"value": 1}, } if influx_client: influx_client.write_points([point]) return None def emit_error_event(event_type: str, amt_hit_type_id: str) -> None: """ e.g. todo: structure the error_types """ tags = { "host": socket.gethostname(), "service": "amt-jb", "event_type": event_type, "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } point = { "measurement": "amt_jb.error_events", "tags": tags, "fields": {"value": 1}, } if influx_client: influx_client.write_points([point]) return None def emit_bonus_event(amount: USDCent, amt_hit_type_id: str) -> None: """ An AMT bonus was awarded """ tags = { "host": socket.gethostname(), "service": "amt-jb", "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } point = { "measurement": "amt_jb.bonus_events", "tags": tags, "fields": {"value": 1, "amount": int(amount)}, } if influx_client: influx_client.write_points([point]) return None