diff options
| author | Max Nanis | 2026-02-19 20:11:41 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-19 20:11:41 -0500 |
| commit | 8b31678c6e44400967d4934cd9f3c6c6ac0da721 (patch) | |
| tree | 41c5f4479c353a16da1a8b6fa9088abd084ea388 /jb/flow | |
| parent | f0f96f83c2630e890a2cbcab53f77fd4c37e1684 (diff) | |
| download | amt-jb-8b31678c6e44400967d4934cd9f3c6c6ac0da721.tar.gz amt-jb-8b31678c6e44400967d4934cd9f3c6c6ac0da721.zip | |
Carer dir into project, some initial pytest, part of the flow tasks. License and Readme update
Diffstat (limited to 'jb/flow')
| -rw-r--r-- | jb/flow/__init__.py | 0 | ||||
| -rw-r--r-- | jb/flow/events.py | 126 | ||||
| -rw-r--r-- | jb/flow/maintenance.py | 26 | ||||
| -rw-r--r-- | jb/flow/monitoring.py | 156 |
4 files changed, 308 insertions, 0 deletions
diff --git a/jb/flow/__init__.py b/jb/flow/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/jb/flow/__init__.py diff --git a/jb/flow/events.py b/jb/flow/events.py new file mode 100644 index 0000000..3961a64 --- /dev/null +++ b/jb/flow/events.py @@ -0,0 +1,126 @@ +import logging +import time +from concurrent import futures +from concurrent.futures import ThreadPoolExecutor, Executor, as_completed +from typing import Optional + +import redis + +from jb.config import ( + JB_EVENTS_STREAM, + CONSUMER_GROUP, + CONSUMER_NAME, + JB_EVENTS_FAILED_STREAM, +) +from jb.decorators import REDIS +from jb.flow.assignment_tasks import process_assignment_submitted +from jb.models.event import MTurkEvent + + +def process_mturk_events_task(): + executor = ThreadPoolExecutor(max_workers=5) + create_consumer_group() + while True: + try: + process_mturk_events(executor=executor) + except Exception as e: + logging.exception(e) + finally: + time.sleep(1) + + +def handle_pending_msgs_task(): + while True: + try: + handle_pending_msgs() + except Exception as e: + logging.exception(e) + finally: + time.sleep(60) + + +def process_mturk_events(executor: Executor): + while True: + n = process_mturk_events_chunk(executor=executor) + if n is None or n < 10: + break + + +def create_consumer_group(): + try: + REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True) + except redis.exceptions.ResponseError as e: + if "BUSYGROUP Consumer Group name already exists" in str(e): + pass # group already exists + else: + raise + + +def process_mturk_events_chunk(executor: Executor) -> Optional[int]: + msgs = REDIS.xreadgroup( + groupname=CONSUMER_GROUP, + consumername=CONSUMER_NAME, + streams={JB_EVENTS_STREAM: ">"}, + count=10, + ) + if not msgs: + return None + msgs = msgs[0][1] # the queue, we only have 1 + + fs = [] + for msg in msgs: + msg_id, data = msg + msg_json = data["data"] + event = MTurkEvent.model_validate_json(msg_json) + if event.event_type == "AssignmentSubmitted": + fs.append( + executor.submit(process_assignment_submitted_event, event, msg_id) + ) + else: + logging.info(f"Discarding {event}") + REDIS.xdel(JB_EVENTS_STREAM, msg_id) + + futures.wait(fs, timeout=60) + return len(msgs) + + +def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): + process_assignment_submitted(event) + REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) + + +def handle_pending_msgs(): + # Looks in the redis queue for msgs that + # are pending (read by a consumer but not ACK). These prob failed. + # Below is from chatgpt, idk if it works + pending = REDIS.xpending_range( + JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10 + ) + for entry in pending: + msg_id = entry["message_id"] + # Claim message if idle > 10 sec + if entry["idle"] > 10_000: # milliseconds + claimed = REDIS.xclaim( + JB_EVENTS_STREAM, + CONSUMER_GROUP, + CONSUMER_NAME, + min_idle_time=10_000, + message_ids=[msg_id], + ) + for cid, data in claimed: + msg_json = data["data"] + event = MTurkEvent.model_validate_json(msg_json) + if event.event_type == "AssignmentSubmitted": + # Try to process it again. If it fails, add + # it to the failed stream, so maybe we can fix + # and try again? + try: + process_assignment_submitted_event(event, cid) + REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) + except Exception as e: + logging.exception(e) + REDIS.xadd(JB_EVENTS_FAILED_STREAM, data) + REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) + else: + logging.info(f"Discarding {event}") + REDIS.xdel(JB_EVENTS_STREAM, msg_id) diff --git a/jb/flow/maintenance.py b/jb/flow/maintenance.py new file mode 100644 index 0000000..5dc9cea --- /dev/null +++ b/jb/flow/maintenance.py @@ -0,0 +1,26 @@ +from typing import Optional + +from jb.decorators import HM +from jb.flow.monitoring import emit_hit_event +from jb.managers.amt import AMTManager +from jb.models.definitions import HitStatus + + +def check_hit_status( + amt_hit_id: str, amt_hit_type_id: str, reason: Optional[str] = None +) -> HitStatus: + """ + (this used to be called "process_hit") + Request information from Amazon regarding the status of a HIT ID. Update the local state from + that response. + """ + hit_status = AMTManager.get_hit_status(amt_hit_id=amt_hit_id) + # We're assuming that in the db this Hit is marked as Assignable, or else we wouldn't + # have called this function. + if hit_status != HitStatus.Assignable: + # todo: should update also assignment_pending_count, assignment_available_count, assignment_completed_count + HM.update_status(amt_hit_id=amt_hit_id, hit_status=hit_status) + emit_hit_event( + status=hit_status, amt_hit_type_id=amt_hit_type_id, reason=reason + ) + return hit_status diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py new file mode 100644 index 0000000..c8432bb --- /dev/null +++ b/jb/flow/monitoring.py @@ -0,0 +1,156 @@ +import socket +from typing import Optional + +from mypy_boto3_mturk.literals import EventTypeType + +from jb.config import settings +from jb.decorators import influx_client +from jb.models.currency import USDCent +from jb.models.definitions import HitStatus, AssignmentStatus + + +def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int): + tags = { + "host": socket.gethostname(), # could be "amt-jb-0" + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.hits", + "tags": tags, + "fields": {"count": cnt}, + } + if influx_client: + influx_client.write_points([point]) + + +def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int): + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.assignments", + "tags": tags, + "fields": {"count": cnt}, + } + if influx_client: + influx_client.write_points([point]) + + +def emit_hit_event( + status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None +): + """ + e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus, + so it would just be when status=='Assignable' + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + if reason: + tags["reason"] = reason + point = { + "measurement": "amt_jb.hit_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_assignment_event( + status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None +): + """ + e.g. an Assignment was accepted/approved/reject + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + if reason: + tags["reason"] = reason + point = { + "measurement": "amt_jb.assignment_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str): + """ + e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet. + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "event_type": event_type, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.mturk_notification_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_error_event(event_type: str, amt_hit_type_id: str): + """ + e.g. todo: structure the error_types + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "event_type": event_type, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.error_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_bonus_event(amount: USDCent, amt_hit_type_id: str): + """ + An AMT bonus was awarded + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.bonus_events", + "tags": tags, + "fields": {"value": 1, "amount": int(amount)}, + } + + if influx_client: + influx_client.write_points([point]) |
