diff options
| author | Max Nanis | 2026-02-26 20:29:41 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-26 20:29:41 -0500 |
| commit | af66829e26cb05f182bef36ac06d58c7baa0ec1e (patch) | |
| tree | 2848a9223e7d4d680f3e93fc8dfcc7545f716abd /jb/flow | |
| parent | 0bf32fadd85d5938ae29d489efdd82e2cd137300 (diff) | |
| download | amt-jb-af66829e26cb05f182bef36ac06d58c7baa0ec1e.tar.gz amt-jb-af66829e26cb05f182bef36ac06d58c7baa0ec1e.zip | |
AMTManager moved to fixture, and dectorator with parameters on tasks and init / non-static class appraoch. More assertion checks and typing. TestMTurkClient seperated from TestAMTManger
Diffstat (limited to 'jb/flow')
| -rw-r--r-- | jb/flow/assignment_tasks.py | 59 | ||||
| -rw-r--r-- | jb/flow/events.py | 4 |
2 files changed, 41 insertions, 22 deletions
diff --git a/jb/flow/assignment_tasks.py b/jb/flow/assignment_tasks.py index 345d629..a41d7cf 100644 --- a/jb/flow/assignment_tasks.py +++ b/jb/flow/assignment_tasks.py @@ -35,7 +35,11 @@ from jb.config import settings def process_assignment_submitted( - am: AssignmentManager, hm: HitManager, bm: BonusManager, event: MTurkEvent + amtm: AMTManager, + am: AssignmentManager, + hm: HitManager, + bm: BonusManager, + event: MTurkEvent, ) -> None: """ Called either directly or from the SNS Notification that a @@ -43,6 +47,7 @@ def process_assignment_submitted( :return: None """ + # # Step 1: Attempt to get the Assignment out of the API # @@ -53,9 +58,10 @@ def process_assignment_submitted( # This call is hitting AMT, not our db. # The API won't even return is unless it has been submitted. So if this # fails, we don't need to do anything (i.e. reject it). - assignment = AMTManager.get_assignment_if_exists( + assignment = amtm.get_assignment_if_exists( amt_assignment_id=event.amt_assignment_id ) + if assignment is None: # It is not found in amt, either it is invalid, not yet submitted, or # already been approved/rejected, so we just do nothing ... @@ -87,13 +93,14 @@ def process_assignment_submitted( amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=assignment.amt_assignment_id, msg=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT, amt_hit_type_id=hit.amt_hit_type_id, ) - review_hit(hm=hm, assignment=assignment) + review_hit(amtm=amtm, hm=hm, assignment=assignment) return None assert assignment.amt_assignment_id == event.amt_assignment_id @@ -117,38 +124,43 @@ def process_assignment_submitted( amt_hit_type_id=event.amt_hit_type_id, ) reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, msg=REJECT_MESSAGE_BADDIE, amt_hit_type_id=hit.amt_hit_type_id, ) - review_hit(hm=hm, assignment=assignment) + review_hit(amtm=amtm, hm=hm, assignment=assignment) return None if assignment.tsid is None: - assignment = handle_assignment_w_no_work(am=am, hm=hm, assignment=assignment) + assignment = handle_assignment_w_no_work( + amtm=amtm, am=am, hm=hm, assignment=assignment + ) else: # We need to validate the work exists on thl, and if so, approve - assignment = handle_assignment_w_work(am=am, hm=hm, assignment=assignment) + assignment = handle_assignment_w_work( + amtm=amtm, am=am, hm=hm, assignment=assignment + ) # # Step 4: Tell Amazon we've reviewed the HIT, and update the DB # - review_hit(hm=hm, assignment=assignment) + review_hit(amtm=amtm, hm=hm, assignment=assignment) if ( assignment.tsid and assignment.status == AssignmentStatus.Approved and assignment.requester_feedback != NO_WORK_APPROVAL_MESSAGE ): - return issue_worker_payment(hm=hm, bm=bm, assignment=assignment) + return issue_worker_payment(amtm=amtm, hm=hm, bm=bm, assignment=assignment) -def review_hit(hm: HitManager, assignment: Assignment) -> None: +def review_hit(amtm: AMTManager, hm: HitManager, assignment: Assignment) -> None: # Reviewable to Reviewing - AMTManager.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False) - hit, _ = AMTManager.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id) + amtm.update_hit_review_status(amt_hit_id=assignment.amt_hit_id, revert=False) + hit, _ = amtm.get_hit_if_exists(amt_hit_id=assignment.amt_hit_id) if hit is None: logging.warning( @@ -163,7 +175,7 @@ def review_hit(hm: HitManager, assignment: Assignment) -> None: def handle_assignment_w_no_work( - am: AssignmentManager, hm: HitManager, assignment: Assignment + amtm: AMTManager, am: AssignmentManager, hm: HitManager, assignment: Assignment ) -> Assignment: """ Called when an assignment is submitted without a wall event. @@ -189,6 +201,7 @@ def handle_assignment_w_no_work( or get_user_blocked(amt_worker_id=amt_worker_id) ): assignment = reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -201,6 +214,7 @@ def handle_assignment_w_no_work( # Approve with a message explaining they shouldn't do it. assignment = approve_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -212,6 +226,7 @@ def handle_assignment_w_no_work( def reject_assignment( + amtm: AMTManager, am: AssignmentManager, hm: HitManager, amt_assignment_id: str, @@ -220,7 +235,7 @@ def reject_assignment( ) -> Assignment: # Reject in AMT, update db - res = AMTManager.reject_assignment_if_possible( + res = amtm.reject_assignment_if_possible( amt_assignment_id=amt_assignment_id, msg=msg ) if res is None: @@ -232,7 +247,7 @@ def reject_assignment( raise Exception(f"Failed to reject assignment: {amt_assignment_id}") # We just rejected this assignment, get it from amazon again - assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) + assignment = amtm.get_assignment(amt_assignment_id=amt_assignment_id) assert assignment.status == AssignmentStatus.Rejected # And update the db. The assignment may not actually exist in the db (if @@ -256,6 +271,7 @@ def reject_assignment( def approve_assignment( + amtm: AMTManager, am: AssignmentManager, hm: HitManager, amt_assignment_id: str, @@ -264,7 +280,7 @@ def approve_assignment( ) -> Assignment: # Approve in AMT, update db - res = AMTManager.approve_assignment_if_possible( + res = amtm.approve_assignment_if_possible( amt_assignment_id=amt_assignment_id, msg=msg ) if res is None: @@ -276,7 +292,7 @@ def approve_assignment( raise Exception(f"Failed to approve assignment: {amt_assignment_id}") # We just approved this assignment, get it from amazon again - assignment = AMTManager.get_assignment(amt_assignment_id=amt_assignment_id) + assignment = amtm.get_assignment(amt_assignment_id=amt_assignment_id) assert assignment.status == AssignmentStatus.Approved # And update the db am.approve(assignment=assignment) @@ -288,7 +304,7 @@ def approve_assignment( def handle_assignment_w_work( - am: AssignmentManager, hm: HitManager, assignment: Assignment + amtm: AMTManager, am: AssignmentManager, hm: HitManager, assignment: Assignment ) -> Assignment: """ Called when an assignment is submitted with a tsid. @@ -332,6 +348,7 @@ def handle_assignment_w_work( amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -358,6 +375,7 @@ def handle_assignment_w_work( amt_hit_type_id=hit.amt_hit_type_id, ) assignment = reject_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -370,6 +388,7 @@ def handle_assignment_w_work( # We've approved the HIT payment, now update the db to reflect this, and approve the assignment assignment = approve_assignment( + amtm=amtm, am=am, hm=hm, amt_assignment_id=amt_assignment_id, @@ -429,7 +448,7 @@ def submit_and_approve_amt_bonus_request( def issue_worker_payment( - hm: HitManager, bm: BonusManager, assignment: Assignment + amtm: AMTManager, hm: HitManager, bm: BonusManager, assignment: Assignment ) -> None: # For now, since we have no "I want my bonus" request/button. A user's # balance will be sent out anytime they get an approved assignment. We @@ -458,7 +477,7 @@ def issue_worker_payment( return None assert pe.id - AMTManager.send_bonus( + amtm.send_bonus( amt_worker_id=assignment.amt_worker_id, amt_assignment_id=assignment.amt_assignment_id, amount=amount, @@ -467,7 +486,7 @@ def issue_worker_payment( ) # Confirm it was sent through amt - bonus = AMTManager.get_bonus( + bonus = amtm.get_bonus( amt_assignment_id=assignment.amt_assignment_id, payout_event_id=pe.id ) diff --git a/jb/flow/events.py b/jb/flow/events.py index 5252fd0..60076de 100644 --- a/jb/flow/events.py +++ b/jb/flow/events.py @@ -96,9 +96,9 @@ def process_mturk_events_chunk(executor: Executor) -> Optional[int]: def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): - from jb.decorators import AM, HM, BM + from jb.decorators import AMTM, AM, HM, BM - process_assignment_submitted(am=AM, hm=HM, bm=BM, event=event) + process_assignment_submitted(amtm=AMTM, am=AM, hm=HM, bm=BM, event=event) REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) |
