aboutsummaryrefslogtreecommitdiff
path: root/tests/fixtures/http.py
blob: 4f11fde18f35e19821da87a6e53a6271e3222564 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import httpx
import redis
import pytest
import requests_mock
from asgi_lifespan import LifespanManager
from httpx import AsyncClient, ASGITransport
from typing import Dict, Any

from jb.main import app
import json

from httpx import AsyncClient
import secrets

from jb.config import JB_EVENTS_STREAM, settings
from tests import generate_amt_id


@pytest.fixture(scope="session")
def anyio_backend():
    return "asyncio"


@pytest.fixture(scope="session")
async def httpxclient():
    # limiter.enabled = True
    # limiter.reset()
    app.testing = True

    async with LifespanManager(app):
        # await FastAPICache.clear()
        transport = ASGITransport(app=app)
        async with AsyncClient(
            transport=transport, base_url="http://127.0.0.1:8001/"
        ) as client:
            yield client
        await client.aclose()


@pytest.fixture()
def no_limit():
    """Fixture to execute asserts before and after a test is run"""
    # limiter.enabled = False
    yield  # this is where the testing happens
    # limiter.enabled = True


@pytest.fixture()
def httpxclient_ip(httpxclient):
    """Fixture to execute asserts before and after a test is run"""
    httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001))
    yield httpxclient  # this is where the testing happens
    httpxclient._transport = httpx.ASGITransport(app=app)


@pytest.fixture
def mock_requests():
    with requests_mock.Mocker() as m:
        yield m


def generate_hex_id(length: int = 40) -> str:
    # length is number of hex chars, so we need length//2 bytes
    return secrets.token_hex(length // 2)


@pytest.fixture
def mturk_event_body_record(
    hit_record: Hit, assignment_stub_record: AssignmentStub
) -> Dict[str, Any]:
    return {
        "Type": "Notification",
        "Message": json.dumps(
            {
                "Events": [
                    {
                        "EventType": "AssignmentSubmitted",
                        "EventTimestamp": "2025-10-16T18:45:51.000000Z",
                        "HITId": hit_record.amt_hit_id,
                        "AssignmentId": assignment_stub_record.amt_assignment_id,
                        "HITTypeId": hit_record.amt_hit_type_id,
                    }
                ],
                "EventDocId": generate_hex_id(),
                "SourceAccount": settings.aws_owner_id,
                "CustomerId": generate_amt_id(length=14),
                "EventDocVersion": "2006-05-05",
            }
        ),
    }


@pytest.fixture()
def clean_mturk_events_redis_stream(redis: redis.Redis):
    redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
    assert redis.xlen(JB_EVENTS_STREAM) == 0
    yield
    redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
    assert redis.xlen(JB_EVENTS_STREAM) == 0