import json import pytest from httpx import AsyncClient import secrets from jb.config import JB_EVENTS_STREAM, settings from jb.models.event import MTurkEvent from tests import generate_amt_id def generate_hex_id(length: int = 40) -> str: # length is number of hex chars, so we need length//2 bytes return secrets.token_hex(length // 2) @pytest.fixture def example_mturk_event_body(amt_hit_id, amt_hit_type_id, amt_assignment_id): return { "Type": "Notification", "Message": json.dumps( { "Events": [ { "EventType": "AssignmentSubmitted", "EventTimestamp": "2025-10-16T18:45:51.000000Z", "HITId": amt_hit_id, "AssignmentId": amt_assignment_id, "HITTypeId": amt_hit_type_id, } ], "EventDocId": generate_hex_id(), "SourceAccount": settings.aws_owner_id, "CustomerId": generate_amt_id(length=14), "EventDocVersion": "2006-05-05", } ), } @pytest.fixture() def clean_mturk_events_redis_stream(redis): redis.xtrim(JB_EVENTS_STREAM, maxlen=0) assert redis.xlen(JB_EVENTS_STREAM) == 0 yield redis.xtrim(JB_EVENTS_STREAM, maxlen=0) assert redis.xlen(JB_EVENTS_STREAM) == 0 @pytest.mark.anyio async def test_mturk_notifications( redis, httpxclient: AsyncClient, no_limit, example_mturk_event_body, amt_assignment_id, clean_mturk_events_redis_stream, ): client = httpxclient res = await client.post(url=f"/{settings.sns_path}/", json=example_mturk_event_body) res.raise_for_status() msg_res = redis.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100) msg_res = msg_res[0][1][0] msg_id, msg = msg_res redis.xdel(JB_EVENTS_STREAM, msg_id) msg_json = msg["data"] event = MTurkEvent.model_validate_json(msg_json) assert event.amt_assignment_id == amt_assignment_id