import logging import time from concurrent import futures from concurrent.futures import ThreadPoolExecutor, Executor from typing import Optional, cast, TypedDict import redis from jb.config import ( JB_EVENTS_STREAM, CONSUMER_GROUP, CONSUMER_NAME, JB_EVENTS_FAILED_STREAM, ) from jb.decorators import REDIS from jb.flow.assignment_tasks import process_assignment_submitted from jb.models.event import MTurkEvent StreamMessages = list[tuple[str, list[tuple[bytes, dict[bytes, bytes]]]]] class PendingEntry(TypedDict): message_id: bytes consumer: bytes time_since_delivered: int times_delivered: int def process_mturk_events_task(): executor = ThreadPoolExecutor(max_workers=5) create_consumer_group() while True: try: process_mturk_events(executor=executor) except Exception as e: logging.exception(e) finally: time.sleep(1) def handle_pending_msgs_task(): while True: try: handle_pending_msgs() except Exception as e: logging.exception(e) finally: time.sleep(60) def process_mturk_events(executor: Executor): while True: n = process_mturk_events_chunk(executor=executor) if n is None or n < 10: break def create_consumer_group(): try: REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True) except redis.exceptions.ResponseError as e: if "BUSYGROUP Consumer Group name already exists" in str(e): pass # group already exists else: raise def process_mturk_events_chunk(executor: Executor) -> Optional[int]: msgs_raw = REDIS.xreadgroup( groupname=CONSUMER_GROUP, consumername=CONSUMER_NAME, streams={JB_EVENTS_STREAM: ">"}, count=10, ) if not msgs_raw: return None msgs = cast(StreamMessages, msgs_raw)[0][1] # The queue, we only have 1 fs = [] for msg in msgs: msg_id, data = msg msg_json: str = data["data"] event = MTurkEvent.model_validate_json(json_data=msg_json) if event.event_type == "AssignmentSubmitted": fs.append( executor.submit(process_assignment_submitted_event, event, str(msg_id)) ) else: logging.info(f"Discarding {event}") REDIS.xdel(JB_EVENTS_STREAM, msg_id) futures.wait(fs, timeout=60) return len(msgs) def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): from jb.decorators import AMTM, AM, HM, BM process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event) REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) def handle_pending_msgs(): # Looks in the redis queue for msgs that # are pending (read by a consumer but not ACK). These prob failed. # Below is from chatgpt, idk if it works pending = cast( list[PendingEntry], REDIS.xpending_range( JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10 ), ) for entry in pending: msg_id = entry["message_id"] # Claim message if idle > 10 sec if entry["idle"] > 10_000: # milliseconds claimed = REDIS.xclaim( JB_EVENTS_STREAM, CONSUMER_GROUP, CONSUMER_NAME, min_idle_time=10_000, message_ids=[msg_id], ) for cid, data in claimed: msg_json = data["data"] event = MTurkEvent.model_validate_json(msg_json) if event.event_type == "AssignmentSubmitted": # Try to process it again. If it fails, add # it to the failed stream, so maybe we can fix # and try again? try: process_assignment_submitted_event(event, cid) REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) except Exception as e: logging.exception(e) REDIS.xadd(JB_EVENTS_FAILED_STREAM, data) REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) else: logging.info(f"Discarding {event}") REDIS.xdel(JB_EVENTS_STREAM, msg_id)