aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jb/flow/assignment_tasks.py444
-rw-r--r--jb/managers/thl.py87
-rw-r--r--jb/settings.py3
3 files changed, 534 insertions, 0 deletions
diff --git a/jb/flow/assignment_tasks.py b/jb/flow/assignment_tasks.py
new file mode 100644
index 0000000..4022716
--- /dev/null
+++ b/jb/flow/assignment_tasks.py
@@ -0,0 +1,444 @@
+import logging
+import math
+from datetime import timedelta
+from typing import Optional
+
+from generalresearchutils.models.thl.definitions import PayoutStatus, StatusCode1
+from generalresearchutils.models.thl.wallet.cashout_method import CashoutRequestInfo
+
+from jb.decorators import AM, HM, BM
+from jb.flow.monitoring import emit_error_event, emit_assignment_event, emit_bonus_event
+from jb.managers.amt import (
+ AMTManager,
+ REJECT_MESSAGE_UNKNOWN_ASSIGNMENT,
+ REJECT_MESSAGE_NO_WORK,
+ NO_WORK_APPROVAL_MESSAGE,
+ REJECT_MESSAGE_BADDIE,
+ APPROVAL_MESSAGE,
+ BONUS_MESSAGE,
+)
+from jb.managers.thl import (
+ get_user_blocked,
+ get_task_status,
+ user_cashout_request,
+ AMT_ASSIGNMENT_CASHOUT_METHOD,
+ manage_pending_cashout,
+ get_user_blocked_or_not_exists,
+ get_wallet_balance,
+ AMT_BONUS_CASHOUT_METHOD,
+)
+from jb.models.assignment import Assignment
+from jb.models.currency import USDCent
+from jb.models.definitions import AssignmentStatus
+from jb.models.event import MTurkEvent
+
+
+def process_assignment_submitted(event: MTurkEvent) -> None:
+ """
+ Called either directly or from the SNS Notification that a HIT was submitted
+
+ :return: None
+ """
+ #
+ # Step 1: Attempt to get the Assignment out of the API
+ #
+ logging.info(f"{event=}")
+
+ # This is the assignment model from AMT. In the DB, we should only have
+ # the AssignmentStub
+ # This call is hitting AMT, not our db.
+ # The API won't even return is unless it has been submitted. So if this
+ # fails, we don't need to do anything (i.e. reject it).
+ assignment = AMTManager.get_assignment_if_exists(
+ amt_assignment_id=event.amt_assignment_id
+ )
+ if assignment is None:
+ # It is not found in amt, either it is invalid, not yet submitted, or
+ # already been approved/rejected, so we just do nothing ...
+ # todo: maybe we confirm its state matches what we have in the db
+ logging.warning(f"No assignment found on AMT: {event.amt_assignment_id}")
+ emit_error_event(
+ event_type="assignment_not_found_in_amt",
+ amt_hit_type_id=event.amt_hit_type_id,
+ )
+ return None
+
+ # Even if the assignment doesn't exist, the hit must ...
+ hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+
+ #
+ # Step 2: Attempt to get the Assignment out of the DB
+ #
+ # Now, we need to confirm it is something that we have in the db. If not,
+ # that means either something broke, or some funny business is happening
+ # (maybe a baddie is submitting an assignment without doing any work).
+ stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id)
+ if stub is None:
+ # When they visited the "work" page, it should have created an
+ # AssignmentStub in the db. If that doesn't exist, something bad
+ # happened.
+ logging.warning(f"No assignment found in DB: {event.amt_assignment_id}")
+ emit_error_event(
+ event_type="assignment_stub_not_found_in_db",
+ amt_hit_type_id=event.amt_hit_type_id,
+ )
+ reject_assignment(
+ amt_assignment_id=assignment.amt_assignment_id,
+ msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ review_hit(assignment)
+ return None
+
+ assert assignment.amt_assignment_id == event.amt_assignment_id
+ assert assignment.amt_hit_id == event.amt_hit_id
+ assert assignment.amt_hit_id == stub.amt_hit_id
+ assert assignment.amt_worker_id == stub.amt_worker_id
+ amt_assignment_id = assignment.amt_assignment_id
+ amt_worker_id = assignment.amt_worker_id
+
+ # We don't have a TSID associated with the assignment until we the
+ # assignment is submitted.
+ AM.update_answer(assignment=assignment)
+
+ # check if the user is blocked by thl
+ if get_user_blocked_or_not_exists(amt_worker_id=amt_worker_id):
+ logging.warning(
+ f"User {amt_worker_id} blocked or not exists. Rejecting: {amt_assignment_id}"
+ )
+ emit_error_event(
+ event_type="assignment_submitted_user_blocked_or_not_exists",
+ amt_hit_type_id=event.amt_hit_type_id,
+ )
+ reject_assignment(
+ amt_assignment_id=amt_assignment_id,
+ msg=REJECT_MESSAGE_BADDIE,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ review_hit(assignment)
+ return None
+
+ if assignment.tsid is None:
+ assignment = handle_assignment_w_no_work(assignment)
+ else:
+ # We need to validate the work exists on thl, and if so, approve
+ assignment = handle_assignment_w_work(assignment)
+
+ #
+ # Step 4: Tell Amazon we've reviewed the HIT, and update the DB
+ #
+ review_hit(assignment)
+
+ if (
+ assignment.tsid
+ and assignment.status == AssignmentStatus.Approved
+ and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE
+ ):
+ return issue_worker_payment(assignment)
+
+
+def review_hit(assignment):
+ # Reviewable to Reviewing
+ AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False)
+ hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id)
+ # Update the db
+ HM.update_hit(hit)
+
+
+def handle_assignment_w_no_work(assignment: Assignment) -> Assignment:
+ """
+ Called when an assignment is submitted without a wall event.
+ Not entirely clear why this happens. I think they accept a HIT, get no work
+ available for whatever reason, then report, and submit it.
+
+ :return: The Assignment
+ """
+ logging.warning(
+ f"Assignment submitted with no tsid: {assignment.amt_assignment_id}"
+ )
+ amt_worker_id = assignment.amt_worker_id
+ amt_assignment_id = assignment.amt_assignment_id
+ hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ emit_error_event(
+ event_type="assignment_submitted_no_work",
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+
+ # They get 3 chances per week. If exceeded: Reject!
+ if (AM.missing_tsid_count(amt_worker_id=amt_worker_id) >= 3) or (
+ AM.rejected_count(amt_worker_id=amt_worker_id) >= 3
+ or get_user_blocked(amt_worker_id=amt_worker_id)
+ ):
+ assignment = reject_assignment(
+ amt_assignment_id=amt_assignment_id,
+ msg=REJECT_MESSAGE_NO_WORK,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ # todo: we don't have a way to "block" a user (i.e. tattle to thl)
+ # make_block_worker_decision(user)
+ return assignment
+
+ # Approve with a message explaining they shouldn't do it.
+ assignment = approve_assignment(
+ amt_assignment_id=amt_assignment_id,
+ msg=NO_WORK_APPROVAL_MESSAGE,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+
+ return assignment
+
+
+def reject_assignment(
+ amt_assignment_id: str, msg: str, amt_hit_type_id: str
+) -> Assignment:
+ # Reject in AMT, update db
+
+ res = AMTManager.reject_assignment_if_possible(
+ amt_assignment_id=amt_assignment_id, msg=msg
+ )
+ if res is None:
+ # We failed to reject this assignment. Will this happen?
+ emit_error_event(
+ event_type="failed_to_reject_assignment",
+ amt_hit_type_id=amt_hit_type_id,
+ )
+ raise Exception(f"Failed to reject assignment: {amt_assignment_id}")
+
+ # We just rejected this assignment, get it from amazon again
+ assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id)
+ assert assignment.status == AssignmentStatus.Rejected
+
+ # And update the db. The assignment may not actually exist in the db (if
+ # a baddie intercepted it and is trying to game us). So, we might
+ # need to create as assignment first ...
+ stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id)
+ if stub is None:
+ logging.warning(
+ f"Rejected assignment doesn't exist in DB. Creating ... : {amt_assignment_id}"
+ )
+ # Even if the assignment doesn't exist, the hit must ...
+ hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ assignment.hit_id = hit.id
+ AM.create(assignment=assignment)
+ AM.reject(assignment=assignment)
+ emit_assignment_event(
+ status=AssignmentStatus.Rejected, amt_hit_type_id=amt_hit_type_id, reason=msg
+ )
+ logging.warning(f"Rejected assignment: {amt_assignment_id}")
+ return assignment
+
+
+def approve_assignment(
+ amt_assignment_id: str, msg: str, amt_hit_type_id: str
+) -> Assignment:
+ # Approve in AMT, update db
+
+ res = AMTManager.approve_assignment_if_possible(
+ amt_assignment_id=amt_assignment_id, msg=msg
+ )
+ if res is None:
+ # We failed to approve this assignment. Will this happen?
+ emit_error_event(
+ event_type="failed_to_approve_assignment",
+ amt_hit_type_id=amt_hit_type_id,
+ )
+ raise Exception(f"Failed to approve assignment: {amt_assignment_id}")
+
+ # We just approved this assignment, get it from amazon again
+ assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id)
+ assert assignment.status == AssignmentStatus.Approved
+ # And update the db
+ AM.approve(assignment=assignment)
+ emit_assignment_event(
+ status=AssignmentStatus.Approved, amt_hit_type_id=amt_hit_type_id, reason=msg
+ )
+ logging.warning(f"Approved assignment: {amt_assignment_id}")
+ return assignment
+
+
+def handle_assignment_w_work(assignment: Assignment) -> Assignment:
+ """
+ Called when an assignment is submitted with a tsid.
+ - Check the tsid (thl status endpoint). Make sure it is finished, and
+ stuff matches (doesn't matter if not a complete)
+ - Try to submit a cashout request for the HIT payout (e.g. 5c)
+ """
+
+ amt_worker_id = assignment.amt_worker_id
+ amt_assignment_id = assignment.amt_assignment_id
+ tsid = assignment.tsid
+ hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+
+ tsr = get_task_status(tsid=tsid)
+ if (
+ tsr is None
+ or tsr.status is None
+ or tsr.status_code_1
+ in {
+ StatusCode1.SESSION_START_FAIL,
+ StatusCode1.SESSION_START_QUALITY_FAIL,
+ StatusCode1.SESSION_CONTINUE_QUALITY_FAIL,
+ }
+ ):
+ # TSID doesn't exist or work is not finished:
+ # Reject the assignment instead
+ if tsr is not None and tsr.status_code_1 in {
+ StatusCode1.SESSION_START_QUALITY_FAIL,
+ StatusCode1.SESSION_CONTINUE_QUALITY_FAIL,
+ }:
+ event_type = "assignment_submitted_quality_fail"
+ else:
+ event_type = "assignment_submitted_work_not_complete"
+ emit_error_event(
+ event_type=event_type,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ assignment = reject_assignment(
+ amt_assignment_id=amt_assignment_id,
+ msg=REJECT_MESSAGE_BADDIE,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ return assignment
+
+ assert tsr.product_user_id == amt_worker_id
+ assert tsr.finished is not None
+ assert (tsr.finished - assignment.created_at) <= timedelta(minutes=90)
+
+ # Request an AMT_ASSIGNMENT cashout for 1c
+ req = submit_and_approve_amt_assignment_request(
+ amt_worker_id=amt_worker_id, amount=hit.reward
+ )
+ if req is None:
+ # Reject the assignment instead
+ logging.warning(
+ f"submit_and_approve_amt_assignment_request failed: {amt_assignment_id}"
+ )
+ emit_error_event(
+ event_type="assignment_cashout_request_failed",
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ assignment = reject_assignment(
+ amt_assignment_id=amt_assignment_id,
+ msg=REJECT_MESSAGE_BADDIE,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ return assignment
+
+ # We've approved the HIT payment, now update the db to reflect this, and approve the assignment
+ assignment = approve_assignment(
+ amt_assignment_id=amt_assignment_id,
+ msg=APPROVAL_MESSAGE,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ # We complete after the assignment is approved
+ complete_res = manage_pending_cashout(req.id, PayoutStatus.COMPLETE)
+ if complete_res.status != PayoutStatus.COMPLETE:
+ # unclear wny this would happen
+ raise ValueError(f"Failed to complete cashout: {req.id}")
+ return assignment
+
+
+def submit_and_approve_amt_assignment_request(
+ amt_worker_id: str, amount: USDCent
+) -> Optional[CashoutRequestInfo]:
+ # If successful, returns the cashout id, otherwise, returns None
+ req = user_cashout_request(
+ amt_worker_id=amt_worker_id,
+ amount=amount,
+ cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD,
+ )
+
+ if req.status != PayoutStatus.PENDING:
+ return None
+
+ approve_res = manage_pending_cashout(req.id, PayoutStatus.APPROVED)
+ if approve_res.status != PayoutStatus.APPROVED:
+ return None
+
+ return req
+
+
+def submit_and_approve_amt_bonus_request(
+ amt_worker_id: str, amount: USDCent
+) -> Optional[CashoutRequestInfo]:
+ # If successful, returns the cashout id, otherwise, returns None
+ req = user_cashout_request(
+ amt_worker_id=amt_worker_id,
+ amount=amount,
+ cashout_method_id=AMT_BONUS_CASHOUT_METHOD,
+ )
+
+ if req.status != PayoutStatus.PENDING:
+ return None
+
+ approve_res = manage_pending_cashout(req.id, PayoutStatus.APPROVED)
+ if approve_res.status != PayoutStatus.APPROVED:
+ return None
+
+ return req
+
+
+def issue_worker_payment(assignment: Assignment) -> None:
+ # For now, since we have no "I want my bonus" request/button. A user's
+ # balance will be sent out anytime they get an approved assignment. We
+ # don't need the task status, the tsid, nor the amount / user_payout
+ # that was paid, or anything.
+ # We just get the wallet balance and submit a cashout request if >0
+ # then approve it, send the amt bonus, then complete it
+ amt_assignment_id = assignment.amt_assignment_id
+ hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ wallet_balance = get_wallet_balance(amt_worker_id=assignment.amt_worker_id)
+ amount = round_payment(amount=wallet_balance)
+ if not amount:
+ return None
+
+ pe = submit_and_approve_amt_bonus_request(
+ amt_worker_id=assignment.amt_worker_id, amount=amount
+ )
+ if pe is None:
+ logging.warning(
+ f"submit_and_approve_amt_bonus_request failed: {amt_assignment_id}"
+ )
+ emit_error_event(
+ event_type="bonus_cashout_request_failed",
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ return None
+
+ AMTManager.send_bonus(
+ amt_worker_id=assignment.amt_worker_id,
+ amt_assignment_id=assignment.amt_assignment_id,
+ amount=amount,
+ reason=BONUS_MESSAGE,
+ unique_request_token=pe.id,
+ )
+ # Confirm it was sent through amt
+ bonus = AMTManager.get_bonus(
+ amt_assignment_id=assignment.amt_assignment_id, payout_event_id=pe.id
+ )
+ # Create in DB
+ BM.create(bonus)
+ emit_bonus_event(amount=amount, amt_hit_type_id=hit.amt_hit_type_id)
+ # Complete cashout
+ res = manage_pending_cashout(pe.id, PayoutStatus.COMPLETE)
+ if res.status != PayoutStatus.COMPLETE:
+ raise ValueError(
+ f"{assignment.amt_assignment_id} {pe.id=} manage_pending_cashout COMPLETE failed: {res=}"
+ )
+
+
+def round_payment(amount: USDCent) -> USDCent:
+ """
+ Don't pay bonuses less than 7 cents, just add it to their wallet.
+ Round down bonuses (>=7 cents) to the nearest multiple of 5
+ starting at 2
+ """
+ if amount < 7:
+ return USDCent(0)
+
+ amt = (5 * math.floor((int(amount) - 2) / 5)) + 2
+
+ payout = USDCent(amt)
+ assert 0 <= payout <= 40_00, "Payout must be between $0.00 and $40.00"
+
+ return payout
diff --git a/jb/managers/thl.py b/jb/managers/thl.py
new file mode 100644
index 0000000..b1dcbde
--- /dev/null
+++ b/jb/managers/thl.py
@@ -0,0 +1,87 @@
+from decimal import Decimal
+from typing import Dict, Optional
+
+import requests
+from generalresearchutils.models.thl.payout import UserPayoutEvent
+from generalresearchutils.models.thl.task_status import TaskStatusResponse
+from generalresearchutils.models.thl.wallet.cashout_method import (
+ CashoutRequestResponse,
+ CashoutRequestInfo,
+)
+
+from jb.config import settings
+from jb.models.currency import USDCent
+from jb.models.definitions import PayoutStatus
+
+# TODO: Organize this more with other endpoints (offerwall, cashout requests/approvals, etc).
+
+
+def get_user_profile(amt_worker_id: str) -> Dict:
+ url = f"{settings.fsb_host}{settings.product_id}/user/{amt_worker_id}/profile/"
+ res = requests.get(url).json()
+ if res.get("detail") == "user not found":
+ raise ValueError("user not found")
+ return res["user_profile"]
+
+
+def get_user_blocked(amt_worker_id: str) -> bool:
+ res = get_user_profile(amt_worker_id=amt_worker_id)
+ return res["user"]["blocked"]
+
+
+def get_user_blocked_or_not_exists(amt_worker_id: str) -> bool:
+ try:
+ res = get_user_profile(amt_worker_id=amt_worker_id)
+ return res["user"]["blocked"]
+ except ValueError as e:
+ if e.args[0] == "user not found":
+ return True
+
+
+def get_task_status(tsid: str) -> Optional[TaskStatusResponse]:
+ url = f"{settings.fsb_host}{settings.product_id}/status/{tsid}/"
+ d = requests.get(url).json()
+ if d.get("msg") == "invalid tsid":
+ return None
+ return TaskStatusResponse.model_validate(d)
+
+
+def user_cashout_request(
+ amt_worker_id: str, amount: USDCent, cashout_method_id
+) -> CashoutRequestInfo:
+ assert cashout_method_id in {
+ settings.amt_assignment_cashout_method,
+ settings.amt_bonus_cashout_method,
+ }
+ assert isinstance(amount, USDCent)
+ assert USDCent(0) < amount < USDCent(10_00)
+ url = f"{settings.fsb_host}{settings.product_id}/cashout/"
+ body = {
+ "bpuid": amt_worker_id,
+ "amount": int(amount),
+ "cashout_method_id": cashout_method_id,
+ }
+ res = requests.post(url, json=body)
+ d = res.json()
+
+ return CashoutRequestResponse.model_validate(d).cashout
+
+
+def manage_pending_cashout(
+ cashout_id: str, payout_status: PayoutStatus
+) -> UserPayoutEvent:
+ url = f"{settings.fsb_host}{settings.fsb_host_private_route}/thl/manage_cashout/"
+ body = {
+ "payout_id": cashout_id,
+ "new_status": payout_status.value,
+ }
+ res = requests.post(url, json=body)
+ d = res.json()
+
+ return UserPayoutEvent.model_validate(d)
+
+
+def get_wallet_balance(amt_worker_id: str):
+ url = f"{settings.fsb_host}{settings.product_id}/wallet/"
+ params = {"bpuid": amt_worker_id}
+ return USDCent(requests.get(url, params=params).json()["wallet"]["amount"])
diff --git a/jb/settings.py b/jb/settings.py
index 28402f3..538b89f 100644
--- a/jb/settings.py
+++ b/jb/settings.py
@@ -30,6 +30,9 @@ class AmtJbBaseSettings(BaseSettings):
aws_owner_id: str = Field()
aws_subscription_arn: str = Field()
+ amt_bonus_cashout_method: str = Field()
+ amt_assignment_cashout_method: str = Field()
+
class Settings(AmtJbBaseSettings):
model_config = SettingsConfigDict(