diff options
Diffstat (limited to 'jb/flow/events.py')
| -rw-r--r-- | jb/flow/events.py | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/jb/flow/events.py b/jb/flow/events.py new file mode 100644 index 0000000..3961a64 --- /dev/null +++ b/jb/flow/events.py @@ -0,0 +1,126 @@ +import logging +import time +from concurrent import futures +from concurrent.futures import ThreadPoolExecutor, Executor, as_completed +from typing import Optional + +import redis + +from jb.config import ( + JB_EVENTS_STREAM, + CONSUMER_GROUP, + CONSUMER_NAME, + JB_EVENTS_FAILED_STREAM, +) +from jb.decorators import REDIS +from jb.flow.assignment_tasks import process_assignment_submitted +from jb.models.event import MTurkEvent + + +def process_mturk_events_task(): + executor = ThreadPoolExecutor(max_workers=5) + create_consumer_group() + while True: + try: + process_mturk_events(executor=executor) + except Exception as e: + logging.exception(e) + finally: + time.sleep(1) + + +def handle_pending_msgs_task(): + while True: + try: + handle_pending_msgs() + except Exception as e: + logging.exception(e) + finally: + time.sleep(60) + + +def process_mturk_events(executor: Executor): + while True: + n = process_mturk_events_chunk(executor=executor) + if n is None or n < 10: + break + + +def create_consumer_group(): + try: + REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True) + except redis.exceptions.ResponseError as e: + if "BUSYGROUP Consumer Group name already exists" in str(e): + pass # group already exists + else: + raise + + +def process_mturk_events_chunk(executor: Executor) -> Optional[int]: + msgs = REDIS.xreadgroup( + groupname=CONSUMER_GROUP, + consumername=CONSUMER_NAME, + streams={JB_EVENTS_STREAM: ">"}, + count=10, + ) + if not msgs: + return None + msgs = msgs[0][1] # the queue, we only have 1 + + fs = [] + for msg in msgs: + msg_id, data = msg + msg_json = data["data"] + event = MTurkEvent.model_validate_json(msg_json) + if event.event_type == "AssignmentSubmitted": + fs.append( + executor.submit(process_assignment_submitted_event, event, msg_id) + ) + else: + logging.info(f"Discarding {event}") + REDIS.xdel(JB_EVENTS_STREAM, msg_id) + + futures.wait(fs, timeout=60) + return len(msgs) + + +def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): + process_assignment_submitted(event) + REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) + + +def handle_pending_msgs(): + # Looks in the redis queue for msgs that + # are pending (read by a consumer but not ACK). These prob failed. + # Below is from chatgpt, idk if it works + pending = REDIS.xpending_range( + JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10 + ) + for entry in pending: + msg_id = entry["message_id"] + # Claim message if idle > 10 sec + if entry["idle"] > 10_000: # milliseconds + claimed = REDIS.xclaim( + JB_EVENTS_STREAM, + CONSUMER_GROUP, + CONSUMER_NAME, + min_idle_time=10_000, + message_ids=[msg_id], + ) + for cid, data in claimed: + msg_json = data["data"] + event = MTurkEvent.model_validate_json(msg_json) + if event.event_type == "AssignmentSubmitted": + # Try to process it again. If it fails, add + # it to the failed stream, so maybe we can fix + # and try again? + try: + process_assignment_submitted_event(event, cid) + REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) + except Exception as e: + logging.exception(e) + REDIS.xadd(JB_EVENTS_FAILED_STREAM, data) + REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) + else: + logging.info(f"Discarding {event}") + REDIS.xdel(JB_EVENTS_STREAM, msg_id) |
