aboutsummaryrefslogtreecommitdiff
path: root/jb/flow
diff options
context:
space:
mode:
Diffstat (limited to 'jb/flow')
-rw-r--r--jb/flow/assignment_tasks.py90
-rw-r--r--jb/flow/events.py4
2 files changed, 64 insertions, 30 deletions
diff --git a/jb/flow/assignment_tasks.py b/jb/flow/assignment_tasks.py
index bb0877d..345d629 100644
--- a/jb/flow/assignment_tasks.py
+++ b/jb/flow/assignment_tasks.py
@@ -7,7 +7,6 @@ from generalresearchutils.models.thl.definitions import PayoutStatus, StatusCode
from generalresearchutils.models.thl.wallet.cashout_method import CashoutRequestInfo
from generalresearchutils.currency import USDCent
-from jb.decorators import AM, HM, BM
from jb.flow.monitoring import emit_error_event, emit_assignment_event, emit_bonus_event
from jb.managers.amt import (
AMTManager,
@@ -29,10 +28,15 @@ from jb.managers.thl import (
from jb.models.assignment import Assignment
from jb.models.definitions import AssignmentStatus
from jb.models.event import MTurkEvent
+from jb.managers.assignment import AssignmentManager
+from jb.managers.hit import HitManager
+from jb.managers.bonus import BonusManager
from jb.config import settings
-def process_assignment_submitted(event: MTurkEvent) -> None:
+def process_assignment_submitted(
+ am: AssignmentManager, hm: HitManager, bm: BonusManager, event: MTurkEvent
+) -> None:
"""
Called either directly or from the SNS Notification that a
HIT was submitted
@@ -64,7 +68,7 @@ def process_assignment_submitted(event: MTurkEvent) -> None:
return None
# Even if the assignment doesn't exist, the hit must ...
- hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
#
# Step 2: Attempt to get the Assignment out of the DB
@@ -72,7 +76,7 @@ def process_assignment_submitted(event: MTurkEvent) -> None:
# Now, we need to confirm it is something that we have in the db. If not,
# that means either something broke, or some funny business is happening
# (maybe a baddie is submitting an assignment without doing any work).
- stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id)
+ stub = am.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id)
if stub is None:
# When they visited the "work" page, it should have created an
# AssignmentStub in the db. If that doesn't exist, something bad
@@ -83,11 +87,13 @@ def process_assignment_submitted(event: MTurkEvent) -> None:
amt_hit_type_id=event.amt_hit_type_id,
)
reject_assignment(
+ am=am,
+ hm=hm,
amt_assignment_id=assignment.amt_assignment_id,
msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT,
amt_hit_type_id=hit.amt_hit_type_id,
)
- review_hit(assignment)
+ review_hit(hm=hm, assignment=assignment)
return None
assert assignment.amt_assignment_id == event.amt_assignment_id
@@ -99,7 +105,7 @@ def process_assignment_submitted(event: MTurkEvent) -> None:
# We don't have a TSID associated with the assignment until we the
# assignment is submitted.
- AM.update_answer(assignment=assignment)
+ am.update_answer(assignment=assignment)
# check if the user is blocked by thl
if get_user_blocked_or_not_exists(amt_worker_id=amt_worker_id):
@@ -111,33 +117,35 @@ def process_assignment_submitted(event: MTurkEvent) -> None:
amt_hit_type_id=event.amt_hit_type_id,
)
reject_assignment(
+ am=am,
+ hm=hm,
amt_assignment_id=amt_assignment_id,
msg=REJECT_MESSAGE_BADDIE,
amt_hit_type_id=hit.amt_hit_type_id,
)
- review_hit(assignment)
+ review_hit(hm=hm, assignment=assignment)
return None
if assignment.tsid is None:
- assignment = handle_assignment_w_no_work(assignment)
+ assignment = handle_assignment_w_no_work(am=am, hm=hm, assignment=assignment)
else:
# We need to validate the work exists on thl, and if so, approve
- assignment = handle_assignment_w_work(assignment)
+ assignment = handle_assignment_w_work(am=am, hm=hm, assignment=assignment)
#
# Step 4: Tell Amazon we've reviewed the HIT, and update the DB
#
- review_hit(assignment)
+ review_hit(hm=hm, assignment=assignment)
if (
assignment.tsid
and assignment.status == AssignmentStatus.Approved
and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE
):
- return issue_worker_payment(assignment)
+ return issue_worker_payment(hm=hm, bm=bm, assignment=assignment)
-def review_hit(assignment: Assignment) -> None:
+def review_hit(hm: HitManager, assignment: Assignment) -> None:
# Reviewable to Reviewing
AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False)
hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id)
@@ -149,12 +157,14 @@ def review_hit(assignment: Assignment) -> None:
return None
# Update the db
- HM.update_hit(hit)
+ hm.update_hit(hit)
return None
-def handle_assignment_w_no_work(assignment: Assignment) -> Assignment:
+def handle_assignment_w_no_work(
+ am: AssignmentManager, hm: HitManager, assignment: Assignment
+) -> Assignment:
"""
Called when an assignment is submitted without a wall event.
Not entirely clear why this happens. I think they accept a HIT, get no work
@@ -167,18 +177,20 @@ def handle_assignment_w_no_work(assignment: Assignment) -> Assignment:
)
amt_worker_id = assignment.amt_worker_id
amt_assignment_id = assignment.amt_assignment_id
- hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
emit_error_event(
event_type="assignment_submitted_no_work",
amt_hit_type_id=hit.amt_hit_type_id,
)
# They get 3 chances per week. If exceeded: Reject!
- if (AM.missing_tsid_count(amt_worker_id=amt_worker_id) >= 3) or (
- AM.rejected_count(amt_worker_id=amt_worker_id) >= 3
+ if (am.missing_tsid_count(amt_worker_id=amt_worker_id) >= 3) or (
+ am.rejected_count(amt_worker_id=amt_worker_id) >= 3
or get_user_blocked(amt_worker_id=amt_worker_id)
):
assignment = reject_assignment(
+ am=am,
+ hm=hm,
amt_assignment_id=amt_assignment_id,
msg=REJECT_MESSAGE_NO_WORK,
amt_hit_type_id=hit.amt_hit_type_id,
@@ -189,6 +201,8 @@ def handle_assignment_w_no_work(assignment: Assignment) -> Assignment:
# Approve with a message explaining they shouldn't do it.
assignment = approve_assignment(
+ am=am,
+ hm=hm,
amt_assignment_id=amt_assignment_id,
msg=NO_WORK_APPROVAL_MESSAGE,
amt_hit_type_id=hit.amt_hit_type_id,
@@ -198,7 +212,11 @@ def handle_assignment_w_no_work(assignment: Assignment) -> Assignment:
def reject_assignment(
- amt_assignment_id: str, msg: str, amt_hit_type_id: str
+ am: AssignmentManager,
+ hm: HitManager,
+ amt_assignment_id: str,
+ msg: str,
+ amt_hit_type_id: str,
) -> Assignment:
# Reject in AMT, update db
@@ -220,16 +238,16 @@ def reject_assignment(
# And update the db. The assignment may not actually exist in the db (if
# a baddie intercepted it and is trying to game us). So, we might
# need to create as assignment first ...
- stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id)
+ stub = am.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id)
if stub is None:
logging.warning(
f"Rejected assignment doesn't exist in DB. Creating ... : {amt_assignment_id}"
)
# Even if the assignment doesn't exist, the hit must ...
- hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
assignment.hit_id = hit.id
- AM.create(assignment=assignment)
- AM.reject(assignment=assignment)
+ am.create(assignment=assignment)
+ am.reject(assignment=assignment)
emit_assignment_event(
status=AssignmentStatus.Rejected, amt_hit_type_id=amt_hit_type_id, reason=msg
)
@@ -238,7 +256,11 @@ def reject_assignment(
def approve_assignment(
- amt_assignment_id: str, msg: str, amt_hit_type_id: str
+ am: AssignmentManager,
+ hm: HitManager,
+ amt_assignment_id: str,
+ msg: str,
+ amt_hit_type_id: str,
) -> Assignment:
# Approve in AMT, update db
@@ -257,7 +279,7 @@ def approve_assignment(
assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id)
assert assignment.status == AssignmentStatus.Approved
# And update the db
- AM.approve(assignment=assignment)
+ am.approve(assignment=assignment)
emit_assignment_event(
status=AssignmentStatus.Approved, amt_hit_type_id=amt_hit_type_id, reason=msg
)
@@ -265,7 +287,9 @@ def approve_assignment(
return assignment
-def handle_assignment_w_work(assignment: Assignment) -> Assignment:
+def handle_assignment_w_work(
+ am: AssignmentManager, hm: HitManager, assignment: Assignment
+) -> Assignment:
"""
Called when an assignment is submitted with a tsid.
- Check the tsid (thl status endpoint). Make sure it is finished, and
@@ -280,7 +304,7 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment:
tsid is not None
), "Assignment must have a tsid to be handled in handle_assignment_w_work"
- hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
tsr = get_task_status(tsid=tsid)
if (
@@ -308,6 +332,8 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment:
amt_hit_type_id=hit.amt_hit_type_id,
)
assignment = reject_assignment(
+ am=am,
+ hm=hm,
amt_assignment_id=amt_assignment_id,
msg=REJECT_MESSAGE_BADDIE,
amt_hit_type_id=hit.amt_hit_type_id,
@@ -332,6 +358,8 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment:
amt_hit_type_id=hit.amt_hit_type_id,
)
assignment = reject_assignment(
+ am=am,
+ hm=hm,
amt_assignment_id=amt_assignment_id,
msg=REJECT_MESSAGE_BADDIE,
amt_hit_type_id=hit.amt_hit_type_id,
@@ -342,6 +370,8 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment:
# We've approved the HIT payment, now update the db to reflect this, and approve the assignment
assignment = approve_assignment(
+ am=am,
+ hm=hm,
amt_assignment_id=amt_assignment_id,
msg=APPROVAL_MESSAGE,
amt_hit_type_id=hit.amt_hit_type_id,
@@ -398,7 +428,9 @@ def submit_and_approve_amt_bonus_request(
return req
-def issue_worker_payment(assignment: Assignment) -> None:
+def issue_worker_payment(
+ hm: HitManager, bm: BonusManager, assignment: Assignment
+) -> None:
# For now, since we have no "I want my bonus" request/button. A user's
# balance will be sent out anytime they get an approved assignment. We
# don't need the task status, the tsid, nor the amount / user_payout
@@ -406,7 +438,7 @@ def issue_worker_payment(assignment: Assignment) -> None:
# We just get the wallet balance and submit a cashout request if >0
# then approve it, send the amt bonus, then complete it
amt_assignment_id = assignment.amt_assignment_id
- hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
+ hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id)
wallet_balance = get_wallet_balance(amt_worker_id=assignment.amt_worker_id)
amount = round_payment(amount=wallet_balance)
if not amount:
@@ -450,7 +482,7 @@ def issue_worker_payment(assignment: Assignment) -> None:
return None
# Create in DB
- BM.create(bonus=bonus)
+ bm.create(bonus=bonus)
emit_bonus_event(amount=amount, amt_hit_type_id=hit.amt_hit_type_id)
# Complete cashout
diff --git a/jb/flow/events.py b/jb/flow/events.py
index 7b7bd32..5252fd0 100644
--- a/jb/flow/events.py
+++ b/jb/flow/events.py
@@ -96,7 +96,9 @@ def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
- process_assignment_submitted(event)
+ from jb.decorators import AM, HM, BM
+
+ process_assignment_submitted(am=AM, hm=HM, bm=BM, event=event)
REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)