1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
|
import logging
from datetime import datetime, timezone
from typing import Optional, TypedDict
from xml.etree import ElementTree
from mypy_boto3_mturk.type_defs import AssignmentTypeDef
from pydantic import (
BaseModel,
Field,
ConfigDict,
model_validator,
PositiveInt,
computed_field,
TypeAdapter,
ValidationError,
)
from typing_extensions import Self
from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr
from jb.models.definitions import AssignmentStatus
class AnswerDict(TypedDict):
amt_assignment_id: str
amt_worker_id: str
tsid: str
class AssignmentStub(BaseModel):
# todo: we need an "AssignmentStub" model that just has
# the IDs, this is used when a user accepts an assignment
# but hasn't submitted it yet. We want to create it in the db
# at that point.
model_config = ConfigDict(
extra="forbid",
validate_assignment=True,
)
id: Optional[PositiveInt] = Field(default=None)
hit_id: Optional[PositiveInt] = Field(default=None)
amt_assignment_id: AMTBoto3ID = Field()
amt_hit_id: AMTBoto3ID = Field()
amt_worker_id: str = Field(min_length=3, max_length=50)
status: AssignmentStatus = Field()
# GRL Specific
created_at: AwareDatetimeISO = Field(
default_factory=lambda: datetime.now(tz=timezone.utc),
description="When this record was saved in the database",
)
modified_at: Optional[AwareDatetimeISO] = Field(
default_factory=lambda: datetime.now(tz=timezone.utc),
description="When this record was updated / modified in the database",
)
def to_postgres(self):
d = self.model_dump(mode="json")
return d
class Assignment(AssignmentStub):
"""
The Assignment data structure represents a single assignment of a HIT to
a Worker. The assignment tracks the Worker's efforts to complete the HIT,
and contains the results for later retrieval.
The Assignment data structure is used as a response element for the
following operations:
GetAssignment
GetAssignmentsForHIT
https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
"""
auto_approval_time: AwareDatetimeISO = Field(
description="If results have been submitted, AutoApprovalTime is the "
"date and time the results of the assignment results are "
"considered Approved automatically if they have not already "
"been explicitly approved or rejected by the Requester. "
"This value is derived from the auto-approval delay "
"specified by the Requester in the HIT. This value is "
"omitted from the assignment if the Worker has not yet "
"submitted results.",
)
accept_time: AwareDatetimeISO = Field(
description="The date and time the Worker accepted the assignment.",
)
submit_time: AwareDatetimeISO = Field(
description="The date and time the assignment was submitted. This value "
"is omitted from the assignment if the Worker has not yet "
"submitted results.",
)
approval_time: Optional[AwareDatetimeISO] = Field(
default=None,
description="The date and time the Requester approved the results. This "
"value is omitted from the assignment if the Requester has "
"not yet approved the results.",
)
rejection_time: Optional[AwareDatetimeISO] = Field(
default=None,
description="The date and time the Requester rejected the results.",
)
requester_feedback: Optional[str] = Field(
# Default: None. This field isn't returned with assignment data by
# default. To request this field, specify a response group of
# AssignmentFeedback. For information about response groups, see
# Common Parameters.
default=None,
min_length=3,
max_length=2_000,
help_text="The feedback string included with the call to the "
"ApproveAssignment operation or the RejectAssignment "
"operation, if the Requester approved or rejected the "
"assignment and specified feedback.",
)
answer_xml: Optional[str] = Field(default=None, exclude=True)
# GRL Specific
tsid: Optional[UUIDStr] = Field(default=None)
# --- Validators ---
@model_validator(mode="before")
def set_tsid(cls, values: dict):
if values.get("tsid") is None and (answer_xml := values.get("answer_xml")):
answer_dict = cls.parse_answer_xml(answer_xml)
tsid = answer_dict.get("tsid")
try:
values["tsid"] = TypeAdapter(UUIDStr).validate_python(tsid)
except ValidationError as e:
# Don't break the model validation if a baddie messes with the tsid in the answer.
logging.warning(e)
values["tsid"] = None
return values
@model_validator(mode="after")
def check_time_sequences(self) -> Self:
if self.accept_time > self.submit_time:
raise ValueError("Assignment times invalid")
return self
@model_validator(mode="after")
def check_answers_alignment(self) -> Self:
if self.answers_dict is None:
return self
if self.amt_worker_id != self.answers_dict["amt_worker_id"]:
raise ValueError("Assignment answer invalid worker_id")
if self.amt_assignment_id != self.answers_dict["amt_assignment_id"]:
raise ValueError("Assignment answer invalid amt_assignment_id")
if (
self.tsid
and self.answers_dict["tsid"]
and self.tsid != self.answers_dict["tsid"]
):
raise ValueError("Assignment answer invalid tsid")
return self
# --- Properties ---
@property
def answers_dict(self) -> Optional[AnswerDict]:
# See https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
# https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMechanicalTurkRequester/Concepts_NotificationsArticle.html
if self.answer_xml is None:
return None
return self.parse_answer_xml(self.answer_xml)
@staticmethod
def parse_answer_xml(answer_xml: str):
root = ElementTree.fromstring(answer_xml)
ns = {
"mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd"
}
res = {}
for a in root.findall("mt:Answer", ns):
name = a.find("mt:QuestionIdentifier", ns).text
value = a.find("mt:FreeText", ns).text
res[name] = value or ""
EXPECTED_KEYS = {"amt_assignment_id", "amt_worker_id", "tsid"}
# We don't want validation to fail if a baddie inserts or changes url
# params, which will result in new or missing keys. Amazon generates the xml
# so that should always be correct
# assert all(k in res for k in EXPECTED_KEYS), list(res.keys())
res = {k: v for k, v in res.items() if k in EXPECTED_KEYS}
return res
@classmethod
def from_amt_get_assignment(cls, data: AssignmentTypeDef) -> Self:
assignment = cls(
amt_assignment_id=data["AssignmentId"],
amt_hit_id=data["HITId"],
amt_worker_id=data["WorkerId"],
status=AssignmentStatus[data["AssignmentStatus"]],
auto_approval_time=data["AutoApprovalTime"].astimezone(tz=timezone.utc),
accept_time=data["AcceptTime"].astimezone(tz=timezone.utc),
submit_time=data["SubmitTime"].astimezone(tz=timezone.utc),
approval_time=(
data["ApprovalTime"].astimezone(tz=timezone.utc)
if data.get("ApprovalTime")
else None
),
rejection_time=(
data["RejectionTime"].astimezone(tz=timezone.utc)
if data.get("RejectionTime")
else None
),
answer_xml=data["Answer"],
requester_feedback=data.get("RequesterFeedback"),
)
return assignment
def to_stub(self) -> AssignmentStub:
return AssignmentStub.model_validate(
self.model_dump(include=set(AssignmentStub.model_fields.keys()))
)
# --- Methods ---
#
# def refresh(self) -> Self:
# from tasks.mtwerk.managers.assignment import AssignmentManager
# return AssignmentManager.fetch_by_id(self)
#
# def reject(self, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT):
# """
# Save in the database that the Assignment was rejected, and also
# Report to Amazon Mechanical Turk that this Assignment should be
# rejected
#
# TODO: can this only occur when the Assignment is in a certain status?
#
# :return:
# """
# now = datetime.now(tz=None)
#
# MYSQLC.execute_sql_query("""
# UPDATE `amt-jb`.`mtwerk_assignment`
# SET submit_time = %s, rejection_time = %s, status = %s,
# requester_feedback = %s
# WHERE assignment_id = %s""",
# params=[
# now, now,
# AssignmentStatus.Rejected.value,
# msg, self.id],
# commit=True)
#
# CLIENT.reject_assignment(
# AssignmentId=self.id,
# RequesterFeedback=msg)
#
# def approve(self, msg: str = "Approved."):
# """
# Report to Amazon Mechanical Turk that this Assignment should be
# approved
#
# TODO: can this only occur when the Assignment is in a certain status?
#
# :return:
# """
# CLIENT.approve_assignment(
# AssignmentId=self.id,
# RequesterFeedback=msg)
#
# def submit_and_complete_request(self) -> Optional[str]:
# """
# This approves the Assignment and issues the Reward
# amount (typically $.05)
#
# :return:
# """
# worker = self.worker
# amount = DecimalUSDDollars(self.hit.reward)
#
# # If successful, returns the cashout id, otherwise, returns None
# cashout: Optional[dict] = worker.cashout_request(
# amount=amount,
# cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD)
#
# if cashout is None or cashout.get('status') != PayoutStatus.PENDING:
# return None
#
# cashout_id: str = cashout[id]
#
# approval: Optional[dict] = Bonus.manage_pending_cashout(
# cashout_id=cashout_id,
# action=PayoutStatus.APPROVED)
#
# if approval is None or approval['status'] != PayoutStatus.APPROVED:
# return None
#
# completion: Optional[dict] = Bonus.manage_pending_cashout(
# cashout_id=cashout_id,
# action=PayoutStatus.COMPLETE)
#
# if completion is None or completion['status'] != PayoutStatus.COMPLETE:
# return None
#
# return cashout_id
#
# # --- ORM ---
#
# def model_dump_mysql(self, *args, **kwargs) -> dict:
# d = self.model_dump(mode='json', *args, **kwargs)
#
# d['auto_approval_time'] = self.auto_approval_time.replace(tzinfo=None)
# d['accept_time'] = self.accept_time.replace(tzinfo=None)
# d['submit_time'] = self.submit_time.replace(tzinfo=None)
#
# if self.approval_time:
# d['approval_time'] = self.approval_time.replace(tzinfo=None)
#
# if self.rejection_time:
# d['rejection_time'] = self.rejection_time.replace(tzinfo=None)
#
# # created is automatically added by the database
# d['created'] = self.created.replace(tzinfo=None)
#
# if self.modified:
# d['modified'] = self.modified.replace(tzinfo=None)
#
# d['tsid'] = self.answers.get('tsid')
#
# return d
#
# def save(self) -> bool:
# """
# Either INSERTS or UPDATES the Assignment instance to a Mysql
# record.
# """
#
# # We're modifying the record, so set the time to right now!
# self.modified = datetime.now(tz=timezone.utc)
#
# query = """
# INSERT `amt-jb`.`mtwerk_assignment` (
# id, worker_id, hit_id, status,
# auto_approval_time, accept_time, submit_time,
# approval_time, rejection_time,
# requester_feedback, created, modified, tsid
# )
# VALUES (
# %(id)s, %(worker_id)s, %(hit_id)s, %(status)s,
# %(auto_approval_time)s, %(accept_time)s, %(submit_time)s,
# %(approval_time)s, %(rejection_time)s,
# %(requester_feedback)s, %(created)s, %(modified)s, %(tsid)s
# )
# ON DUPLICATE KEY UPDATE
# worker_id = %(worker_id)s,
# hit_id = %(hit_id)s,
# status = %(status)s,
#
# auto_approval_time = %(auto_approval_time)s,
# accept_time = %(accept_time)s,
# submit_time = %(submit_time)s,
#
# approval_time = %(approval_time)s,
# rejection_time = %(rejection_time)s,
#
# requester_feedback = %(requester_feedback)s,
# -- Not going to update created just incase it changed
# -- in pydantic for some reason
# modified = %(modified)s,
# tsid = %(tsid)s
# """
#
# try:
# MYSQLC.execute_sql_query(query, params=self.model_dump_mysql(), commit=True)
# return True
#
# except Exception as e:
# return False
#
# REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"
|