1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
import json
import pytest
from httpx import AsyncClient
import secrets
from jb.config import JB_EVENTS_STREAM, settings
from jb.decorators import REDIS
from jb.models.event import MTurkEvent
from tests import generate_amt_id
def generate_hex_id(length: int = 40) -> str:
# length is number of hex chars, so we need length//2 bytes
return secrets.token_hex(length // 2)
@pytest.fixture
def example_mturk_event_body(amt_hit_id, amt_hit_type_id, amt_assignment_id):
return {
"Type": "Notification",
"Message": json.dumps(
{
"Events": [
{
"EventType": "AssignmentSubmitted",
"EventTimestamp": "2025-10-16T18:45:51.000000Z",
"HITId": amt_hit_id,
"AssignmentId": amt_assignment_id,
"HITTypeId": amt_hit_type_id,
}
],
"EventDocId": generate_hex_id(),
"SourceAccount": settings.aws_owner_id,
"CustomerId": generate_amt_id(length=14),
"EventDocVersion": "2006-05-05",
}
),
}
@pytest.fixture()
def clean_mturk_events_redis_stream():
REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
assert REDIS.xlen(JB_EVENTS_STREAM) == 0
yield
REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
assert REDIS.xlen(JB_EVENTS_STREAM) == 0
@pytest.mark.anyio
async def test_mturk_notifications(
httpxclient: AsyncClient,
no_limit,
example_mturk_event_body,
amt_assignment_id,
clean_mturk_events_redis_stream,
):
client = httpxclient
res = await client.post(url=f"/{settings.sns_path}/", json=example_mturk_event_body)
res.raise_for_status()
msg_res = REDIS.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100)
msg_res = msg_res[0][1][0]
msg_id, msg = msg_res
REDIS.xdel(JB_EVENTS_STREAM, msg_id)
msg_json = msg["data"]
event = MTurkEvent.model_validate_json(msg_json)
assert event.amt_assignment_id == amt_assignment_id
|