1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
from jb.managers.amt import AMTManager
from jb.models.hit import HitType, HitQuestion
from jb.managers.hit import HitQuestionManager, HitTypeManager, HitManager
from mypy_boto3_mturk import MTurkClient
from mypy_boto3_mturk.type_defs import (
GetAssignmentResponseTypeDef,
GetAccountBalanceResponseTypeDef,
ListHITsResponseTypeDef,
)
class TestMTurkClient:
def test_sandbox(self, amt_client: MTurkClient):
assert amt_client is not None
assert "mturk-requester-sandbox" in amt_client.meta.endpoint_url
balance: GetAccountBalanceResponseTypeDef = amt_client.get_account_balance()
assert balance["AvailableBalance"] == "10000.00"
# def test_create_hit()
# def test_delete_hit()
# def test_get_assignment(self, amt_client: MTurkClient):
# def test_get_hit
def test_list_hits(self, amt_client: MTurkClient, amtm: AMTManager):
res: ListHITsResponseTypeDef = amt_client.list_hits()
assert isinstance(res, dict)
assert isinstance(res.get("HITs"), list)
for hit in res["HITs"]:
assert isinstance(hit, dict)
assert isinstance(hit.get("HITId"), str)
assert isinstance(hit.get("HITTypeId"), str)
assert isinstance(hit.get("HITGroupId"), str)
assert isinstance(hit.get("Title"), str)
# botocore.errorfactory.RequestError: An error occurred
# (RequestError) when calling the DeleteHIT operation: This
# HIT is currently in the state 'Assignable'. This operation
# can be called with a status of: Reviewing, Reviewable
# (1772146028380 s)
if hit["HITStatus"] in ["Reviewing", "Reviewable"]:
print(
f"Deleting HIT with id {hit['HITId']} and status {hit['HITStatus']}"
)
amt_client.delete_hit(HITId=hit["HITId"])
elif hit["HITStatus"] == "Assignable":
# I don't know... just let expire I guess. To changes it's
# status you need to do manage Assignments (I think)
# amt_client.update_hit_review_status(HITId=hit["HITId"], Revert=True)
pass
amtm.expire_all_hits()
class TestAMTManager:
def test_create_hit_type(self, amtm: AMTManager, hit_type: HitType):
assert hit_type.amt_hit_type_id is None
amtm.create_hit_type(hit_type=hit_type)
assert hit_type.amt_hit_type_id is not None
def test_create_hit_with_hit_type(
self,
amtm: AMTManager,
htm: HitTypeManager,
hm: HitManager,
hit_type_record_with_amt_id: HitType,
question_record: HitQuestion,
):
sandbox_amt_hit_type_id = hit_type_record_with_amt_id.amt_hit_type_id
assert isinstance(sandbox_amt_hit_type_id, str)
res = [
x
for x in htm.filter_active()
if x.amt_hit_type_id == sandbox_amt_hit_type_id
]
assert len(res) == 1
sandbox_hit_type = res[0]
hit = amtm.create_hit_with_hit_type(
hit_type=sandbox_hit_type, question=question_record
)
assert hit.amt_hit_id is not None
assert hit.id is None
hm.create(hit)
assert hit.id is not None
|