aboutsummaryrefslogtreecommitdiff
path: root/tests/http/test_notifications.py
blob: 70458b80609acaca08e35ad2a09cc9acd24e0a90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import json

import pytest
from httpx import AsyncClient
import secrets

from jb.config import JB_EVENTS_STREAM, settings
from jb.decorators import REDIS
from jb.models.event import MTurkEvent
from tests import generate_amt_id


def generate_hex_id(length: int = 40) -> str:
    # length is number of hex chars, so we need length//2 bytes
    return secrets.token_hex(length // 2)


@pytest.fixture
def example_mturk_event_body(amt_hit_id, amt_hit_type_id, amt_assignment_id):
    return {
        "Type": "Notification",
        "Message": json.dumps(
            {
                "Events": [
                    {
                        "EventType": "AssignmentSubmitted",
                        "EventTimestamp": "2025-10-16T18:45:51.000000Z",
                        "HITId": amt_hit_id,
                        "AssignmentId": amt_assignment_id,
                        "HITTypeId": amt_hit_type_id,
                    }
                ],
                "EventDocId": generate_hex_id(),
                "SourceAccount": settings.aws_owner_id,
                "CustomerId": generate_amt_id(length=14),
                "EventDocVersion": "2006-05-05",
            }
        ),
    }


@pytest.fixture()
def clean_mturk_events_redis_stream():
    REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
    assert REDIS.xlen(JB_EVENTS_STREAM) == 0
    yield
    REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
    assert REDIS.xlen(JB_EVENTS_STREAM) == 0


@pytest.mark.anyio
async def test_mturk_notifications(
    httpxclient: AsyncClient,
    no_limit,
    example_mturk_event_body,
    amt_assignment_id,
    clean_mturk_events_redis_stream,
):
    client = httpxclient

    res = await client.post(url=f"/{settings.sns_path}/", json=example_mturk_event_body)
    res.raise_for_status()

    msg_res = REDIS.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100)
    msg_res = msg_res[0][1][0]
    msg_id, msg = msg_res
    REDIS.xdel(JB_EVENTS_STREAM, msg_id)

    msg_json = msg["data"]
    event = MTurkEvent.model_validate_json(msg_json)
    assert event.amt_assignment_id == amt_assignment_id