aboutsummaryrefslogtreecommitdiff
path: root/jb/flow/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'jb/flow/events.py')
-rw-r--r--jb/flow/events.py126
1 files changed, 126 insertions, 0 deletions
diff --git a/jb/flow/events.py b/jb/flow/events.py
new file mode 100644
index 0000000..3961a64
--- /dev/null
+++ b/jb/flow/events.py
@@ -0,0 +1,126 @@
+import logging
+import time
+from concurrent import futures
+from concurrent.futures import ThreadPoolExecutor, Executor, as_completed
+from typing import Optional
+
+import redis
+
+from jb.config import (
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ JB_EVENTS_FAILED_STREAM,
+)
+from jb.decorators import REDIS
+from jb.flow.assignment_tasks import process_assignment_submitted
+from jb.models.event import MTurkEvent
+
+
+def process_mturk_events_task():
+ executor = ThreadPoolExecutor(max_workers=5)
+ create_consumer_group()
+ while True:
+ try:
+ process_mturk_events(executor=executor)
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(1)
+
+
+def handle_pending_msgs_task():
+ while True:
+ try:
+ handle_pending_msgs()
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(60)
+
+
+def process_mturk_events(executor: Executor):
+ while True:
+ n = process_mturk_events_chunk(executor=executor)
+ if n is None or n < 10:
+ break
+
+
+def create_consumer_group():
+ try:
+ REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True)
+ except redis.exceptions.ResponseError as e:
+ if "BUSYGROUP Consumer Group name already exists" in str(e):
+ pass # group already exists
+ else:
+ raise
+
+
+def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
+ msgs = REDIS.xreadgroup(
+ groupname=CONSUMER_GROUP,
+ consumername=CONSUMER_NAME,
+ streams={JB_EVENTS_STREAM: ">"},
+ count=10,
+ )
+ if not msgs:
+ return None
+ msgs = msgs[0][1] # the queue, we only have 1
+
+ fs = []
+ for msg in msgs:
+ msg_id, data = msg
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ fs.append(
+ executor.submit(process_assignment_submitted_event, event, msg_id)
+ )
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
+
+ futures.wait(fs, timeout=60)
+ return len(msgs)
+
+
+def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
+ process_assignment_submitted(event)
+ REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)
+
+
+def handle_pending_msgs():
+ # Looks in the redis queue for msgs that
+ # are pending (read by a consumer but not ACK). These prob failed.
+ # Below is from chatgpt, idk if it works
+ pending = REDIS.xpending_range(
+ JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
+ )
+ for entry in pending:
+ msg_id = entry["message_id"]
+ # Claim message if idle > 10 sec
+ if entry["idle"] > 10_000: # milliseconds
+ claimed = REDIS.xclaim(
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ min_idle_time=10_000,
+ message_ids=[msg_id],
+ )
+ for cid, data in claimed:
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ # Try to process it again. If it fails, add
+ # it to the failed stream, so maybe we can fix
+ # and try again?
+ try:
+ process_assignment_submitted_event(event, cid)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ except Exception as e:
+ logging.exception(e)
+ REDIS.xadd(JB_EVENTS_FAILED_STREAM, data)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)