aboutsummaryrefslogtreecommitdiff
path: root/jb
diff options
context:
space:
mode:
Diffstat (limited to 'jb')
-rw-r--r--jb/flow/events.py11
1 files changed, 10 insertions, 1 deletions
diff --git a/jb/flow/events.py b/jb/flow/events.py
index c759377..2825cb1 100644
--- a/jb/flow/events.py
+++ b/jb/flow/events.py
@@ -14,6 +14,7 @@ from jb.config import (
)
from jb.decorators import REDIS
from jb.flow.assignment_tasks import process_assignment_submitted
+from jb.flow.monitoring import emit_error_event
from jb.models.event import MTurkEvent
StreamMessages = list[tuple[str, list[tuple[bytes, dict[bytes, bytes]]]]]
@@ -98,7 +99,15 @@ def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
from jb.decorators import AMTM, AM, HM, BM
- process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event)
+ try:
+ process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event)
+ except Exception as e:
+ logging.exception(f"{event.amt_assignment_id=}, {e=}")
+ emit_error_event(
+ event_type="failed_process_assignment_submitted",
+ amt_hit_type_id=event.amt_hit_type_id,
+ )
+
REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)