import copy from datetime import datetime, timezone, timedelta import os from typing import Optional from uuid import uuid4 from dotenv import load_dotenv import pytest from dateutil.tz import tzlocal from mypy_boto3_mturk.type_defs import ( GetHITResponseTypeDef, CreateHITTypeResponseTypeDef, CreateHITWithHITTypeResponseTypeDef, GetAssignmentResponseTypeDef, ) from jb.managers import Permission from generalresearchutils.pg_helper import PostgresConfig from jb.managers.amt import AMTManager, APPROVAL_MESSAGE, NO_WORK_APPROVAL_MESSAGE from jb.models.assignment import AssignmentStub, Assignment from generalresearchutils.currency import USDCent from jb.models.definitions import HitStatus, HitReviewStatus, AssignmentStatus from jb.models.hit import HitType, HitQuestion, Hit from tests import generate_amt_id @pytest.fixture def amt_hit_type_id() -> str: return generate_amt_id() @pytest.fixture def amt_assignment_id() -> str: return generate_amt_id() @pytest.fixture def amt_worker_id() -> str: return generate_amt_id(length=21) @pytest.fixture def amt_group_id() -> str: return generate_amt_id() @pytest.fixture def tsid() -> str: return uuid4().hex @pytest.fixture def tsid1() -> str: return uuid4().hex @pytest.fixture def tsid2() -> str: return uuid4().hex @pytest.fixture def pe_id() -> str: # payout event / cashout request UUID return uuid4().hex # --- Settings --- @pytest.fixture(scope="session") def env_file_path(pytestconfig): root_path = pytestconfig.rootpath env_path = os.path.join(root_path, ".env.test") if os.path.exists(env_path): load_dotenv(dotenv_path=env_path, override=True) return env_path @pytest.fixture(scope="session") def settings(env_file_path) -> "Settings": from jb.settings import Settings as JBSettings s = JBSettings(_env_file=env_file_path) return s # --- Database Connectors --- @pytest.fixture(scope="session") def redis(settings): from generalresearchutils.redis_helper import RedisConfig redis_config = RedisConfig( dsn=settings.redis, decode_responses=True, socket_timeout=settings.redis_timeout, socket_connect_timeout=settings.redis_timeout, ) return redis_config.create_redis_client() @pytest.fixture(scope="session") def pg_config(settings) -> PostgresConfig: return PostgresConfig( dsn=settings.amt_jb_db, connect_timeout=1, statement_timeout=1, ) # --- Managers --- @pytest.fixture(scope="session") def hqm(pg_config) -> "HitQuestionManager": assert "/unittest-" in pg_config.dsn.path from jb.managers.hit import HitQuestionManager return HitQuestionManager( pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] ) @pytest.fixture(scope="session") def htm(pg_config) -> "HitTypeManager": assert "/unittest-" in pg_config.dsn.path from jb.managers.hit import HitTypeManager return HitTypeManager( pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] ) @pytest.fixture(scope="session") def hm(pg_config) -> "HitManager": assert "/unittest-" in pg_config.dsn.path from jb.managers.hit import HitManager return HitManager( pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] ) @pytest.fixture(scope="session") def am(pg_config) -> "AssignmentManager": assert "/unittest-" in pg_config.dsn.path from jb.managers.assignment import AssignmentManager return AssignmentManager( pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] ) @pytest.fixture(scope="session") def bm(pg_config) -> "BonusManager": assert "/unittest-" in pg_config.dsn.path from jb.managers.bonus import BonusManager return BonusManager( pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] ) # --- Question --- @pytest.fixture def question() -> HitQuestion: return HitQuestion(url="https://jamesbillings67.com/work/", height=1200) @pytest.fixture def question_record(hqm, question) -> HitQuestion: return hqm.get_or_create(question) # --- HITType --- @pytest.fixture def hit_type() -> HitType: return HitType( title="Awesome Surveys!", description="Give us your opinion", reward=USDCent(5), keywords="market,research,amazing", min_active=10, ) @pytest.fixture def hit_type_record(htm, hit_type) -> HitType: hit_type.amt_hit_type_id = generate_amt_id() return htm.get_or_create(hit_type) @pytest.fixture def hit_type_with_amt_id(htm, hit_type: HitType) -> HitType: # This is a real hit type I've previously registered with amt (sandbox). # It will always exist hit_type.amt_hit_type_id = "3217B3DC4P5YW9DRV9R3X8O56V041J" # Get or create our db htm.get_or_create(hit_type) # this call adds the pk int id ---^ return hit_type # --- HIT --- @pytest.fixture def amt_hit_id() -> str: return generate_amt_id() @pytest.fixture def hit(amt_hit_id, amt_hit_type_id, amt_group_id, question) -> Hit: now = datetime.now(tz=timezone.utc) return Hit.model_validate( dict( amt_hit_id=amt_hit_id, amt_hit_type_id=amt_hit_type_id, amt_group_id=amt_group_id, status=HitStatus.Assignable, review_status=HitReviewStatus.NotReviewed, creation_time=now, expiration=now + timedelta(days=3), hit_question_xml=question.xml, qualification_requirements=[], max_assignments=1, assignment_pending_count=0, assignment_available_count=1, assignment_completed_count=0, description="Description", keywords="Keywords", reward=USDCent(5), title="Title", question_id=question.id, hit_type_id=None, ) ) @pytest.fixture def hit_record( hm, question_record, hit_type_record, hit, amt_hit_id, ) -> Hit: """ Returns a hit that exists in our db, but does not in amazon (the amt ids are random). The mtwerk_hittype and mtwerk_question records will also exist (in the db) """ hit.hit_type_id = hit_type_record.id hit.amt_hit_id = amt_hit_id hit.question_id = question_record.id hm.create(hit) return hit @pytest.fixture def hit_in_amt(hm, question_record, hit_type_with_amt_id: HitType) -> Hit: # Actually create a new HIT in amt (sandbox) hit = AMTManager.create_hit_with_hit_type( hit_type=hit_type_with_amt_id, question=question_record ) # Create it in the DB hm.create(hit) return hit # --- Assignment --- @pytest.fixture def assignment_stub(hit: Hit, amt_assignment_id, amt_worker_id): now = datetime.now(tz=timezone.utc) return AssignmentStub( amt_assignment_id=amt_assignment_id, amt_hit_id=hit.amt_hit_id, amt_worker_id=amt_worker_id, status=AssignmentStatus.Submitted, modified_at=now, created_at=now, ) @pytest.fixture def assignment_factory(hit: Hit): def inner(amt_worker_id: str = None): now = datetime.now(tz=timezone.utc) amt_assignment_id = generate_amt_id() amt_worker_id = amt_worker_id or generate_amt_id() return Assignment( amt_assignment_id=amt_assignment_id, amt_hit_id=hit.amt_hit_id, amt_worker_id=amt_worker_id, status=AssignmentStatus.Submitted, modified_at=now, created_at=now, accept_time=now, auto_approval_time=now, submit_time=now, ) return inner @pytest.fixture def assignment_in_db_factory(am, assignment_factory): def inner(hit_id: int, amt_worker_id: Optional[str] = None): a = assignment_factory(amt_worker_id=amt_worker_id) a.hit_id = hit_id am.create_stub(a) am.update_answer(a) return a return inner @pytest.fixture def assignment_stub_in_db(am, hit_record, assignment_stub) -> AssignmentStub: """ Returns an AssignmentStub that exists in our db, but does not in amazon (the amt ids are random). The mtwerk_hit, mtwerk_hittype, and mtwerk_question records will also exist (in the db) """ assignment_stub.hit_id = hit_record.id am.create_stub(assignment_stub) return assignment_stub # --- HIT --- @pytest.fixture def amt_response_metadata(): req_id = str(uuid4()) return { "RequestId": req_id, "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": req_id, "content-type": "application/x-amz-json-1.1", "content-length": "46", "date": "Wed, 15 Oct 2025 02:16:16 GMT", }, "RetryAttempts": 0, } @pytest.fixture def create_hit_type_response( amt_hit_type_id, amt_response_metadata ) -> CreateHITTypeResponseTypeDef: return { "HITTypeId": amt_hit_type_id, "ResponseMetadata": amt_response_metadata, } @pytest.fixture def create_hit_with_hit_type_response( amt_hit_type_id, amt_hit_id, amt_response_metadata ) -> CreateHITWithHITTypeResponseTypeDef: amt_group_id = generate_amt_id(length=30) return { "HIT": { "HITId": amt_hit_id, "HITTypeId": amt_hit_type_id, "HITGroupId": amt_group_id, "CreationTime": datetime(2025, 10, 14, 20, 22, tzinfo=tzlocal()), "Title": "Test", "Description": "test", "Question": '\n\n https://jamesbillings67.com/work/\n 1200\n ', "HITStatus": "Assignable", "MaxAssignments": 1, "Reward": "0.05", "AutoApprovalDelayInSeconds": 2_592_000, "Expiration": datetime(2025, 10, 14, 20, 24, 3, tzinfo=tzlocal()), "AssignmentDurationInSeconds": 123, "QualificationRequirements": [], "HITReviewStatus": "NotReviewed", "NumberOfAssignmentsPending": 0, "NumberOfAssignmentsAvailable": 1, "NumberOfAssignmentsCompleted": 0, }, "ResponseMetadata": amt_response_metadata, } @pytest.fixture def get_hit_response( amt_hit_type_id, amt_hit_id, amt_response_metadata ) -> GetHITResponseTypeDef: amt_group_id = generate_amt_id(length=30) return { "HIT": { "HITId": amt_hit_id, "HITTypeId": amt_hit_type_id, "HITGroupId": amt_group_id, "CreationTime": datetime(2025, 10, 13, 23, 0, 3, tzinfo=tzlocal()), "Title": "Awesome Surveys!", "Description": "Give us your opinion", "Question": '\n\n https://jamesbillings67.com/work/\n 1200\n ', "Keywords": "market,research,amazing", "HITStatus": "Assignable", "MaxAssignments": 1, "Reward": "0.05", "AutoApprovalDelayInSeconds": 604_800, "Expiration": datetime(2025, 10, 27, 23, 0, 3, tzinfo=tzlocal()), "AssignmentDurationInSeconds": 5_400, "QualificationRequirements": [], "HITReviewStatus": "NotReviewed", "NumberOfAssignmentsPending": 0, "NumberOfAssignmentsAvailable": 1, "NumberOfAssignmentsCompleted": 0, }, "ResponseMetadata": amt_response_metadata, } @pytest.fixture def get_hit_response_reviewing(get_hit_response): res = copy.deepcopy(get_hit_response) res["HIT"]["NumberOfAssignmentsAvailable"] = 0 res["HIT"]["NumberOfAssignmentsCompleted"] = 1 res["HIT"]["HITStatus"] = "Reviewing" return res @pytest.fixture def get_assignment_response( amt_hit_type_id, amt_hit_id, amt_assignment_id, amt_worker_id, get_hit_response, amt_response_metadata, tsid, ) -> GetAssignmentResponseTypeDef: hit_response = get_hit_response["HIT"] local_now = datetime.now(tz=tzlocal()) return { "Assignment": { "AssignmentId": amt_assignment_id, "WorkerId": amt_worker_id, "HITId": amt_hit_id, "AssignmentStatus": "Submitted", "AutoApprovalTime": local_now + timedelta(days=7), "AcceptTime": local_now - timedelta(minutes=10), "SubmitTime": local_now, "Deadline": local_now + timedelta(minutes=90), "Answer": '\n' '\n ' "\n amt_worker_id\n " f" {amt_worker_id}\n \n \n " " amt_assignment_id\n " f" {amt_assignment_id}\n \n \n " f" tsid\n {tsid}\n " " \n", "RequesterFeedback": "Good work", }, "HIT": hit_response, "ResponseMetadata": amt_response_metadata, } @pytest.fixture def get_assignment_response_no_tsid( get_assignment_response, amt_worker_id, amt_assignment_id ): res = copy.deepcopy(get_assignment_response) res["Assignment"]["Answer"] = ( '\n' '\n ' "\n amt_worker_id\n " f" {amt_worker_id}\n \n \n " " amt_assignment_id\n " f" {amt_assignment_id}\n \n " # f"\n tsid\n {tsid}\n \n" f"" ) return res @pytest.fixture def get_assignment_response_approved( get_assignment_response: GetAssignmentResponseTypeDef, ): def inner(feedback: str = APPROVAL_MESSAGE) -> GetAssignmentResponseTypeDef: res = copy.deepcopy(get_assignment_response) res["Assignment"]["AssignmentStatus"] = "Approved" res["Assignment"]["RequesterFeedback"] = feedback res["Assignment"]["ApprovalTime"] = res["Assignment"]["SubmitTime"] return res return inner @pytest.fixture def get_assignment_response_rejected( get_assignment_response: GetAssignmentResponseTypeDef, ): def inner(reject_reason: str = "reject reason") -> GetAssignmentResponseTypeDef: res = copy.deepcopy(get_assignment_response) res["Assignment"]["AssignmentStatus"] = "Rejected" res["Assignment"]["RequesterFeedback"] = reject_reason res["Assignment"]["RejectionTime"] = res["Assignment"]["SubmitTime"] return res return inner @pytest.fixture def get_assignment_response_rejected_no_tsid( get_assignment_response_no_tsid: GetAssignmentResponseTypeDef, ): def inner(reject_reason: str = "reject reason") -> GetAssignmentResponseTypeDef: res = copy.deepcopy(get_assignment_response_no_tsid) res["Assignment"]["AssignmentStatus"] = "Rejected" res["Assignment"]["RequesterFeedback"] = reject_reason res["Assignment"]["RejectionTime"] = res["Assignment"]["SubmitTime"] return res return inner @pytest.fixture def get_assignment_response_approved_no_tsid( get_assignment_response_no_tsid: GetAssignmentResponseTypeDef, ): res = copy.deepcopy(get_assignment_response_no_tsid) res["Assignment"]["AssignmentStatus"] = "Approved" res["Assignment"]["RequesterFeedback"] = NO_WORK_APPROVAL_MESSAGE res["Assignment"]["ApprovalTime"] = res["Assignment"]["SubmitTime"] return res