diff options
| author | Max Nanis | 2026-02-26 15:51:49 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-26 15:51:49 -0500 |
| commit | 0bf32fadd85d5938ae29d489efdd82e2cd137300 (patch) | |
| tree | 814e8128947fb604dc7cc3509e72260d95757590 /tests/fixtures/http.py | |
| parent | 04aee0dc7e908ce020d2d2c3f8ffb4a96424b883 (diff) | |
| download | amt-jb-0bf32fadd85d5938ae29d489efdd82e2cd137300.tar.gz amt-jb-0bf32fadd85d5938ae29d489efdd82e2cd137300.zip | |
Passing Managers into flow tasks for better pytest usage. Conftests broken out into seperate fixture files. Extensive type hinting.
Diffstat (limited to 'tests/fixtures/http.py')
| -rw-r--r-- | tests/fixtures/http.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/tests/fixtures/http.py b/tests/fixtures/http.py new file mode 100644 index 0000000..4f11fde --- /dev/null +++ b/tests/fixtures/http.py @@ -0,0 +1,99 @@ +import httpx +import redis +import pytest +import requests_mock +from asgi_lifespan import LifespanManager +from httpx import AsyncClient, ASGITransport +from typing import Dict, Any + +from jb.main import app +import json + +from httpx import AsyncClient +import secrets + +from jb.config import JB_EVENTS_STREAM, settings +from tests import generate_amt_id + + +@pytest.fixture(scope="session") +def anyio_backend(): + return "asyncio" + + +@pytest.fixture(scope="session") +async def httpxclient(): + # limiter.enabled = True + # limiter.reset() + app.testing = True + + async with LifespanManager(app): + # await FastAPICache.clear() + transport = ASGITransport(app=app) + async with AsyncClient( + transport=transport, base_url="http://127.0.0.1:8001/" + ) as client: + yield client + await client.aclose() + + +@pytest.fixture() +def no_limit(): + """Fixture to execute asserts before and after a test is run""" + # limiter.enabled = False + yield # this is where the testing happens + # limiter.enabled = True + + +@pytest.fixture() +def httpxclient_ip(httpxclient): + """Fixture to execute asserts before and after a test is run""" + httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001)) + yield httpxclient # this is where the testing happens + httpxclient._transport = httpx.ASGITransport(app=app) + + +@pytest.fixture +def mock_requests(): + with requests_mock.Mocker() as m: + yield m + + +def generate_hex_id(length: int = 40) -> str: + # length is number of hex chars, so we need length//2 bytes + return secrets.token_hex(length // 2) + + +@pytest.fixture +def mturk_event_body_record( + hit_record: Hit, assignment_stub_record: AssignmentStub +) -> Dict[str, Any]: + return { + "Type": "Notification", + "Message": json.dumps( + { + "Events": [ + { + "EventType": "AssignmentSubmitted", + "EventTimestamp": "2025-10-16T18:45:51.000000Z", + "HITId": hit_record.amt_hit_id, + "AssignmentId": assignment_stub_record.amt_assignment_id, + "HITTypeId": hit_record.amt_hit_type_id, + } + ], + "EventDocId": generate_hex_id(), + "SourceAccount": settings.aws_owner_id, + "CustomerId": generate_amt_id(length=14), + "EventDocVersion": "2006-05-05", + } + ), + } + + +@pytest.fixture() +def clean_mturk_events_redis_stream(redis: redis.Redis): + redis.xtrim(JB_EVENTS_STREAM, maxlen=0) + assert redis.xlen(JB_EVENTS_STREAM) == 0 + yield + redis.xtrim(JB_EVENTS_STREAM, maxlen=0) + assert redis.xlen(JB_EVENTS_STREAM) == 0 |
