diff options
Diffstat (limited to 'jb/flow/events.py')
| -rw-r--r-- | jb/flow/events.py | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/jb/flow/events.py b/jb/flow/events.py index 3961a64..7b7bd32 100644 --- a/jb/flow/events.py +++ b/jb/flow/events.py @@ -1,8 +1,8 @@ import logging import time from concurrent import futures -from concurrent.futures import ThreadPoolExecutor, Executor, as_completed -from typing import Optional +from concurrent.futures import ThreadPoolExecutor, Executor +from typing import Optional, cast, TypedDict import redis @@ -16,6 +16,15 @@ from jb.decorators import REDIS from jb.flow.assignment_tasks import process_assignment_submitted from jb.models.event import MTurkEvent +StreamMessages = list[tuple[str, list[tuple[bytes, dict[bytes, bytes]]]]] + + +class PendingEntry(TypedDict): + message_id: bytes + consumer: bytes + time_since_delivered: int + times_delivered: int + def process_mturk_events_task(): executor = ThreadPoolExecutor(max_workers=5) @@ -57,24 +66,26 @@ def create_consumer_group(): def process_mturk_events_chunk(executor: Executor) -> Optional[int]: - msgs = REDIS.xreadgroup( + msgs_raw = REDIS.xreadgroup( groupname=CONSUMER_GROUP, consumername=CONSUMER_NAME, streams={JB_EVENTS_STREAM: ">"}, count=10, ) - if not msgs: + if not msgs_raw: return None - msgs = msgs[0][1] # the queue, we only have 1 + + msgs = cast(StreamMessages, msgs_raw)[0][1] # The queue, we only have 1 fs = [] for msg in msgs: msg_id, data = msg - msg_json = data["data"] - event = MTurkEvent.model_validate_json(msg_json) + msg_json: str = data["data"] + + event = MTurkEvent.model_validate_json(json_data=msg_json) if event.event_type == "AssignmentSubmitted": fs.append( - executor.submit(process_assignment_submitted_event, event, msg_id) + executor.submit(process_assignment_submitted_event, event, str(msg_id)) ) else: logging.info(f"Discarding {event}") @@ -93,9 +104,13 @@ def handle_pending_msgs(): # Looks in the redis queue for msgs that # are pending (read by a consumer but not ACK). These prob failed. # Below is from chatgpt, idk if it works - pending = REDIS.xpending_range( - JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10 + pending = cast( + list[PendingEntry], + REDIS.xpending_range( + JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10 + ), ) + for entry in pending: msg_id = entry["message_id"] # Claim message if idle > 10 sec |
