diff options
| -rw-r--r-- | jb/flow/assignment_tasks.py | 444 | ||||
| -rw-r--r-- | jb/managers/thl.py | 87 | ||||
| -rw-r--r-- | jb/settings.py | 3 |
3 files changed, 534 insertions, 0 deletions
diff --git a/jb/flow/assignment_tasks.py b/jb/flow/assignment_tasks.py new file mode 100644 index 0000000..4022716 --- /dev/null +++ b/jb/flow/assignment_tasks.py @@ -0,0 +1,444 @@ +import logging +import math +from datetime import timedelta +from typing import Optional + +from generalresearchutils.models.thl.definitions import PayoutStatus, StatusCode1 +from generalresearchutils.models.thl.wallet.cashout_method import CashoutRequestInfo + +from jb.decorators import AM, HM, BM +from jb.flow.monitoring import emit_error_event, emit_assignment_event, emit_bonus_event +from jb.managers.amt import ( + AMTManager, + REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, + REJECT_MESSAGE_NO_WORK, + NO_WORK_APPROVAL_MESSAGE, + REJECT_MESSAGE_BADDIE, + APPROVAL_MESSAGE, + BONUS_MESSAGE, +) +from jb.managers.thl import ( + get_user_blocked, + get_task_status, + user_cashout_request, + AMT_ASSIGNMENT_CASHOUT_METHOD, + manage_pending_cashout, + get_user_blocked_or_not_exists, + get_wallet_balance, + AMT_BONUS_CASHOUT_METHOD, +) +from jb.models.assignment import Assignment +from jb.models.currency import USDCent +from jb.models.definitions import AssignmentStatus +from jb.models.event import MTurkEvent + + +def process_assignment_submitted(event: MTurkEvent) -> None: + """ + Called either directly or from the SNS Notification that a HIT was submitted + + :return: None + """ + # + # Step 1: Attempt to get the Assignment out of the API + # + logging.info(f"{event=}") + + # This is the assignment model from AMT. In the DB, we should only have + # the AssignmentStub + # This call is hitting AMT, not our db. + # The API won't even return is unless it has been submitted. So if this + # fails, we don't need to do anything (i.e. reject it). + assignment = AMTManager.get_assignment_if_exists( + amt_assignment_id=event.amt_assignment_id + ) + if assignment is None: + # It is not found in amt, either it is invalid, not yet submitted, or + # already been approved/rejected, so we just do nothing ... + # todo: maybe we confirm its state matches what we have in the db + logging.warning(f"No assignment found on AMT: {event.amt_assignment_id}") + emit_error_event( + event_type="assignment_not_found_in_amt", + amt_hit_type_id=event.amt_hit_type_id, + ) + return None + + # Even if the assignment doesn't exist, the hit must ... + hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + + # + # Step 2: Attempt to get the Assignment out of the DB + # + # Now, we need to confirm it is something that we have in the db. If not, + # that means either something broke, or some funny business is happening + # (maybe a baddie is submitting an assignment without doing any work). + stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) + if stub is None: + # When they visited the "work" page, it should have created an + # AssignmentStub in the db. If that doesn't exist, something bad + # happened. + logging.warning(f"No assignment found in DB: {event.amt_assignment_id}") + emit_error_event( + event_type="assignment_stub_not_found_in_db", + amt_hit_type_id=event.amt_hit_type_id, + ) + reject_assignment( + amt_assignment_id=assignment.amt_assignment_id, + msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, + amt_hit_type_id=hit.amt_hit_type_id, + ) + review_hit(assignment) + return None + + assert assignment.amt_assignment_id == event.amt_assignment_id + assert assignment.amt_hit_id == event.amt_hit_id + assert assignment.amt_hit_id == stub.amt_hit_id + assert assignment.amt_worker_id == stub.amt_worker_id + amt_assignment_id = assignment.amt_assignment_id + amt_worker_id = assignment.amt_worker_id + + # We don't have a TSID associated with the assignment until we the + # assignment is submitted. + AM.update_answer(assignment=assignment) + + # check if the user is blocked by thl + if get_user_blocked_or_not_exists(amt_worker_id=amt_worker_id): + logging.warning( + f"User {amt_worker_id} blocked or not exists. Rejecting: {amt_assignment_id}" + ) + emit_error_event( + event_type="assignment_submitted_user_blocked_or_not_exists", + amt_hit_type_id=event.amt_hit_type_id, + ) + reject_assignment( + amt_assignment_id=amt_assignment_id, + msg=REJECT_MESSAGE_BADDIE, + amt_hit_type_id=hit.amt_hit_type_id, + ) + review_hit(assignment) + return None + + if assignment.tsid is None: + assignment = handle_assignment_w_no_work(assignment) + else: + # We need to validate the work exists on thl, and if so, approve + assignment = handle_assignment_w_work(assignment) + + # + # Step 4: Tell Amazon we've reviewed the HIT, and update the DB + # + review_hit(assignment) + + if ( + assignment.tsid + and assignment.status == AssignmentStatus.Approved + and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE + ): + return issue_worker_payment(assignment) + + +def review_hit(assignment): + # Reviewable to Reviewing + AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False) + hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id) + # Update the db + HM.update_hit(hit) + + +def handle_assignment_w_no_work(assignment: Assignment) -> Assignment: + """ + Called when an assignment is submitted without a wall event. + Not entirely clear why this happens. I think they accept a HIT, get no work + available for whatever reason, then report, and submit it. + + :return: The Assignment + """ + logging.warning( + f"Assignment submitted with no tsid: {assignment.amt_assignment_id}" + ) + amt_worker_id = assignment.amt_worker_id + amt_assignment_id = assignment.amt_assignment_id + hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + emit_error_event( + event_type="assignment_submitted_no_work", + amt_hit_type_id=hit.amt_hit_type_id, + ) + + # They get 3 chances per week. If exceeded: Reject! + if (AM.missing_tsid_count(amt_worker_id=amt_worker_id) >= 3) or ( + AM.rejected_count(amt_worker_id=amt_worker_id) >= 3 + or get_user_blocked(amt_worker_id=amt_worker_id) + ): + assignment = reject_assignment( + amt_assignment_id=amt_assignment_id, + msg=REJECT_MESSAGE_NO_WORK, + amt_hit_type_id=hit.amt_hit_type_id, + ) + # todo: we don't have a way to "block" a user (i.e. tattle to thl) + # make_block_worker_decision(user) + return assignment + + # Approve with a message explaining they shouldn't do it. + assignment = approve_assignment( + amt_assignment_id=amt_assignment_id, + msg=NO_WORK_APPROVAL_MESSAGE, + amt_hit_type_id=hit.amt_hit_type_id, + ) + + return assignment + + +def reject_assignment( + amt_assignment_id: str, msg: str, amt_hit_type_id: str +) -> Assignment: + # Reject in AMT, update db + + res = AMTManager.reject_assignment_if_possible( + amt_assignment_id=amt_assignment_id, msg=msg + ) + if res is None: + # We failed to reject this assignment. Will this happen? + emit_error_event( + event_type="failed_to_reject_assignment", + amt_hit_type_id=amt_hit_type_id, + ) + raise Exception(f"Failed to reject assignment: {amt_assignment_id}") + + # We just rejected this assignment, get it from amazon again + assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) + assert assignment.status == AssignmentStatus.Rejected + + # And update the db. The assignment may not actually exist in the db (if + # a baddie intercepted it and is trying to game us). So, we might + # need to create as assignment first ... + stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) + if stub is None: + logging.warning( + f"Rejected assignment doesn't exist in DB. Creating ... : {amt_assignment_id}" + ) + # Even if the assignment doesn't exist, the hit must ... + hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + assignment.hit_id = hit.id + AM.create(assignment=assignment) + AM.reject(assignment=assignment) + emit_assignment_event( + status=AssignmentStatus.Rejected, amt_hit_type_id=amt_hit_type_id, reason=msg + ) + logging.warning(f"Rejected assignment: {amt_assignment_id}") + return assignment + + +def approve_assignment( + amt_assignment_id: str, msg: str, amt_hit_type_id: str +) -> Assignment: + # Approve in AMT, update db + + res = AMTManager.approve_assignment_if_possible( + amt_assignment_id=amt_assignment_id, msg=msg + ) + if res is None: + # We failed to approve this assignment. Will this happen? + emit_error_event( + event_type="failed_to_approve_assignment", + amt_hit_type_id=amt_hit_type_id, + ) + raise Exception(f"Failed to approve assignment: {amt_assignment_id}") + + # We just approved this assignment, get it from amazon again + assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) + assert assignment.status == AssignmentStatus.Approved + # And update the db + AM.approve(assignment=assignment) + emit_assignment_event( + status=AssignmentStatus.Approved, amt_hit_type_id=amt_hit_type_id, reason=msg + ) + logging.warning(f"Approved assignment: {amt_assignment_id}") + return assignment + + +def handle_assignment_w_work(assignment: Assignment) -> Assignment: + """ + Called when an assignment is submitted with a tsid. + - Check the tsid (thl status endpoint). Make sure it is finished, and + stuff matches (doesn't matter if not a complete) + - Try to submit a cashout request for the HIT payout (e.g. 5c) + """ + + amt_worker_id = assignment.amt_worker_id + amt_assignment_id = assignment.amt_assignment_id + tsid = assignment.tsid + hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + + tsr = get_task_status(tsid=tsid) + if ( + tsr is None + or tsr.status is None + or tsr.status_code_1 + in { + StatusCode1.SESSION_START_FAIL, + StatusCode1.SESSION_START_QUALITY_FAIL, + StatusCode1.SESSION_CONTINUE_QUALITY_FAIL, + } + ): + # TSID doesn't exist or work is not finished: + # Reject the assignment instead + if tsr is not None and tsr.status_code_1 in { + StatusCode1.SESSION_START_QUALITY_FAIL, + StatusCode1.SESSION_CONTINUE_QUALITY_FAIL, + }: + event_type = "assignment_submitted_quality_fail" + else: + event_type = "assignment_submitted_work_not_complete" + emit_error_event( + event_type=event_type, + amt_hit_type_id=hit.amt_hit_type_id, + ) + assignment = reject_assignment( + amt_assignment_id=amt_assignment_id, + msg=REJECT_MESSAGE_BADDIE, + amt_hit_type_id=hit.amt_hit_type_id, + ) + return assignment + + assert tsr.product_user_id == amt_worker_id + assert tsr.finished is not None + assert (tsr.finished - assignment.created_at) <= timedelta(minutes=90) + + # Request an AMT_ASSIGNMENT cashout for 1c + req = submit_and_approve_amt_assignment_request( + amt_worker_id=amt_worker_id, amount=hit.reward + ) + if req is None: + # Reject the assignment instead + logging.warning( + f"submit_and_approve_amt_assignment_request failed: {amt_assignment_id}" + ) + emit_error_event( + event_type="assignment_cashout_request_failed", + amt_hit_type_id=hit.amt_hit_type_id, + ) + assignment = reject_assignment( + amt_assignment_id=amt_assignment_id, + msg=REJECT_MESSAGE_BADDIE, + amt_hit_type_id=hit.amt_hit_type_id, + ) + return assignment + + # We've approved the HIT payment, now update the db to reflect this, and approve the assignment + assignment = approve_assignment( + amt_assignment_id=amt_assignment_id, + msg=APPROVAL_MESSAGE, + amt_hit_type_id=hit.amt_hit_type_id, + ) + # We complete after the assignment is approved + complete_res = manage_pending_cashout(req.id, PayoutStatus.COMPLETE) + if complete_res.status != PayoutStatus.COMPLETE: + # unclear wny this would happen + raise ValueError(f"Failed to complete cashout: {req.id}") + return assignment + + +def submit_and_approve_amt_assignment_request( + amt_worker_id: str, amount: USDCent +) -> Optional[CashoutRequestInfo]: + # If successful, returns the cashout id, otherwise, returns None + req = user_cashout_request( + amt_worker_id=amt_worker_id, + amount=amount, + cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD, + ) + + if req.status != PayoutStatus.PENDING: + return None + + approve_res = manage_pending_cashout(req.id, PayoutStatus.APPROVED) + if approve_res.status != PayoutStatus.APPROVED: + return None + + return req + + +def submit_and_approve_amt_bonus_request( + amt_worker_id: str, amount: USDCent +) -> Optional[CashoutRequestInfo]: + # If successful, returns the cashout id, otherwise, returns None + req = user_cashout_request( + amt_worker_id=amt_worker_id, + amount=amount, + cashout_method_id=AMT_BONUS_CASHOUT_METHOD, + ) + + if req.status != PayoutStatus.PENDING: + return None + + approve_res = manage_pending_cashout(req.id, PayoutStatus.APPROVED) + if approve_res.status != PayoutStatus.APPROVED: + return None + + return req + + +def issue_worker_payment(assignment: Assignment) -> None: + # For now, since we have no "I want my bonus" request/button. A user's + # balance will be sent out anytime they get an approved assignment. We + # don't need the task status, the tsid, nor the amount / user_payout + # that was paid, or anything. + # We just get the wallet balance and submit a cashout request if >0 + # then approve it, send the amt bonus, then complete it + amt_assignment_id = assignment.amt_assignment_id + hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + wallet_balance = get_wallet_balance(amt_worker_id=assignment.amt_worker_id) + amount = round_payment(amount=wallet_balance) + if not amount: + return None + + pe = submit_and_approve_amt_bonus_request( + amt_worker_id=assignment.amt_worker_id, amount=amount + ) + if pe is None: + logging.warning( + f"submit_and_approve_amt_bonus_request failed: {amt_assignment_id}" + ) + emit_error_event( + event_type="bonus_cashout_request_failed", + amt_hit_type_id=hit.amt_hit_type_id, + ) + return None + + AMTManager.send_bonus( + amt_worker_id=assignment.amt_worker_id, + amt_assignment_id=assignment.amt_assignment_id, + amount=amount, + reason=BONUS_MESSAGE, + unique_request_token=pe.id, + ) + # Confirm it was sent through amt + bonus = AMTManager.get_bonus( + amt_assignment_id=assignment.amt_assignment_id, payout_event_id=pe.id + ) + # Create in DB + BM.create(bonus) + emit_bonus_event(amount=amount, amt_hit_type_id=hit.amt_hit_type_id) + # Complete cashout + res = manage_pending_cashout(pe.id, PayoutStatus.COMPLETE) + if res.status != PayoutStatus.COMPLETE: + raise ValueError( + f"{assignment.amt_assignment_id} {pe.id=} manage_pending_cashout COMPLETE failed: {res=}" + ) + + +def round_payment(amount: USDCent) -> USDCent: + """ + Don't pay bonuses less than 7 cents, just add it to their wallet. + Round down bonuses (>=7 cents) to the nearest multiple of 5 + starting at 2 + """ + if amount < 7: + return USDCent(0) + + amt = (5 * math.floor((int(amount) - 2) / 5)) + 2 + + payout = USDCent(amt) + assert 0 <= payout <= 40_00, "Payout must be between $0.00 and $40.00" + + return payout diff --git a/jb/managers/thl.py b/jb/managers/thl.py new file mode 100644 index 0000000..b1dcbde --- /dev/null +++ b/jb/managers/thl.py @@ -0,0 +1,87 @@ +from decimal import Decimal +from typing import Dict, Optional + +import requests +from generalresearchutils.models.thl.payout import UserPayoutEvent +from generalresearchutils.models.thl.task_status import TaskStatusResponse +from generalresearchutils.models.thl.wallet.cashout_method import ( + CashoutRequestResponse, + CashoutRequestInfo, +) + +from jb.config import settings +from jb.models.currency import USDCent +from jb.models.definitions import PayoutStatus + +# TODO: Organize this more with other endpoints (offerwall, cashout requests/approvals, etc). + + +def get_user_profile(amt_worker_id: str) -> Dict: + url = f"{settings.fsb_host}{settings.product_id}/user/{amt_worker_id}/profile/" + res = requests.get(url).json() + if res.get("detail") == "user not found": + raise ValueError("user not found") + return res["user_profile"] + + +def get_user_blocked(amt_worker_id: str) -> bool: + res = get_user_profile(amt_worker_id=amt_worker_id) + return res["user"]["blocked"] + + +def get_user_blocked_or_not_exists(amt_worker_id: str) -> bool: + try: + res = get_user_profile(amt_worker_id=amt_worker_id) + return res["user"]["blocked"] + except ValueError as e: + if e.args[0] == "user not found": + return True + + +def get_task_status(tsid: str) -> Optional[TaskStatusResponse]: + url = f"{settings.fsb_host}{settings.product_id}/status/{tsid}/" + d = requests.get(url).json() + if d.get("msg") == "invalid tsid": + return None + return TaskStatusResponse.model_validate(d) + + +def user_cashout_request( + amt_worker_id: str, amount: USDCent, cashout_method_id +) -> CashoutRequestInfo: + assert cashout_method_id in { + settings.amt_assignment_cashout_method, + settings.amt_bonus_cashout_method, + } + assert isinstance(amount, USDCent) + assert USDCent(0) < amount < USDCent(10_00) + url = f"{settings.fsb_host}{settings.product_id}/cashout/" + body = { + "bpuid": amt_worker_id, + "amount": int(amount), + "cashout_method_id": cashout_method_id, + } + res = requests.post(url, json=body) + d = res.json() + + return CashoutRequestResponse.model_validate(d).cashout + + +def manage_pending_cashout( + cashout_id: str, payout_status: PayoutStatus +) -> UserPayoutEvent: + url = f"{settings.fsb_host}{settings.fsb_host_private_route}/thl/manage_cashout/" + body = { + "payout_id": cashout_id, + "new_status": payout_status.value, + } + res = requests.post(url, json=body) + d = res.json() + + return UserPayoutEvent.model_validate(d) + + +def get_wallet_balance(amt_worker_id: str): + url = f"{settings.fsb_host}{settings.product_id}/wallet/" + params = {"bpuid": amt_worker_id} + return USDCent(requests.get(url, params=params).json()["wallet"]["amount"]) diff --git a/jb/settings.py b/jb/settings.py index 28402f3..538b89f 100644 --- a/jb/settings.py +++ b/jb/settings.py @@ -30,6 +30,9 @@ class AmtJbBaseSettings(BaseSettings): aws_owner_id: str = Field() aws_subscription_arn: str = Field() + amt_bonus_cashout_method: str = Field() + amt_assignment_cashout_method: str = Field() + class Settings(AmtJbBaseSettings): model_config = SettingsConfigDict( |
