import logging from datetime import datetime, timezone from typing import Optional, TypedDict from xml.etree import ElementTree from mypy_boto3_mturk.type_defs import AssignmentTypeDef from pydantic import ( BaseModel, Field, ConfigDict, model_validator, PositiveInt, computed_field, TypeAdapter, ValidationError, ) from typing_extensions import Self from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr from jb.models.definitions import AssignmentStatus class AnswerDict(TypedDict): amt_assignment_id: str amt_worker_id: str tsid: str class AssignmentStub(BaseModel): # todo: we need an "AssignmentStub" model that just has # the IDs, this is used when a user accepts an assignment # but hasn't submitted it yet. We want to create it in the db # at that point. model_config = ConfigDict( extra="forbid", validate_assignment=True, ) id: Optional[PositiveInt] = Field(default=None) hit_id: Optional[PositiveInt] = Field(default=None) amt_assignment_id: AMTBoto3ID = Field() amt_hit_id: AMTBoto3ID = Field() amt_worker_id: str = Field(min_length=3, max_length=50) status: AssignmentStatus = Field() # GRL Specific created_at: AwareDatetimeISO = Field( default_factory=lambda: datetime.now(tz=timezone.utc), description="When this record was saved in the database", ) modified_at: Optional[AwareDatetimeISO] = Field( default_factory=lambda: datetime.now(tz=timezone.utc), description="When this record was updated / modified in the database", ) def to_postgres(self): d = self.model_dump(mode="json") return d class Assignment(AssignmentStub): """ The Assignment data structure represents a single assignment of a HIT to a Worker. The assignment tracks the Worker's efforts to complete the HIT, and contains the results for later retrieval. The Assignment data structure is used as a response element for the following operations: GetAssignment GetAssignmentsForHIT https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html """ auto_approval_time: AwareDatetimeISO = Field( description="If results have been submitted, AutoApprovalTime is the " "date and time the results of the assignment results are " "considered Approved automatically if they have not already " "been explicitly approved or rejected by the Requester. " "This value is derived from the auto-approval delay " "specified by the Requester in the HIT. This value is " "omitted from the assignment if the Worker has not yet " "submitted results.", ) accept_time: AwareDatetimeISO = Field( description="The date and time the Worker accepted the assignment.", ) submit_time: AwareDatetimeISO = Field( description="The date and time the assignment was submitted. This value " "is omitted from the assignment if the Worker has not yet " "submitted results.", ) approval_time: Optional[AwareDatetimeISO] = Field( default=None, description="The date and time the Requester approved the results. This " "value is omitted from the assignment if the Requester has " "not yet approved the results.", ) rejection_time: Optional[AwareDatetimeISO] = Field( default=None, description="The date and time the Requester rejected the results.", ) requester_feedback: Optional[str] = Field( # Default: None. This field isn't returned with assignment data by # default. To request this field, specify a response group of # AssignmentFeedback. For information about response groups, see # Common Parameters. default=None, min_length=3, max_length=2_000, help_text="The feedback string included with the call to the " "ApproveAssignment operation or the RejectAssignment " "operation, if the Requester approved or rejected the " "assignment and specified feedback.", ) answer_xml: Optional[str] = Field(default=None, exclude=True) # GRL Specific tsid: Optional[UUIDStr] = Field(default=None) # --- Validators --- @model_validator(mode="before") def set_tsid(cls, values: dict): if values.get("tsid") is None and (answer_xml := values.get("answer_xml")): answer_dict = cls.parse_answer_xml(answer_xml) tsid = answer_dict.get("tsid") try: values["tsid"] = TypeAdapter(UUIDStr).validate_python(tsid) except ValidationError as e: # Don't break the model validation if a baddie messes with the tsid in the answer. logging.warning(e) values["tsid"] = None return values @model_validator(mode="after") def check_time_sequences(self) -> Self: if self.accept_time > self.submit_time: raise ValueError("Assignment times invalid") return self @model_validator(mode="after") def check_answers_alignment(self) -> Self: if self.answers_dict is None: return self if self.amt_worker_id != self.answers_dict["amt_worker_id"]: raise ValueError("Assignment answer invalid worker_id") if self.amt_assignment_id != self.answers_dict["amt_assignment_id"]: raise ValueError("Assignment answer invalid amt_assignment_id") if ( self.tsid and self.answers_dict["tsid"] and self.tsid != self.answers_dict["tsid"] ): raise ValueError("Assignment answer invalid tsid") return self # --- Properties --- @property def answers_dict(self) -> Optional[AnswerDict]: # See https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html # https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMechanicalTurkRequester/Concepts_NotificationsArticle.html if self.answer_xml is None: return None return self.parse_answer_xml(self.answer_xml) @staticmethod def parse_answer_xml(answer_xml: str): root = ElementTree.fromstring(answer_xml) ns = { "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd" } res = {} for a in root.findall("mt:Answer", ns): name = a.find("mt:QuestionIdentifier", ns).text value = a.find("mt:FreeText", ns).text res[name] = value or "" EXPECTED_KEYS = {"amt_assignment_id", "amt_worker_id", "tsid"} # We don't want validation to fail if a baddie inserts or changes url # params, which will result in new or missing keys. Amazon generates the xml # so that should always be correct # assert all(k in res for k in EXPECTED_KEYS), list(res.keys()) res = {k: v for k, v in res.items() if k in EXPECTED_KEYS} return res @classmethod def from_amt_get_assignment(cls, data: AssignmentTypeDef) -> Self: assignment = cls( amt_assignment_id=data["AssignmentId"], amt_hit_id=data["HITId"], amt_worker_id=data["WorkerId"], status=AssignmentStatus[data["AssignmentStatus"]], auto_approval_time=data["AutoApprovalTime"].astimezone(tz=timezone.utc), accept_time=data["AcceptTime"].astimezone(tz=timezone.utc), submit_time=data["SubmitTime"].astimezone(tz=timezone.utc), approval_time=( data["ApprovalTime"].astimezone(tz=timezone.utc) if data.get("ApprovalTime") else None ), rejection_time=( data["RejectionTime"].astimezone(tz=timezone.utc) if data.get("RejectionTime") else None ), answer_xml=data["Answer"], requester_feedback=data.get("RequesterFeedback"), ) return assignment def to_stub(self) -> AssignmentStub: return AssignmentStub.model_validate( self.model_dump(include=set(AssignmentStub.model_fields.keys())) ) # --- Methods --- # # def refresh(self) -> Self: # from tasks.mtwerk.managers.assignment import AssignmentManager # return AssignmentManager.fetch_by_id(self) # # def reject(self, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT): # """ # Save in the database that the Assignment was rejected, and also # Report to Amazon Mechanical Turk that this Assignment should be # rejected # # TODO: can this only occur when the Assignment is in a certain status? # # :return: # """ # now = datetime.now(tz=None) # # MYSQLC.execute_sql_query(""" # UPDATE `amt-jb`.`mtwerk_assignment` # SET submit_time = %s, rejection_time = %s, status = %s, # requester_feedback = %s # WHERE assignment_id = %s""", # params=[ # now, now, # AssignmentStatus.Rejected.value, # msg, self.id], # commit=True) # # CLIENT.reject_assignment( # AssignmentId=self.id, # RequesterFeedback=msg) # # def approve(self, msg: str = "Approved."): # """ # Report to Amazon Mechanical Turk that this Assignment should be # approved # # TODO: can this only occur when the Assignment is in a certain status? # # :return: # """ # CLIENT.approve_assignment( # AssignmentId=self.id, # RequesterFeedback=msg) # # def submit_and_complete_request(self) -> Optional[str]: # """ # This approves the Assignment and issues the Reward # amount (typically $.05) # # :return: # """ # worker = self.worker # amount = DecimalUSDDollars(self.hit.reward) # # # If successful, returns the cashout id, otherwise, returns None # cashout: Optional[dict] = worker.cashout_request( # amount=amount, # cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD) # # if cashout is None or cashout.get('status') != PayoutStatus.PENDING: # return None # # cashout_id: str = cashout[id] # # approval: Optional[dict] = Bonus.manage_pending_cashout( # cashout_id=cashout_id, # action=PayoutStatus.APPROVED) # # if approval is None or approval['status'] != PayoutStatus.APPROVED: # return None # # completion: Optional[dict] = Bonus.manage_pending_cashout( # cashout_id=cashout_id, # action=PayoutStatus.COMPLETE) # # if completion is None or completion['status'] != PayoutStatus.COMPLETE: # return None # # return cashout_id # # # --- ORM --- # # def model_dump_mysql(self, *args, **kwargs) -> dict: # d = self.model_dump(mode='json', *args, **kwargs) # # d['auto_approval_time'] = self.auto_approval_time.replace(tzinfo=None) # d['accept_time'] = self.accept_time.replace(tzinfo=None) # d['submit_time'] = self.submit_time.replace(tzinfo=None) # # if self.approval_time: # d['approval_time'] = self.approval_time.replace(tzinfo=None) # # if self.rejection_time: # d['rejection_time'] = self.rejection_time.replace(tzinfo=None) # # # created is automatically added by the database # d['created'] = self.created.replace(tzinfo=None) # # if self.modified: # d['modified'] = self.modified.replace(tzinfo=None) # # d['tsid'] = self.answers.get('tsid') # # return d # # def save(self) -> bool: # """ # Either INSERTS or UPDATES the Assignment instance to a Mysql # record. # """ # # # We're modifying the record, so set the time to right now! # self.modified = datetime.now(tz=timezone.utc) # # query = """ # INSERT `amt-jb`.`mtwerk_assignment` ( # id, worker_id, hit_id, status, # auto_approval_time, accept_time, submit_time, # approval_time, rejection_time, # requester_feedback, created, modified, tsid # ) # VALUES ( # %(id)s, %(worker_id)s, %(hit_id)s, %(status)s, # %(auto_approval_time)s, %(accept_time)s, %(submit_time)s, # %(approval_time)s, %(rejection_time)s, # %(requester_feedback)s, %(created)s, %(modified)s, %(tsid)s # ) # ON DUPLICATE KEY UPDATE # worker_id = %(worker_id)s, # hit_id = %(hit_id)s, # status = %(status)s, # # auto_approval_time = %(auto_approval_time)s, # accept_time = %(accept_time)s, # submit_time = %(submit_time)s, # # approval_time = %(approval_time)s, # rejection_time = %(rejection_time)s, # # requester_feedback = %(requester_feedback)s, # -- Not going to update created just incase it changed # -- in pydantic for some reason # modified = %(modified)s, # tsid = %(tsid)s # """ # # try: # MYSQLC.execute_sql_query(query, params=self.model_dump_mysql(), commit=True) # return True # # except Exception as e: # return False # # REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"