from jb.managers.amt import AMTManager from jb.models.hit import HitType, HitQuestion from jb.managers.hit import HitQuestionManager, HitTypeManager, HitManager from mypy_boto3_mturk import MTurkClient from mypy_boto3_mturk.type_defs import ( GetAssignmentResponseTypeDef, GetAccountBalanceResponseTypeDef, ListHITsResponseTypeDef, ) class TestMTurkClient: def test_sandbox(self, amt_client: MTurkClient): assert amt_client is not None assert "mturk-requester-sandbox" in amt_client.meta.endpoint_url balance: GetAccountBalanceResponseTypeDef = amt_client.get_account_balance() assert balance["AvailableBalance"] == "10000.00" # def test_create_hit() # def test_delete_hit() # def test_get_assignment(self, amt_client: MTurkClient): # def test_get_hit def test_list_hits(self, amt_client: MTurkClient, amtm: AMTManager): res: ListHITsResponseTypeDef = amt_client.list_hits() assert isinstance(res, dict) assert isinstance(res.get("HITs"), list) for hit in res["HITs"]: assert isinstance(hit, dict) assert isinstance(hit.get("HITId"), str) assert isinstance(hit.get("HITTypeId"), str) assert isinstance(hit.get("HITGroupId"), str) assert isinstance(hit.get("Title"), str) # botocore.errorfactory.RequestError: An error occurred # (RequestError) when calling the DeleteHIT operation: This # HIT is currently in the state 'Assignable'. This operation # can be called with a status of: Reviewing, Reviewable # (1772146028380 s) if hit["HITStatus"] in ["Reviewing", "Reviewable"]: print( f"Deleting HIT with id {hit['HITId']} and status {hit['HITStatus']}" ) amt_client.delete_hit(HITId=hit["HITId"]) elif hit["HITStatus"] == "Assignable": # I don't know... just let expire I guess. To changes it's # status you need to do manage Assignments (I think) # amt_client.update_hit_review_status(HITId=hit["HITId"], Revert=True) pass amtm.expire_all_hits() class TestAMTManager: def test_create_hit_type(self, amtm: AMTManager, hit_type: HitType): assert hit_type.amt_hit_type_id is None amtm.create_hit_type(hit_type=hit_type) assert hit_type.amt_hit_type_id is not None def test_create_hit_with_hit_type( self, amtm: AMTManager, htm: HitTypeManager, hm: HitManager, hit_type_record_with_amt_id: HitType, question_record: HitQuestion, ): sandbox_amt_hit_type_id = hit_type_record_with_amt_id.amt_hit_type_id assert isinstance(sandbox_amt_hit_type_id, str) res = [ x for x in htm.filter_active() if x.amt_hit_type_id == sandbox_amt_hit_type_id ] assert len(res) == 1 sandbox_hit_type = res[0] hit = amtm.create_hit_with_hit_type( hit_type=sandbox_hit_type, question=question_record ) assert hit.amt_hit_id is not None assert hit.id is None hm.create(hit) assert hit.id is not None