diff options
Diffstat (limited to 'jb/models/assignment.py')
| -rw-r--r-- | jb/models/assignment.py | 388 |
1 files changed, 388 insertions, 0 deletions
diff --git a/jb/models/assignment.py b/jb/models/assignment.py new file mode 100644 index 0000000..39ae47c --- /dev/null +++ b/jb/models/assignment.py @@ -0,0 +1,388 @@ +import logging +from datetime import datetime, timezone +from typing import Optional, TypedDict +from xml.etree import ElementTree + +from mypy_boto3_mturk.type_defs import AssignmentTypeDef +from pydantic import ( + BaseModel, + Field, + ConfigDict, + model_validator, + PositiveInt, + computed_field, + TypeAdapter, + ValidationError, +) +from typing_extensions import Self + +from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr +from jb.models.definitions import AssignmentStatus + + +class AnswerDict(TypedDict): + amt_assignment_id: str + amt_worker_id: str + tsid: str + + +class AssignmentStub(BaseModel): + # todo: we need an "AssignmentStub" model that just has + # the IDs, this is used when a user accepts an assignment + # but hasn't submitted it yet. We want to create it in the db + # at that point. + + model_config = ConfigDict( + extra="forbid", + validate_assignment=True, + ) + + id: Optional[PositiveInt] = Field(default=None) + hit_id: Optional[PositiveInt] = Field(default=None) + amt_assignment_id: AMTBoto3ID = Field() + amt_hit_id: AMTBoto3ID = Field() + amt_worker_id: str = Field(min_length=3, max_length=50) + + status: AssignmentStatus = Field() + + # GRL Specific + created_at: AwareDatetimeISO = Field( + default_factory=lambda: datetime.now(tz=timezone.utc), + description="When this record was saved in the database", + ) + + modified_at: Optional[AwareDatetimeISO] = Field( + default_factory=lambda: datetime.now(tz=timezone.utc), + description="When this record was updated / modified in the database", + ) + + def to_postgres(self): + d = self.model_dump(mode="json") + return d + + +class Assignment(AssignmentStub): + """ + The Assignment data structure represents a single assignment of a HIT to + a Worker. The assignment tracks the Worker's efforts to complete the HIT, + and contains the results for later retrieval. + + The Assignment data structure is used as a response element for the + following operations: + + GetAssignment + GetAssignmentsForHIT + + https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html + """ + + auto_approval_time: AwareDatetimeISO = Field( + description="If results have been submitted, AutoApprovalTime is the " + "date and time the results of the assignment results are " + "considered Approved automatically if they have not already " + "been explicitly approved or rejected by the Requester. " + "This value is derived from the auto-approval delay " + "specified by the Requester in the HIT. This value is " + "omitted from the assignment if the Worker has not yet " + "submitted results.", + ) + + accept_time: AwareDatetimeISO = Field( + description="The date and time the Worker accepted the assignment.", + ) + + submit_time: AwareDatetimeISO = Field( + description="The date and time the assignment was submitted. This value " + "is omitted from the assignment if the Worker has not yet " + "submitted results.", + ) + + approval_time: Optional[AwareDatetimeISO] = Field( + default=None, + description="The date and time the Requester approved the results. This " + "value is omitted from the assignment if the Requester has " + "not yet approved the results.", + ) + rejection_time: Optional[AwareDatetimeISO] = Field( + default=None, + description="The date and time the Requester rejected the results.", + ) + + requester_feedback: Optional[str] = Field( + # Default: None. This field isn't returned with assignment data by + # default. To request this field, specify a response group of + # AssignmentFeedback. For information about response groups, see + # Common Parameters. + default=None, + min_length=3, + max_length=2_000, + help_text="The feedback string included with the call to the " + "ApproveAssignment operation or the RejectAssignment " + "operation, if the Requester approved or rejected the " + "assignment and specified feedback.", + ) + + answer_xml: Optional[str] = Field(default=None, exclude=True) + + # GRL Specific + + tsid: Optional[UUIDStr] = Field(default=None) + + # --- Validators --- + + @model_validator(mode="before") + def set_tsid(cls, values: dict): + if values.get("tsid") is None and (answer_xml := values.get("answer_xml")): + answer_dict = cls.parse_answer_xml(answer_xml) + tsid = answer_dict.get("tsid") + try: + values["tsid"] = TypeAdapter(UUIDStr).validate_python(tsid) + except ValidationError as e: + # Don't break the model validation if a baddie messes with the tsid in the answer. + logging.warning(e) + values["tsid"] = None + return values + + @model_validator(mode="after") + def check_time_sequences(self) -> Self: + if self.accept_time > self.submit_time: + raise ValueError("Assignment times invalid") + + return self + + @model_validator(mode="after") + def check_answers_alignment(self) -> Self: + if self.answers_dict is None: + return self + if self.amt_worker_id != self.answers_dict["amt_worker_id"]: + raise ValueError("Assignment answer invalid worker_id") + if self.amt_assignment_id != self.answers_dict["amt_assignment_id"]: + raise ValueError("Assignment answer invalid amt_assignment_id") + if ( + self.tsid + and self.answers_dict["tsid"] + and self.tsid != self.answers_dict["tsid"] + ): + raise ValueError("Assignment answer invalid tsid") + return self + + # --- Properties --- + + @property + def answers_dict(self) -> Optional[AnswerDict]: + # See https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html + # https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMechanicalTurkRequester/Concepts_NotificationsArticle.html + if self.answer_xml is None: + return None + + return self.parse_answer_xml(self.answer_xml) + + @staticmethod + def parse_answer_xml(answer_xml: str): + root = ElementTree.fromstring(answer_xml) + ns = { + "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd" + } + res = {} + + for a in root.findall("mt:Answer", ns): + name = a.find("mt:QuestionIdentifier", ns).text + value = a.find("mt:FreeText", ns).text + res[name] = value or "" + + EXPECTED_KEYS = {"amt_assignment_id", "amt_worker_id", "tsid"} + # We don't want validation to fail if a baddie inserts or changes url + # params, which will result in new or missing keys. Amazon generates the xml + # so that should always be correct + # assert all(k in res for k in EXPECTED_KEYS), list(res.keys()) + res = {k: v for k, v in res.items() if k in EXPECTED_KEYS} + return res + + @classmethod + def from_amt_get_assignment(cls, data: AssignmentTypeDef) -> Self: + assignment = cls( + amt_assignment_id=data["AssignmentId"], + amt_hit_id=data["HITId"], + amt_worker_id=data["WorkerId"], + status=AssignmentStatus[data["AssignmentStatus"]], + auto_approval_time=data["AutoApprovalTime"].astimezone(tz=timezone.utc), + accept_time=data["AcceptTime"].astimezone(tz=timezone.utc), + submit_time=data["SubmitTime"].astimezone(tz=timezone.utc), + approval_time=( + data["ApprovalTime"].astimezone(tz=timezone.utc) + if data.get("ApprovalTime") + else None + ), + rejection_time=( + data["RejectionTime"].astimezone(tz=timezone.utc) + if data.get("RejectionTime") + else None + ), + answer_xml=data["Answer"], + requester_feedback=data.get("RequesterFeedback"), + ) + return assignment + + def to_stub(self) -> AssignmentStub: + return AssignmentStub.model_validate( + self.model_dump(include=set(AssignmentStub.model_fields.keys())) + ) + + # --- Methods --- + # + # def refresh(self) -> Self: + # from tasks.mtwerk.managers.assignment import AssignmentManager + # return AssignmentManager.fetch_by_id(self) + # + # def reject(self, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT): + # """ + # Save in the database that the Assignment was rejected, and also + # Report to Amazon Mechanical Turk that this Assignment should be + # rejected + # + # TODO: can this only occur when the Assignment is in a certain status? + # + # :return: + # """ + # now = datetime.now(tz=None) + # + # MYSQLC.execute_sql_query(""" + # UPDATE `amt-jb`.`mtwerk_assignment` + # SET submit_time = %s, rejection_time = %s, status = %s, + # requester_feedback = %s + # WHERE assignment_id = %s""", + # params=[ + # now, now, + # AssignmentStatus.Rejected.value, + # msg, self.id], + # commit=True) + # + # CLIENT.reject_assignment( + # AssignmentId=self.id, + # RequesterFeedback=msg) + # + # def approve(self, msg: str = "Approved."): + # """ + # Report to Amazon Mechanical Turk that this Assignment should be + # approved + # + # TODO: can this only occur when the Assignment is in a certain status? + # + # :return: + # """ + # CLIENT.approve_assignment( + # AssignmentId=self.id, + # RequesterFeedback=msg) + # + # def submit_and_complete_request(self) -> Optional[str]: + # """ + # This approves the Assignment and issues the Reward + # amount (typically $.05) + # + # :return: + # """ + # worker = self.worker + # amount = DecimalUSDDollars(self.hit.reward) + # + # # If successful, returns the cashout id, otherwise, returns None + # cashout: Optional[dict] = worker.cashout_request( + # amount=amount, + # cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD) + # + # if cashout is None or cashout.get('status') != PayoutStatus.PENDING: + # return None + # + # cashout_id: str = cashout[id] + # + # approval: Optional[dict] = Bonus.manage_pending_cashout( + # cashout_id=cashout_id, + # action=PayoutStatus.APPROVED) + # + # if approval is None or approval['status'] != PayoutStatus.APPROVED: + # return None + # + # completion: Optional[dict] = Bonus.manage_pending_cashout( + # cashout_id=cashout_id, + # action=PayoutStatus.COMPLETE) + # + # if completion is None or completion['status'] != PayoutStatus.COMPLETE: + # return None + # + # return cashout_id + # + # # --- ORM --- + # + # def model_dump_mysql(self, *args, **kwargs) -> dict: + # d = self.model_dump(mode='json', *args, **kwargs) + # + # d['auto_approval_time'] = self.auto_approval_time.replace(tzinfo=None) + # d['accept_time'] = self.accept_time.replace(tzinfo=None) + # d['submit_time'] = self.submit_time.replace(tzinfo=None) + # + # if self.approval_time: + # d['approval_time'] = self.approval_time.replace(tzinfo=None) + # + # if self.rejection_time: + # d['rejection_time'] = self.rejection_time.replace(tzinfo=None) + # + # # created is automatically added by the database + # d['created'] = self.created.replace(tzinfo=None) + # + # if self.modified: + # d['modified'] = self.modified.replace(tzinfo=None) + # + # d['tsid'] = self.answers.get('tsid') + # + # return d + # + # def save(self) -> bool: + # """ + # Either INSERTS or UPDATES the Assignment instance to a Mysql + # record. + # """ + # + # # We're modifying the record, so set the time to right now! + # self.modified = datetime.now(tz=timezone.utc) + # + # query = """ + # INSERT `amt-jb`.`mtwerk_assignment` ( + # id, worker_id, hit_id, status, + # auto_approval_time, accept_time, submit_time, + # approval_time, rejection_time, + # requester_feedback, created, modified, tsid + # ) + # VALUES ( + # %(id)s, %(worker_id)s, %(hit_id)s, %(status)s, + # %(auto_approval_time)s, %(accept_time)s, %(submit_time)s, + # %(approval_time)s, %(rejection_time)s, + # %(requester_feedback)s, %(created)s, %(modified)s, %(tsid)s + # ) + # ON DUPLICATE KEY UPDATE + # worker_id = %(worker_id)s, + # hit_id = %(hit_id)s, + # status = %(status)s, + # + # auto_approval_time = %(auto_approval_time)s, + # accept_time = %(accept_time)s, + # submit_time = %(submit_time)s, + # + # approval_time = %(approval_time)s, + # rejection_time = %(rejection_time)s, + # + # requester_feedback = %(requester_feedback)s, + # -- Not going to update created just incase it changed + # -- in pydantic for some reason + # modified = %(modified)s, + # tsid = %(tsid)s + # """ + # + # try: + # MYSQLC.execute_sql_query(query, params=self.model_dump_mysql(), commit=True) + # return True + # + # except Exception as e: + # return False + # + + +# REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment" |
