diff options
| author | Max Nanis | 2026-02-26 15:51:49 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-26 15:51:49 -0500 |
| commit | 0bf32fadd85d5938ae29d489efdd82e2cd137300 (patch) | |
| tree | 814e8128947fb604dc7cc3509e72260d95757590 /tests/http | |
| parent | 04aee0dc7e908ce020d2d2c3f8ffb4a96424b883 (diff) | |
| download | amt-jb-0bf32fadd85d5938ae29d489efdd82e2cd137300.tar.gz amt-jb-0bf32fadd85d5938ae29d489efdd82e2cd137300.zip | |
Passing Managers into flow tasks for better pytest usage. Conftests broken out into seperate fixture files. Extensive type hinting.
Diffstat (limited to 'tests/http')
| -rw-r--r-- | tests/http/conftest.py | 99 | ||||
| -rw-r--r-- | tests/http/test_notifications.py | 11 | ||||
| -rw-r--r-- | tests/http/test_report.py | 0 |
3 files changed, 11 insertions, 99 deletions
diff --git a/tests/http/conftest.py b/tests/http/conftest.py deleted file mode 100644 index 4f11fde..0000000 --- a/tests/http/conftest.py +++ /dev/null @@ -1,99 +0,0 @@ -import httpx -import redis -import pytest -import requests_mock -from asgi_lifespan import LifespanManager -from httpx import AsyncClient, ASGITransport -from typing import Dict, Any - -from jb.main import app -import json - -from httpx import AsyncClient -import secrets - -from jb.config import JB_EVENTS_STREAM, settings -from tests import generate_amt_id - - -@pytest.fixture(scope="session") -def anyio_backend(): - return "asyncio" - - -@pytest.fixture(scope="session") -async def httpxclient(): - # limiter.enabled = True - # limiter.reset() - app.testing = True - - async with LifespanManager(app): - # await FastAPICache.clear() - transport = ASGITransport(app=app) - async with AsyncClient( - transport=transport, base_url="http://127.0.0.1:8001/" - ) as client: - yield client - await client.aclose() - - -@pytest.fixture() -def no_limit(): - """Fixture to execute asserts before and after a test is run""" - # limiter.enabled = False - yield # this is where the testing happens - # limiter.enabled = True - - -@pytest.fixture() -def httpxclient_ip(httpxclient): - """Fixture to execute asserts before and after a test is run""" - httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001)) - yield httpxclient # this is where the testing happens - httpxclient._transport = httpx.ASGITransport(app=app) - - -@pytest.fixture -def mock_requests(): - with requests_mock.Mocker() as m: - yield m - - -def generate_hex_id(length: int = 40) -> str: - # length is number of hex chars, so we need length//2 bytes - return secrets.token_hex(length // 2) - - -@pytest.fixture -def mturk_event_body_record( - hit_record: Hit, assignment_stub_record: AssignmentStub -) -> Dict[str, Any]: - return { - "Type": "Notification", - "Message": json.dumps( - { - "Events": [ - { - "EventType": "AssignmentSubmitted", - "EventTimestamp": "2025-10-16T18:45:51.000000Z", - "HITId": hit_record.amt_hit_id, - "AssignmentId": assignment_stub_record.amt_assignment_id, - "HITTypeId": hit_record.amt_hit_type_id, - } - ], - "EventDocId": generate_hex_id(), - "SourceAccount": settings.aws_owner_id, - "CustomerId": generate_amt_id(length=14), - "EventDocVersion": "2006-05-05", - } - ), - } - - -@pytest.fixture() -def clean_mturk_events_redis_stream(redis: redis.Redis): - redis.xtrim(JB_EVENTS_STREAM, maxlen=0) - assert redis.xlen(JB_EVENTS_STREAM) == 0 - yield - redis.xtrim(JB_EVENTS_STREAM, maxlen=0) - assert redis.xlen(JB_EVENTS_STREAM) == 0 diff --git a/tests/http/test_notifications.py b/tests/http/test_notifications.py index 4386863..508b236 100644 --- a/tests/http/test_notifications.py +++ b/tests/http/test_notifications.py @@ -8,6 +8,7 @@ from uuid import uuid4 from jb.config import JB_EVENTS_STREAM, settings from jb.models.event import MTurkEvent from jb.models.hit import Hit +from jb.models.assignment import AssignmentStub class TestNotifications: @@ -74,11 +75,18 @@ class TestNotifications: == assignment_stub_record.amt_assignment_id ) + # Confirm the stream is empty + assert redis.xlen(JB_EVENTS_STREAM) == 0 + res = await client.post( url=f"/{settings.sns_path}/", json=mturk_event_body_record ) res.raise_for_status() + # Now that we POSTed, confirm the stream has 1 event in it + # Confirm the stream is empty + assert redis.xlen(JB_EVENTS_STREAM) == 1 + # AMT SNS needs to receive a 200 response to stop retrying the notification assert res.status_code == 200 assert res.json() == {"status": "ok"} @@ -89,6 +97,9 @@ class TestNotifications: msg_id, msg = msg_res redis.xdel(JB_EVENTS_STREAM, msg_id) + # After running xdel, we can confirm the stream is empty + assert redis.xlen(JB_EVENTS_STREAM) == 0 + msg_json = msg["data"] event = MTurkEvent.model_validate_json(msg_json) diff --git a/tests/http/test_report.py b/tests/http/test_report.py deleted file mode 100644 index e69de29..0000000 --- a/tests/http/test_report.py +++ /dev/null |
