1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import logging
import time
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor, Executor, as_completed
from typing import Optional
import redis
from jb.config import (
JB_EVENTS_STREAM,
CONSUMER_GROUP,
CONSUMER_NAME,
JB_EVENTS_FAILED_STREAM,
)
from jb.decorators import REDIS
from jb.flow.assignment_tasks import process_assignment_submitted
from jb.models.event import MTurkEvent
def process_mturk_events_task():
executor = ThreadPoolExecutor(max_workers=5)
create_consumer_group()
while True:
try:
process_mturk_events(executor=executor)
except Exception as e:
logging.exception(e)
finally:
time.sleep(1)
def handle_pending_msgs_task():
while True:
try:
handle_pending_msgs()
except Exception as e:
logging.exception(e)
finally:
time.sleep(60)
def process_mturk_events(executor: Executor):
while True:
n = process_mturk_events_chunk(executor=executor)
if n is None or n < 10:
break
def create_consumer_group():
try:
REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP Consumer Group name already exists" in str(e):
pass # group already exists
else:
raise
def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
msgs = REDIS.xreadgroup(
groupname=CONSUMER_GROUP,
consumername=CONSUMER_NAME,
streams={JB_EVENTS_STREAM: ">"},
count=10,
)
if not msgs:
return None
msgs = msgs[0][1] # the queue, we only have 1
fs = []
for msg in msgs:
msg_id, data = msg
msg_json = data["data"]
event = MTurkEvent.model_validate_json(msg_json)
if event.event_type == "AssignmentSubmitted":
fs.append(
executor.submit(process_assignment_submitted_event, event, msg_id)
)
else:
logging.info(f"Discarding {event}")
REDIS.xdel(JB_EVENTS_STREAM, msg_id)
futures.wait(fs, timeout=60)
return len(msgs)
def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
process_assignment_submitted(event)
REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)
def handle_pending_msgs():
# Looks in the redis queue for msgs that
# are pending (read by a consumer but not ACK). These prob failed.
# Below is from chatgpt, idk if it works
pending = REDIS.xpending_range(
JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
)
for entry in pending:
msg_id = entry["message_id"]
# Claim message if idle > 10 sec
if entry["idle"] > 10_000: # milliseconds
claimed = REDIS.xclaim(
JB_EVENTS_STREAM,
CONSUMER_GROUP,
CONSUMER_NAME,
min_idle_time=10_000,
message_ids=[msg_id],
)
for cid, data in claimed:
msg_json = data["data"]
event = MTurkEvent.model_validate_json(msg_json)
if event.event_type == "AssignmentSubmitted":
# Try to process it again. If it fails, add
# it to the failed stream, so maybe we can fix
# and try again?
try:
process_assignment_submitted_event(event, cid)
REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
except Exception as e:
logging.exception(e)
REDIS.xadd(JB_EVENTS_FAILED_STREAM, data)
REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
else:
logging.info(f"Discarding {event}")
REDIS.xdel(JB_EVENTS_STREAM, msg_id)
|