import copy
from datetime import datetime, timezone, timedelta
import os
from typing import Optional
from uuid import uuid4
from dotenv import load_dotenv
import pytest
from dateutil.tz import tzlocal
from mypy_boto3_mturk.type_defs import (
GetHITResponseTypeDef,
CreateHITTypeResponseTypeDef,
CreateHITWithHITTypeResponseTypeDef,
GetAssignmentResponseTypeDef,
)
from jb.managers import Permission
from generalresearchutils.pg_helper import PostgresConfig
from jb.managers.amt import AMTManager, APPROVAL_MESSAGE, NO_WORK_APPROVAL_MESSAGE
from jb.models.assignment import AssignmentStub, Assignment
from generalresearchutils.currency import USDCent
from jb.models.definitions import HitStatus, HitReviewStatus, AssignmentStatus
from jb.models.hit import HitType, HitQuestion, Hit
from tests import generate_amt_id
@pytest.fixture
def amt_hit_type_id() -> str:
return generate_amt_id()
@pytest.fixture
def amt_assignment_id() -> str:
return generate_amt_id()
@pytest.fixture
def amt_worker_id() -> str:
return generate_amt_id(length=21)
@pytest.fixture
def amt_group_id() -> str:
return generate_amt_id()
@pytest.fixture
def tsid() -> str:
return uuid4().hex
@pytest.fixture
def tsid1() -> str:
return uuid4().hex
@pytest.fixture
def tsid2() -> str:
return uuid4().hex
@pytest.fixture
def pe_id() -> str:
# payout event / cashout request UUID
return uuid4().hex
# --- Settings ---
@pytest.fixture(scope="session")
def env_file_path(pytestconfig):
root_path = pytestconfig.rootpath
env_path = os.path.join(root_path, ".env.test")
if os.path.exists(env_path):
load_dotenv(dotenv_path=env_path, override=True)
return env_path
@pytest.fixture(scope="session")
def settings(env_file_path) -> "Settings":
from jb.settings import Settings as JBSettings
s = JBSettings(_env_file=env_file_path)
return s
# --- Database Connectors ---
@pytest.fixture(scope="session")
def redis(settings):
from generalresearchutils.redis_helper import RedisConfig
redis_config = RedisConfig(
dsn=settings.redis,
decode_responses=True,
socket_timeout=settings.redis_timeout,
socket_connect_timeout=settings.redis_timeout,
)
return redis_config.create_redis_client()
@pytest.fixture(scope="session")
def pg_config(settings) -> PostgresConfig:
return PostgresConfig(
dsn=settings.amt_jb_db,
connect_timeout=1,
statement_timeout=1,
)
# --- Managers ---
@pytest.fixture(scope="session")
def hqm(pg_config) -> "HitQuestionManager":
assert "/unittest-" in pg_config.dsn.path
from jb.managers.hit import HitQuestionManager
return HitQuestionManager(
pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
)
@pytest.fixture(scope="session")
def htm(pg_config) -> "HitTypeManager":
assert "/unittest-" in pg_config.dsn.path
from jb.managers.hit import HitTypeManager
return HitTypeManager(
pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
)
@pytest.fixture(scope="session")
def hm(pg_config) -> "HitManager":
assert "/unittest-" in pg_config.dsn.path
from jb.managers.hit import HitManager
return HitManager(
pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
)
@pytest.fixture(scope="session")
def am(pg_config) -> "AssignmentManager":
assert "/unittest-" in pg_config.dsn.path
from jb.managers.assignment import AssignmentManager
return AssignmentManager(
pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
)
@pytest.fixture(scope="session")
def bm(pg_config) -> "BonusManager":
assert "/unittest-" in pg_config.dsn.path
from jb.managers.bonus import BonusManager
return BonusManager(
pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
)
# --- Question ---
@pytest.fixture
def question() -> HitQuestion:
return HitQuestion(url="https://jamesbillings67.com/work/", height=1200)
@pytest.fixture
def question_record(hqm, question) -> HitQuestion:
return hqm.get_or_create(question)
# --- HITType ---
@pytest.fixture
def hit_type() -> HitType:
return HitType(
title="Awesome Surveys!",
description="Give us your opinion",
reward=USDCent(5),
keywords="market,research,amazing",
min_active=10,
)
@pytest.fixture
def hit_type_record(htm, hit_type) -> HitType:
hit_type.amt_hit_type_id = generate_amt_id()
return htm.get_or_create(hit_type)
@pytest.fixture
def hit_type_with_amt_id(htm, hit_type: HitType) -> HitType:
# This is a real hit type I've previously registered with amt (sandbox).
# It will always exist
hit_type.amt_hit_type_id = "3217B3DC4P5YW9DRV9R3X8O56V041J"
# Get or create our db
htm.get_or_create(hit_type)
# this call adds the pk int id ---^
return hit_type
# --- HIT ---
@pytest.fixture
def amt_hit_id() -> str:
return generate_amt_id()
@pytest.fixture
def hit(amt_hit_id, amt_hit_type_id, amt_group_id, question) -> Hit:
now = datetime.now(tz=timezone.utc)
return Hit.model_validate(
dict(
amt_hit_id=amt_hit_id,
amt_hit_type_id=amt_hit_type_id,
amt_group_id=amt_group_id,
status=HitStatus.Assignable,
review_status=HitReviewStatus.NotReviewed,
creation_time=now,
expiration=now + timedelta(days=3),
hit_question_xml=question.xml,
qualification_requirements=[],
max_assignments=1,
assignment_pending_count=0,
assignment_available_count=1,
assignment_completed_count=0,
description="Description",
keywords="Keywords",
reward=USDCent(5),
title="Title",
question_id=question.id,
hit_type_id=None,
)
)
@pytest.fixture
def hit_record(
hm,
question_record,
hit_type_record,
hit,
amt_hit_id,
) -> Hit:
"""
Returns a hit that exists in our db, but does not in amazon (the amt ids
are random). The mtwerk_hittype and mtwerk_question records will also
exist (in the db)
"""
hit.hit_type_id = hit_type_record.id
hit.amt_hit_id = amt_hit_id
hit.question_id = question_record.id
hm.create(hit)
return hit
@pytest.fixture
def hit_in_amt(hm, question_record, hit_type_with_amt_id: HitType) -> Hit:
# Actually create a new HIT in amt (sandbox)
hit = AMTManager.create_hit_with_hit_type(
hit_type=hit_type_with_amt_id, question=question_record
)
# Create it in the DB
hm.create(hit)
return hit
# --- Assignment ---
@pytest.fixture
def assignment_stub(hit: Hit, amt_assignment_id, amt_worker_id):
now = datetime.now(tz=timezone.utc)
return AssignmentStub(
amt_assignment_id=amt_assignment_id,
amt_hit_id=hit.amt_hit_id,
amt_worker_id=amt_worker_id,
status=AssignmentStatus.Submitted,
modified_at=now,
created_at=now,
)
@pytest.fixture
def assignment_factory(hit: Hit):
def inner(amt_worker_id: str = None):
now = datetime.now(tz=timezone.utc)
amt_assignment_id = generate_amt_id()
amt_worker_id = amt_worker_id or generate_amt_id()
return Assignment(
amt_assignment_id=amt_assignment_id,
amt_hit_id=hit.amt_hit_id,
amt_worker_id=amt_worker_id,
status=AssignmentStatus.Submitted,
modified_at=now,
created_at=now,
accept_time=now,
auto_approval_time=now,
submit_time=now,
)
return inner
@pytest.fixture
def assignment_in_db_factory(am, assignment_factory):
def inner(hit_id: int, amt_worker_id: Optional[str] = None):
a = assignment_factory(amt_worker_id=amt_worker_id)
a.hit_id = hit_id
am.create_stub(a)
am.update_answer(a)
return a
return inner
@pytest.fixture
def assignment_stub_in_db(am, hit_record, assignment_stub) -> AssignmentStub:
"""
Returns an AssignmentStub that exists in our db, but does not in amazon (the amt ids are random).
The mtwerk_hit, mtwerk_hittype, and mtwerk_question records will also exist (in the db)
"""
assignment_stub.hit_id = hit_record.id
am.create_stub(assignment_stub)
return assignment_stub
# --- HIT ---
@pytest.fixture
def amt_response_metadata():
req_id = str(uuid4())
return {
"RequestId": req_id,
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": req_id,
"content-type": "application/x-amz-json-1.1",
"content-length": "46",
"date": "Wed, 15 Oct 2025 02:16:16 GMT",
},
"RetryAttempts": 0,
}
@pytest.fixture
def create_hit_type_response(
amt_hit_type_id, amt_response_metadata
) -> CreateHITTypeResponseTypeDef:
return {
"HITTypeId": amt_hit_type_id,
"ResponseMetadata": amt_response_metadata,
}
@pytest.fixture
def create_hit_with_hit_type_response(
amt_hit_type_id, amt_hit_id, amt_response_metadata
) -> CreateHITWithHITTypeResponseTypeDef:
amt_group_id = generate_amt_id(length=30)
return {
"HIT": {
"HITId": amt_hit_id,
"HITTypeId": amt_hit_type_id,
"HITGroupId": amt_group_id,
"CreationTime": datetime(2025, 10, 14, 20, 22, tzinfo=tzlocal()),
"Title": "Test",
"Description": "test",
"Question": '\n\n https://jamesbillings67.com/work/\n 1200\n ',
"HITStatus": "Assignable",
"MaxAssignments": 1,
"Reward": "0.05",
"AutoApprovalDelayInSeconds": 2_592_000,
"Expiration": datetime(2025, 10, 14, 20, 24, 3, tzinfo=tzlocal()),
"AssignmentDurationInSeconds": 123,
"QualificationRequirements": [],
"HITReviewStatus": "NotReviewed",
"NumberOfAssignmentsPending": 0,
"NumberOfAssignmentsAvailable": 1,
"NumberOfAssignmentsCompleted": 0,
},
"ResponseMetadata": amt_response_metadata,
}
@pytest.fixture
def get_hit_response(
amt_hit_type_id, amt_hit_id, amt_response_metadata
) -> GetHITResponseTypeDef:
amt_group_id = generate_amt_id(length=30)
return {
"HIT": {
"HITId": amt_hit_id,
"HITTypeId": amt_hit_type_id,
"HITGroupId": amt_group_id,
"CreationTime": datetime(2025, 10, 13, 23, 0, 3, tzinfo=tzlocal()),
"Title": "Awesome Surveys!",
"Description": "Give us your opinion",
"Question": '\n\n https://jamesbillings67.com/work/\n 1200\n ',
"Keywords": "market,research,amazing",
"HITStatus": "Assignable",
"MaxAssignments": 1,
"Reward": "0.05",
"AutoApprovalDelayInSeconds": 604_800,
"Expiration": datetime(2025, 10, 27, 23, 0, 3, tzinfo=tzlocal()),
"AssignmentDurationInSeconds": 5_400,
"QualificationRequirements": [],
"HITReviewStatus": "NotReviewed",
"NumberOfAssignmentsPending": 0,
"NumberOfAssignmentsAvailable": 1,
"NumberOfAssignmentsCompleted": 0,
},
"ResponseMetadata": amt_response_metadata,
}
@pytest.fixture
def get_hit_response_reviewing(get_hit_response):
res = copy.deepcopy(get_hit_response)
res["HIT"]["NumberOfAssignmentsAvailable"] = 0
res["HIT"]["NumberOfAssignmentsCompleted"] = 1
res["HIT"]["HITStatus"] = "Reviewing"
return res
@pytest.fixture
def get_assignment_response(
amt_hit_type_id,
amt_hit_id,
amt_assignment_id,
amt_worker_id,
get_hit_response,
amt_response_metadata,
tsid,
) -> GetAssignmentResponseTypeDef:
hit_response = get_hit_response["HIT"]
local_now = datetime.now(tz=tzlocal())
return {
"Assignment": {
"AssignmentId": amt_assignment_id,
"WorkerId": amt_worker_id,
"HITId": amt_hit_id,
"AssignmentStatus": "Submitted",
"AutoApprovalTime": local_now + timedelta(days=7),
"AcceptTime": local_now - timedelta(minutes=10),
"SubmitTime": local_now,
"Deadline": local_now + timedelta(minutes=90),
"Answer": '\n'
'\n '
"\n amt_worker_id\n "
f" {amt_worker_id}\n \n \n "
" amt_assignment_id\n "
f" {amt_assignment_id}\n \n \n "
f" tsid\n {tsid}\n "
" \n",
"RequesterFeedback": "Good work",
},
"HIT": hit_response,
"ResponseMetadata": amt_response_metadata,
}
@pytest.fixture
def get_assignment_response_no_tsid(
get_assignment_response, amt_worker_id, amt_assignment_id
):
res = copy.deepcopy(get_assignment_response)
res["Assignment"]["Answer"] = (
'\n'
'\n '
"\n amt_worker_id\n "
f" {amt_worker_id}\n \n \n "
" amt_assignment_id\n "
f" {amt_assignment_id}\n \n "
# f"\n tsid\n {tsid}\n \n"
f""
)
return res
@pytest.fixture
def get_assignment_response_approved(
get_assignment_response: GetAssignmentResponseTypeDef,
):
def inner(feedback: str = APPROVAL_MESSAGE) -> GetAssignmentResponseTypeDef:
res = copy.deepcopy(get_assignment_response)
res["Assignment"]["AssignmentStatus"] = "Approved"
res["Assignment"]["RequesterFeedback"] = feedback
res["Assignment"]["ApprovalTime"] = res["Assignment"]["SubmitTime"]
return res
return inner
@pytest.fixture
def get_assignment_response_rejected(
get_assignment_response: GetAssignmentResponseTypeDef,
):
def inner(reject_reason: str = "reject reason") -> GetAssignmentResponseTypeDef:
res = copy.deepcopy(get_assignment_response)
res["Assignment"]["AssignmentStatus"] = "Rejected"
res["Assignment"]["RequesterFeedback"] = reject_reason
res["Assignment"]["RejectionTime"] = res["Assignment"]["SubmitTime"]
return res
return inner
@pytest.fixture
def get_assignment_response_rejected_no_tsid(
get_assignment_response_no_tsid: GetAssignmentResponseTypeDef,
):
def inner(reject_reason: str = "reject reason") -> GetAssignmentResponseTypeDef:
res = copy.deepcopy(get_assignment_response_no_tsid)
res["Assignment"]["AssignmentStatus"] = "Rejected"
res["Assignment"]["RequesterFeedback"] = reject_reason
res["Assignment"]["RejectionTime"] = res["Assignment"]["SubmitTime"]
return res
return inner
@pytest.fixture
def get_assignment_response_approved_no_tsid(
get_assignment_response_no_tsid: GetAssignmentResponseTypeDef,
):
res = copy.deepcopy(get_assignment_response_no_tsid)
res["Assignment"]["AssignmentStatus"] = "Approved"
res["Assignment"]["RequesterFeedback"] = NO_WORK_APPROVAL_MESSAGE
res["Assignment"]["ApprovalTime"] = res["Assignment"]["SubmitTime"]
return res