1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
import httpx
import redis
import pytest
import requests_mock
from asgi_lifespan import LifespanManager
from httpx import AsyncClient, ASGITransport
from typing import Dict, Any, AsyncGenerator
from jb.main import app
import json
from httpx import AsyncClient
import secrets
from jb.models.hit import Hit
from jb.models.assignment import AssignmentStub
from jb.config import JB_EVENTS_STREAM, settings
from tests import generate_amt_id
@pytest.fixture(scope="session")
def anyio_backend():
return "asyncio"
@pytest.fixture(scope="session")
async def httpxclient() -> AsyncGenerator[AsyncClient, None]:
# limiter.enabled = True
# limiter.reset()
app.testing = True
async with LifespanManager(app):
# await FastAPICache.clear()
transport = ASGITransport(app=app)
async with AsyncClient(
transport=transport, base_url="http://127.0.0.1:8001/"
) as client:
yield client
await client.aclose()
@pytest.fixture()
def no_limit():
"""Fixture to execute asserts before and after a test is run"""
# limiter.enabled = False
yield # this is where the testing happens
# limiter.enabled = True
@pytest.fixture()
def httpxclient_ip(httpxclient: AsyncClient):
"""Fixture to execute asserts before and after a test is run"""
httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001)) # type: ignore[attr-defined]
yield httpxclient # this is where the testing happens
httpxclient._transport = httpx.ASGITransport(app=app) # type: ignore[attr-defined]
@pytest.fixture
def mock_requests():
with requests_mock.Mocker() as m:
yield m
def generate_hex_id(length: int = 40) -> str:
# length is number of hex chars, so we need length//2 bytes
return secrets.token_hex(length // 2)
@pytest.fixture
def mturk_event_body_record(
hit_record: Hit, assignment_stub_record: AssignmentStub
) -> Dict[str, Any]:
return {
"Type": "Notification",
"Message": json.dumps(
{
"Events": [
{
"EventType": "AssignmentSubmitted",
"EventTimestamp": "2025-10-16T18:45:51.000000Z",
"HITId": hit_record.amt_hit_id,
"AssignmentId": assignment_stub_record.amt_assignment_id,
"HITTypeId": hit_record.amt_hit_type_id,
}
],
"EventDocId": generate_hex_id(),
"SourceAccount": settings.aws_owner_id,
"CustomerId": generate_amt_id(length=14),
"EventDocVersion": "2006-05-05",
}
),
}
@pytest.fixture()
def clean_mturk_events_redis_stream(redis: redis.Redis):
redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
assert redis.xlen(JB_EVENTS_STREAM) == 0
yield
redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
assert redis.xlen(JB_EVENTS_STREAM) == 0
|