diff options
| author | Max Nanis | 2026-02-24 17:26:15 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-24 17:26:15 -0500 |
| commit | 8c1940445503fd6678d0961600f2be81622793a2 (patch) | |
| tree | b9173562b8824b5eaa805e446d9d780e1f23fb2a /jb/flow/monitoring.py | |
| parent | 25d8c3c214baf10f6520cc1351f78473150e5d7a (diff) | |
| download | amt-jb-8c1940445503fd6678d0961600f2be81622793a2.tar.gz amt-jb-8c1940445503fd6678d0961600f2be81622793a2.zip | |
Extensive use of type checking. Movement of pytest conf towards handling managers (for db agnostic unittest). Starting to organize pytests.
Diffstat (limited to 'jb/flow/monitoring.py')
| -rw-r--r-- | jb/flow/monitoring.py | 53 |
1 files changed, 40 insertions, 13 deletions
diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py index c8432bb..28f7271 100644 --- a/jb/flow/monitoring.py +++ b/jb/flow/monitoring.py @@ -5,11 +5,12 @@ from mypy_boto3_mturk.literals import EventTypeType from jb.config import settings from jb.decorators import influx_client -from jb.models.currency import USDCent +from generalresearchutils.currency import USDCent from jb.models.definitions import HitStatus, AssignmentStatus -def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int): +def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int) -> None: + tags = { "host": socket.gethostname(), # could be "amt-jb-0" "service": "amt-jb", @@ -23,10 +24,14 @@ def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int): "fields": {"count": cnt}, } if influx_client: - influx_client.write_points([point]) + influx_client.write_points(points=[point]) + + return None -def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int): +def write_assignment_gauge( + status: AssignmentStatus, amt_hit_type_id: str, cnt: int +) -> None: tags = { "host": socket.gethostname(), "service": "amt-jb", @@ -40,15 +45,17 @@ def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: "fields": {"count": cnt}, } if influx_client: - influx_client.write_points([point]) + influx_client.write_points(points=[point]) + + return None def emit_hit_event( status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None -): +) -> None: """ - e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus, - so it would just be when status=='Assignable' + e.g. a HIT was created, Reviewable, etc. We don't have a "created" + HitStatus, so it would just be when status=='Assignable' """ tags = { "host": socket.gethostname(), @@ -57,8 +64,10 @@ def emit_hit_event( "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } + if reason: tags["reason"] = reason + point = { "measurement": "amt_jb.hit_events", "tags": tags, @@ -68,10 +77,12 @@ def emit_hit_event( if influx_client: influx_client.write_points([point]) + return None + def emit_assignment_event( status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None -): +) -> None: """ e.g. an Assignment was accepted/approved/reject """ @@ -82,8 +93,10 @@ def emit_assignment_event( "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } + if reason: tags["reason"] = reason + point = { "measurement": "amt_jb.assignment_events", "tags": tags, @@ -93,10 +106,15 @@ def emit_assignment_event( if influx_client: influx_client.write_points([point]) + return None + -def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str): +def emit_mturk_notification_event( + event_type: EventTypeType, amt_hit_type_id: str +) -> None: """ - e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet. + e.g. a Mturk notification was received. We just put it in redis, we + haven't processed it yet. """ tags = { "host": socket.gethostname(), @@ -105,6 +123,7 @@ def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: st "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } + point = { "measurement": "amt_jb.mturk_notification_events", "tags": tags, @@ -114,8 +133,10 @@ def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: st if influx_client: influx_client.write_points([point]) + return None + -def emit_error_event(event_type: str, amt_hit_type_id: str): +def emit_error_event(event_type: str, amt_hit_type_id: str) -> None: """ e.g. todo: structure the error_types """ @@ -126,6 +147,7 @@ def emit_error_event(event_type: str, amt_hit_type_id: str): "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } + point = { "measurement": "amt_jb.error_events", "tags": tags, @@ -135,8 +157,10 @@ def emit_error_event(event_type: str, amt_hit_type_id: str): if influx_client: influx_client.write_points([point]) + return None + -def emit_bonus_event(amount: USDCent, amt_hit_type_id: str): +def emit_bonus_event(amount: USDCent, amt_hit_type_id: str) -> None: """ An AMT bonus was awarded """ @@ -146,6 +170,7 @@ def emit_bonus_event(amount: USDCent, amt_hit_type_id: str): "amt_hit_type_id": amt_hit_type_id, "debug": settings.debug, } + point = { "measurement": "amt_jb.bonus_events", "tags": tags, @@ -154,3 +179,5 @@ def emit_bonus_event(amount: USDCent, amt_hit_type_id: str): if influx_client: influx_client.write_points([point]) + + return None |
