import httpx import redis import pytest import requests_mock from asgi_lifespan import LifespanManager from httpx import AsyncClient, ASGITransport from typing import Dict, Any from jb.main import app import json from httpx import AsyncClient import secrets from jb.config import JB_EVENTS_STREAM, settings from tests import generate_amt_id @pytest.fixture(scope="session") def anyio_backend(): return "asyncio" @pytest.fixture(scope="session") async def httpxclient(): # limiter.enabled = True # limiter.reset() app.testing = True async with LifespanManager(app): # await FastAPICache.clear() transport = ASGITransport(app=app) async with AsyncClient( transport=transport, base_url="http://127.0.0.1:8001/" ) as client: yield client await client.aclose() @pytest.fixture() def no_limit(): """Fixture to execute asserts before and after a test is run""" # limiter.enabled = False yield # this is where the testing happens # limiter.enabled = True @pytest.fixture() def httpxclient_ip(httpxclient): """Fixture to execute asserts before and after a test is run""" httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001)) yield httpxclient # this is where the testing happens httpxclient._transport = httpx.ASGITransport(app=app) @pytest.fixture def mock_requests(): with requests_mock.Mocker() as m: yield m def generate_hex_id(length: int = 40) -> str: # length is number of hex chars, so we need length//2 bytes return secrets.token_hex(length // 2) @pytest.fixture def mturk_event_body_record( hit_record: Hit, assignment_stub_record: AssignmentStub ) -> Dict[str, Any]: return { "Type": "Notification", "Message": json.dumps( { "Events": [ { "EventType": "AssignmentSubmitted", "EventTimestamp": "2025-10-16T18:45:51.000000Z", "HITId": hit_record.amt_hit_id, "AssignmentId": assignment_stub_record.amt_assignment_id, "HITTypeId": hit_record.amt_hit_type_id, } ], "EventDocId": generate_hex_id(), "SourceAccount": settings.aws_owner_id, "CustomerId": generate_amt_id(length=14), "EventDocVersion": "2006-05-05", } ), } @pytest.fixture() def clean_mturk_events_redis_stream(redis: redis.Redis): redis.xtrim(JB_EVENTS_STREAM, maxlen=0) assert redis.xlen(JB_EVENTS_STREAM) == 0 yield redis.xtrim(JB_EVENTS_STREAM, maxlen=0) assert redis.xlen(JB_EVENTS_STREAM) == 0