aboutsummaryrefslogtreecommitdiff
path: root/tests/http/test_notifications.py
diff options
context:
space:
mode:
authorMax Nanis2026-02-24 17:26:15 -0500
committerMax Nanis2026-02-24 17:26:15 -0500
commit8c1940445503fd6678d0961600f2be81622793a2 (patch)
treeb9173562b8824b5eaa805e446d9d780e1f23fb2a /tests/http/test_notifications.py
parent25d8c3c214baf10f6520cc1351f78473150e5d7a (diff)
downloadamt-jb-8c1940445503fd6678d0961600f2be81622793a2.tar.gz
amt-jb-8c1940445503fd6678d0961600f2be81622793a2.zip
Extensive use of type checking. Movement of pytest conf towards handling managers (for db agnostic unittest). Starting to organize pytests.
Diffstat (limited to 'tests/http/test_notifications.py')
-rw-r--r--tests/http/test_notifications.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/tests/http/test_notifications.py b/tests/http/test_notifications.py
index 70458b8..6770044 100644
--- a/tests/http/test_notifications.py
+++ b/tests/http/test_notifications.py
@@ -5,7 +5,6 @@ from httpx import AsyncClient
import secrets
from jb.config import JB_EVENTS_STREAM, settings
-from jb.decorators import REDIS
from jb.models.event import MTurkEvent
from tests import generate_amt_id
@@ -40,16 +39,17 @@ def example_mturk_event_body(amt_hit_id, amt_hit_type_id, amt_assignment_id):
@pytest.fixture()
-def clean_mturk_events_redis_stream():
- REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
- assert REDIS.xlen(JB_EVENTS_STREAM) == 0
+def clean_mturk_events_redis_stream(redis):
+ redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
+ assert redis.xlen(JB_EVENTS_STREAM) == 0
yield
- REDIS.xtrim(JB_EVENTS_STREAM, maxlen=0)
- assert REDIS.xlen(JB_EVENTS_STREAM) == 0
+ redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
+ assert redis.xlen(JB_EVENTS_STREAM) == 0
@pytest.mark.anyio
async def test_mturk_notifications(
+ redis,
httpxclient: AsyncClient,
no_limit,
example_mturk_event_body,
@@ -61,10 +61,10 @@ async def test_mturk_notifications(
res = await client.post(url=f"/{settings.sns_path}/", json=example_mturk_event_body)
res.raise_for_status()
- msg_res = REDIS.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100)
+ msg_res = redis.xread(streams={JB_EVENTS_STREAM: 0}, count=1, block=100)
msg_res = msg_res[0][1][0]
msg_id, msg = msg_res
- REDIS.xdel(JB_EVENTS_STREAM, msg_id)
+ redis.xdel(JB_EVENTS_STREAM, msg_id)
msg_json = msg["data"]
event = MTurkEvent.model_validate_json(msg_json)