aboutsummaryrefslogtreecommitdiff
path: root/tests/http/conftest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/http/conftest.py')
-rw-r--r--tests/http/conftest.py99
1 files changed, 0 insertions, 99 deletions
diff --git a/tests/http/conftest.py b/tests/http/conftest.py
deleted file mode 100644
index 4f11fde..0000000
--- a/tests/http/conftest.py
+++ /dev/null
@@ -1,99 +0,0 @@
-import httpx
-import redis
-import pytest
-import requests_mock
-from asgi_lifespan import LifespanManager
-from httpx import AsyncClient, ASGITransport
-from typing import Dict, Any
-
-from jb.main import app
-import json
-
-from httpx import AsyncClient
-import secrets
-
-from jb.config import JB_EVENTS_STREAM, settings
-from tests import generate_amt_id
-
-
-@pytest.fixture(scope="session")
-def anyio_backend():
- return "asyncio"
-
-
-@pytest.fixture(scope="session")
-async def httpxclient():
- # limiter.enabled = True
- # limiter.reset()
- app.testing = True
-
- async with LifespanManager(app):
- # await FastAPICache.clear()
- transport = ASGITransport(app=app)
- async with AsyncClient(
- transport=transport, base_url="http://127.0.0.1:8001/"
- ) as client:
- yield client
- await client.aclose()
-
-
-@pytest.fixture()
-def no_limit():
- """Fixture to execute asserts before and after a test is run"""
- # limiter.enabled = False
- yield # this is where the testing happens
- # limiter.enabled = True
-
-
-@pytest.fixture()
-def httpxclient_ip(httpxclient):
- """Fixture to execute asserts before and after a test is run"""
- httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001))
- yield httpxclient # this is where the testing happens
- httpxclient._transport = httpx.ASGITransport(app=app)
-
-
-@pytest.fixture
-def mock_requests():
- with requests_mock.Mocker() as m:
- yield m
-
-
-def generate_hex_id(length: int = 40) -> str:
- # length is number of hex chars, so we need length//2 bytes
- return secrets.token_hex(length // 2)
-
-
-@pytest.fixture
-def mturk_event_body_record(
- hit_record: Hit, assignment_stub_record: AssignmentStub
-) -> Dict[str, Any]:
- return {
- "Type": "Notification",
- "Message": json.dumps(
- {
- "Events": [
- {
- "EventType": "AssignmentSubmitted",
- "EventTimestamp": "2025-10-16T18:45:51.000000Z",
- "HITId": hit_record.amt_hit_id,
- "AssignmentId": assignment_stub_record.amt_assignment_id,
- "HITTypeId": hit_record.amt_hit_type_id,
- }
- ],
- "EventDocId": generate_hex_id(),
- "SourceAccount": settings.aws_owner_id,
- "CustomerId": generate_amt_id(length=14),
- "EventDocVersion": "2006-05-05",
- }
- ),
- }
-
-
-@pytest.fixture()
-def clean_mturk_events_redis_stream(redis: redis.Redis):
- redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
- assert redis.xlen(JB_EVENTS_STREAM) == 0
- yield
- redis.xtrim(JB_EVENTS_STREAM, maxlen=0)
- assert redis.xlen(JB_EVENTS_STREAM) == 0