aboutsummaryrefslogtreecommitdiff
path: root/jb/flow/monitoring.py
diff options
context:
space:
mode:
Diffstat (limited to 'jb/flow/monitoring.py')
-rw-r--r--jb/flow/monitoring.py156
1 files changed, 156 insertions, 0 deletions
diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py
new file mode 100644
index 0000000..c8432bb
--- /dev/null
+++ b/jb/flow/monitoring.py
@@ -0,0 +1,156 @@
+import socket
+from typing import Optional
+
+from mypy_boto3_mturk.literals import EventTypeType
+
+from jb.config import settings
+from jb.decorators import influx_client
+from jb.models.currency import USDCent
+from jb.models.definitions import HitStatus, AssignmentStatus
+
+
+def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(), # could be "amt-jb-0"
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.hits",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.assignments",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_hit_event(
+ status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus,
+ so it would just be when status=='Assignable'
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.hit_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_assignment_event(
+ status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. an Assignment was accepted/approved/reject
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.assignment_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str):
+ """
+ e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet.
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.mturk_notification_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_error_event(event_type: str, amt_hit_type_id: str):
+ """
+ e.g. todo: structure the error_types
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.error_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_bonus_event(amount: USDCent, amt_hit_type_id: str):
+ """
+ An AMT bonus was awarded
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.bonus_events",
+ "tags": tags,
+ "fields": {"value": 1, "amount": int(amount)},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])