import logging import math from datetime import timedelta from typing import Optional from generalresearchutils.models.thl.definitions import PayoutStatus, StatusCode1 from generalresearchutils.models.thl.wallet.cashout_method import CashoutRequestInfo from generalresearchutils.currency import USDCent from jb.flow.monitoring import emit_error_event, emit_assignment_event, emit_bonus_event from jb.managers.amt import ( AMTManager, REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, REJECT_MESSAGE_NO_WORK, NO_WORK_APPROVAL_MESSAGE, REJECT_MESSAGE_BADDIE, APPROVAL_MESSAGE, BONUS_MESSAGE, ) from jb.managers.thl import ( get_user_blocked, get_task_status, user_cashout_request, manage_pending_cashout, get_user_blocked_or_not_exists, get_wallet_balance, ) from jb.models.assignment import Assignment from jb.models.definitions import AssignmentStatus from jb.models.event import MTurkEvent from jb.managers.assignment import AssignmentManager from jb.managers.hit import HitManager from jb.managers.bonus import BonusManager from jb.config import settings def process_assignment_submitted( am: AssignmentManager, hm: HitManager, bm: BonusManager, event: MTurkEvent ) -> None: """ Called either directly or from the SNS Notification that a HIT was submitted :return: None """ # # Step 1: Attempt to get the Assignment out of the API # logging.info(f"{event=}") # This is the assignment model from AMT. In the DB, we should only have # the AssignmentStub # This call is hitting AMT, not our db. # The API won't even return is unless it has been submitted. So if this # fails, we don't need to do anything (i.e. reject it). assignment = AMTManager.get_assignment_if_exists( amt_assignment_id=event.amt_assignment_id ) if assignment is None: # It is not found in amt, either it is invalid, not yet submitted, or # already been approved/rejected, so we just do nothing ... # todo: maybe we confirm its state matches what we have in the db logging.warning(f"No assignment found on AMT: {event.amt_assignment_id}") emit_error_event( event_type="assignment_not_found_in_amt", amt_hit_type_id=event.amt_hit_type_id, ) return None # Even if the assignment doesn't exist, the hit must ... hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) # # Step 2: Attempt to get the Assignment out of the DB # # Now, we need to confirm it is something that we have in the db. If not, # that means either something broke, or some funny business is happening # (maybe a baddie is submitting an assignment without doing any work). stub = am.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) if stub is None: # When they visited the "work" page, it should have created an # AssignmentStub in the db. If that doesn't exist, something bad # happened. logging.warning(f"No assignment found in DB: {event.amt_assignment_id}") emit_error_event( event_type="assignment_stub_not_found_in_db", amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( am=am, hm=hm, amt_assignment_id=assignment.amt_assignment_id, msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, amt_hit_type_id=hit.amt_hit_type_id, ) review_hit(hm=hm, assignment=assignment) return None assert assignment.amt_assignment_id == event.amt_assignment_id assert assignment.amt_hit_id == event.amt_hit_id assert assignment.amt_hit_id == stub.amt_hit_id assert assignment.amt_worker_id == stub.amt_worker_id amt_assignment_id = assignment.amt_assignment_id amt_worker_id = assignment.amt_worker_id # We don't have a TSID associated with the assignment until we the # assignment is submitted. am.update_answer(assignment=assignment) # check if the user is blocked by thl if get_user_blocked_or_not_exists(amt_worker_id=amt_worker_id): logging.warning( f"User {amt_worker_id} blocked or not exists. Rejecting: {amt_assignment_id}" ) emit_error_event( event_type="assignment_submitted_user_blocked_or_not_exists", amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, ) review_hit(hm=hm, assignment=assignment) return None if assignment.tsid is None: assignment = handle_assignment_w_no_work(am=am, hm=hm, assignment=assignment) else: # We need to validate the work exists on thl, and if so, approve assignment = handle_assignment_w_work(am=am, hm=hm, assignment=assignment) # # Step 4: Tell Amazon we've reviewed the HIT, and update the DB # review_hit(hm=hm, assignment=assignment) if ( assignment.tsid and assignment.status == AssignmentStatus.Approved and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE ): return issue_worker_payment(hm=hm, bm=bm, assignment=assignment) def review_hit(hm: HitManager, assignment: Assignment) -> None: # Reviewable to Reviewing AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False) hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id) if hit is None: logging.warning( f"Hit not found when trying to review hit: {assignment.amt_hit_id}" ) return None # Update the db hm.update_hit(hit) return None def handle_assignment_w_no_work( am: AssignmentManager, hm: HitManager, assignment: Assignment ) -> Assignment: """ Called when an assignment is submitted without a wall event. Not entirely clear why this happens. I think they accept a HIT, get no work available for whatever reason, then report, and submit it. :return: The Assignment """ logging.warning( f"Assignment submitted with no tsid: {assignment.amt_assignment_id}" ) amt_worker_id = assignment.amt_worker_id amt_assignment_id = assignment.amt_assignment_id hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) emit_error_event( event_type="assignment_submitted_no_work", amt_hit_type_id=hit.amt_hit_type_id, ) # They get 3 chances per week. If exceeded: Reject! if (am.missing_tsid_count(amt_worker_id=amt_worker_id) >= 3) or ( am.rejected_count(amt_worker_id=amt_worker_id) >= 3 or get_user_blocked(amt_worker_id=amt_worker_id) ): assignment = reject_assignment( am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_NO_WORK, amt_hit_type_id=hit.amt_hit_type_id, ) # todo: we don't have a way to "block" a user (i.e. tattle to thl) # make_block_worker_decision(user) return assignment # Approve with a message explaining they shouldn't do it. assignment = approve_assignment( am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=NO_WORK_APPROVAL_MESSAGE, amt_hit_type_id=hit.amt_hit_type_id, ) return assignment def reject_assignment( am: AssignmentManager, hm: HitManager, amt_assignment_id: str, msg: str, amt_hit_type_id: str, ) -> Assignment: # Reject in AMT, update db res = AMTManager.reject_assignment_if_possible( amt_assignment_id=amt_assignment_id, msg=msg ) if res is None: # We failed to reject this assignment. Will this happen? emit_error_event( event_type="failed_to_reject_assignment", amt_hit_type_id=amt_hit_type_id, ) raise Exception(f"Failed to reject assignment: {amt_assignment_id}") # We just rejected this assignment, get it from amazon again assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) assert assignment.status == AssignmentStatus.Rejected # And update the db. The assignment may not actually exist in the db (if # a baddie intercepted it and is trying to game us). So, we might # need to create as assignment first ... stub = am.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) if stub is None: logging.warning( f"Rejected assignment doesn't exist in DB. Creating ... : {amt_assignment_id}" ) # Even if the assignment doesn't exist, the hit must ... hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) assignment.hit_id = hit.id am.create(assignment=assignment) am.reject(assignment=assignment) emit_assignment_event( status=AssignmentStatus.Rejected, amt_hit_type_id=amt_hit_type_id, reason=msg ) logging.warning(f"Rejected assignment: {amt_assignment_id}") return assignment def approve_assignment( am: AssignmentManager, hm: HitManager, amt_assignment_id: str, msg: str, amt_hit_type_id: str, ) -> Assignment: # Approve in AMT, update db res = AMTManager.approve_assignment_if_possible( amt_assignment_id=amt_assignment_id, msg=msg ) if res is None: # We failed to approve this assignment. Will this happen? emit_error_event( event_type="failed_to_approve_assignment", amt_hit_type_id=amt_hit_type_id, ) raise Exception(f"Failed to approve assignment: {amt_assignment_id}") # We just approved this assignment, get it from amazon again assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) assert assignment.status == AssignmentStatus.Approved # And update the db am.approve(assignment=assignment) emit_assignment_event( status=AssignmentStatus.Approved, amt_hit_type_id=amt_hit_type_id, reason=msg ) logging.warning(f"Approved assignment: {amt_assignment_id}") return assignment def handle_assignment_w_work( am: AssignmentManager, hm: HitManager, assignment: Assignment ) -> Assignment: """ Called when an assignment is submitted with a tsid. - Check the tsid (thl status endpoint). Make sure it is finished, and stuff matches (doesn't matter if not a complete) - Try to submit a cashout request for the HIT payout (e.g. 5c) """ amt_worker_id = assignment.amt_worker_id amt_assignment_id = assignment.amt_assignment_id tsid = assignment.tsid assert ( tsid is not None ), "Assignment must have a tsid to be handled in handle_assignment_w_work" hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) tsr = get_task_status(tsid=tsid) if ( tsr is None or tsr.status is None or tsr.status_code_1 in { StatusCode1.SESSION_START_FAIL, StatusCode1.SESSION_START_QUALITY_FAIL, StatusCode1.SESSION_CONTINUE_QUALITY_FAIL, } ): # TSID doesn't exist or work is not finished: # Reject the assignment instead if tsr is not None and tsr.status_code_1 in { StatusCode1.SESSION_START_QUALITY_FAIL, StatusCode1.SESSION_CONTINUE_QUALITY_FAIL, }: event_type = "assignment_submitted_quality_fail" else: event_type = "assignment_submitted_work_not_complete" emit_error_event( event_type=event_type, amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, ) return assignment assert tsr.product_user_id == amt_worker_id assert tsr.finished is not None assert (tsr.finished - assignment.created_at) <= timedelta(minutes=90) # Request an AMT_ASSIGNMENT cashout for 1c req = submit_and_approve_amt_assignment_request( amt_worker_id=amt_worker_id, amount=hit.reward ) if req is None: # Reject the assignment instead logging.warning( f"submit_and_approve_amt_assignment_request failed: {amt_assignment_id}" ) emit_error_event( event_type="assignment_cashout_request_failed", amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, ) return assignment assert req.id # We've approved the HIT payment, now update the db to reflect this, and approve the assignment assignment = approve_assignment( am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=APPROVAL_MESSAGE, amt_hit_type_id=hit.amt_hit_type_id, ) # We complete after the assignment is approved complete_res = manage_pending_cashout( cashout_id=req.id, payout_status=PayoutStatus.COMPLETE ) if complete_res.status != PayoutStatus.COMPLETE: # unclear wny this would happen raise ValueError(f"Failed to complete cashout: {req.id}") return assignment def submit_and_approve_amt_assignment_request( amt_worker_id: str, amount: USDCent ) -> Optional[CashoutRequestInfo]: # If successful, returns the cashout id, otherwise, returns None req = user_cashout_request( amt_worker_id=amt_worker_id, amount=amount, cashout_method_id=settings.amt_assignment_cashout_method, ) assert req.id if req.status != PayoutStatus.PENDING: return None approve_res = manage_pending_cashout(req.id, PayoutStatus.APPROVED) if approve_res.status != PayoutStatus.APPROVED: return None return req def submit_and_approve_amt_bonus_request( amt_worker_id: str, amount: USDCent ) -> Optional[CashoutRequestInfo]: # If successful, returns the cashout id, otherwise, returns None req = user_cashout_request( amt_worker_id=amt_worker_id, amount=amount, cashout_method_id=settings.amt_bonus_cashout_method, ) assert req.id if req.status != PayoutStatus.PENDING: return None approve_res = manage_pending_cashout(req.id, PayoutStatus.APPROVED) if approve_res.status != PayoutStatus.APPROVED: return None return req def issue_worker_payment( hm: HitManager, bm: BonusManager, assignment: Assignment ) -> None: # For now, since we have no "I want my bonus" request/button. A user's # balance will be sent out anytime they get an approved assignment. We # don't need the task status, the tsid, nor the amount / user_payout # that was paid, or anything. # We just get the wallet balance and submit a cashout request if >0 # then approve it, send the amt bonus, then complete it amt_assignment_id = assignment.amt_assignment_id hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) wallet_balance = get_wallet_balance(amt_worker_id=assignment.amt_worker_id) amount = round_payment(amount=wallet_balance) if not amount: return None pe = submit_and_approve_amt_bonus_request( amt_worker_id=assignment.amt_worker_id, amount=amount ) if pe is None: logging.warning( f"submit_and_approve_amt_bonus_request failed: {amt_assignment_id}" ) emit_error_event( event_type="bonus_cashout_request_failed", amt_hit_type_id=hit.amt_hit_type_id, ) return None assert pe.id AMTManager.send_bonus( amt_worker_id=assignment.amt_worker_id, amt_assignment_id=assignment.amt_assignment_id, amount=amount, reason=BONUS_MESSAGE, unique_request_token=pe.id, ) # Confirm it was sent through amt bonus = AMTManager.get_bonus( amt_assignment_id=assignment.amt_assignment_id, payout_event_id=pe.id ) if bonus is None: logging.warning( f"Failed to find bonus after sending it: {amt_assignment_id} {pe.id}" ) emit_error_event( event_type="bonus_not_found_after_sending", amt_hit_type_id=hit.amt_hit_type_id, ) return None # Create in DB bm.create(bonus=bonus) emit_bonus_event(amount=amount, amt_hit_type_id=hit.amt_hit_type_id) # Complete cashout res = manage_pending_cashout(pe.id, PayoutStatus.COMPLETE) if res.status != PayoutStatus.COMPLETE: raise ValueError( f"{assignment.amt_assignment_id} {pe.id=} manage_pending_cashout COMPLETE failed: {res=}" ) def round_payment(amount: USDCent) -> USDCent: """ Don't pay bonuses less than 7 cents, just add it to their wallet. Round down bonuses (>=7 cents) to the nearest multiple of 5 starting at 2 """ if amount < 7: return USDCent(0) amt = (5 * math.floor((int(amount) - 2) / 5)) + 2 payout = USDCent(amt) assert 0 <= payout <= 40_00, "Payout must be between $0.00 and $40.00" return payout