1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
import logging
import time
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor, Executor
from typing import Optional, cast, TypedDict
import redis
from jb.config import (
JB_EVENTS_STREAM,
CONSUMER_GROUP,
CONSUMER_NAME,
JB_EVENTS_FAILED_STREAM,
)
from jb.decorators import REDIS
from jb.flow.assignment_tasks import process_assignment_submitted
from jb.models.event import MTurkEvent
StreamMessages = list[tuple[str, list[tuple[bytes, dict[bytes, bytes]]]]]
class PendingEntry(TypedDict):
message_id: bytes
consumer: bytes
time_since_delivered: int
times_delivered: int
def process_mturk_events_task():
executor = ThreadPoolExecutor(max_workers=5)
create_consumer_group()
while True:
try:
process_mturk_events(executor=executor)
except Exception as e:
logging.exception(e)
finally:
time.sleep(1)
def handle_pending_msgs_task():
while True:
try:
handle_pending_msgs()
except Exception as e:
logging.exception(e)
finally:
time.sleep(60)
def process_mturk_events(executor: Executor):
while True:
n = process_mturk_events_chunk(executor=executor)
if n is None or n < 10:
break
def create_consumer_group():
try:
REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP Consumer Group name already exists" in str(e):
pass # group already exists
else:
raise
def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
msgs_raw = REDIS.xreadgroup(
groupname=CONSUMER_GROUP,
consumername=CONSUMER_NAME,
streams={JB_EVENTS_STREAM: ">"},
count=10,
)
if not msgs_raw:
return None
msgs = cast(StreamMessages, msgs_raw)[0][1] # The queue, we only have 1
fs = []
for msg in msgs:
msg_id, data = msg
msg_json: str = data["data"]
event = MTurkEvent.model_validate_json(json_data=msg_json)
if event.event_type == "AssignmentSubmitted":
fs.append(
executor.submit(process_assignment_submitted_event, event, str(msg_id))
)
else:
logging.info(f"Discarding {event}")
REDIS.xdel(JB_EVENTS_STREAM, msg_id)
futures.wait(fs, timeout=60)
return len(msgs)
def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
from jb.decorators import AMTM, AM, HM, BM
process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event)
REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)
def handle_pending_msgs():
# Looks in the redis queue for msgs that
# are pending (read by a consumer but not ACK). These prob failed.
# Below is from chatgpt, idk if it works
pending = cast(
list[PendingEntry],
REDIS.xpending_range(
JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
),
)
for entry in pending:
msg_id = entry["message_id"]
# Claim message if idle > 10 sec
if entry["idle"] > 10_000: # milliseconds
claimed = REDIS.xclaim(
JB_EVENTS_STREAM,
CONSUMER_GROUP,
CONSUMER_NAME,
min_idle_time=10_000,
message_ids=[msg_id],
)
for cid, data in claimed:
msg_json = data["data"]
event = MTurkEvent.model_validate_json(msg_json)
if event.event_type == "AssignmentSubmitted":
# Try to process it again. If it fails, add
# it to the failed stream, so maybe we can fix
# and try again?
try:
process_assignment_submitted_event(event, cid)
REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
except Exception as e:
logging.exception(e)
REDIS.xadd(JB_EVENTS_FAILED_STREAM, data)
REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
else:
logging.info(f"Discarding {event}")
REDIS.xdel(JB_EVENTS_STREAM, msg_id)
|