aboutsummaryrefslogtreecommitdiff
path: root/jb/flow
diff options
context:
space:
mode:
authorMax Nanis2026-02-19 20:11:41 -0500
committerMax Nanis2026-02-19 20:11:41 -0500
commit8b31678c6e44400967d4934cd9f3c6c6ac0da721 (patch)
tree41c5f4479c353a16da1a8b6fa9088abd084ea388 /jb/flow
parentf0f96f83c2630e890a2cbcab53f77fd4c37e1684 (diff)
downloadamt-jb-8b31678c6e44400967d4934cd9f3c6c6ac0da721.tar.gz
amt-jb-8b31678c6e44400967d4934cd9f3c6c6ac0da721.zip
Carer dir into project, some initial pytest, part of the flow tasks. License and Readme update
Diffstat (limited to 'jb/flow')
-rw-r--r--jb/flow/__init__.py0
-rw-r--r--jb/flow/events.py126
-rw-r--r--jb/flow/maintenance.py26
-rw-r--r--jb/flow/monitoring.py156
4 files changed, 308 insertions, 0 deletions
diff --git a/jb/flow/__init__.py b/jb/flow/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/jb/flow/__init__.py
diff --git a/jb/flow/events.py b/jb/flow/events.py
new file mode 100644
index 0000000..3961a64
--- /dev/null
+++ b/jb/flow/events.py
@@ -0,0 +1,126 @@
+import logging
+import time
+from concurrent import futures
+from concurrent.futures import ThreadPoolExecutor, Executor, as_completed
+from typing import Optional
+
+import redis
+
+from jb.config import (
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ JB_EVENTS_FAILED_STREAM,
+)
+from jb.decorators import REDIS
+from jb.flow.assignment_tasks import process_assignment_submitted
+from jb.models.event import MTurkEvent
+
+
+def process_mturk_events_task():
+ executor = ThreadPoolExecutor(max_workers=5)
+ create_consumer_group()
+ while True:
+ try:
+ process_mturk_events(executor=executor)
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(1)
+
+
+def handle_pending_msgs_task():
+ while True:
+ try:
+ handle_pending_msgs()
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(60)
+
+
+def process_mturk_events(executor: Executor):
+ while True:
+ n = process_mturk_events_chunk(executor=executor)
+ if n is None or n < 10:
+ break
+
+
+def create_consumer_group():
+ try:
+ REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True)
+ except redis.exceptions.ResponseError as e:
+ if "BUSYGROUP Consumer Group name already exists" in str(e):
+ pass # group already exists
+ else:
+ raise
+
+
+def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
+ msgs = REDIS.xreadgroup(
+ groupname=CONSUMER_GROUP,
+ consumername=CONSUMER_NAME,
+ streams={JB_EVENTS_STREAM: ">"},
+ count=10,
+ )
+ if not msgs:
+ return None
+ msgs = msgs[0][1] # the queue, we only have 1
+
+ fs = []
+ for msg in msgs:
+ msg_id, data = msg
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ fs.append(
+ executor.submit(process_assignment_submitted_event, event, msg_id)
+ )
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
+
+ futures.wait(fs, timeout=60)
+ return len(msgs)
+
+
+def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
+ process_assignment_submitted(event)
+ REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)
+
+
+def handle_pending_msgs():
+ # Looks in the redis queue for msgs that
+ # are pending (read by a consumer but not ACK). These prob failed.
+ # Below is from chatgpt, idk if it works
+ pending = REDIS.xpending_range(
+ JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
+ )
+ for entry in pending:
+ msg_id = entry["message_id"]
+ # Claim message if idle > 10 sec
+ if entry["idle"] > 10_000: # milliseconds
+ claimed = REDIS.xclaim(
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ min_idle_time=10_000,
+ message_ids=[msg_id],
+ )
+ for cid, data in claimed:
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ # Try to process it again. If it fails, add
+ # it to the failed stream, so maybe we can fix
+ # and try again?
+ try:
+ process_assignment_submitted_event(event, cid)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ except Exception as e:
+ logging.exception(e)
+ REDIS.xadd(JB_EVENTS_FAILED_STREAM, data)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
diff --git a/jb/flow/maintenance.py b/jb/flow/maintenance.py
new file mode 100644
index 0000000..5dc9cea
--- /dev/null
+++ b/jb/flow/maintenance.py
@@ -0,0 +1,26 @@
+from typing import Optional
+
+from jb.decorators import HM
+from jb.flow.monitoring import emit_hit_event
+from jb.managers.amt import AMTManager
+from jb.models.definitions import HitStatus
+
+
+def check_hit_status(
+ amt_hit_id: str, amt_hit_type_id: str, reason: Optional[str] = None
+) -> HitStatus:
+ """
+ (this used to be called "process_hit")
+ Request information from Amazon regarding the status of a HIT ID. Update the local state from
+ that response.
+ """
+ hit_status = AMTManager.get_hit_status(amt_hit_id=amt_hit_id)
+ # We're assuming that in the db this Hit is marked as Assignable, or else we wouldn't
+ # have called this function.
+ if hit_status != HitStatus.Assignable:
+ # todo: should update also assignment_pending_count, assignment_available_count, assignment_completed_count
+ HM.update_status(amt_hit_id=amt_hit_id, hit_status=hit_status)
+ emit_hit_event(
+ status=hit_status, amt_hit_type_id=amt_hit_type_id, reason=reason
+ )
+ return hit_status
diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py
new file mode 100644
index 0000000..c8432bb
--- /dev/null
+++ b/jb/flow/monitoring.py
@@ -0,0 +1,156 @@
+import socket
+from typing import Optional
+
+from mypy_boto3_mturk.literals import EventTypeType
+
+from jb.config import settings
+from jb.decorators import influx_client
+from jb.models.currency import USDCent
+from jb.models.definitions import HitStatus, AssignmentStatus
+
+
+def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(), # could be "amt-jb-0"
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.hits",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.assignments",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_hit_event(
+ status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus,
+ so it would just be when status=='Assignable'
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.hit_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_assignment_event(
+ status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. an Assignment was accepted/approved/reject
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.assignment_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str):
+ """
+ e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet.
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.mturk_notification_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_error_event(event_type: str, amt_hit_type_id: str):
+ """
+ e.g. todo: structure the error_types
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.error_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_bonus_event(amount: USDCent, amt_hit_type_id: str):
+ """
+ An AMT bonus was awarded
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.bonus_events",
+ "tags": tags,
+ "fields": {"value": 1, "amount": int(amount)},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])