import logging import time from concurrent import futures from concurrent.futures import ThreadPoolExecutor, Executor, as_completed from typing import Optional import redis from jb.config import ( JB_EVENTS_STREAM, CONSUMER_GROUP, CONSUMER_NAME, JB_EVENTS_FAILED_STREAM, ) from jb.decorators import REDIS from jb.flow.assignment_tasks import process_assignment_submitted from jb.models.event import MTurkEvent def process_mturk_events_task(): executor = ThreadPoolExecutor(max_workers=5) create_consumer_group() while True: try: process_mturk_events(executor=executor) except Exception as e: logging.exception(e) finally: time.sleep(1) def handle_pending_msgs_task(): while True: try: handle_pending_msgs() except Exception as e: logging.exception(e) finally: time.sleep(60) def process_mturk_events(executor: Executor): while True: n = process_mturk_events_chunk(executor=executor) if n is None or n < 10: break def create_consumer_group(): try: REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True) except redis.exceptions.ResponseError as e: if "BUSYGROUP Consumer Group name already exists" in str(e): pass # group already exists else: raise def process_mturk_events_chunk(executor: Executor) -> Optional[int]: msgs = REDIS.xreadgroup( groupname=CONSUMER_GROUP, consumername=CONSUMER_NAME, streams={JB_EVENTS_STREAM: ">"}, count=10, ) if not msgs: return None msgs = msgs[0][1] # the queue, we only have 1 fs = [] for msg in msgs: msg_id, data = msg msg_json = data["data"] event = MTurkEvent.model_validate_json(msg_json) if event.event_type == "AssignmentSubmitted": fs.append( executor.submit(process_assignment_submitted_event, event, msg_id) ) else: logging.info(f"Discarding {event}") REDIS.xdel(JB_EVENTS_STREAM, msg_id) futures.wait(fs, timeout=60) return len(msgs) def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): process_assignment_submitted(event) REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) def handle_pending_msgs(): # Looks in the redis queue for msgs that # are pending (read by a consumer but not ACK). These prob failed. # Below is from chatgpt, idk if it works pending = REDIS.xpending_range( JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10 ) for entry in pending: msg_id = entry["message_id"] # Claim message if idle > 10 sec if entry["idle"] > 10_000: # milliseconds claimed = REDIS.xclaim( JB_EVENTS_STREAM, CONSUMER_GROUP, CONSUMER_NAME, min_idle_time=10_000, message_ids=[msg_id], ) for cid, data in claimed: msg_json = data["data"] event = MTurkEvent.model_validate_json(msg_json) if event.event_type == "AssignmentSubmitted": # Try to process it again. If it fails, add # it to the failed stream, so maybe we can fix # and try again? try: process_assignment_submitted_event(event, cid) REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) except Exception as e: logging.exception(e) REDIS.xadd(JB_EVENTS_FAILED_STREAM, data) REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) else: logging.info(f"Discarding {event}") REDIS.xdel(JB_EVENTS_STREAM, msg_id)