diff options
| author | Max Nanis | 2026-02-26 20:29:41 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-26 20:29:41 -0500 |
| commit | af66829e26cb05f182bef36ac06d58c7baa0ec1e (patch) | |
| tree | 2848a9223e7d4d680f3e93fc8dfcc7545f716abd /jb | |
| parent | 0bf32fadd85d5938ae29d489efdd82e2cd137300 (diff) | |
| download | amt-jb-af66829e26cb05f182bef36ac06d58c7baa0ec1e.tar.gz amt-jb-af66829e26cb05f182bef36ac06d58c7baa0ec1e.zip | |
AMTManager moved to fixture, and dectorator with parameters on tasks and init / non-static class appraoch. More assertion checks and typing. TestMTurkClient seperated from TestAMTManger
Diffstat (limited to 'jb')
| -rw-r--r-- | jb/decorators.py | 3 | ||||
| -rw-r--r-- | jb/flow/assignment_tasks.py | 59 | ||||
| -rw-r--r-- | jb/flow/events.py | 4 | ||||
| -rw-r--r-- | jb/managers/amt.py | 131 | ||||
| -rw-r--r-- | jb/managers/hit.py | 3 |
5 files changed, 130 insertions, 70 deletions
diff --git a/jb/decorators.py b/jb/decorators.py index cbc28b5..5c1b1f5 100644 --- a/jb/decorators.py +++ b/jb/decorators.py @@ -8,6 +8,7 @@ from mypy_boto3_sns import SNSClient from jb.config import settings from jb.managers import Permission +from jb.managers.amt import AMTManager from jb.managers.assignment import AssignmentManager from jb.managers.bonus import BonusManager from jb.managers.hit import HitTypeManager, HitManager, HitQuestionManager @@ -55,6 +56,8 @@ pg_config = PostgresConfig( statement_timeout=1, ) +AMTM = AMTManager(amt_client=AMT_CLIENT) + HTM = HitTypeManager( pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] ) diff --git a/jb/flow/assignment_tasks.py b/jb/flow/assignment_tasks.py index 345d629..a41d7cf 100644 --- a/jb/flow/assignment_tasks.py +++ b/jb/flow/assignment_tasks.py @@ -35,7 +35,11 @@ from jb.config import settings def process_assignment_submitted( - am: AssignmentManager, hm: HitManager, bm: BonusManager, event: MTurkEvent + amtm: AMTManager, + am: AssignmentManager, + hm: HitManager, + bm: BonusManager, + event: MTurkEvent, ) -> None: """ Called either directly or from the SNS Notification that a @@ -43,6 +47,7 @@ def process_assignment_submitted( :return: None """ + # # Step 1: Attempt to get the Assignment out of the API # @@ -53,9 +58,10 @@ def process_assignment_submitted( # This call is hitting AMT, not our db. # The API won't even return is unless it has been submitted. So if this # fails, we don't need to do anything (i.e. reject it). - assignment = AMTManager.get_assignment_if_exists( + assignment = amtm.get_assignment_if_exists( amt_assignment_id=event.amt_assignment_id ) + if assignment is None: # It is not found in amt, either it is invalid, not yet submitted, or # already been approved/rejected, so we just do nothing ... @@ -87,13 +93,14 @@ def process_assignment_submitted( amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=assignment.amt_assignment_id, msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, amt_hit_type_id=hit.amt_hit_type_id, ) - review_hit(hm=hm, assignment=assignment) + review_hit(amtm=amtm, hm=hm, assignment=assignment) return None assert assignment.amt_assignment_id == event.amt_assignment_id @@ -117,38 +124,43 @@ def process_assignment_submitted( amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, ) - review_hit(hm=hm, assignment=assignment) + review_hit(amtm=amtm, hm=hm, assignment=assignment) return None if assignment.tsid is None: - assignment = handle_assignment_w_no_work(am=am, hm=hm, assignment=assignment) + assignment = handle_assignment_w_no_work( + amtm=amtm, am=am, hm=hm, assignment=assignment + ) else: # We need to validate the work exists on thl, and if so, approve - assignment = handle_assignment_w_work(am=am, hm=hm, assignment=assignment) + assignment = handle_assignment_w_work( + amtm=amtm, am=am, hm=hm, assignment=assignment + ) # # Step 4: Tell Amazon we've reviewed the HIT, and update the DB # - review_hit(hm=hm, assignment=assignment) + review_hit(amtm=amtm, hm=hm, assignment=assignment) if ( assignment.tsid and assignment.status == AssignmentStatus.Approved and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE ): - return issue_worker_payment(hm=hm, bm=bm, assignment=assignment) + return issue_worker_payment(amtm=amtm, hm=hm, bm=bm, assignment=assignment) -def review_hit(hm: HitManager, assignment: Assignment) -> None: +def review_hit(amtm: AMTManager, hm: HitManager, assignment: Assignment) -> None: # Reviewable to Reviewing - AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False) - hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id) + amtm.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False) + hit, _ = amtm.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id) if hit is None: logging.warning( @@ -163,7 +175,7 @@ def review_hit(hm: HitManager, assignment: Assignment) -> None: def handle_assignment_w_no_work( - am: AssignmentManager, hm: HitManager, assignment: Assignment + amtm: AMTManager, am: AssignmentManager, hm: HitManager, assignment: Assignment ) -> Assignment: """ Called when an assignment is submitted without a wall event. @@ -189,6 +201,7 @@ def handle_assignment_w_no_work( or get_user_blocked(amt_worker_id=amt_worker_id) ): assignment = reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -201,6 +214,7 @@ def handle_assignment_w_no_work( # Approve with a message explaining they shouldn't do it. assignment = approve_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -212,6 +226,7 @@ def handle_assignment_w_no_work( def reject_assignment( + amtm: AMTManager, am: AssignmentManager, hm: HitManager, amt_assignment_id: str, @@ -220,7 +235,7 @@ def reject_assignment( ) -> Assignment: # Reject in AMT, update db - res = AMTManager.reject_assignment_if_possible( + res = amtm.reject_assignment_if_possible( amt_assignment_id=amt_assignment_id, msg=msg ) if res is None: @@ -232,7 +247,7 @@ def reject_assignment( raise Exception(f"Failed to reject assignment: {amt_assignment_id}") # We just rejected this assignment, get it from amazon again - assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) + assignment = amtm.get_assignment(amt_assignment_id=amt_assignment_id) assert assignment.status == AssignmentStatus.Rejected # And update the db. The assignment may not actually exist in the db (if @@ -256,6 +271,7 @@ def reject_assignment( def approve_assignment( + amtm: AMTManager, am: AssignmentManager, hm: HitManager, amt_assignment_id: str, @@ -264,7 +280,7 @@ def approve_assignment( ) -> Assignment: # Approve in AMT, update db - res = AMTManager.approve_assignment_if_possible( + res = amtm.approve_assignment_if_possible( amt_assignment_id=amt_assignment_id, msg=msg ) if res is None: @@ -276,7 +292,7 @@ def approve_assignment( raise Exception(f"Failed to approve assignment: {amt_assignment_id}") # We just approved this assignment, get it from amazon again - assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) + assignment = amtm.get_assignment(amt_assignment_id=amt_assignment_id) assert assignment.status == AssignmentStatus.Approved # And update the db am.approve(assignment=assignment) @@ -288,7 +304,7 @@ def approve_assignment( def handle_assignment_w_work( - am: AssignmentManager, hm: HitManager, assignment: Assignment + amtm: AMTManager, am: AssignmentManager, hm: HitManager, assignment: Assignment ) -> Assignment: """ Called when an assignment is submitted with a tsid. @@ -332,6 +348,7 @@ def handle_assignment_w_work( amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -358,6 +375,7 @@ def handle_assignment_w_work( amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -370,6 +388,7 @@ def handle_assignment_w_work( # We've approved the HIT payment, now update the db to reflect this, and approve the assignment assignment = approve_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -429,7 +448,7 @@ def submit_and_approve_amt_bonus_request( def issue_worker_payment( - hm: HitManager, bm: BonusManager, assignment: Assignment + amtm: AMTManager, hm: HitManager, bm: BonusManager, assignment: Assignment ) -> None: # For now, since we have no "I want my bonus" request/button. A user's # balance will be sent out anytime they get an approved assignment. We @@ -458,7 +477,7 @@ def issue_worker_payment( return None assert pe.id - AMTManager.send_bonus( + amtm.send_bonus( amt_worker_id=assignment.amt_worker_id, amt_assignment_id=assignment.amt_assignment_id, amount=amount, @@ -467,7 +486,7 @@ def issue_worker_payment( ) # Confirm it was sent through amt - bonus = AMTManager.get_bonus( + bonus = amtm.get_bonus( amt_assignment_id=assignment.amt_assignment_id, payout_event_id=pe.id ) diff --git a/jb/flow/events.py b/jb/flow/events.py index 5252fd0..60076de 100644 --- a/jb/flow/events.py +++ b/jb/flow/events.py @@ -96,9 +96,9 @@ def process_mturk_events_chunk(executor: Executor) -> Optional[int]: def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): - from jb.decorators import AM, HM, BM + from jb.decorators import AMTM, AM, HM, BM - process_assignment_submitted(am=AM, hm=HM, bm=BM, event=event) + process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event) REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) diff --git a/jb/managers/amt.py b/jb/managers/amt.py index 0ec70d3..b764ffc 100644 --- a/jb/managers/amt.py +++ b/jb/managers/amt.py @@ -1,18 +1,24 @@ import logging from datetime import timezone, datetime -from typing import Tuple, Optional, List +from typing import Tuple, Optional, List, Dict, Any import botocore.exceptions -from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef +from mypy_boto3_mturk.type_defs import ( + AssignmentTypeDef, + BonusPaymentTypeDef, + CreateHITTypeResponseTypeDef, + GetHITResponseTypeDef, + CreateHITWithHITTypeResponseTypeDef, +) from jb.config import TOPIC_ARN -from jb.decorators import AMT_CLIENT from jb.models import AMTAccount from jb.models.assignment import Assignment from jb.models.bonus import Bonus from generalresearchutils.currency import USDCent from jb.models.definitions import HitStatus from jb.models.hit import HitType, HitQuestion, Hit +from mypy_boto3_mturk import MTurkClient REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment" REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work." @@ -26,10 +32,22 @@ BONUS_MESSAGE = "Great job! Bonus for a survey complete" class AMTManager: + """ + I am type annotating this more than needed (depending on the editor), + however it's only because AMT on boto3 is not intuitive... at all. + """ + + def __init__( + self, + amt_client: MTurkClient, + **kwargs, # type: ignore + ): + super().__init__(**kwargs) + self.amt_client = amt_client + + def fetch_account(self) -> AMTAccount: + res = self.amt_client.get_account_balance() - @staticmethod - def fetch_account() -> AMTAccount: - res = AMT_CLIENT.get_account_balance() return AMTAccount.model_validate( { "available_balance": res["AvailableBalance"], @@ -37,19 +55,19 @@ class AMTManager: } ) - @staticmethod - def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]: + def get_hit_if_exists(self, amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]: try: - res = AMT_CLIENT.get_hit(HITId=amt_hit_id) - except AMT_CLIENT.exceptions.RequestError as e: + res: GetHITResponseTypeDef = self.amt_client.get_hit(HITId=amt_hit_id) + + except self.amt_client.exceptions.RequestError as e: msg = e.response.get("Error", {}).get("Message", "") return None, msg + hit = Hit.from_amt_get_hit(res["HIT"]) return hit, None - @classmethod - def get_hit_status(cls, amt_hit_id: str) -> HitStatus: - res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id) + def get_hit_status(self, amt_hit_id: str) -> HitStatus: + res, msg = self.get_hit_if_exists(amt_hit_id=amt_hit_id) if res is None: if msg is None: @@ -57,17 +75,23 @@ class AMTManager: if " does not exist. (" in msg: return HitStatus.Disposed + else: logging.warning(msg) return HitStatus.Unassignable return res.status - @staticmethod - def create_hit_type(hit_type: HitType): - res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body()) # type: ignore + def create_hit_type(self, hit_type: HitType) -> HitType: + assert hit_type.amt_hit_type_id is None + + res: CreateHITTypeResponseTypeDef = self.amt_client.create_hit_type(**hit_type.to_api_request_body()) # type: ignore + hit_type.amt_hit_type_id = res["HITTypeId"] - AMT_CLIENT.update_notification_settings( + + # TODO: Assert / Check that the SNS Notification was + # successfully created. + self.amt_client.update_notification_settings( HITTypeId=hit_type.amt_hit_type_id, Notification={ "Destination": TOPIC_ARN, @@ -81,8 +105,7 @@ class AMTManager: return hit_type - @staticmethod - def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit: + def create_hit_with_hit_type(self, hit_type: HitType, question: HitQuestion) -> Hit: """ HITTypeId: str LifetimeInSeconds: int @@ -94,27 +117,30 @@ class AMTManager: assert hit_type.amt_hit_type_id is not None data = hit_type.generate_hit_amt_request(question=question) - res = AMT_CLIENT.create_hit_with_hit_type(**data) + res: CreateHITWithHITTypeResponseTypeDef = ( + self.amt_client.create_hit_with_hit_type(**data) + ) + return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question) - @staticmethod - def get_assignment(amt_assignment_id: str) -> Assignment: + def get_assignment(self, amt_assignment_id: str) -> Assignment: """ You CANNOT get an Assignment if it has been only ACCEPTED (by the worker). The api is stupid, it will only show up once it is Submitted """ - res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id) + res = self.amt_client.get_assignment(AssignmentId=amt_assignment_id) ass_res: AssignmentTypeDef = res["Assignment"] assignment = Assignment.from_amt_get_assignment(ass_res) # to be clear, this has not checked whether it exists in our db assert assignment.id is None return assignment - @classmethod - def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]: + def get_assignment_if_exists(self, amt_assignment_id: str) -> Optional[Assignment]: expected_err_msg = f"Assignment {amt_assignment_id} does not exist" + try: - return cls.get_assignment(amt_assignment_id=amt_assignment_id) + return self.get_assignment(amt_assignment_id=amt_assignment_id) + except botocore.exceptions.ClientError as e: logging.warning(e) error_code = e.response["Error"]["Code"] @@ -123,80 +149,90 @@ class AMTManager: return None raise e - @staticmethod def reject_assignment_if_possible( - amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT - ): + self, amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT + ) -> Optional[Dict[str, Any]]: + # Unclear to me when this would fail try: - return AMT_CLIENT.reject_assignment( + return self.amt_client.reject_assignment( AssignmentId=amt_assignment_id, RequesterFeedback=msg ) + except botocore.exceptions.ClientError as e: logging.warning(e) return None - @staticmethod def approve_assignment_if_possible( + self, amt_assignment_id: str, msg: str = APPROVAL_MESSAGE, override_rejection: bool = False, - ): + ) -> Optional[Dict[str, Any]]: + # Unclear to me when this would fail try: - return AMT_CLIENT.approve_assignment( + return self.amt_client.approve_assignment( AssignmentId=amt_assignment_id, RequesterFeedback=msg, OverrideRejection=override_rejection, ) + except botocore.exceptions.ClientError as e: logging.warning(e) return None - @staticmethod - def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None: + def update_hit_review_status(self, amt_hit_id: str, revert: bool = False) -> None: try: # Reviewable to Reviewing - AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert) + self.amt_client.update_hit_review_status(HITId=amt_hit_id, Revert=revert) + except botocore.exceptions.ClientError as e: logging.warning(f"{amt_hit_id=}, {e}") error_msg = e.response["Error"]["Message"] + if "does not exist" in error_msg: raise ValueError(error_msg) + # elif "This HIT is currently in the state 'Reviewing'" in error_msg: # logging.warning(error_msg) return None - @staticmethod def send_bonus( + self, amt_worker_id: str, amount: USDCent, amt_assignment_id: str, reason: str, unique_request_token: str, - ): + ) -> Optional[Dict[str, Any]]: try: - return AMT_CLIENT.send_bonus( + return self.amt_client.send_bonus( WorkerId=amt_worker_id, BonusAmount=str(amount.to_usd()), AssignmentId=amt_assignment_id, Reason=reason, UniqueRequestToken=unique_request_token, ) + except botocore.exceptions.ClientError as e: logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}") return None - @staticmethod - def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]: - res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments( + def get_bonus( + self, amt_assignment_id: str, payout_event_id: str + ) -> Optional[Bonus]: + + res: List[BonusPaymentTypeDef] = self.amt_client.list_bonus_payments( AssignmentId=amt_assignment_id )["BonusPayments"] + assert ( len(res) <= 1 ), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}" d = res[0] if res else None + if d: return Bonus.model_validate( { @@ -208,18 +244,19 @@ class AMTManager: "payout_event_id": payout_event_id, } ) + return None - @staticmethod - def expire_all_hits() -> None: - # used in testing only (or in an emergency I guess) + def expire_all_hits(self) -> None: + # Used in testing only (or in an emergency I guess) now = datetime.now(tz=timezone.utc) - paginator = AMT_CLIENT.get_paginator("list_hits") + paginator = self.amt_client.get_paginator("list_hits") for page in paginator.paginate(): for hit in page["HITs"]: if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"): - AMT_CLIENT.update_expiration_for_hit( + self.amt_client.update_expiration_for_hit( HITId=hit["HITId"], ExpireAt=now ) + return None diff --git a/jb/managers/hit.py b/jb/managers/hit.py index 7d60b63..533f45a 100644 --- a/jb/managers/hit.py +++ b/jb/managers/hit.py @@ -163,8 +163,9 @@ class HitTypeManager(PostgresManager): with conn.cursor() as c: c.execute(query, data) conn.commit() - assert c.rowcount == 1, c.rowcount + row_cnt = c.rowcount + assert row_cnt == 1, f"Expected 1 row updated, got {row_cnt}" return None |
