import logging from datetime import timezone, datetime from typing import Tuple, Optional, List import botocore.exceptions from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef from jb.config import TOPIC_ARN from jb.decorators import AMT_CLIENT from jb.models import AMTAccount from jb.models.assignment import Assignment from jb.models.bonus import Bonus from jb.models.currency import USDCent from jb.models.definitions import HitStatus from jb.models.hit import HitType, HitQuestion, Hit REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment" REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work." REJECT_MESSAGE_BADDIE = "Quality has dropped below an acceptable level" APPROVAL_MESSAGE = "Thank you!" NO_WORK_APPROVAL_MESSAGE = ( REJECT_MESSAGE_NO_WORK + " In the future, if you are not sent into a task, " "please return the assignment, otherwise it will be rejected!" ) BONUS_MESSAGE = "Great job! Bonus for a survey complete" class AMTManager: @staticmethod def fetch_account() -> AMTAccount: res = AMT_CLIENT.get_account_balance() return AMTAccount.model_validate( { "available_balance": res["AvailableBalance"], "onhold_balance": res.get("OnHoldBalance", 0), } ) @staticmethod def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]: try: res = AMT_CLIENT.get_hit(HITId=amt_hit_id) except AMT_CLIENT.exceptions.RequestError as e: msg = e.response.get("Error", {}).get("Message", "") return None, msg hit = Hit.from_amt_get_hit(res["HIT"]) return hit, None @classmethod def get_hit_status(cls, amt_hit_id: str): res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id) if res is None: if " does not exist. (" in msg: return HitStatus.Disposed else: logging.warning(msg) return HitStatus.Unassignable return res.status @staticmethod def create_hit_type(hit_type: HitType): res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body()) hit_type.amt_hit_type_id = res["HITTypeId"] AMT_CLIENT.update_notification_settings( HITTypeId=hit_type.amt_hit_type_id, Notification={ "Destination": TOPIC_ARN, "Transport": "SNS", "Version": "2006-05-05", # you can add more events, see mypy_boto3_mturk.literals.EventTypeType "EventTypes": ["AssignmentSubmitted"], }, Active=True, ) return hit_type @staticmethod def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit: """ HITTypeId: str LifetimeInSeconds: int MaxAssignments: NotRequired[int] Question: NotRequired[str] UniqueRequestToken: NotRequired[str] """ assert hit_type.id is not None assert hit_type.amt_hit_type_id is not None data = hit_type.generate_hit_amt_request(question=question) res = AMT_CLIENT.create_hit_with_hit_type(**data) return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question) @staticmethod def get_assignment(amt_assignment_id: str) -> Assignment: # note, you CANNOT get an assignment if it has been only ACCEPTED (by the worker) # the api is stupid. it will only show up once it is submitted res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id) ass_res: AssignmentTypeDef = res["Assignment"] assignment = Assignment.from_amt_get_assignment(ass_res) # to be clear, this has not checked whether it exists in our db assert assignment.id is None return assignment @classmethod def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]: expected_err_msg = f"Assignment {amt_assignment_id} does not exist" try: return cls.get_assignment(amt_assignment_id=amt_assignment_id) except botocore.exceptions.ClientError as e: logging.warning(e) error_code = e.response["Error"]["Code"] error_msg = e.response["Error"]["Message"] if error_code == "RequestError" and expected_err_msg in error_msg: return None raise e @staticmethod def reject_assignment_if_possible( amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT ): # Unclear to me when this would fail try: return AMT_CLIENT.reject_assignment( AssignmentId=amt_assignment_id, RequesterFeedback=msg ) except botocore.exceptions.ClientError as e: logging.warning(e) return None @staticmethod def approve_assignment_if_possible( amt_assignment_id: str, msg: str = APPROVAL_MESSAGE, override_rejection: bool = False, ): # Unclear to me when this would fail try: return AMT_CLIENT.approve_assignment( AssignmentId=amt_assignment_id, RequesterFeedback=msg, OverrideRejection=override_rejection, ) except botocore.exceptions.ClientError as e: logging.warning(e) return None @staticmethod def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None: try: # Reviewable to Reviewing AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert) except botocore.exceptions.ClientError as e: logging.warning(f"{amt_hit_id=}, {e}") error_msg = e.response["Error"]["Message"] if "does not exist" in error_msg: raise ValueError(error_msg) # elif "This HIT is currently in the state 'Reviewing'" in error_msg: # logging.warning(error_msg) return None @staticmethod def send_bonus( amt_worker_id: str, amount: USDCent, amt_assignment_id: str, reason: str, unique_request_token: str, ): try: return AMT_CLIENT.send_bonus( WorkerId=amt_worker_id, BonusAmount=str(amount.to_usd()), AssignmentId=amt_assignment_id, Reason=reason, UniqueRequestToken=unique_request_token, ) except botocore.exceptions.ClientError as e: logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}") return None @staticmethod def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]: res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments( AssignmentId=amt_assignment_id )["BonusPayments"] assert ( len(res) <= 1 ), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}" d = res[0] if res else None if d: return Bonus.model_validate( { "amt_worker_id": d["WorkerId"], "amount": USDCent(round(float(d["BonusAmount"]) * 100)), "amt_assignment_id": d["AssignmentId"], "reason": d["Reason"], "grant_time": d["GrantTime"].astimezone(tz=timezone.utc), "payout_event_id": payout_event_id, } ) return None @staticmethod def expire_all_hits(): # used in testing only (or in an emergency I guess) now = datetime.now(tz=timezone.utc) paginator = AMT_CLIENT.get_paginator("list_hits") for page in paginator.paginate(): for hit in page["HITs"]: if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"): AMT_CLIENT.update_expiration_for_hit( HITId=hit["HITId"], ExpireAt=now )