diff options
Diffstat (limited to 'jb/managers/amt.py')
| -rw-r--r-- | jb/managers/amt.py | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/jb/managers/amt.py b/jb/managers/amt.py new file mode 100644 index 0000000..79661c7 --- /dev/null +++ b/jb/managers/amt.py @@ -0,0 +1,216 @@ +import logging +from datetime import timezone, datetime +from typing import Tuple, Optional, List + +import botocore.exceptions +from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef + +from jb.config import TOPIC_ARN +from jb.decorators import AMT_CLIENT +from jb.models import AMTAccount +from jb.models.assignment import Assignment +from jb.models.bonus import Bonus +from jb.models.currency import USDCent +from jb.models.definitions import HitStatus +from jb.models.hit import HitType, HitQuestion, Hit + +REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment" +REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work." +REJECT_MESSAGE_BADDIE = "Quality has dropped below an acceptable level" +APPROVAL_MESSAGE = "Thank you!" +NO_WORK_APPROVAL_MESSAGE = ( + REJECT_MESSAGE_NO_WORK + " In the future, if you are not sent into a task, " + "please return the assignment, otherwise it will be rejected!" +) +BONUS_MESSAGE = "Great job! Bonus for a survey complete" + + +class AMTManager: + + @staticmethod + def fetch_account() -> AMTAccount: + res = AMT_CLIENT.get_account_balance() + return AMTAccount.model_validate( + { + "available_balance": res["AvailableBalance"], + "onhold_balance": res.get("OnHoldBalance", 0), + } + ) + + @staticmethod + def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]: + try: + res = AMT_CLIENT.get_hit(HITId=amt_hit_id) + except AMT_CLIENT.exceptions.RequestError as e: + msg = e.response.get("Error", {}).get("Message", "") + return None, msg + hit = Hit.from_amt_get_hit(res["HIT"]) + return hit, None + + @classmethod + def get_hit_status(cls, amt_hit_id: str): + res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id) + if res is None: + if " does not exist. (" in msg: + return HitStatus.Disposed + else: + logging.warning(msg) + return HitStatus.Unassignable + return res.status + + @staticmethod + def create_hit_type(hit_type: HitType): + res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body()) + hit_type.amt_hit_type_id = res["HITTypeId"] + AMT_CLIENT.update_notification_settings( + HITTypeId=hit_type.amt_hit_type_id, + Notification={ + "Destination": TOPIC_ARN, + "Transport": "SNS", + "Version": "2006-05-05", + # you can add more events, see mypy_boto3_mturk.literals.EventTypeType + "EventTypes": ["AssignmentSubmitted"], + }, + Active=True, + ) + + return hit_type + + @staticmethod + def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit: + """ + HITTypeId: str + LifetimeInSeconds: int + MaxAssignments: NotRequired[int] + Question: NotRequired[str] + UniqueRequestToken: NotRequired[str] + """ + assert hit_type.id is not None + assert hit_type.amt_hit_type_id is not None + + data = hit_type.generate_hit_amt_request(question=question) + res = AMT_CLIENT.create_hit_with_hit_type(**data) + return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question) + + @staticmethod + def get_assignment(amt_assignment_id: str) -> Assignment: + # note, you CANNOT get an assignment if it has been only ACCEPTED (by the worker) + # the api is stupid. it will only show up once it is submitted + res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id) + ass_res: AssignmentTypeDef = res["Assignment"] + assignment = Assignment.from_amt_get_assignment(ass_res) + # to be clear, this has not checked whether it exists in our db + assert assignment.id is None + return assignment + + @classmethod + def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]: + expected_err_msg = f"Assignment {amt_assignment_id} does not exist" + try: + return cls.get_assignment(amt_assignment_id=amt_assignment_id) + except botocore.exceptions.ClientError as e: + logging.warning(e) + error_code = e.response["Error"]["Code"] + error_msg = e.response["Error"]["Message"] + if error_code == "RequestError" and expected_err_msg in error_msg: + return None + raise e + + @staticmethod + def reject_assignment_if_possible( + amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT + ): + # Unclear to me when this would fail + try: + return AMT_CLIENT.reject_assignment( + AssignmentId=amt_assignment_id, RequesterFeedback=msg + ) + except botocore.exceptions.ClientError as e: + logging.warning(e) + return None + + @staticmethod + def approve_assignment_if_possible( + amt_assignment_id: str, + msg: str = APPROVAL_MESSAGE, + override_rejection: bool = False, + ): + # Unclear to me when this would fail + try: + return AMT_CLIENT.approve_assignment( + AssignmentId=amt_assignment_id, + RequesterFeedback=msg, + OverrideRejection=override_rejection, + ) + except botocore.exceptions.ClientError as e: + logging.warning(e) + return None + + @staticmethod + def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None: + try: + # Reviewable to Reviewing + AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert) + except botocore.exceptions.ClientError as e: + logging.warning(f"{amt_hit_id=}, {e}") + error_msg = e.response["Error"]["Message"] + if "does not exist" in error_msg: + raise ValueError(error_msg) + # elif "This HIT is currently in the state 'Reviewing'" in error_msg: + # logging.warning(error_msg) + return None + + @staticmethod + def send_bonus( + amt_worker_id: str, + amount: USDCent, + amt_assignment_id: str, + reason: str, + unique_request_token: str, + ): + try: + return AMT_CLIENT.send_bonus( + WorkerId=amt_worker_id, + BonusAmount=str(amount.to_usd()), + AssignmentId=amt_assignment_id, + Reason=reason, + UniqueRequestToken=unique_request_token, + ) + except botocore.exceptions.ClientError as e: + logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}") + return None + + @staticmethod + def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]: + res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments( + AssignmentId=amt_assignment_id + )["BonusPayments"] + assert ( + len(res) <= 1 + ), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}" + d = res[0] if res else None + if d: + return Bonus.model_validate( + { + "amt_worker_id": d["WorkerId"], + "amount": USDCent(round(float(d["BonusAmount"]) * 100)), + "amt_assignment_id": d["AssignmentId"], + "reason": d["Reason"], + "grant_time": d["GrantTime"].astimezone(tz=timezone.utc), + "payout_event_id": payout_event_id, + } + ) + return None + + @staticmethod + def expire_all_hits(): + # used in testing only (or in an emergency I guess) + now = datetime.now(tz=timezone.utc) + paginator = AMT_CLIENT.get_paginator("list_hits") + + for page in paginator.paginate(): + for hit in page["HITs"]: + if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"): + AMT_CLIENT.update_expiration_for_hit( + HITId=hit["HITId"], ExpireAt=now + ) |
