aboutsummaryrefslogtreecommitdiff
path: root/tests/http
diff options
context:
space:
mode:
authorMax Nanis2026-02-26 15:51:49 -0500
committerMax Nanis2026-02-26 15:51:49 -0500
commit0bf32fadd85d5938ae29d489efdd82e2cd137300 (patch)
tree814e8128947fb604dc7cc3509e72260d95757590 /tests/http
parent04aee0dc7e908ce020d2d2c3f8ffb4a96424b883 (diff)
downloadamt-jb-0bf32fadd85d5938ae29d489efdd82e2cd137300.tar.gz
amt-jb-0bf32fadd85d5938ae29d489efdd82e2cd137300.zip
Passing Managers into flow tasks for better pytest usage. Conftests broken out into seperate fixture files. Extensive type hinting.
Diffstat (limited to 'tests/http')
-rw-r--r--tests/http/conftest.py99
-rw-r--r--tests/http/test_notifications.py11
-rw-r--r--tests/http/test_report.py0
3 files changed, 11 insertions, 99 deletions
diff --git a/tests/http/conftest.py b/tests/http/conftest.py
deleted file mode 100644
index 4f11fde..0000000
--- a/tests/http/conftest.py
+++ /dev/null
@@ -1,99 +0,0 @@
-import httpx
-import redis
-import pytest
-import requests_mock
-from asgi_lifespan import LifespanManager
-from httpx import AsyncClient, ASGITransport
-from typing import Dict, Any
-
-from jb.main import app
-import json
-
-from httpx import AsyncClient
-import secrets
-
-from jb.config import JB_EVENTS_STREAM, settings
-from tests import generate_amt_id
-
-
-@pytest.fixture(scope="session")
-def anyio_backend():
- return "asyncio"
-
-
-@pytest.fixture(scope="session")
-async def httpxclient():
- # limiter.enabled = True
- # limiter.reset()
- app.testing = True
-
- async with LifespanManager(app):
- # await FastAPICache.clear()
- transport = ASGITransport(app=app)
- async with AsyncClient(
- transport=transport, base_url="http://127.0.0.1:8001/"
- ) as client:
- yield client
- await client.aclose()
-
-
-@pytest.fixture()
-def no_limit():
- """Fixture to execute asserts before and after a test is run"""
- # limiter.enabled = False
- yield # this is where the testing happens
- # limiter.enabled = True
-
-
-@pytest.fixture()
-def httpxclient_ip(httpxclient):
- """Fixture to execute asserts before and after a test is run"""
- httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001))
- yield httpxclient # this is where the testing happens
- httpxclient._transport = httpx.ASGITransport(app=app)
-
-
-@pytest.fixture
-def mock_requests():
- with requests_mock.Mocker() as m:
- yield m
-
-
-def generate_hex_id(length: int = 40) -> str:
- # length is number of hex chars, so we need length//2 bytes
- return secrets.token_hex(length // 2)
-
-
-@pytest.fixture
-def mturk_event_body_record(
- hit_record: Hit, assignment_stub_record: AssignmentStub
-) -> Dict[str, Any]:
- return {
- "Type": "Notification",
- "Message": json.dumps(
- {
- "Events": [
- {
- "EventType": "AssignmentSubmitted",
- "EventTimestamp": "2025-10-16T18:45:51.000000Z",
- "HITId": hit_record.amt_hit_id,
- "AssignmentId": assignment_stub_record.amt_assignment_id,
- "HITTypeId": hit_record.amt_hit_type_id,
- }
- ],
- "EventDocId": generate_hex_id(),
- "SourceAccount": settings.aws_owner_id,
- "CustomerId": generate_amt_id(length=14),
- "EventDocVersion": "2006-05-05",
- }
- ),
- }
-
-
-@pytest.fixture()
-def clean_mturk_events_redis_stream(redis: redis.Redis):
- redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
- assert redis.xlen(JB_EVENTS_STREAM) == 0
- yield
- redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
- assert redis.xlen(JB_EVENTS_STREAM) == 0
diff --git a/tests/http/test_notifications.py b/tests/http/test_notifications.py
index 4386863..508b236 100644
--- a/tests/http/test_notifications.py
+++ b/tests/http/test_notifications.py
@@ -8,6 +8,7 @@ from uuid import uuid4
from jb.config import JB_EVENTS_STREAM, settings
from jb.models.event import MTurkEvent
from jb.models.hit import Hit
+from jb.models.assignment import AssignmentStub
class TestNotifications:
@@ -74,11 +75,18 @@ class TestNotifications:
== assignment_stub_record.amt_assignment_id
)
+ # Confirm the stream is empty
+ assert redis.xlen(JB_EVENTS_STREAM) == 0
+
res = await client.post(
url=f"/{settings.sns_path}/", json=mturk_event_body_record
)
res.raise_for_status()
+ # Now that we POSTed, confirm the stream has 1 event in it
+ # Confirm the stream is empty
+ assert redis.xlen(JB_EVENTS_STREAM) == 1
+
# AMT SNS needs to receive a 200 response to stop retrying the notification
assert res.status_code == 200
assert res.json() == {"status": "ok"}
@@ -89,6 +97,9 @@ class TestNotifications:
msg_id, msg = msg_res
redis.xdel(JB_EVENTS_STREAM, msg_id)
+ # After running xdel, we can confirm the stream is empty
+ assert redis.xlen(JB_EVENTS_STREAM) == 0
+
msg_json = msg["data"]
event = MTurkEvent.model_validate_json(msg_json)
diff --git a/tests/http/test_report.py b/tests/http/test_report.py
deleted file mode 100644
index e69de29..0000000
--- a/tests/http/test_report.py
+++ /dev/null