From f0f96f83c2630e890a2cbcab53f77fd4c37e1684 Mon Sep 17 00:00:00 2001 From: Max Nanis Date: Thu, 19 Feb 2026 02:43:23 -0500 Subject: Models, Project files, some pytests, requirements.. etc --- tests/flow/__init__.py | 0 tests/flow/test_tasks.py | 534 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 534 insertions(+) create mode 100644 tests/flow/__init__.py create mode 100644 tests/flow/test_tasks.py (limited to 'tests/flow') diff --git a/tests/flow/__init__.py b/tests/flow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/flow/test_tasks.py b/tests/flow/test_tasks.py new file mode 100644 index 0000000..37391d1 --- /dev/null +++ b/tests/flow/test_tasks.py @@ -0,0 +1,534 @@ +import logging +from contextlib import contextmanager +from datetime import timezone, datetime +from typing import Dict +from uuid import uuid4 + +import pytest +import requests +from botocore.stub import Stubber +from generalresearchutils.models.thl.payout import UserPayoutEvent +from generalresearchutils.models.thl.wallet import PayoutType +from generalresearchutils.models.thl.wallet.cashout_method import ( + CashoutRequestResponse, + CashoutRequestInfo, +) + +from jb.config import settings +from jb.decorators import AMT_CLIENT, AM, BM +from jb.flow.assignment_tasks import process_assignment_submitted +from jb.managers.amt import ( + AMTManager, + APPROVAL_MESSAGE, + REJECT_MESSAGE_BADDIE, + REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, + REJECT_MESSAGE_NO_WORK, + BONUS_MESSAGE, + NO_WORK_APPROVAL_MESSAGE, +) +from jb.models.currency import USDCent +from jb.models.definitions import AssignmentStatus, PayoutStatus +from jb.models.event import MTurkEvent + + +@contextmanager +def amt_stub_context(responses): + # ty chatgpt for this + with Stubber(AMT_CLIENT) as stub: + for r in responses: + stub.add_response( + r["operation"], + r.get("response", {}), + r.get("expected_params", {}), + ) + yield stub + + +@pytest.fixture +def approved_assignment_stubs( + get_assignment_response, + get_assignment_response_approved, + amt_assignment_id, + amt_hit_id, + get_hit_response_reviewing, +): + # These are the AMT_CLIENT stubs/mocks that need to be set when running + # process_assignment_submitted() which will result in an approved + # assignment and sent bonus + def _approved_assignment_stubs( + feedback: str = APPROVAL_MESSAGE, + override_response=None, + override_approve_response=None, + ): + response = override_response or get_assignment_response + approve_response = ( + override_approve_response or get_assignment_response_approved(feedback) + ) + return [ + { + "operation": "get_assignment", + "response": response, + "expected_params": {"AssignmentId": amt_assignment_id}, + }, + { + "operation": "approve_assignment", + "response": {}, + "expected_params": { + "AssignmentId": amt_assignment_id, + "RequesterFeedback": feedback, + "OverrideRejection": False, + }, + }, + { + "operation": "get_assignment", + "response": approve_response, + "expected_params": {"AssignmentId": amt_assignment_id}, + }, + { + "operation": "update_hit_review_status", + "response": {}, + "expected_params": {"HITId": amt_hit_id, "Revert": False}, + }, + { + "operation": "get_hit", + "response": get_hit_response_reviewing, + "expected_params": {"HITId": amt_hit_id}, + }, + ] + + return _approved_assignment_stubs + + +@pytest.fixture +def approved_assignment_stubs_w_bonus( + approved_assignment_stubs, amt_worker_id, amt_assignment_id, pe_id +): + now = datetime.now(tz=timezone.utc) + stubs = approved_assignment_stubs().copy() + stubs.append( + { + "operation": "send_bonus", + "response": {}, + "expected_params": { + "WorkerId": amt_worker_id, + "BonusAmount": "0.07", + "AssignmentId": amt_assignment_id, + "Reason": BONUS_MESSAGE, + "UniqueRequestToken": pe_id, + }, + } + ) + stubs.append( + { + "operation": "list_bonus_payments", + "response": { + "BonusPayments": [ + { + "WorkerId": amt_worker_id, + "BonusAmount": "0.07", + "AssignmentId": amt_assignment_id, + "Reason": BONUS_MESSAGE, + "GrantTime": now, + } + ] + }, + "expected_params": {"AssignmentId": amt_assignment_id}, + } + ) + return stubs + + +@pytest.fixture +def rejected_assignment_stubs( + get_assignment_response, + get_assignment_response_rejected, + amt_assignment_id, + amt_hit_id, + get_hit_response_reviewing, +): + # These are the AMT_CLIENT stubs/mocks that need to be set when running + # process_assignment_submitted() which will result in a rejected + # assignment + def _rejected_assignment_stubs( + reject_reason: str, override_response=None, override_reject_response=None + ): + response = override_response or get_assignment_response + reject_response = override_reject_response or get_assignment_response_rejected( + reject_reason + ) + return [ + { + "operation": "get_assignment", + "response": response, + "expected_params": {"AssignmentId": amt_assignment_id}, + }, + { + "operation": "reject_assignment", + "response": {}, + "expected_params": { + "AssignmentId": amt_assignment_id, + "RequesterFeedback": reject_reason, + }, + }, + { + "operation": "get_assignment", + "response": reject_response, + "expected_params": {"AssignmentId": amt_assignment_id}, + }, + { + "operation": "update_hit_review_status", + "response": {}, + "expected_params": {"HITId": amt_hit_id, "Revert": False}, + }, + { + "operation": "get_hit", + "response": get_hit_response_reviewing, + "expected_params": {"HITId": amt_hit_id}, + }, + ] + + return _rejected_assignment_stubs + + +@pytest.fixture +def mock_thl_responses(monkeypatch, amt_worker_id, tsid, pe_id): + original_get = requests.get + original_post = requests.post + + class MockThlCashoutRequestResponse: + def json(self): + return CashoutRequestResponse( + status="success", + cashout=CashoutRequestInfo( + id=pe_id, + description="amt something", + status=PayoutStatus.PENDING, + ), + ).model_dump(mode="json") + + def _apply_mock( + user_blocked: bool = False, + status_finished: bool = True, + status_complete: bool = False, + wallet_redeemable_amount: int = 10, + ): + + def mock_get(url, *args, **kwargs): + profile_url = f"{settings.fsb_host}{settings.product_id}/user/{amt_worker_id}/profile/" + status_url = f"{settings.fsb_host}{settings.product_id}/status/{tsid}/" + cashout_request_url = f"{settings.fsb_host}{settings.product_id}/cashout/" + wallet_url = f"{settings.fsb_host}{settings.product_id}/wallet/" + if url == profile_url: + + class MockThlProfileResponse: + def json(self): + return {"user-profile": {"user": {"blocked": user_blocked}}} + + return MockThlProfileResponse() + + elif url == wallet_url: + + class MockThlWalletResponse: + def json(self): + return { + "wallet": {"redeemable_amount": wallet_redeemable_amount} + } + + return MockThlWalletResponse() + + elif url == status_url: + + class MockThlStatusResponse: + def json(self): + return { + "tsid": tsid, + "product_id": str(settings.product_id), + "product_user_id": amt_worker_id, + "started": "2020-06-02T00:30:35.036398Z", + "finished": ( + "2020-06-02T00:31:35.036398Z" + if status_finished + else None + ), + "status": 3 if status_complete else 2, + "payout": 10 if status_complete else 0, + "user_payout": 10 if status_complete else 0, + "status_code_1": "BUYER_FAIL", + "status_code_2": None, + } + + return MockThlStatusResponse() + elif url == cashout_request_url: + return MockThlCashoutRequestResponse() + else: + raise ValueError(f"unhandled call: {url=} {args=} {kwargs=}") + return original_get(url, *args, **kwargs) + + def mock_post(url, *args, **kwargs): + cashout_request_url = f"{settings.fsb_host}{settings.product_id}/cashout/" + manage_cashout_request_url = f"{settings.fsb_host}{settings.fsb_host_private_route}/thl/manage_cashout/" + + if url == cashout_request_url: + return MockThlCashoutRequestResponse() + + elif url == manage_cashout_request_url: + json = kwargs["json"] + print(json) + payout_id = json["payout_id"] + new_status = json["new_status"] + + class MockThlManageCashoutResponse: + def json(self): + return UserPayoutEvent( + uuid=payout_id, + status=new_status, + amount=USDCent(5), + debit_account_uuid=uuid4().hex, + cashout_method_uuid=uuid4().hex, + payout_type=PayoutType.AMT, + ).model_dump(mode="json") + + return MockThlManageCashoutResponse() + return original_post(url, *args, **kwargs) + + monkeypatch.setattr(requests, "get", mock_get) + monkeypatch.setattr(requests, "post", mock_post) + + return _apply_mock + + +@pytest.fixture +def mturk_event(amt_assignment_id, amt_hit_id, amt_hit_type_id): + now = datetime.now(tz=timezone.utc) + return MTurkEvent( + event_type="AssignmentSubmitted", + event_timestamp=now, + amt_assignment_id=amt_assignment_id, + amt_hit_type_id=amt_hit_type_id, + amt_hit_id=amt_hit_id, + ) + + +def test_fake_get_assignment( + amt_assignment_id, amt_worker_id, get_assignment_response: Dict +): + # Testing just that this boto stubber works (we fake a response using the real boto client) + fake_response = get_assignment_response.copy() + with Stubber(AMT_CLIENT) as stub: + expected_params = {"AssignmentId": amt_assignment_id} + stub.add_response("get_assignment", fake_response, expected_params) + + assignment = AMTManager.get_assignment_if_exists( + amt_assignment_id=amt_assignment_id + ) + assert assignment.amt_assignment_id == amt_assignment_id + assert assignment.amt_worker_id == amt_worker_id + + # Optionally, ensure all queued responses were used: + stub.assert_no_pending_responses() + + +class TestProcessAssignmentSubmitted: + + def test_no_assignment_in_db( + self, + mturk_event, + amt_assignment_id, + get_assignment_response: Dict, + caplog, + hit_in_db, + rejected_assignment_stubs, + ): + # An assignment is submitted. The hit exists in the DB. The amt assignment id is valid, + # but the assignment stub is not in our db. Reject it and write the assignment to the db. + + amt_stubs = rejected_assignment_stubs( + reject_reason=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT + ) + + with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING): + process_assignment_submitted(mturk_event) + stub.assert_no_pending_responses() + + assert f"No assignment found in DB: {amt_assignment_id}" in caplog.text + assert f"Rejected assignment doesn't exist in DB. Creating ... " in caplog.text + assert f"Rejected assignment: " in caplog.text + stub.assert_no_pending_responses() + + ass = AM.get(amt_assignment_id=amt_assignment_id) + assert ass.status == AssignmentStatus.Rejected + assert ass.requester_feedback == REJECT_MESSAGE_UNKNOWN_ASSIGNMENT + + def test_assignment_in_db_user_doesnt_exist( + self, + mturk_event, + amt_assignment_id, + assignment_stub_in_db, + caplog, + mock_thl_responses, + rejected_assignment_stubs, + ): + # An assignment is submitted. The hit and assignment stub exist in the DB. We think we're going to + # approve the assignment, but the user-profile / check blocked call on THL shows the user doesn't + # exist (same thing would happen if the user does exist and is blocked). So we reject. + + _ = assignment_stub_in_db # we need this to make the assignment stub in the db + + amt_stubs = rejected_assignment_stubs(reject_reason=REJECT_MESSAGE_BADDIE) + + mock_thl_responses(user_blocked=True) + + with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING): + process_assignment_submitted(mturk_event) + stub.assert_no_pending_responses() + + assert f"No assignment found in DB: {amt_assignment_id}" not in caplog.text + assert f"blocked or not exists" in caplog.text + assert f"Rejected assignment: " in caplog.text + + ass = AM.get(amt_assignment_id=amt_assignment_id) + assert ass.status == AssignmentStatus.Rejected + assert ass.requester_feedback == REJECT_MESSAGE_BADDIE + + def test_no_work_w_warning( + self, + mturk_event, + amt_assignment_id, + assignment_stub_in_db, + caplog, + mock_thl_responses, + approved_assignment_stubs, + get_assignment_response_approved_no_tsid, + get_assignment_response_no_tsid, + ): + # An assignment is submitted. The hit and assignment stub exist in the DB. + # The assignment has no tsid. + # We APPROVE this assignment b/c we are very nice and give users a couple + # chances, with an explanation, before rejecting. + + _ = assignment_stub_in_db # we need this to make the assignment stub in the db + + # Simulate that the AMT.get_assignment call returns the assignment, but the answers xml + # has no tsid. + amt_stubs = approved_assignment_stubs( + feedback=NO_WORK_APPROVAL_MESSAGE, + override_response=get_assignment_response_no_tsid, + override_approve_response=get_assignment_response_approved_no_tsid, + ) + + mock_thl_responses(user_blocked=False) + + with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING): + process_assignment_submitted(mturk_event) + stub.assert_no_pending_responses() + + assert f"No assignment found in DB: {amt_assignment_id}" not in caplog.text + assert f"Assignment submitted with no tsid" in caplog.text + assert f"Approved assignment: " in caplog.text + + ass = AM.get(amt_assignment_id=amt_assignment_id) + assert ass.status == AssignmentStatus.Approved + assert ass.requester_feedback == NO_WORK_APPROVAL_MESSAGE + assert AM.missing_tsid_count(amt_worker_id=ass.amt_worker_id) == 1 + + def test_no_work_no_warning( + self, + mturk_event, + amt_assignment_id, + assignment_stub_in_db, + caplog, + mock_thl_responses, + rejected_assignment_stubs, + get_assignment_response_rejected_no_tsid, + get_assignment_response_no_tsid, + assignment_in_db_factory, + hit_in_db, + amt_worker_id, + ): + # An assignment is submitted. The hit and assignment stub exist in the DB. + # The assignment has no tsid. + + # Going to create and submit 3 assignments w no work + # (all on the same hit, which we don't do in JB for real, + # but doesn't matter here) + a1 = assignment_in_db_factory(hit_id=hit_in_db.id, amt_worker_id=amt_worker_id) + a2 = assignment_in_db_factory(hit_id=hit_in_db.id, amt_worker_id=amt_worker_id) + a3 = assignment_in_db_factory(hit_id=hit_in_db.id, amt_worker_id=amt_worker_id) + assert AM.missing_tsid_count(amt_worker_id=amt_worker_id) == 3 + # So now, we'll reject, b/c they've already gotten 3 warnings + + _ = assignment_stub_in_db # we need this to make the assignment stub in the db + + # Simulate that the AMT.get_assignment call returns the assignment, but the answers xml + # has no tsid. + amt_stubs = rejected_assignment_stubs( + reject_reason=REJECT_MESSAGE_NO_WORK, + override_response=get_assignment_response_no_tsid, + override_reject_response=get_assignment_response_rejected_no_tsid( + REJECT_MESSAGE_NO_WORK + ), + ) + + mock_thl_responses(user_blocked=False) + + with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING): + process_assignment_submitted(mturk_event) + stub.assert_no_pending_responses() + + assert f"No assignment found in DB: {amt_assignment_id}" not in caplog.text + assert f"Assignment submitted with no tsid" in caplog.text + assert f"Rejected assignment: " in caplog.text + + # It will exist in the db since we can validate the model. + ass = AM.get(amt_assignment_id=amt_assignment_id) + assert ass.status == AssignmentStatus.Rejected + assert ass.requester_feedback == REJECT_MESSAGE_NO_WORK + + def test_assignment_submitted_no_bonus( + self, + mturk_event, + amt_assignment_id, + assignment_stub_in_db, + caplog, + mock_thl_responses, + approved_assignment_stubs, + ): + _ = assignment_stub_in_db # we need this to make the assignment stub in the db + # The "send bonus" stuff will still run, even if the user didn't get a complete, + # because all we do is check the user's wallet balance (if an assignment is approved) + # and they may have money in their wallet from a prev event or bribe + # So mock the wallet balance as 1cent, so no bonus will be triggered + mock_thl_responses(status_complete=False, wallet_redeemable_amount=1) + with amt_stub_context(approved_assignment_stubs()) as stub, caplog.at_level( + logging.WARNING + ): + process_assignment_submitted(mturk_event) + stub.assert_no_pending_responses() + + ass = AM.get(amt_assignment_id=amt_assignment_id) + assert ass.status == AssignmentStatus.Approved + assert ass.requester_feedback == APPROVAL_MESSAGE + + def test_assignment_submitted_w_bonus( + self, + mturk_event, + amt_assignment_id, + assignment_stub_in_db, + caplog, + mock_thl_responses, + approved_assignment_stubs_w_bonus, + ): + _ = assignment_stub_in_db # we need this to make the assignment stub in the db + mock_thl_responses(status_complete=True, wallet_redeemable_amount=10) + with amt_stub_context( + approved_assignment_stubs_w_bonus + ) as stub, caplog.at_level(logging.WARNING): + process_assignment_submitted(mturk_event) + stub.assert_no_pending_responses() + + ass = AM.get(amt_assignment_id=amt_assignment_id) + assert ass.status == AssignmentStatus.Approved + assert ass.requester_feedback == APPROVAL_MESSAGE + + bonus = BM.filter(amt_assignment_id=amt_assignment_id)[0] + assert bonus.amount == USDCent(7) -- cgit v1.2.3