aboutsummaryrefslogtreecommitdiff
path: root/jb
diff options
context:
space:
mode:
authorstuppie2026-04-11 10:18:18 -0600
committerstuppie2026-04-11 10:18:18 -0600
commit07bde156894db70ce92fabf67938770fac8f81b9 (patch)
tree9e55343cbff9151baa2e2ed595f16f69a03a0072 /jb
parent2a8ce8027f26574764dc3f81364fd6aef5abbfd3 (diff)
downloadamt-jb-07bde156894db70ce92fabf67938770fac8f81b9.tar.gz
amt-jb-07bde156894db70ce92fabf67938770fac8f81b9.zip
emit_error_event if failed process_assignment_submitted
Diffstat (limited to 'jb')
-rw-r--r--jb/flow/events.py11
1 files changed, 10 insertions, 1 deletions
diff --git a/jb/flow/events.py b/jb/flow/events.py
index c759377..2825cb1 100644
--- a/jb/flow/events.py
+++ b/jb/flow/events.py
@@ -14,6 +14,7 @@ from jb.config import (
)
from jb.decorators import REDIS
from jb.flow.assignment_tasks import process_assignment_submitted
+from jb.flow.monitoring import emit_error_event
from jb.models.event import MTurkEvent
StreamMessages = list[tuple[str, list[tuple[bytes, dict[bytes, bytes]]]]]
@@ -98,7 +99,15 @@ def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
from jb.decorators import AMTM, AM, HM, BM
- process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event)
+ try:
+ process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event)
+ except Exception as e:
+ logging.exception(f"{event.amt_assignment_id=}, {e=}")
+ emit_error_event(
+ event_type="failed_process_assignment_submitted",
+ amt_hit_type_id=event.amt_hit_type_id,
+ )
+
REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)