aboutsummaryrefslogtreecommitdiff
path: root/jb/flow/events.py
blob: 3961a647cb95c1e0b330d536937ba346502acb64 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import logging
import time
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor, Executor, as_completed
from typing import Optional

import redis

from jb.config import (
    JB_EVENTS_STREAM,
    CONSUMER_GROUP,
    CONSUMER_NAME,
    JB_EVENTS_FAILED_STREAM,
)
from jb.decorators import REDIS
from jb.flow.assignment_tasks import process_assignment_submitted
from jb.models.event import MTurkEvent


def process_mturk_events_task():
    executor = ThreadPoolExecutor(max_workers=5)
    create_consumer_group()
    while True:
        try:
            process_mturk_events(executor=executor)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(1)


def handle_pending_msgs_task():
    while True:
        try:
            handle_pending_msgs()
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(60)


def process_mturk_events(executor: Executor):
    while True:
        n = process_mturk_events_chunk(executor=executor)
        if n is None or n < 10:
            break


def create_consumer_group():
    try:
        REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True)
    except redis.exceptions.ResponseError as e:
        if "BUSYGROUP Consumer Group name already exists" in str(e):
            pass  # group already exists
        else:
            raise


def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
    msgs = REDIS.xreadgroup(
        groupname=CONSUMER_GROUP,
        consumername=CONSUMER_NAME,
        streams={JB_EVENTS_STREAM: ">"},
        count=10,
    )
    if not msgs:
        return None
    msgs = msgs[0][1]  # the queue, we only have 1

    fs = []
    for msg in msgs:
        msg_id, data = msg
        msg_json = data["data"]
        event = MTurkEvent.model_validate_json(msg_json)
        if event.event_type == "AssignmentSubmitted":
            fs.append(
                executor.submit(process_assignment_submitted_event, event, msg_id)
            )
        else:
            logging.info(f"Discarding {event}")
            REDIS.xdel(JB_EVENTS_STREAM, msg_id)

    futures.wait(fs, timeout=60)
    return len(msgs)


def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
    process_assignment_submitted(event)
    REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)


def handle_pending_msgs():
    # Looks in the redis queue for msgs that
    #   are pending (read by a consumer but not ACK). These prob failed.
    # Below is from chatgpt, idk if it works
    pending = REDIS.xpending_range(
        JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
    )
    for entry in pending:
        msg_id = entry["message_id"]
        # Claim message if idle > 10 sec
        if entry["idle"] > 10_000:  # milliseconds
            claimed = REDIS.xclaim(
                JB_EVENTS_STREAM,
                CONSUMER_GROUP,
                CONSUMER_NAME,
                min_idle_time=10_000,
                message_ids=[msg_id],
            )
            for cid, data in claimed:
                msg_json = data["data"]
                event = MTurkEvent.model_validate_json(msg_json)
                if event.event_type == "AssignmentSubmitted":
                    # Try to process it again. If it fails, add
                    #   it to the failed stream, so maybe we can fix
                    #   and try again?
                    try:
                        process_assignment_submitted_event(event, cid)
                        REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
                    except Exception as e:
                        logging.exception(e)
                        REDIS.xadd(JB_EVENTS_FAILED_STREAM, data)
                        REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
                else:
                    logging.info(f"Discarding {event}")
                    REDIS.xdel(JB_EVENTS_STREAM, msg_id)