diff options
| author | Max Nanis | 2026-02-24 17:26:15 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-24 17:26:15 -0500 |
| commit | 8c1940445503fd6678d0961600f2be81622793a2 (patch) | |
| tree | b9173562b8824b5eaa805e446d9d780e1f23fb2a /tests/http/test_notifications.py | |
| parent | 25d8c3c214baf10f6520cc1351f78473150e5d7a (diff) | |
| download | amt-jb-8c1940445503fd6678d0961600f2be81622793a2.tar.gz amt-jb-8c1940445503fd6678d0961600f2be81622793a2.zip | |
Extensive use of type checking. Movement of pytest conf towards handling managers (for db agnostic unittest). Starting to organize pytests.
Diffstat (limited to 'tests/http/test_notifications.py')
| -rw-r--r-- | tests/http/test_notifications.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/tests/http/test_notifications.py b/tests/http/test_notifications.py index 70458b8..6770044 100644 --- a/tests/http/test_notifications.py +++ b/tests/http/test_notifications.py @@ -5,7 +5,6 @@ from httpx import AsyncClient import secrets from jb.config import JB_EVENTS_STREAM, settings -from jb.decorators import REDIS from jb.models.event import MTurkEvent from tests import generate_amt_id @@ -40,16 +39,17 @@ def example_mturk_event_body(amt_hit_id, amt_hit_type_id, amt_assignment_id): @pytest.fixture() -def clean_mturk_events_redis_stream(): - REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0) - assert REDIS.xlen(JB_EVENTS_STREAM) == 0 +def clean_mturk_events_redis_stream(redis): + redis.xtrim(JB_EVENTS_STREAM, maxlen=0) + assert redis.xlen(JB_EVENTS_STREAM) == 0 yield - REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0) - assert REDIS.xlen(JB_EVENTS_STREAM) == 0 + redis.xtrim(JB_EVENTS_STREAM, maxlen=0) + assert redis.xlen(JB_EVENTS_STREAM) == 0 @pytest.mark.anyio async def test_mturk_notifications( + redis, httpxclient: AsyncClient, no_limit, example_mturk_event_body, @@ -61,10 +61,10 @@ async def test_mturk_notifications( res = await client.post(url=f"/{settings.sns_path}/", json=example_mturk_event_body) res.raise_for_status() - msg_res = REDIS.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100) + msg_res = redis.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100) msg_res = msg_res[0][1][0] msg_id, msg = msg_res - REDIS.xdel(JB_EVENTS_STREAM, msg_id) + redis.xdel(JB_EVENTS_STREAM, msg_id) msg_json = msg["data"] event = MTurkEvent.model_validate_json(msg_json) |
