diff options
| author | Max Nanis | 2026-02-19 20:11:41 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-19 20:11:41 -0500 |
| commit | 8b31678c6e44400967d4934cd9f3c6c6ac0da721 (patch) | |
| tree | 41c5f4479c353a16da1a8b6fa9088abd084ea388 /jb/flow/monitoring.py | |
| parent | f0f96f83c2630e890a2cbcab53f77fd4c37e1684 (diff) | |
| download | amt-jb-8b31678c6e44400967d4934cd9f3c6c6ac0da721.tar.gz amt-jb-8b31678c6e44400967d4934cd9f3c6c6ac0da721.zip | |
Carer dir into project, some initial pytest, part of the flow tasks. License and Readme update
Diffstat (limited to 'jb/flow/monitoring.py')
| -rw-r--r-- | jb/flow/monitoring.py | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py new file mode 100644 index 0000000..c8432bb --- /dev/null +++ b/jb/flow/monitoring.py @@ -0,0 +1,156 @@ +import socket +from typing import Optional + +from mypy_boto3_mturk.literals import EventTypeType + +from jb.config import settings +from jb.decorators import influx_client +from jb.models.currency import USDCent +from jb.models.definitions import HitStatus, AssignmentStatus + + +def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int): + tags = { + "host": socket.gethostname(), # could be "amt-jb-0" + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.hits", + "tags": tags, + "fields": {"count": cnt}, + } + if influx_client: + influx_client.write_points([point]) + + +def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int): + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.assignments", + "tags": tags, + "fields": {"count": cnt}, + } + if influx_client: + influx_client.write_points([point]) + + +def emit_hit_event( + status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None +): + """ + e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus, + so it would just be when status=='Assignable' + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + if reason: + tags["reason"] = reason + point = { + "measurement": "amt_jb.hit_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_assignment_event( + status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None +): + """ + e.g. an Assignment was accepted/approved/reject + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + if reason: + tags["reason"] = reason + point = { + "measurement": "amt_jb.assignment_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str): + """ + e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet. + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "event_type": event_type, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.mturk_notification_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_error_event(event_type: str, amt_hit_type_id: str): + """ + e.g. todo: structure the error_types + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "event_type": event_type, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.error_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_bonus_event(amount: USDCent, amt_hit_type_id: str): + """ + An AMT bonus was awarded + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.bonus_events", + "tags": tags, + "fields": {"value": 1, "amount": int(amount)}, + } + + if influx_client: + influx_client.write_points([point]) |
