diff options
| author | Max Nanis | 2026-02-21 02:15:52 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-21 02:15:52 -0500 |
| commit | 67ab724561e4ceb8fe8fb4031de277168f7d9724 (patch) | |
| tree | 4d85619973491e7239f0e83dc5cdd85618f0f248 /tests/http/test_notifications.py | |
| parent | af8057d58ff152f511f5161a7626b0fffa9d661a (diff) | |
| download | amt-jb-67ab724561e4ceb8fe8fb4031de277168f7d9724.tar.gz amt-jb-67ab724561e4ceb8fe8fb4031de277168f7d9724.zip | |
More pytest conf, some views, and defining more attrs on the settings config
Diffstat (limited to 'tests/http/test_notifications.py')
| -rw-r--r-- | tests/http/test_notifications.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/tests/http/test_notifications.py b/tests/http/test_notifications.py new file mode 100644 index 0000000..70458b8 --- /dev/null +++ b/tests/http/test_notifications.py @@ -0,0 +1,71 @@ +import json + +import pytest +from httpx import AsyncClient +import secrets + +from jb.config import JB_EVENTS_STREAM, settings +from jb.decorators import REDIS +from jb.models.event import MTurkEvent +from tests import generate_amt_id + + +def generate_hex_id(length: int = 40) -> str: + # length is number of hex chars, so we need length//2 bytes + return secrets.token_hex(length // 2) + + +@pytest.fixture +def example_mturk_event_body(amt_hit_id, amt_hit_type_id, amt_assignment_id): + return { + "Type": "Notification", + "Message": json.dumps( + { + "Events": [ + { + "EventType": "AssignmentSubmitted", + "EventTimestamp": "2025-10-16T18:45:51.000000Z", + "HITId": amt_hit_id, + "AssignmentId": amt_assignment_id, + "HITTypeId": amt_hit_type_id, + } + ], + "EventDocId": generate_hex_id(), + "SourceAccount": settings.aws_owner_id, + "CustomerId": generate_amt_id(length=14), + "EventDocVersion": "2006-05-05", + } + ), + } + + +@pytest.fixture() +def clean_mturk_events_redis_stream(): + REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0) + assert REDIS.xlen(JB_EVENTS_STREAM) == 0 + yield + REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0) + assert REDIS.xlen(JB_EVENTS_STREAM) == 0 + + +@pytest.mark.anyio +async def test_mturk_notifications( + httpxclient: AsyncClient, + no_limit, + example_mturk_event_body, + amt_assignment_id, + clean_mturk_events_redis_stream, +): + client = httpxclient + + res = await client.post(url=f"/{settings.sns_path}/", json=example_mturk_event_body) + res.raise_for_status() + + msg_res = REDIS.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100) + msg_res = msg_res[0][1][0] + msg_id, msg = msg_res + REDIS.xdel(JB_EVENTS_STREAM, msg_id) + + msg_json = msg["data"] + event = MTurkEvent.model_validate_json(msg_json) + assert event.amt_assignment_id == amt_assignment_id |
