aboutsummaryrefslogtreecommitdiff
path: root/jb/flow/monitoring.py
diff options
context:
space:
mode:
authorMax Nanis2026-02-24 17:26:15 -0500
committerMax Nanis2026-02-24 17:26:15 -0500
commit8c1940445503fd6678d0961600f2be81622793a2 (patch)
treeb9173562b8824b5eaa805e446d9d780e1f23fb2a /jb/flow/monitoring.py
parent25d8c3c214baf10f6520cc1351f78473150e5d7a (diff)
downloadamt-jb-8c1940445503fd6678d0961600f2be81622793a2.tar.gz
amt-jb-8c1940445503fd6678d0961600f2be81622793a2.zip
Extensive use of type checking. Movement of pytest conf towards handling managers (for db agnostic unittest). Starting to organize pytests.
Diffstat (limited to 'jb/flow/monitoring.py')
-rw-r--r--jb/flow/monitoring.py53
1 files changed, 40 insertions, 13 deletions
diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py
index c8432bb..28f7271 100644
--- a/jb/flow/monitoring.py
+++ b/jb/flow/monitoring.py
@@ -5,11 +5,12 @@ from mypy_boto3_mturk.literals import EventTypeType
from jb.config import settings
from jb.decorators import influx_client
-from jb.models.currency import USDCent
+from generalresearchutils.currency import USDCent
from jb.models.definitions import HitStatus, AssignmentStatus
-def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int):
+def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int) -> None:
+
tags = {
"host": socket.gethostname(), # could be "amt-jb-0"
"service": "amt-jb",
@@ -23,10 +24,14 @@ def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int):
"fields": {"count": cnt},
}
if influx_client:
- influx_client.write_points([point])
+ influx_client.write_points(points=[point])
+
+ return None
-def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int):
+def write_assignment_gauge(
+ status: AssignmentStatus, amt_hit_type_id: str, cnt: int
+) -> None:
tags = {
"host": socket.gethostname(),
"service": "amt-jb",
@@ -40,15 +45,17 @@ def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt:
"fields": {"count": cnt},
}
if influx_client:
- influx_client.write_points([point])
+ influx_client.write_points(points=[point])
+
+ return None
def emit_hit_event(
status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None
-):
+) -> None:
"""
- e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus,
- so it would just be when status=='Assignable'
+ e.g. a HIT was created, Reviewable, etc. We don't have a "created"
+ HitStatus, so it would just be when status=='Assignable'
"""
tags = {
"host": socket.gethostname(),
@@ -57,8 +64,10 @@ def emit_hit_event(
"amt_hit_type_id": amt_hit_type_id,
"debug": settings.debug,
}
+
if reason:
tags["reason"] = reason
+
point = {
"measurement": "amt_jb.hit_events",
"tags": tags,
@@ -68,10 +77,12 @@ def emit_hit_event(
if influx_client:
influx_client.write_points([point])
+ return None
+
def emit_assignment_event(
status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None
-):
+) -> None:
"""
e.g. an Assignment was accepted/approved/reject
"""
@@ -82,8 +93,10 @@ def emit_assignment_event(
"amt_hit_type_id": amt_hit_type_id,
"debug": settings.debug,
}
+
if reason:
tags["reason"] = reason
+
point = {
"measurement": "amt_jb.assignment_events",
"tags": tags,
@@ -93,10 +106,15 @@ def emit_assignment_event(
if influx_client:
influx_client.write_points([point])
+ return None
+
-def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str):
+def emit_mturk_notification_event(
+ event_type: EventTypeType, amt_hit_type_id: str
+) -> None:
"""
- e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet.
+ e.g. a Mturk notification was received. We just put it in redis, we
+ haven't processed it yet.
"""
tags = {
"host": socket.gethostname(),
@@ -105,6 +123,7 @@ def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: st
"amt_hit_type_id": amt_hit_type_id,
"debug": settings.debug,
}
+
point = {
"measurement": "amt_jb.mturk_notification_events",
"tags": tags,
@@ -114,8 +133,10 @@ def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: st
if influx_client:
influx_client.write_points([point])
+ return None
+
-def emit_error_event(event_type: str, amt_hit_type_id: str):
+def emit_error_event(event_type: str, amt_hit_type_id: str) -> None:
"""
e.g. todo: structure the error_types
"""
@@ -126,6 +147,7 @@ def emit_error_event(event_type: str, amt_hit_type_id: str):
"amt_hit_type_id": amt_hit_type_id,
"debug": settings.debug,
}
+
point = {
"measurement": "amt_jb.error_events",
"tags": tags,
@@ -135,8 +157,10 @@ def emit_error_event(event_type: str, amt_hit_type_id: str):
if influx_client:
influx_client.write_points([point])
+ return None
+
-def emit_bonus_event(amount: USDCent, amt_hit_type_id: str):
+def emit_bonus_event(amount: USDCent, amt_hit_type_id: str) -> None:
"""
An AMT bonus was awarded
"""
@@ -146,6 +170,7 @@ def emit_bonus_event(amount: USDCent, amt_hit_type_id: str):
"amt_hit_type_id": amt_hit_type_id,
"debug": settings.debug,
}
+
point = {
"measurement": "amt_jb.bonus_events",
"tags": tags,
@@ -154,3 +179,5 @@ def emit_bonus_event(amount: USDCent, amt_hit_type_id: str):
if influx_client:
influx_client.write_points([point])
+
+ return None