aboutsummaryrefslogtreecommitdiff
path: root/jb/managers/amt.py
diff options
context:
space:
mode:
authorMax Nanis2026-02-19 02:43:23 -0500
committerMax Nanis2026-02-19 02:43:23 -0500
commitf0f96f83c2630e890a2cbcab53f77fd4c37e1684 (patch)
treec6d2cb092e76bf5d499e0ea9949508d6b22164fd /jb/managers/amt.py
parent3eaa56f0306ead818f64c3d99fc6d230d9b970a4 (diff)
downloadamt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.tar.gz
amt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.zip
Models, Project files, some pytests, requirements.. etcHEADmaster
Diffstat (limited to 'jb/managers/amt.py')
-rw-r--r--jb/managers/amt.py216
1 files changed, 216 insertions, 0 deletions
diff --git a/jb/managers/amt.py b/jb/managers/amt.py
new file mode 100644
index 0000000..79661c7
--- /dev/null
+++ b/jb/managers/amt.py
@@ -0,0 +1,216 @@
+import logging
+from datetime import timezone, datetime
+from typing import Tuple, Optional, List
+
+import botocore.exceptions
+from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef
+
+from jb.config import TOPIC_ARN
+from jb.decorators import AMT_CLIENT
+from jb.models import AMTAccount
+from jb.models.assignment import Assignment
+from jb.models.bonus import Bonus
+from jb.models.currency import USDCent
+from jb.models.definitions import HitStatus
+from jb.models.hit import HitType, HitQuestion, Hit
+
+REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"
+REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work."
+REJECT_MESSAGE_BADDIE = "Quality has dropped below an acceptable level"
+APPROVAL_MESSAGE = "Thank you!"
+NO_WORK_APPROVAL_MESSAGE = (
+ REJECT_MESSAGE_NO_WORK + " In the future, if you are not sent into a task, "
+ "please return the assignment, otherwise it will be rejected!"
+)
+BONUS_MESSAGE = "Great job! Bonus for a survey complete"
+
+
+class AMTManager:
+
+ @staticmethod
+ def fetch_account() -> AMTAccount:
+ res = AMT_CLIENT.get_account_balance()
+ return AMTAccount.model_validate(
+ {
+ "available_balance": res["AvailableBalance"],
+ "onhold_balance": res.get("OnHoldBalance", 0),
+ }
+ )
+
+ @staticmethod
+ def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]:
+ try:
+ res = AMT_CLIENT.get_hit(HITId=amt_hit_id)
+ except AMT_CLIENT.exceptions.RequestError as e:
+ msg = e.response.get("Error", {}).get("Message", "")
+ return None, msg
+ hit = Hit.from_amt_get_hit(res["HIT"])
+ return hit, None
+
+ @classmethod
+ def get_hit_status(cls, amt_hit_id: str):
+ res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id)
+ if res is None:
+ if " does not exist. (" in msg:
+ return HitStatus.Disposed
+ else:
+ logging.warning(msg)
+ return HitStatus.Unassignable
+ return res.status
+
+ @staticmethod
+ def create_hit_type(hit_type: HitType):
+ res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body())
+ hit_type.amt_hit_type_id = res["HITTypeId"]
+ AMT_CLIENT.update_notification_settings(
+ HITTypeId=hit_type.amt_hit_type_id,
+ Notification={
+ "Destination": TOPIC_ARN,
+ "Transport": "SNS",
+ "Version": "2006-05-05",
+ # you can add more events, see mypy_boto3_mturk.literals.EventTypeType
+ "EventTypes": ["AssignmentSubmitted"],
+ },
+ Active=True,
+ )
+
+ return hit_type
+
+ @staticmethod
+ def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit:
+ """
+ HITTypeId: str
+ LifetimeInSeconds: int
+ MaxAssignments: NotRequired[int]
+ Question: NotRequired[str]
+ UniqueRequestToken: NotRequired[str]
+ """
+ assert hit_type.id is not None
+ assert hit_type.amt_hit_type_id is not None
+
+ data = hit_type.generate_hit_amt_request(question=question)
+ res = AMT_CLIENT.create_hit_with_hit_type(**data)
+ return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question)
+
+ @staticmethod
+ def get_assignment(amt_assignment_id: str) -> Assignment:
+ # note, you CANNOT get an assignment if it has been only ACCEPTED (by the worker)
+ # the api is stupid. it will only show up once it is submitted
+ res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id)
+ ass_res: AssignmentTypeDef = res["Assignment"]
+ assignment = Assignment.from_amt_get_assignment(ass_res)
+ # to be clear, this has not checked whether it exists in our db
+ assert assignment.id is None
+ return assignment
+
+ @classmethod
+ def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]:
+ expected_err_msg = f"Assignment {amt_assignment_id} does not exist"
+ try:
+ return cls.get_assignment(amt_assignment_id=amt_assignment_id)
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ error_code = e.response["Error"]["Code"]
+ error_msg = e.response["Error"]["Message"]
+ if error_code == "RequestError" and expected_err_msg in error_msg:
+ return None
+ raise e
+
+ @staticmethod
+ def reject_assignment_if_possible(
+ amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT
+ ):
+ # Unclear to me when this would fail
+ try:
+ return AMT_CLIENT.reject_assignment(
+ AssignmentId=amt_assignment_id, RequesterFeedback=msg
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ return None
+
+ @staticmethod
+ def approve_assignment_if_possible(
+ amt_assignment_id: str,
+ msg: str = APPROVAL_MESSAGE,
+ override_rejection: bool = False,
+ ):
+ # Unclear to me when this would fail
+ try:
+ return AMT_CLIENT.approve_assignment(
+ AssignmentId=amt_assignment_id,
+ RequesterFeedback=msg,
+ OverrideRejection=override_rejection,
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ return None
+
+ @staticmethod
+ def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None:
+ try:
+ # Reviewable to Reviewing
+ AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert)
+ except botocore.exceptions.ClientError as e:
+ logging.warning(f"{amt_hit_id=}, {e}")
+ error_msg = e.response["Error"]["Message"]
+ if "does not exist" in error_msg:
+ raise ValueError(error_msg)
+ # elif "This HIT is currently in the state 'Reviewing'" in error_msg:
+ # logging.warning(error_msg)
+ return None
+
+ @staticmethod
+ def send_bonus(
+ amt_worker_id: str,
+ amount: USDCent,
+ amt_assignment_id: str,
+ reason: str,
+ unique_request_token: str,
+ ):
+ try:
+ return AMT_CLIENT.send_bonus(
+ WorkerId=amt_worker_id,
+ BonusAmount=str(amount.to_usd()),
+ AssignmentId=amt_assignment_id,
+ Reason=reason,
+ UniqueRequestToken=unique_request_token,
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}")
+ return None
+
+ @staticmethod
+ def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]:
+ res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments(
+ AssignmentId=amt_assignment_id
+ )["BonusPayments"]
+ assert (
+ len(res) <= 1
+ ), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}"
+ d = res[0] if res else None
+ if d:
+ return Bonus.model_validate(
+ {
+ "amt_worker_id": d["WorkerId"],
+ "amount": USDCent(round(float(d["BonusAmount"]) * 100)),
+ "amt_assignment_id": d["AssignmentId"],
+ "reason": d["Reason"],
+ "grant_time": d["GrantTime"].astimezone(tz=timezone.utc),
+ "payout_event_id": payout_event_id,
+ }
+ )
+ return None
+
+ @staticmethod
+ def expire_all_hits():
+ # used in testing only (or in an emergency I guess)
+ now = datetime.now(tz=timezone.utc)
+ paginator = AMT_CLIENT.get_paginator("list_hits")
+
+ for page in paginator.paginate():
+ for hit in page["HITs"]:
+ if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"):
+ AMT_CLIENT.update_expiration_for_hit(
+ HITId=hit["HITId"], ExpireAt=now
+ )