summaryrefslogtreecommitdiff
path: root/jb/models/assignment.py
diff options
context:
space:
mode:
authorMax Nanis2026-02-19 02:43:23 -0500
committerMax Nanis2026-02-19 02:43:23 -0500
commitf0f96f83c2630e890a2cbcab53f77fd4c37e1684 (patch)
treec6d2cb092e76bf5d499e0ea9949508d6b22164fd /jb/models/assignment.py
parent3eaa56f0306ead818f64c3d99fc6d230d9b970a4 (diff)
downloadamt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.tar.gz
amt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.zip
Models, Project files, some pytests, requirements.. etcHEADmaster
Diffstat (limited to 'jb/models/assignment.py')
-rw-r--r--jb/models/assignment.py388
1 files changed, 388 insertions, 0 deletions
diff --git a/jb/models/assignment.py b/jb/models/assignment.py
new file mode 100644
index 0000000..39ae47c
--- /dev/null
+++ b/jb/models/assignment.py
@@ -0,0 +1,388 @@
+import logging
+from datetime import datetime, timezone
+from typing import Optional, TypedDict
+from xml.etree import ElementTree
+
+from mypy_boto3_mturk.type_defs import AssignmentTypeDef
+from pydantic import (
+ BaseModel,
+ Field,
+ ConfigDict,
+ model_validator,
+ PositiveInt,
+ computed_field,
+ TypeAdapter,
+ ValidationError,
+)
+from typing_extensions import Self
+
+from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr
+from jb.models.definitions import AssignmentStatus
+
+
+class AnswerDict(TypedDict):
+ amt_assignment_id: str
+ amt_worker_id: str
+ tsid: str
+
+
+class AssignmentStub(BaseModel):
+ # todo: we need an "AssignmentStub" model that just has
+ # the IDs, this is used when a user accepts an assignment
+ # but hasn't submitted it yet. We want to create it in the db
+ # at that point.
+
+ model_config = ConfigDict(
+ extra="forbid",
+ validate_assignment=True,
+ )
+
+ id: Optional[PositiveInt] = Field(default=None)
+ hit_id: Optional[PositiveInt] = Field(default=None)
+ amt_assignment_id: AMTBoto3ID = Field()
+ amt_hit_id: AMTBoto3ID = Field()
+ amt_worker_id: str = Field(min_length=3, max_length=50)
+
+ status: AssignmentStatus = Field()
+
+ # GRL Specific
+ created_at: AwareDatetimeISO = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was saved in the database",
+ )
+
+ modified_at: Optional[AwareDatetimeISO] = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was updated / modified in the database",
+ )
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ return d
+
+
+class Assignment(AssignmentStub):
+ """
+ The Assignment data structure represents a single assignment of a HIT to
+ a Worker. The assignment tracks the Worker's efforts to complete the HIT,
+ and contains the results for later retrieval.
+
+ The Assignment data structure is used as a response element for the
+ following operations:
+
+ GetAssignment
+ GetAssignmentsForHIT
+
+ https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
+ """
+
+ auto_approval_time: AwareDatetimeISO = Field(
+ description="If results have been submitted, AutoApprovalTime is the "
+ "date and time the results of the assignment results are "
+ "considered Approved automatically if they have not already "
+ "been explicitly approved or rejected by the Requester. "
+ "This value is derived from the auto-approval delay "
+ "specified by the Requester in the HIT. This value is "
+ "omitted from the assignment if the Worker has not yet "
+ "submitted results.",
+ )
+
+ accept_time: AwareDatetimeISO = Field(
+ description="The date and time the Worker accepted the assignment.",
+ )
+
+ submit_time: AwareDatetimeISO = Field(
+ description="The date and time the assignment was submitted. This value "
+ "is omitted from the assignment if the Worker has not yet "
+ "submitted results.",
+ )
+
+ approval_time: Optional[AwareDatetimeISO] = Field(
+ default=None,
+ description="The date and time the Requester approved the results. This "
+ "value is omitted from the assignment if the Requester has "
+ "not yet approved the results.",
+ )
+ rejection_time: Optional[AwareDatetimeISO] = Field(
+ default=None,
+ description="The date and time the Requester rejected the results.",
+ )
+
+ requester_feedback: Optional[str] = Field(
+ # Default: None. This field isn't returned with assignment data by
+ # default. To request this field, specify a response group of
+ # AssignmentFeedback. For information about response groups, see
+ # Common Parameters.
+ default=None,
+ min_length=3,
+ max_length=2_000,
+ help_text="The feedback string included with the call to the "
+ "ApproveAssignment operation or the RejectAssignment "
+ "operation, if the Requester approved or rejected the "
+ "assignment and specified feedback.",
+ )
+
+ answer_xml: Optional[str] = Field(default=None, exclude=True)
+
+ # GRL Specific
+
+ tsid: Optional[UUIDStr] = Field(default=None)
+
+ # --- Validators ---
+
+ @model_validator(mode="before")
+ def set_tsid(cls, values: dict):
+ if values.get("tsid") is None and (answer_xml := values.get("answer_xml")):
+ answer_dict = cls.parse_answer_xml(answer_xml)
+ tsid = answer_dict.get("tsid")
+ try:
+ values["tsid"] = TypeAdapter(UUIDStr).validate_python(tsid)
+ except ValidationError as e:
+ # Don't break the model validation if a baddie messes with the tsid in the answer.
+ logging.warning(e)
+ values["tsid"] = None
+ return values
+
+ @model_validator(mode="after")
+ def check_time_sequences(self) -> Self:
+ if self.accept_time > self.submit_time:
+ raise ValueError("Assignment times invalid")
+
+ return self
+
+ @model_validator(mode="after")
+ def check_answers_alignment(self) -> Self:
+ if self.answers_dict is None:
+ return self
+ if self.amt_worker_id != self.answers_dict["amt_worker_id"]:
+ raise ValueError("Assignment answer invalid worker_id")
+ if self.amt_assignment_id != self.answers_dict["amt_assignment_id"]:
+ raise ValueError("Assignment answer invalid amt_assignment_id")
+ if (
+ self.tsid
+ and self.answers_dict["tsid"]
+ and self.tsid != self.answers_dict["tsid"]
+ ):
+ raise ValueError("Assignment answer invalid tsid")
+ return self
+
+ # --- Properties ---
+
+ @property
+ def answers_dict(self) -> Optional[AnswerDict]:
+ # See https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
+ # https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMechanicalTurkRequester/Concepts_NotificationsArticle.html
+ if self.answer_xml is None:
+ return None
+
+ return self.parse_answer_xml(self.answer_xml)
+
+ @staticmethod
+ def parse_answer_xml(answer_xml: str):
+ root = ElementTree.fromstring(answer_xml)
+ ns = {
+ "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd"
+ }
+ res = {}
+
+ for a in root.findall("mt:Answer", ns):
+ name = a.find("mt:QuestionIdentifier", ns).text
+ value = a.find("mt:FreeText", ns).text
+ res[name] = value or ""
+
+ EXPECTED_KEYS = {"amt_assignment_id", "amt_worker_id", "tsid"}
+ # We don't want validation to fail if a baddie inserts or changes url
+ # params, which will result in new or missing keys. Amazon generates the xml
+ # so that should always be correct
+ # assert all(k in res for k in EXPECTED_KEYS), list(res.keys())
+ res = {k: v for k, v in res.items() if k in EXPECTED_KEYS}
+ return res
+
+ @classmethod
+ def from_amt_get_assignment(cls, data: AssignmentTypeDef) -> Self:
+ assignment = cls(
+ amt_assignment_id=data["AssignmentId"],
+ amt_hit_id=data["HITId"],
+ amt_worker_id=data["WorkerId"],
+ status=AssignmentStatus[data["AssignmentStatus"]],
+ auto_approval_time=data["AutoApprovalTime"].astimezone(tz=timezone.utc),
+ accept_time=data["AcceptTime"].astimezone(tz=timezone.utc),
+ submit_time=data["SubmitTime"].astimezone(tz=timezone.utc),
+ approval_time=(
+ data["ApprovalTime"].astimezone(tz=timezone.utc)
+ if data.get("ApprovalTime")
+ else None
+ ),
+ rejection_time=(
+ data["RejectionTime"].astimezone(tz=timezone.utc)
+ if data.get("RejectionTime")
+ else None
+ ),
+ answer_xml=data["Answer"],
+ requester_feedback=data.get("RequesterFeedback"),
+ )
+ return assignment
+
+ def to_stub(self) -> AssignmentStub:
+ return AssignmentStub.model_validate(
+ self.model_dump(include=set(AssignmentStub.model_fields.keys()))
+ )
+
+ # --- Methods ---
+ #
+ # def refresh(self) -> Self:
+ # from tasks.mtwerk.managers.assignment import AssignmentManager
+ # return AssignmentManager.fetch_by_id(self)
+ #
+ # def reject(self, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT):
+ # """
+ # Save in the database that the Assignment was rejected, and also
+ # Report to Amazon Mechanical Turk that this Assignment should be
+ # rejected
+ #
+ # TODO: can this only occur when the Assignment is in a certain status?
+ #
+ # :return:
+ # """
+ # now = datetime.now(tz=None)
+ #
+ # MYSQLC.execute_sql_query("""
+ # UPDATE `amt-jb`.`mtwerk_assignment`
+ # SET submit_time = %s, rejection_time = %s, status = %s,
+ # requester_feedback = %s
+ # WHERE assignment_id = %s""",
+ # params=[
+ # now, now,
+ # AssignmentStatus.Rejected.value,
+ # msg, self.id],
+ # commit=True)
+ #
+ # CLIENT.reject_assignment(
+ # AssignmentId=self.id,
+ # RequesterFeedback=msg)
+ #
+ # def approve(self, msg: str = "Approved."):
+ # """
+ # Report to Amazon Mechanical Turk that this Assignment should be
+ # approved
+ #
+ # TODO: can this only occur when the Assignment is in a certain status?
+ #
+ # :return:
+ # """
+ # CLIENT.approve_assignment(
+ # AssignmentId=self.id,
+ # RequesterFeedback=msg)
+ #
+ # def submit_and_complete_request(self) -> Optional[str]:
+ # """
+ # This approves the Assignment and issues the Reward
+ # amount (typically $.05)
+ #
+ # :return:
+ # """
+ # worker = self.worker
+ # amount = DecimalUSDDollars(self.hit.reward)
+ #
+ # # If successful, returns the cashout id, otherwise, returns None
+ # cashout: Optional[dict] = worker.cashout_request(
+ # amount=amount,
+ # cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD)
+ #
+ # if cashout is None or cashout.get('status') != PayoutStatus.PENDING:
+ # return None
+ #
+ # cashout_id: str = cashout[id]
+ #
+ # approval: Optional[dict] = Bonus.manage_pending_cashout(
+ # cashout_id=cashout_id,
+ # action=PayoutStatus.APPROVED)
+ #
+ # if approval is None or approval['status'] != PayoutStatus.APPROVED:
+ # return None
+ #
+ # completion: Optional[dict] = Bonus.manage_pending_cashout(
+ # cashout_id=cashout_id,
+ # action=PayoutStatus.COMPLETE)
+ #
+ # if completion is None or completion['status'] != PayoutStatus.COMPLETE:
+ # return None
+ #
+ # return cashout_id
+ #
+ # # --- ORM ---
+ #
+ # def model_dump_mysql(self, *args, **kwargs) -> dict:
+ # d = self.model_dump(mode='json', *args, **kwargs)
+ #
+ # d['auto_approval_time'] = self.auto_approval_time.replace(tzinfo=None)
+ # d['accept_time'] = self.accept_time.replace(tzinfo=None)
+ # d['submit_time'] = self.submit_time.replace(tzinfo=None)
+ #
+ # if self.approval_time:
+ # d['approval_time'] = self.approval_time.replace(tzinfo=None)
+ #
+ # if self.rejection_time:
+ # d['rejection_time'] = self.rejection_time.replace(tzinfo=None)
+ #
+ # # created is automatically added by the database
+ # d['created'] = self.created.replace(tzinfo=None)
+ #
+ # if self.modified:
+ # d['modified'] = self.modified.replace(tzinfo=None)
+ #
+ # d['tsid'] = self.answers.get('tsid')
+ #
+ # return d
+ #
+ # def save(self) -> bool:
+ # """
+ # Either INSERTS or UPDATES the Assignment instance to a Mysql
+ # record.
+ # """
+ #
+ # # We're modifying the record, so set the time to right now!
+ # self.modified = datetime.now(tz=timezone.utc)
+ #
+ # query = """
+ # INSERT `amt-jb`.`mtwerk_assignment` (
+ # id, worker_id, hit_id, status,
+ # auto_approval_time, accept_time, submit_time,
+ # approval_time, rejection_time,
+ # requester_feedback, created, modified, tsid
+ # )
+ # VALUES (
+ # %(id)s, %(worker_id)s, %(hit_id)s, %(status)s,
+ # %(auto_approval_time)s, %(accept_time)s, %(submit_time)s,
+ # %(approval_time)s, %(rejection_time)s,
+ # %(requester_feedback)s, %(created)s, %(modified)s, %(tsid)s
+ # )
+ # ON DUPLICATE KEY UPDATE
+ # worker_id = %(worker_id)s,
+ # hit_id = %(hit_id)s,
+ # status = %(status)s,
+ #
+ # auto_approval_time = %(auto_approval_time)s,
+ # accept_time = %(accept_time)s,
+ # submit_time = %(submit_time)s,
+ #
+ # approval_time = %(approval_time)s,
+ # rejection_time = %(rejection_time)s,
+ #
+ # requester_feedback = %(requester_feedback)s,
+ # -- Not going to update created just incase it changed
+ # -- in pydantic for some reason
+ # modified = %(modified)s,
+ # tsid = %(tsid)s
+ # """
+ #
+ # try:
+ # MYSQLC.execute_sql_query(query, params=self.model_dump_mysql(), commit=True)
+ # return True
+ #
+ # except Exception as e:
+ # return False
+ #
+
+
+# REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"