aboutsummaryrefslogtreecommitdiff
path: root/jb/flow/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'jb/flow/events.py')
-rw-r--r--jb/flow/events.py35
1 files changed, 25 insertions, 10 deletions
diff --git a/jb/flow/events.py b/jb/flow/events.py
index 3961a64..7b7bd32 100644
--- a/jb/flow/events.py
+++ b/jb/flow/events.py
@@ -1,8 +1,8 @@
import logging
import time
from concurrent import futures
-from concurrent.futures import ThreadPoolExecutor, Executor, as_completed
-from typing import Optional
+from concurrent.futures import ThreadPoolExecutor, Executor
+from typing import Optional, cast, TypedDict
import redis
@@ -16,6 +16,15 @@ from jb.decorators import REDIS
from jb.flow.assignment_tasks import process_assignment_submitted
from jb.models.event import MTurkEvent
+StreamMessages = list[tuple[str, list[tuple[bytes, dict[bytes, bytes]]]]]
+
+
+class PendingEntry(TypedDict):
+ message_id: bytes
+ consumer: bytes
+ time_since_delivered: int
+ times_delivered: int
+
def process_mturk_events_task():
executor = ThreadPoolExecutor(max_workers=5)
@@ -57,24 +66,26 @@ def create_consumer_group():
def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
- msgs = REDIS.xreadgroup(
+ msgs_raw = REDIS.xreadgroup(
groupname=CONSUMER_GROUP,
consumername=CONSUMER_NAME,
streams={JB_EVENTS_STREAM: ">"},
count=10,
)
- if not msgs:
+ if not msgs_raw:
return None
- msgs = msgs[0][1] # the queue, we only have 1
+
+ msgs = cast(StreamMessages, msgs_raw)[0][1] # The queue, we only have 1
fs = []
for msg in msgs:
msg_id, data = msg
- msg_json = data["data"]
- event = MTurkEvent.model_validate_json(msg_json)
+ msg_json: str = data["data"]
+
+ event = MTurkEvent.model_validate_json(json_data=msg_json)
if event.event_type == "AssignmentSubmitted":
fs.append(
- executor.submit(process_assignment_submitted_event, event, msg_id)
+ executor.submit(process_assignment_submitted_event, event, str(msg_id))
)
else:
logging.info(f"Discarding {event}")
@@ -93,9 +104,13 @@ def handle_pending_msgs():
# Looks in the redis queue for msgs that
# are pending (read by a consumer but not ACK). These prob failed.
# Below is from chatgpt, idk if it works
- pending = REDIS.xpending_range(
- JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
+ pending = cast(
+ list[PendingEntry],
+ REDIS.xpending_range(
+ JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
+ ),
)
+
for entry in pending:
msg_id = entry["message_id"]
# Claim message if idle > 10 sec