aboutsummaryrefslogtreecommitdiff
path: root/tests/http/test_notifications.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/http/test_notifications.py')
-rw-r--r--tests/http/test_notifications.py71
1 files changed, 71 insertions, 0 deletions
diff --git a/tests/http/test_notifications.py b/tests/http/test_notifications.py
new file mode 100644
index 0000000..70458b8
--- /dev/null
+++ b/tests/http/test_notifications.py
@@ -0,0 +1,71 @@
+import json
+
+import pytest
+from httpx import AsyncClient
+import secrets
+
+from jb.config import JB_EVENTS_STREAM, settings
+from jb.decorators import REDIS
+from jb.models.event import MTurkEvent
+from tests import generate_amt_id
+
+
+def generate_hex_id(length: int = 40) -> str:
+ # length is number of hex chars, so we need length//2 bytes
+ return secrets.token_hex(length // 2)
+
+
+@pytest.fixture
+def example_mturk_event_body(amt_hit_id, amt_hit_type_id, amt_assignment_id):
+ return {
+ "Type": "Notification",
+ "Message": json.dumps(
+ {
+ "Events": [
+ {
+ "EventType": "AssignmentSubmitted",
+ "EventTimestamp": "2025-10-16T18:45:51.000000Z",
+ "HITId": amt_hit_id,
+ "AssignmentId": amt_assignment_id,
+ "HITTypeId": amt_hit_type_id,
+ }
+ ],
+ "EventDocId": generate_hex_id(),
+ "SourceAccount": settings.aws_owner_id,
+ "CustomerId": generate_amt_id(length=14),
+ "EventDocVersion": "2006-05-05",
+ }
+ ),
+ }
+
+
+@pytest.fixture()
+def clean_mturk_events_redis_stream():
+ REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
+ assert REDIS.xlen(JB_EVENTS_STREAM) == 0
+ yield
+ REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
+ assert REDIS.xlen(JB_EVENTS_STREAM) == 0
+
+
+@pytest.mark.anyio
+async def test_mturk_notifications(
+ httpxclient: AsyncClient,
+ no_limit,
+ example_mturk_event_body,
+ amt_assignment_id,
+ clean_mturk_events_redis_stream,
+):
+ client = httpxclient
+
+ res = await client.post(url=f"/{settings.sns_path}/", json=example_mturk_event_body)
+ res.raise_for_status()
+
+ msg_res = REDIS.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100)
+ msg_res = msg_res[0][1][0]
+ msg_id, msg = msg_res
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
+
+ msg_json = msg["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ assert event.amt_assignment_id == amt_assignment_id