diff options
Diffstat (limited to 'jb/flow')
| -rw-r--r-- | jb/flow/assignment_tasks.py | 90 | ||||
| -rw-r--r-- | jb/flow/events.py | 4 |
2 files changed, 64 insertions, 30 deletions
diff --git a/jb/flow/assignment_tasks.py b/jb/flow/assignment_tasks.py index bb0877d..345d629 100644 --- a/jb/flow/assignment_tasks.py +++ b/jb/flow/assignment_tasks.py @@ -7,7 +7,6 @@ from generalresearchutils.models.thl.definitions import PayoutStatus, StatusCode from generalresearchutils.models.thl.wallet.cashout_method import CashoutRequestInfo from generalresearchutils.currency import USDCent -from jb.decorators import AM, HM, BM from jb.flow.monitoring import emit_error_event, emit_assignment_event, emit_bonus_event from jb.managers.amt import ( AMTManager, @@ -29,10 +28,15 @@ from jb.managers.thl import ( from jb.models.assignment import Assignment from jb.models.definitions import AssignmentStatus from jb.models.event import MTurkEvent +from jb.managers.assignment import AssignmentManager +from jb.managers.hit import HitManager +from jb.managers.bonus import BonusManager from jb.config import settings -def process_assignment_submitted(event: MTurkEvent) -> None: +def process_assignment_submitted( + am: AssignmentManager, hm: HitManager, bm: BonusManager, event: MTurkEvent +) -> None: """ Called either directly or from the SNS Notification that a HIT was submitted @@ -64,7 +68,7 @@ def process_assignment_submitted(event: MTurkEvent) -> None: return None # Even if the assignment doesn't exist, the hit must ... - hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) # # Step 2: Attempt to get the Assignment out of the DB @@ -72,7 +76,7 @@ def process_assignment_submitted(event: MTurkEvent) -> None: # Now, we need to confirm it is something that we have in the db. If not, # that means either something broke, or some funny business is happening # (maybe a baddie is submitting an assignment without doing any work). - stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) + stub = am.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) if stub is None: # When they visited the "work" page, it should have created an # AssignmentStub in the db. If that doesn't exist, something bad @@ -83,11 +87,13 @@ def process_assignment_submitted(event: MTurkEvent) -> None: amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( + am=am, + hm=hm, amt_assignment_id=assignment.amt_assignment_id, msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, amt_hit_type_id=hit.amt_hit_type_id, ) - review_hit(assignment) + review_hit(hm=hm, assignment=assignment) return None assert assignment.amt_assignment_id == event.amt_assignment_id @@ -99,7 +105,7 @@ def process_assignment_submitted(event: MTurkEvent) -> None: # We don't have a TSID associated with the assignment until we the # assignment is submitted. - AM.update_answer(assignment=assignment) + am.update_answer(assignment=assignment) # check if the user is blocked by thl if get_user_blocked_or_not_exists(amt_worker_id=amt_worker_id): @@ -111,33 +117,35 @@ def process_assignment_submitted(event: MTurkEvent) -> None: amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( + am=am, + hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, ) - review_hit(assignment) + review_hit(hm=hm, assignment=assignment) return None if assignment.tsid is None: - assignment = handle_assignment_w_no_work(assignment) + assignment = handle_assignment_w_no_work(am=am, hm=hm, assignment=assignment) else: # We need to validate the work exists on thl, and if so, approve - assignment = handle_assignment_w_work(assignment) + assignment = handle_assignment_w_work(am=am, hm=hm, assignment=assignment) # # Step 4: Tell Amazon we've reviewed the HIT, and update the DB # - review_hit(assignment) + review_hit(hm=hm, assignment=assignment) if ( assignment.tsid and assignment.status == AssignmentStatus.Approved and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE ): - return issue_worker_payment(assignment) + return issue_worker_payment(hm=hm, bm=bm, assignment=assignment) -def review_hit(assignment: Assignment) -> None: +def review_hit(hm: HitManager, assignment: Assignment) -> None: # Reviewable to Reviewing AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False) hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id) @@ -149,12 +157,14 @@ def review_hit(assignment: Assignment) -> None: return None # Update the db - HM.update_hit(hit) + hm.update_hit(hit) return None -def handle_assignment_w_no_work(assignment: Assignment) -> Assignment: +def handle_assignment_w_no_work( + am: AssignmentManager, hm: HitManager, assignment: Assignment +) -> Assignment: """ Called when an assignment is submitted without a wall event. Not entirely clear why this happens. I think they accept a HIT, get no work @@ -167,18 +177,20 @@ def handle_assignment_w_no_work(assignment: Assignment) -> Assignment: ) amt_worker_id = assignment.amt_worker_id amt_assignment_id = assignment.amt_assignment_id - hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) emit_error_event( event_type="assignment_submitted_no_work", amt_hit_type_id=hit.amt_hit_type_id, ) # They get 3 chances per week. If exceeded: Reject! - if (AM.missing_tsid_count(amt_worker_id=amt_worker_id) >= 3) or ( - AM.rejected_count(amt_worker_id=amt_worker_id) >= 3 + if (am.missing_tsid_count(amt_worker_id=amt_worker_id) >= 3) or ( + am.rejected_count(amt_worker_id=amt_worker_id) >= 3 or get_user_blocked(amt_worker_id=amt_worker_id) ): assignment = reject_assignment( + am=am, + hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_NO_WORK, amt_hit_type_id=hit.amt_hit_type_id, @@ -189,6 +201,8 @@ def handle_assignment_w_no_work(assignment: Assignment) -> Assignment: # Approve with a message explaining they shouldn't do it. assignment = approve_assignment( + am=am, + hm=hm, amt_assignment_id=amt_assignment_id, msg=NO_WORK_APPROVAL_MESSAGE, amt_hit_type_id=hit.amt_hit_type_id, @@ -198,7 +212,11 @@ def handle_assignment_w_no_work(assignment: Assignment) -> Assignment: def reject_assignment( - amt_assignment_id: str, msg: str, amt_hit_type_id: str + am: AssignmentManager, + hm: HitManager, + amt_assignment_id: str, + msg: str, + amt_hit_type_id: str, ) -> Assignment: # Reject in AMT, update db @@ -220,16 +238,16 @@ def reject_assignment( # And update the db. The assignment may not actually exist in the db (if # a baddie intercepted it and is trying to game us). So, we might # need to create as assignment first ... - stub = AM.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) + stub = am.get_stub_if_exists(amt_assignment_id=assignment.amt_assignment_id) if stub is None: logging.warning( f"Rejected assignment doesn't exist in DB. Creating ... : {amt_assignment_id}" ) # Even if the assignment doesn't exist, the hit must ... - hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) assignment.hit_id = hit.id - AM.create(assignment=assignment) - AM.reject(assignment=assignment) + am.create(assignment=assignment) + am.reject(assignment=assignment) emit_assignment_event( status=AssignmentStatus.Rejected, amt_hit_type_id=amt_hit_type_id, reason=msg ) @@ -238,7 +256,11 @@ def reject_assignment( def approve_assignment( - amt_assignment_id: str, msg: str, amt_hit_type_id: str + am: AssignmentManager, + hm: HitManager, + amt_assignment_id: str, + msg: str, + amt_hit_type_id: str, ) -> Assignment: # Approve in AMT, update db @@ -257,7 +279,7 @@ def approve_assignment( assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) assert assignment.status == AssignmentStatus.Approved # And update the db - AM.approve(assignment=assignment) + am.approve(assignment=assignment) emit_assignment_event( status=AssignmentStatus.Approved, amt_hit_type_id=amt_hit_type_id, reason=msg ) @@ -265,7 +287,9 @@ def approve_assignment( return assignment -def handle_assignment_w_work(assignment: Assignment) -> Assignment: +def handle_assignment_w_work( + am: AssignmentManager, hm: HitManager, assignment: Assignment +) -> Assignment: """ Called when an assignment is submitted with a tsid. - Check the tsid (thl status endpoint). Make sure it is finished, and @@ -280,7 +304,7 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment: tsid is not None ), "Assignment must have a tsid to be handled in handle_assignment_w_work" - hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) tsr = get_task_status(tsid=tsid) if ( @@ -308,6 +332,8 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment: amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( + am=am, + hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, @@ -332,6 +358,8 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment: amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( + am=am, + hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, @@ -342,6 +370,8 @@ def handle_assignment_w_work(assignment: Assignment) -> Assignment: # We've approved the HIT payment, now update the db to reflect this, and approve the assignment assignment = approve_assignment( + am=am, + hm=hm, amt_assignment_id=amt_assignment_id, msg=APPROVAL_MESSAGE, amt_hit_type_id=hit.amt_hit_type_id, @@ -398,7 +428,9 @@ def submit_and_approve_amt_bonus_request( return req -def issue_worker_payment(assignment: Assignment) -> None: +def issue_worker_payment( + hm: HitManager, bm: BonusManager, assignment: Assignment +) -> None: # For now, since we have no "I want my bonus" request/button. A user's # balance will be sent out anytime they get an approved assignment. We # don't need the task status, the tsid, nor the amount / user_payout @@ -406,7 +438,7 @@ def issue_worker_payment(assignment: Assignment) -> None: # We just get the wallet balance and submit a cashout request if >0 # then approve it, send the amt bonus, then complete it amt_assignment_id = assignment.amt_assignment_id - hit = HM.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) + hit = hm.get_from_amt_id(amt_hit_id=assignment.amt_hit_id) wallet_balance = get_wallet_balance(amt_worker_id=assignment.amt_worker_id) amount = round_payment(amount=wallet_balance) if not amount: @@ -450,7 +482,7 @@ def issue_worker_payment(assignment: Assignment) -> None: return None # Create in DB - BM.create(bonus=bonus) + bm.create(bonus=bonus) emit_bonus_event(amount=amount, amt_hit_type_id=hit.amt_hit_type_id) # Complete cashout diff --git a/jb/flow/events.py b/jb/flow/events.py index 7b7bd32..5252fd0 100644 --- a/jb/flow/events.py +++ b/jb/flow/events.py @@ -96,7 +96,9 @@ def process_mturk_events_chunk(executor: Executor) -> Optional[int]: def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): - process_assignment_submitted(event) + from jb.decorators import AM, HM, BM + + process_assignment_submitted(am=AM, hm=HM, bm=BM, event=event) REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) |
