aboutsummaryrefslogtreecommitdiff
path: root/jb
diff options
context:
space:
mode:
Diffstat (limited to 'jb')
-rw-r--r--jb/decorators.py3
-rw-r--r--jb/flow/assignment_tasks.py59
-rw-r--r--jb/flow/events.py4
-rw-r--r--jb/managers/amt.py131
-rw-r--r--jb/managers/hit.py3
5 files changed, 130 insertions, 70 deletions
diff --git a/jb/decorators.py b/jb/decorators.py
index cbc28b5..5c1b1f5 100644
--- a/jb/decorators.py
+++ b/jb/decorators.py
@@ -8,6 +8,7 @@ from mypy_boto3_sns import SNSClient
from jb.config import settings
from jb.managers import Permission
+from jb.managers.amt import AMTManager
from jb.managers.assignment import AssignmentManager
from jb.managers.bonus import BonusManager
from jb.managers.hit import HitTypeManager, HitManager, HitQuestionManager
@@ -55,6 +56,8 @@ pg_config = PostgresConfig(
statement_timeout=1,
)
+AMTM = AMTManager(amt_client=AMT_CLIENT)
+
HTM = HitTypeManager(
pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
)
diff --git a/jb/flow/assignment_tasks.py b/jb/flow/assignment_tasks.py
index 345d629..a41d7cf 100644
--- a/jb/flow/assignment_tasks.py
+++ b/jb/flow/assignment_tasks.py
@@ -35,7 +35,11 @@ from jb.config import settings
def process_assignment_submitted(
- am: AssignmentManager, hm: HitManager, bm: BonusManager, event: MTurkEvent
+ amtm: AMTManager,
+ am: AssignmentManager,
+ hm: HitManager,
+ bm: BonusManager,
+ event: MTurkEvent,
) -> None:
"""
Called either directly or from the SNS Notification that a
@@ -43,6 +47,7 @@ def process_assignment_submitted(
:return: None
"""
+
#
# Step 1: Attempt to get the Assignment out of the API
#
@@ -53,9 +58,10 @@ def process_assignment_submitted(
# This call is hitting AMT, not our db.
# The API won't even return is unless it has been submitted. So if this
# fails, we don't need to do anything (i.e. reject it).
- assignment = AMTManager.get_assignment_if_exists(
+ assignment = amtm.get_assignment_if_exists(
amt_assignment_id=event.amt_assignment_id
)
+
if assignment is None:
# It is not found in amt, either it is invalid, not yet submitted, or
# already been approved/rejected, so we just do nothing ...
@@ -87,13 +93,14 @@ def process_assignment_submitted(
amt_hit_type_id=event.amt_hit_type_id,
)
reject_assignment(
+ amtm=amtm,
am=am,
hm=hm,
amt_assignment_id=assignment.amt_assignment_id,
msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT,
amt_hit_type_id=hit.amt_hit_type_id,
)
- review_hit(hm=hm, assignment=assignment)
+ review_hit(amtm=amtm, hm=hm, assignment=assignment)
return None
assert assignment.amt_assignment_id == event.amt_assignment_id
@@ -117,38 +124,43 @@ def process_assignment_submitted(
amt_hit_type_id=event.amt_hit_type_id,
)
reject_assignment(
+ amtm=amtm,
am=am,
hm=hm,
amt_assignment_id=amt_assignment_id,
msg=REJECT_MESSAGE_BADDIE,
amt_hit_type_id=hit.amt_hit_type_id,
)
- review_hit(hm=hm, assignment=assignment)
+ review_hit(amtm=amtm, hm=hm, assignment=assignment)
return None
if assignment.tsid is None:
- assignment = handle_assignment_w_no_work(am=am, hm=hm, assignment=assignment)
+ assignment = handle_assignment_w_no_work(
+ amtm=amtm, am=am, hm=hm, assignment=assignment
+ )
else:
# We need to validate the work exists on thl, and if so, approve
- assignment = handle_assignment_w_work(am=am, hm=hm, assignment=assignment)
+ assignment = handle_assignment_w_work(
+ amtm=amtm, am=am, hm=hm, assignment=assignment
+ )
#
# Step 4: Tell Amazon we've reviewed the HIT, and update the DB
#
- review_hit(hm=hm, assignment=assignment)
+ review_hit(amtm=amtm, hm=hm, assignment=assignment)
if (
assignment.tsid
and assignment.status == AssignmentStatus.Approved
and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE
):
- return issue_worker_payment(hm=hm, bm=bm, assignment=assignment)
+ return issue_worker_payment(amtm=amtm, hm=hm, bm=bm, assignment=assignment)
-def review_hit(hm: HitManager, assignment: Assignment) -> None:
+def review_hit(amtm: AMTManager, hm: HitManager, assignment: Assignment) -> None:
# Reviewable to Reviewing
- AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False)
- hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id)
+ amtm.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False)
+ hit, _ = amtm.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id)
if hit is None:
logging.warning(
@@ -163,7 +175,7 @@ def review_hit(hm: HitManager, assignment: Assignment) -> None:
def handle_assignment_w_no_work(
- am: AssignmentManager, hm: HitManager, assignment: Assignment
+ amtm: AMTManager, am: AssignmentManager, hm: HitManager, assignment: Assignment
) -> Assignment:
"""
Called when an assignment is submitted without a wall event.
@@ -189,6 +201,7 @@ def handle_assignment_w_no_work(
or get_user_blocked(amt_worker_id=amt_worker_id)
):
assignment = reject_assignment(
+ amtm=amtm,
am=am,
hm=hm,
amt_assignment_id=amt_assignment_id,
@@ -201,6 +214,7 @@ def handle_assignment_w_no_work(
# Approve with a message explaining they shouldn't do it.
assignment = approve_assignment(
+ amtm=amtm,
am=am,
hm=hm,
amt_assignment_id=amt_assignment_id,
@@ -212,6 +226,7 @@ def handle_assignment_w_no_work(
def reject_assignment(
+ amtm: AMTManager,
am: AssignmentManager,
hm: HitManager,
amt_assignment_id: str,
@@ -220,7 +235,7 @@ def reject_assignment(
) -> Assignment:
# Reject in AMT, update db
- res = AMTManager.reject_assignment_if_possible(
+ res = amtm.reject_assignment_if_possible(
amt_assignment_id=amt_assignment_id, msg=msg
)
if res is None:
@@ -232,7 +247,7 @@ def reject_assignment(
raise Exception(f"Failed to reject assignment: {amt_assignment_id}")
# We just rejected this assignment, get it from amazon again
- assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id)
+ assignment = amtm.get_assignment(amt_assignment_id=amt_assignment_id)
assert assignment.status == AssignmentStatus.Rejected
# And update the db. The assignment may not actually exist in the db (if
@@ -256,6 +271,7 @@ def reject_assignment(
def approve_assignment(
+ amtm: AMTManager,
am: AssignmentManager,
hm: HitManager,
amt_assignment_id: str,
@@ -264,7 +280,7 @@ def approve_assignment(
) -> Assignment:
# Approve in AMT, update db
- res = AMTManager.approve_assignment_if_possible(
+ res = amtm.approve_assignment_if_possible(
amt_assignment_id=amt_assignment_id, msg=msg
)
if res is None:
@@ -276,7 +292,7 @@ def approve_assignment(
raise Exception(f"Failed to approve assignment: {amt_assignment_id}")
# We just approved this assignment, get it from amazon again
- assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id)
+ assignment = amtm.get_assignment(amt_assignment_id=amt_assignment_id)
assert assignment.status == AssignmentStatus.Approved
# And update the db
am.approve(assignment=assignment)
@@ -288,7 +304,7 @@ def approve_assignment(
def handle_assignment_w_work(
- am: AssignmentManager, hm: HitManager, assignment: Assignment
+ amtm: AMTManager, am: AssignmentManager, hm: HitManager, assignment: Assignment
) -> Assignment:
"""
Called when an assignment is submitted with a tsid.
@@ -332,6 +348,7 @@ def handle_assignment_w_work(
amt_hit_type_id=hit.amt_hit_type_id,
)
assignment = reject_assignment(
+ amtm=amtm,
am=am,
hm=hm,
amt_assignment_id=amt_assignment_id,
@@ -358,6 +375,7 @@ def handle_assignment_w_work(
amt_hit_type_id=hit.amt_hit_type_id,
)
assignment = reject_assignment(
+ amtm=amtm,
am=am,
hm=hm,
amt_assignment_id=amt_assignment_id,
@@ -370,6 +388,7 @@ def handle_assignment_w_work(
# We've approved the HIT payment, now update the db to reflect this, and approve the assignment
assignment = approve_assignment(
+ amtm=amtm,
am=am,
hm=hm,
amt_assignment_id=amt_assignment_id,
@@ -429,7 +448,7 @@ def submit_and_approve_amt_bonus_request(
def issue_worker_payment(
- hm: HitManager, bm: BonusManager, assignment: Assignment
+ amtm: AMTManager, hm: HitManager, bm: BonusManager, assignment: Assignment
) -> None:
# For now, since we have no "I want my bonus" request/button. A user's
# balance will be sent out anytime they get an approved assignment. We
@@ -458,7 +477,7 @@ def issue_worker_payment(
return None
assert pe.id
- AMTManager.send_bonus(
+ amtm.send_bonus(
amt_worker_id=assignment.amt_worker_id,
amt_assignment_id=assignment.amt_assignment_id,
amount=amount,
@@ -467,7 +486,7 @@ def issue_worker_payment(
)
# Confirm it was sent through amt
- bonus = AMTManager.get_bonus(
+ bonus = amtm.get_bonus(
amt_assignment_id=assignment.amt_assignment_id, payout_event_id=pe.id
)
diff --git a/jb/flow/events.py b/jb/flow/events.py
index 5252fd0..60076de 100644
--- a/jb/flow/events.py
+++ b/jb/flow/events.py
@@ -96,9 +96,9 @@ def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
- from jb.decorators import AM, HM, BM
+ from jb.decorators import AMTM, AM, HM, BM
- process_assignment_submitted(am=AM, hm=HM, bm=BM, event=event)
+ process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event)
REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)
diff --git a/jb/managers/amt.py b/jb/managers/amt.py
index 0ec70d3..b764ffc 100644
--- a/jb/managers/amt.py
+++ b/jb/managers/amt.py
@@ -1,18 +1,24 @@
import logging
from datetime import timezone, datetime
-from typing import Tuple, Optional, List
+from typing import Tuple, Optional, List, Dict, Any
import botocore.exceptions
-from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef
+from mypy_boto3_mturk.type_defs import (
+ AssignmentTypeDef,
+ BonusPaymentTypeDef,
+ CreateHITTypeResponseTypeDef,
+ GetHITResponseTypeDef,
+ CreateHITWithHITTypeResponseTypeDef,
+)
from jb.config import TOPIC_ARN
-from jb.decorators import AMT_CLIENT
from jb.models import AMTAccount
from jb.models.assignment import Assignment
from jb.models.bonus import Bonus
from generalresearchutils.currency import USDCent
from jb.models.definitions import HitStatus
from jb.models.hit import HitType, HitQuestion, Hit
+from mypy_boto3_mturk import MTurkClient
REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"
REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work."
@@ -26,10 +32,22 @@ BONUS_MESSAGE = "Great job! Bonus for a survey complete"
class AMTManager:
+ """
+ I am type annotating this more than needed (depending on the editor),
+ however it's only because AMT on boto3 is not intuitive... at all.
+ """
+
+ def __init__(
+ self,
+ amt_client: MTurkClient,
+ **kwargs, # type: ignore
+ ):
+ super().__init__(**kwargs)
+ self.amt_client = amt_client
+
+ def fetch_account(self) -> AMTAccount:
+ res = self.amt_client.get_account_balance()
- @staticmethod
- def fetch_account() -> AMTAccount:
- res = AMT_CLIENT.get_account_balance()
return AMTAccount.model_validate(
{
"available_balance": res["AvailableBalance"],
@@ -37,19 +55,19 @@ class AMTManager:
}
)
- @staticmethod
- def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]:
+ def get_hit_if_exists(self, amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]:
try:
- res = AMT_CLIENT.get_hit(HITId=amt_hit_id)
- except AMT_CLIENT.exceptions.RequestError as e:
+ res: GetHITResponseTypeDef = self.amt_client.get_hit(HITId=amt_hit_id)
+
+ except self.amt_client.exceptions.RequestError as e:
msg = e.response.get("Error", {}).get("Message", "")
return None, msg
+
hit = Hit.from_amt_get_hit(res["HIT"])
return hit, None
- @classmethod
- def get_hit_status(cls, amt_hit_id: str) -> HitStatus:
- res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id)
+ def get_hit_status(self, amt_hit_id: str) -> HitStatus:
+ res, msg = self.get_hit_if_exists(amt_hit_id=amt_hit_id)
if res is None:
if msg is None:
@@ -57,17 +75,23 @@ class AMTManager:
if " does not exist. (" in msg:
return HitStatus.Disposed
+
else:
logging.warning(msg)
return HitStatus.Unassignable
return res.status
- @staticmethod
- def create_hit_type(hit_type: HitType):
- res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body()) # type: ignore
+ def create_hit_type(self, hit_type: HitType) -> HitType:
+ assert hit_type.amt_hit_type_id is None
+
+ res: CreateHITTypeResponseTypeDef = self.amt_client.create_hit_type(**hit_type.to_api_request_body()) # type: ignore
+
hit_type.amt_hit_type_id = res["HITTypeId"]
- AMT_CLIENT.update_notification_settings(
+
+ # TODO: Assert / Check that the SNS Notification was
+ # successfully created.
+ self.amt_client.update_notification_settings(
HITTypeId=hit_type.amt_hit_type_id,
Notification={
"Destination": TOPIC_ARN,
@@ -81,8 +105,7 @@ class AMTManager:
return hit_type
- @staticmethod
- def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit:
+ def create_hit_with_hit_type(self, hit_type: HitType, question: HitQuestion) -> Hit:
"""
HITTypeId: str
LifetimeInSeconds: int
@@ -94,27 +117,30 @@ class AMTManager:
assert hit_type.amt_hit_type_id is not None
data = hit_type.generate_hit_amt_request(question=question)
- res = AMT_CLIENT.create_hit_with_hit_type(**data)
+ res: CreateHITWithHITTypeResponseTypeDef = (
+ self.amt_client.create_hit_with_hit_type(**data)
+ )
+
return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question)
- @staticmethod
- def get_assignment(amt_assignment_id: str) -> Assignment:
+ def get_assignment(self, amt_assignment_id: str) -> Assignment:
"""
You CANNOT get an Assignment if it has been only ACCEPTED (by the
worker). The api is stupid, it will only show up once it is Submitted
"""
- res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id)
+ res = self.amt_client.get_assignment(AssignmentId=amt_assignment_id)
ass_res: AssignmentTypeDef = res["Assignment"]
assignment = Assignment.from_amt_get_assignment(ass_res)
# to be clear, this has not checked whether it exists in our db
assert assignment.id is None
return assignment
- @classmethod
- def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]:
+ def get_assignment_if_exists(self, amt_assignment_id: str) -> Optional[Assignment]:
expected_err_msg = f"Assignment {amt_assignment_id} does not exist"
+
try:
- return cls.get_assignment(amt_assignment_id=amt_assignment_id)
+ return self.get_assignment(amt_assignment_id=amt_assignment_id)
+
except botocore.exceptions.ClientError as e:
logging.warning(e)
error_code = e.response["Error"]["Code"]
@@ -123,80 +149,90 @@ class AMTManager:
return None
raise e
- @staticmethod
def reject_assignment_if_possible(
- amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT
- ):
+ self, amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT
+ ) -> Optional[Dict[str, Any]]:
+
# Unclear to me when this would fail
try:
- return AMT_CLIENT.reject_assignment(
+ return self.amt_client.reject_assignment(
AssignmentId=amt_assignment_id, RequesterFeedback=msg
)
+
except botocore.exceptions.ClientError as e:
logging.warning(e)
return None
- @staticmethod
def approve_assignment_if_possible(
+ self,
amt_assignment_id: str,
msg: str = APPROVAL_MESSAGE,
override_rejection: bool = False,
- ):
+ ) -> Optional[Dict[str, Any]]:
+
# Unclear to me when this would fail
try:
- return AMT_CLIENT.approve_assignment(
+ return self.amt_client.approve_assignment(
AssignmentId=amt_assignment_id,
RequesterFeedback=msg,
OverrideRejection=override_rejection,
)
+
except botocore.exceptions.ClientError as e:
logging.warning(e)
return None
- @staticmethod
- def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None:
+ def update_hit_review_status(self, amt_hit_id: str, revert: bool = False) -> None:
try:
# Reviewable to Reviewing
- AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert)
+ self.amt_client.update_hit_review_status(HITId=amt_hit_id, Revert=revert)
+
except botocore.exceptions.ClientError as e:
logging.warning(f"{amt_hit_id=}, {e}")
error_msg = e.response["Error"]["Message"]
+
if "does not exist" in error_msg:
raise ValueError(error_msg)
+
# elif "This HIT is currently in the state 'Reviewing'" in error_msg:
# logging.warning(error_msg)
return None
- @staticmethod
def send_bonus(
+ self,
amt_worker_id: str,
amount: USDCent,
amt_assignment_id: str,
reason: str,
unique_request_token: str,
- ):
+ ) -> Optional[Dict[str, Any]]:
try:
- return AMT_CLIENT.send_bonus(
+ return self.amt_client.send_bonus(
WorkerId=amt_worker_id,
BonusAmount=str(amount.to_usd()),
AssignmentId=amt_assignment_id,
Reason=reason,
UniqueRequestToken=unique_request_token,
)
+
except botocore.exceptions.ClientError as e:
logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}")
return None
- @staticmethod
- def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]:
- res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments(
+ def get_bonus(
+ self, amt_assignment_id: str, payout_event_id: str
+ ) -> Optional[Bonus]:
+
+ res: List[BonusPaymentTypeDef] = self.amt_client.list_bonus_payments(
AssignmentId=amt_assignment_id
)["BonusPayments"]
+
assert (
len(res) <= 1
), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}"
d = res[0] if res else None
+
if d:
return Bonus.model_validate(
{
@@ -208,18 +244,19 @@ class AMTManager:
"payout_event_id": payout_event_id,
}
)
+
return None
- @staticmethod
- def expire_all_hits() -> None:
- # used in testing only (or in an emergency I guess)
+ def expire_all_hits(self) -> None:
+ # Used in testing only (or in an emergency I guess)
now = datetime.now(tz=timezone.utc)
- paginator = AMT_CLIENT.get_paginator("list_hits")
+ paginator = self.amt_client.get_paginator("list_hits")
for page in paginator.paginate():
for hit in page["HITs"]:
if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"):
- AMT_CLIENT.update_expiration_for_hit(
+ self.amt_client.update_expiration_for_hit(
HITId=hit["HITId"], ExpireAt=now
)
+
return None
diff --git a/jb/managers/hit.py b/jb/managers/hit.py
index 7d60b63..533f45a 100644
--- a/jb/managers/hit.py
+++ b/jb/managers/hit.py
@@ -163,8 +163,9 @@ class HitTypeManager(PostgresManager):
with conn.cursor() as c:
c.execute(query, data)
conn.commit()
- assert c.rowcount == 1, c.rowcount
+ row_cnt = c.rowcount
+ assert row_cnt == 1, f"Expected 1 row updated, got {row_cnt}"
return None