diff options
| author | Max Nanis | 2026-02-19 02:43:23 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-19 02:43:23 -0500 |
| commit | f0f96f83c2630e890a2cbcab53f77fd4c37e1684 (patch) | |
| tree | c6d2cb092e76bf5d499e0ea9949508d6b22164fd /jb/models | |
| parent | 3eaa56f0306ead818f64c3d99fc6d230d9b970a4 (diff) | |
| download | amt-jb-master.tar.gz amt-jb-master.zip | |
Diffstat (limited to 'jb/models')
| -rw-r--r-- | jb/models/__init__.py | 40 | ||||
| -rw-r--r-- | jb/models/api_response.py | 17 | ||||
| -rw-r--r-- | jb/models/assignment.py | 388 | ||||
| -rw-r--r-- | jb/models/bonus.py | 48 | ||||
| -rw-r--r-- | jb/models/currency.py | 70 | ||||
| -rw-r--r-- | jb/models/custom_types.py | 113 | ||||
| -rw-r--r-- | jb/models/definitions.py | 90 | ||||
| -rw-r--r-- | jb/models/errors.py | 80 | ||||
| -rw-r--r-- | jb/models/event.py | 38 | ||||
| -rw-r--r-- | jb/models/hit.py | 251 |
10 files changed, 1135 insertions, 0 deletions
diff --git a/jb/models/__init__.py b/jb/models/__init__.py new file mode 100644 index 0000000..0aeae14 --- /dev/null +++ b/jb/models/__init__.py @@ -0,0 +1,40 @@ +from decimal import Decimal +from typing import Optional + +from pydantic import BaseModel, Field, ConfigDict + + +class HTTPHeaders(BaseModel): + request_id: str = Field(alias="x-amzn-requestid", min_length=36, max_length=36) + content_type: str = Field(alias="content-type", min_length=26, max_length=26) + # 'content-length': '1255', + content_length: str = Field(alias="content-length", min_length=2) + # 'Mon, 15 Jan 2024 23:40:32 GMT' + date: str = Field() + + connection: Optional[str] = Field(default=None) # 'close' + + +class ResponseMetadata(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + request_id: str = Field(alias="RequestId", min_length=36, max_length=36) + status_code: int = Field(alias="HTTPStatusCode", ge=200, le=599) + headers: HTTPHeaders = Field(alias="HTTPHeaders") + retry_attempts: int = Field(alias="RetryAttempts", ge=0) + + +class AMTAccount(BaseModel): + model_config = ConfigDict(extra="ignore", validate_assignment=True) + + # Remaining available AWS Billing usage if you have enabled AWS Billing. + available_balance: Decimal = Field() + onhold_balance: Decimal = Field(default=Decimal(0)) + + # --- Properties --- + + @property + def is_healthy(self) -> bool: + # A healthy account is one with at least $2,500 worth of + # credit available to it + return self.available_balance >= 2_500 diff --git a/jb/models/api_response.py b/jb/models/api_response.py new file mode 100644 index 0000000..6b29e51 --- /dev/null +++ b/jb/models/api_response.py @@ -0,0 +1,17 @@ +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from jb.models.assignment import Assignment +from jb.models.hit import Hit + + +class AssignmentResponse(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + assignment: Assignment = Field(alias="Assignment") + hit: Hit = Field(alias="HIT") + + @model_validator(mode="after") + def check_consistent_hit_id(self) -> "AssignmentResponse": + if self.hit.id != self.assignment.hit_id: + raise ValueError("Inconsistent Hit IDs") + return self diff --git a/jb/models/assignment.py b/jb/models/assignment.py new file mode 100644 index 0000000..39ae47c --- /dev/null +++ b/jb/models/assignment.py @@ -0,0 +1,388 @@ +import logging +from datetime import datetime, timezone +from typing import Optional, TypedDict +from xml.etree import ElementTree + +from mypy_boto3_mturk.type_defs import AssignmentTypeDef +from pydantic import ( + BaseModel, + Field, + ConfigDict, + model_validator, + PositiveInt, + computed_field, + TypeAdapter, + ValidationError, +) +from typing_extensions import Self + +from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr +from jb.models.definitions import AssignmentStatus + + +class AnswerDict(TypedDict): + amt_assignment_id: str + amt_worker_id: str + tsid: str + + +class AssignmentStub(BaseModel): + # todo: we need an "AssignmentStub" model that just has + # the IDs, this is used when a user accepts an assignment + # but hasn't submitted it yet. We want to create it in the db + # at that point. + + model_config = ConfigDict( + extra="forbid", + validate_assignment=True, + ) + + id: Optional[PositiveInt] = Field(default=None) + hit_id: Optional[PositiveInt] = Field(default=None) + amt_assignment_id: AMTBoto3ID = Field() + amt_hit_id: AMTBoto3ID = Field() + amt_worker_id: str = Field(min_length=3, max_length=50) + + status: AssignmentStatus = Field() + + # GRL Specific + created_at: AwareDatetimeISO = Field( + default_factory=lambda: datetime.now(tz=timezone.utc), + description="When this record was saved in the database", + ) + + modified_at: Optional[AwareDatetimeISO] = Field( + default_factory=lambda: datetime.now(tz=timezone.utc), + description="When this record was updated / modified in the database", + ) + + def to_postgres(self): + d = self.model_dump(mode="json") + return d + + +class Assignment(AssignmentStub): + """ + The Assignment data structure represents a single assignment of a HIT to + a Worker. The assignment tracks the Worker's efforts to complete the HIT, + and contains the results for later retrieval. + + The Assignment data structure is used as a response element for the + following operations: + + GetAssignment + GetAssignmentsForHIT + + https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html + """ + + auto_approval_time: AwareDatetimeISO = Field( + description="If results have been submitted, AutoApprovalTime is the " + "date and time the results of the assignment results are " + "considered Approved automatically if they have not already " + "been explicitly approved or rejected by the Requester. " + "This value is derived from the auto-approval delay " + "specified by the Requester in the HIT. This value is " + "omitted from the assignment if the Worker has not yet " + "submitted results.", + ) + + accept_time: AwareDatetimeISO = Field( + description="The date and time the Worker accepted the assignment.", + ) + + submit_time: AwareDatetimeISO = Field( + description="The date and time the assignment was submitted. This value " + "is omitted from the assignment if the Worker has not yet " + "submitted results.", + ) + + approval_time: Optional[AwareDatetimeISO] = Field( + default=None, + description="The date and time the Requester approved the results. This " + "value is omitted from the assignment if the Requester has " + "not yet approved the results.", + ) + rejection_time: Optional[AwareDatetimeISO] = Field( + default=None, + description="The date and time the Requester rejected the results.", + ) + + requester_feedback: Optional[str] = Field( + # Default: None. This field isn't returned with assignment data by + # default. To request this field, specify a response group of + # AssignmentFeedback. For information about response groups, see + # Common Parameters. + default=None, + min_length=3, + max_length=2_000, + help_text="The feedback string included with the call to the " + "ApproveAssignment operation or the RejectAssignment " + "operation, if the Requester approved or rejected the " + "assignment and specified feedback.", + ) + + answer_xml: Optional[str] = Field(default=None, exclude=True) + + # GRL Specific + + tsid: Optional[UUIDStr] = Field(default=None) + + # --- Validators --- + + @model_validator(mode="before") + def set_tsid(cls, values: dict): + if values.get("tsid") is None and (answer_xml := values.get("answer_xml")): + answer_dict = cls.parse_answer_xml(answer_xml) + tsid = answer_dict.get("tsid") + try: + values["tsid"] = TypeAdapter(UUIDStr).validate_python(tsid) + except ValidationError as e: + # Don't break the model validation if a baddie messes with the tsid in the answer. + logging.warning(e) + values["tsid"] = None + return values + + @model_validator(mode="after") + def check_time_sequences(self) -> Self: + if self.accept_time > self.submit_time: + raise ValueError("Assignment times invalid") + + return self + + @model_validator(mode="after") + def check_answers_alignment(self) -> Self: + if self.answers_dict is None: + return self + if self.amt_worker_id != self.answers_dict["amt_worker_id"]: + raise ValueError("Assignment answer invalid worker_id") + if self.amt_assignment_id != self.answers_dict["amt_assignment_id"]: + raise ValueError("Assignment answer invalid amt_assignment_id") + if ( + self.tsid + and self.answers_dict["tsid"] + and self.tsid != self.answers_dict["tsid"] + ): + raise ValueError("Assignment answer invalid tsid") + return self + + # --- Properties --- + + @property + def answers_dict(self) -> Optional[AnswerDict]: + # See https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html + # https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMechanicalTurkRequester/Concepts_NotificationsArticle.html + if self.answer_xml is None: + return None + + return self.parse_answer_xml(self.answer_xml) + + @staticmethod + def parse_answer_xml(answer_xml: str): + root = ElementTree.fromstring(answer_xml) + ns = { + "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd" + } + res = {} + + for a in root.findall("mt:Answer", ns): + name = a.find("mt:QuestionIdentifier", ns).text + value = a.find("mt:FreeText", ns).text + res[name] = value or "" + + EXPECTED_KEYS = {"amt_assignment_id", "amt_worker_id", "tsid"} + # We don't want validation to fail if a baddie inserts or changes url + # params, which will result in new or missing keys. Amazon generates the xml + # so that should always be correct + # assert all(k in res for k in EXPECTED_KEYS), list(res.keys()) + res = {k: v for k, v in res.items() if k in EXPECTED_KEYS} + return res + + @classmethod + def from_amt_get_assignment(cls, data: AssignmentTypeDef) -> Self: + assignment = cls( + amt_assignment_id=data["AssignmentId"], + amt_hit_id=data["HITId"], + amt_worker_id=data["WorkerId"], + status=AssignmentStatus[data["AssignmentStatus"]], + auto_approval_time=data["AutoApprovalTime"].astimezone(tz=timezone.utc), + accept_time=data["AcceptTime"].astimezone(tz=timezone.utc), + submit_time=data["SubmitTime"].astimezone(tz=timezone.utc), + approval_time=( + data["ApprovalTime"].astimezone(tz=timezone.utc) + if data.get("ApprovalTime") + else None + ), + rejection_time=( + data["RejectionTime"].astimezone(tz=timezone.utc) + if data.get("RejectionTime") + else None + ), + answer_xml=data["Answer"], + requester_feedback=data.get("RequesterFeedback"), + ) + return assignment + + def to_stub(self) -> AssignmentStub: + return AssignmentStub.model_validate( + self.model_dump(include=set(AssignmentStub.model_fields.keys())) + ) + + # --- Methods --- + # + # def refresh(self) -> Self: + # from tasks.mtwerk.managers.assignment import AssignmentManager + # return AssignmentManager.fetch_by_id(self) + # + # def reject(self, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT): + # """ + # Save in the database that the Assignment was rejected, and also + # Report to Amazon Mechanical Turk that this Assignment should be + # rejected + # + # TODO: can this only occur when the Assignment is in a certain status? + # + # :return: + # """ + # now = datetime.now(tz=None) + # + # MYSQLC.execute_sql_query(""" + # UPDATE `amt-jb`.`mtwerk_assignment` + # SET submit_time = %s, rejection_time = %s, status = %s, + # requester_feedback = %s + # WHERE assignment_id = %s""", + # params=[ + # now, now, + # AssignmentStatus.Rejected.value, + # msg, self.id], + # commit=True) + # + # CLIENT.reject_assignment( + # AssignmentId=self.id, + # RequesterFeedback=msg) + # + # def approve(self, msg: str = "Approved."): + # """ + # Report to Amazon Mechanical Turk that this Assignment should be + # approved + # + # TODO: can this only occur when the Assignment is in a certain status? + # + # :return: + # """ + # CLIENT.approve_assignment( + # AssignmentId=self.id, + # RequesterFeedback=msg) + # + # def submit_and_complete_request(self) -> Optional[str]: + # """ + # This approves the Assignment and issues the Reward + # amount (typically $.05) + # + # :return: + # """ + # worker = self.worker + # amount = DecimalUSDDollars(self.hit.reward) + # + # # If successful, returns the cashout id, otherwise, returns None + # cashout: Optional[dict] = worker.cashout_request( + # amount=amount, + # cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD) + # + # if cashout is None or cashout.get('status') != PayoutStatus.PENDING: + # return None + # + # cashout_id: str = cashout[id] + # + # approval: Optional[dict] = Bonus.manage_pending_cashout( + # cashout_id=cashout_id, + # action=PayoutStatus.APPROVED) + # + # if approval is None or approval['status'] != PayoutStatus.APPROVED: + # return None + # + # completion: Optional[dict] = Bonus.manage_pending_cashout( + # cashout_id=cashout_id, + # action=PayoutStatus.COMPLETE) + # + # if completion is None or completion['status'] != PayoutStatus.COMPLETE: + # return None + # + # return cashout_id + # + # # --- ORM --- + # + # def model_dump_mysql(self, *args, **kwargs) -> dict: + # d = self.model_dump(mode='json', *args, **kwargs) + # + # d['auto_approval_time'] = self.auto_approval_time.replace(tzinfo=None) + # d['accept_time'] = self.accept_time.replace(tzinfo=None) + # d['submit_time'] = self.submit_time.replace(tzinfo=None) + # + # if self.approval_time: + # d['approval_time'] = self.approval_time.replace(tzinfo=None) + # + # if self.rejection_time: + # d['rejection_time'] = self.rejection_time.replace(tzinfo=None) + # + # # created is automatically added by the database + # d['created'] = self.created.replace(tzinfo=None) + # + # if self.modified: + # d['modified'] = self.modified.replace(tzinfo=None) + # + # d['tsid'] = self.answers.get('tsid') + # + # return d + # + # def save(self) -> bool: + # """ + # Either INSERTS or UPDATES the Assignment instance to a Mysql + # record. + # """ + # + # # We're modifying the record, so set the time to right now! + # self.modified = datetime.now(tz=timezone.utc) + # + # query = """ + # INSERT `amt-jb`.`mtwerk_assignment` ( + # id, worker_id, hit_id, status, + # auto_approval_time, accept_time, submit_time, + # approval_time, rejection_time, + # requester_feedback, created, modified, tsid + # ) + # VALUES ( + # %(id)s, %(worker_id)s, %(hit_id)s, %(status)s, + # %(auto_approval_time)s, %(accept_time)s, %(submit_time)s, + # %(approval_time)s, %(rejection_time)s, + # %(requester_feedback)s, %(created)s, %(modified)s, %(tsid)s + # ) + # ON DUPLICATE KEY UPDATE + # worker_id = %(worker_id)s, + # hit_id = %(hit_id)s, + # status = %(status)s, + # + # auto_approval_time = %(auto_approval_time)s, + # accept_time = %(accept_time)s, + # submit_time = %(submit_time)s, + # + # approval_time = %(approval_time)s, + # rejection_time = %(rejection_time)s, + # + # requester_feedback = %(requester_feedback)s, + # -- Not going to update created just incase it changed + # -- in pydantic for some reason + # modified = %(modified)s, + # tsid = %(tsid)s + # """ + # + # try: + # MYSQLC.execute_sql_query(query, params=self.model_dump_mysql(), commit=True) + # return True + # + # except Exception as e: + # return False + # + + +# REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment" diff --git a/jb/models/bonus.py b/jb/models/bonus.py new file mode 100644 index 0000000..564a32d --- /dev/null +++ b/jb/models/bonus.py @@ -0,0 +1,48 @@ +from typing import Optional, Dict + +from pydantic import BaseModel, Field, ConfigDict, PositiveInt +from typing_extensions import Self + +from jb.models.currency import USDCent +from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr +from jb.models.definitions import PayoutStatus + + +class Bonus(BaseModel): + """ + A Bonus is created (in our DB) ONLY associated with an APPROVED + thl-payout-event, AFTER the bonus has actually been sent to + the worker. + We have the payout_event uuid as the unique request token to make + sure it only gets sent once (param in the boto request). + """ + + model_config = ConfigDict( + extra="forbid", + validate_assignment=True, + ) + id: Optional[PositiveInt] = Field(default=None) + assignment_id: Optional[PositiveInt] = Field(default=None) + + amt_worker_id: str = Field(min_length=3, max_length=50) + amt_assignment_id: AMTBoto3ID = Field() + + amount: USDCent = Field() + reason: str = Field(min_length=5) + grant_time: AwareDatetimeISO = Field() + + # -- GRL Specific --- + payout_event_id: UUIDStr = Field() + # created: Optional[AwareDatetimeISO] = Field(default=None) + + def to_postgres(self): + d = self.model_dump(mode="json") + d["amount"] = self.amount.to_usd() + return d + + @classmethod + def from_postgres(cls, data: Dict) -> Self: + data["amount"] = USDCent(round(data["amount"] * 100)) + fields = set(cls.model_fields.keys()) + data = {k: v for k, v in data.items() if k in fields} + return cls.model_validate(data) diff --git a/jb/models/currency.py b/jb/models/currency.py new file mode 100644 index 0000000..3094e2a --- /dev/null +++ b/jb/models/currency.py @@ -0,0 +1,70 @@ +import warnings +from decimal import Decimal +from typing import Any + +from pydantic import GetCoreSchemaHandler, NonNegativeInt +from pydantic_core import CoreSchema, core_schema + + +class USDCent(int): + def __new__(cls, value, *args, **kwargs): + + if isinstance(value, float): + warnings.warn( + "USDCent init with a float. Rounding behavior may " "be unexpected" + ) + + if isinstance(value, Decimal): + warnings.warn( + "USDCent init with a Decimal. Rounding behavior may " "be unexpected" + ) + + if value < 0: + raise ValueError("USDCent not be less than zero") + + return super(cls, cls).__new__(cls, value) + + def __add__(self, other): + assert isinstance(other, USDCent) + res = super(USDCent, self).__add__(other) + return self.__class__(res) + + def __sub__(self, other): + assert isinstance(other, USDCent) + res = super(USDCent, self).__sub__(other) + return self.__class__(res) + + def __mul__(self, other): + assert isinstance(other, USDCent) + res = super(USDCent, self).__mul__(other) + return self.__class__(res) + + def __abs__(self): + res = super(USDCent, self).__abs__() + return self.__class__(res) + + def __truediv__(self, other): + raise ValueError("Division not allowed for USDCent") + + def __str__(self): + return "%d" % int(self) + + def __repr__(self): + return "USDCent(%d)" % int(self) + + @classmethod + def __get_pydantic_core_schema__( + cls, source_type: Any, handler: GetCoreSchemaHandler + ) -> CoreSchema: + """ + https://docs.pydantic.dev/latest/concepts/types/#customizing-validation-with-__get_pydantic_core_schema__ + """ + return core_schema.no_info_after_validator_function( + cls, handler(NonNegativeInt) + ) + + def to_usd(self) -> Decimal: + return Decimal(int(self) / 100).quantize(Decimal(".01")) + + def to_usd_str(self) -> str: + return "${:,.2f}".format(float(self.to_usd())) diff --git a/jb/models/custom_types.py b/jb/models/custom_types.py new file mode 100644 index 0000000..70bc5c1 --- /dev/null +++ b/jb/models/custom_types.py @@ -0,0 +1,113 @@ +import re +from datetime import datetime, timezone +from typing import Any, Optional +from uuid import UUID + +from pydantic import ( + AwareDatetime, + StringConstraints, + TypeAdapter, + HttpUrl, +) +from pydantic.functional_serializers import PlainSerializer +from pydantic.functional_validators import AfterValidator, BeforeValidator +from pydantic.networks import UrlConstraints +from pydantic_core import Url +from typing_extensions import Annotated + + +def convert_datetime_to_iso_8601_with_z_suffix(dt: datetime) -> str: + # By default, datetimes are serialized with the %f optional. We don't want that because + # then the deserialization fails if the datetime didn't have microseconds. + return dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + +def convert_str_dt(v: Any) -> Optional[AwareDatetime]: + # By default, pydantic is unable to handle tz-aware isoformat str. Attempt to parse a str + # that was dumped using the iso8601 format with Z suffix. + if v is not None and type(v) is str: + assert v.endswith("Z") and "T" in v, "invalid format" + return datetime.strptime(v, "%Y-%m-%dT%H:%M:%S.%fZ").replace( + tzinfo=timezone.utc + ) + return v + + +def assert_utc(v: AwareDatetime) -> AwareDatetime: + if isinstance(v, datetime): + assert v.tzinfo == timezone.utc, "Timezone is not UTC" + return v + + +# Our custom AwareDatetime that correctly serializes and deserializes +# to an ISO8601 str with timezone +AwareDatetimeISO = Annotated[ + AwareDatetime, + BeforeValidator(convert_str_dt), + AfterValidator(assert_utc), + PlainSerializer( + lambda x: x.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + when_used="json-unless-none", + ), +] + +# ISO 3166-1 alpha-2 (two-letter codes, lowercase) +# "Like" b/c it matches the format, but we're not explicitly checking +# it is one of our supported values. See models.thl.locales for that. +CountryISOLike = Annotated[ + str, StringConstraints(max_length=2, min_length=2, pattern=r"^[a-z]{2}$") +] +# 3-char ISO 639-2/B, lowercase +LanguageISOLike = Annotated[ + str, StringConstraints(max_length=3, min_length=3, pattern=r"^[a-z]{3}$") +] + + +def check_valid_uuid(v: str) -> str: + try: + assert UUID(v).hex == v + except Exception: + raise ValueError("Invalid UUID") + return v + + +# Our custom field that stores a UUID4 as the .hex string representation +UUIDStr = Annotated[ + str, + StringConstraints(min_length=32, max_length=32), + AfterValidator(check_valid_uuid), +] +# Accepts the non-hex representation and coerces +UUIDStrCoerce = Annotated[ + str, + StringConstraints(min_length=32, max_length=32), + BeforeValidator(lambda value: TypeAdapter(UUID).validate_python(value).hex), + AfterValidator(check_valid_uuid), +] + +# Same thing as UUIDStr with HttpUrl field. It is confusing that this +# is not a str https://github.com/pydantic/pydantic/discussions/6395 +HttpUrlStr = Annotated[ + str, + BeforeValidator(lambda value: str(TypeAdapter(HttpUrl).validate_python(value))), +] + +HttpsUrl = Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=["https"])] +HttpsUrlStr = Annotated[ + str, + BeforeValidator(lambda value: str(TypeAdapter(HttpsUrl).validate_python(value))), +] + + +def check_valid_amt_boto3_id(v: str) -> str: + # Test ids from amazon have 20 chars + if not re.fullmatch(r"[A-Z0-9]{20}|[A-Z0-9]{30}", v): + raise ValueError("Invalid AMT Boto3 ID") + return v + + +AMTBoto3ID = Annotated[ + str, + StringConstraints(min_length=20, max_length=30), + AfterValidator(check_valid_amt_boto3_id), +] diff --git a/jb/models/definitions.py b/jb/models/definitions.py new file mode 100644 index 0000000..a3d27ba --- /dev/null +++ b/jb/models/definitions.py @@ -0,0 +1,90 @@ +from enum import IntEnum, StrEnum + + +class AssignmentStatus(IntEnum): + # boto3.mturk specific + Submitted = 0 # same thing as Reviewable + Approved = 1 + Rejected = 2 + + # GRL specific + Accepted = 3 + PreviewState = 4 + # Invalid = 5 + # NotExist = 6 + + +class HitStatus(IntEnum): + """ + https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_HITDataStructureArticle.html + """ + + # Official boto3.mturk + Assignable = 0 + Unassignable = 1 + Reviewable = 2 + Reviewing = 3 + Disposed = 4 + + # GRL Specific + NotExist = 5 + + +class HitReviewStatus(IntEnum): + NotReviewed = 0 + MarkedForReview = 1 + ReviewedAppropriate = 2 + ReviewedInappropriate = 3 + + +class PayoutStatus(StrEnum): + """These are GRL's payout statuses""" + + # The user has requested a payout. The money is taken from their + # wallet. A PENDING request can either be APPROVED, REJECTED, or + # CANCELLED. We can also implicitly skip the APPROVED step and go + # straight to COMPLETE or FAILED. + PENDING = "PENDING" + # The request is approved (by us or automatically). Once approved, + # it can be FAILED or COMPLETE. + APPROVED = "APPROVED" + # The request is rejected. The user loses the money. + REJECTED = "REJECTED" + # The user requests to cancel the request, the money goes back into their wallet. + CANCELLED = "CANCELLED" + # The payment was approved, but failed within external payment provider. + # This is an "error" state, as the money won't have moved anywhere. A + # FAILED payment can be tried again and be COMPLETE. + FAILED = "FAILED" + # The payment was sent successfully and (usually) a fee was charged + # to us for it. + COMPLETE = "COMPLETE" + # Not supported # REFUNDED: I'm not sure if this is possible or + # if we'd want to allow it. + + +class ReportValue(IntEnum): + """ + The reason a user reported a task. + """ + + # Used to indicate the user exited the task without giving feedback + REASON_UNKNOWN = 0 + # Task is in the wrong language/country, unanswerable question, won't proceed to + # next question, loading forever, error message + TECHNICAL_ERROR = 1 + # Task ended (completed or failed, and showed the user some dialog + # indicating the task was over), but failed to redirect + NO_REDIRECT = 2 + # Asked for full name, home address, identity on another site, cc# + PRIVACY_INVASION = 3 + # Asked about children, employer, medical issues, drug use, STDs, etc. + UNCOMFORTABLE_TOPICS = 4 + # Asked to install software, signup/login to external site, access webcam, + # promise to pay using external site, etc. + ASKED_FOR_NOT_ALLOWED_ACTION = 5 + # Task doesn't work well on a mobile device + BAD_ON_MOBILE = 6 + # Too long, too boring, confusing, complicated, too many + # open-ended/free-response questions + DIDNT_LIKE = 7 diff --git a/jb/models/errors.py b/jb/models/errors.py new file mode 100644 index 0000000..94f5fbb --- /dev/null +++ b/jb/models/errors.py @@ -0,0 +1,80 @@ +import re +from enum import Enum + +from pydantic import BaseModel, Field, ConfigDict, model_validator + +from jb.models import ResponseMetadata + + +class BotoRequestErrorOperation(str, Enum): + GET_ASSIGNMENT = "GetAssignment" + GET_HIT = "GetHIT" + + +class TurkErrorCode(str, Enum): + # Unclear: maybe when it's new? + HIT_NOT_EXIST = "AWS.MechanicalTurk.HITDoesNotExist" + + # This seems to be for really old Assignments + # Also maybe when it's only a Preview? + # Happens 2+ years back, and also from past 24hrs + INVALID_ASSIGNEMENT_STATE = "AWS.MechanicalTurk.InvalidAssignmentState" + + # If random assignmentId is used + ASSIGNMENT_NOT_EXIST = "AWS.MechanicalTurk.AssignmentDoesNotExist" + + +class BotoRequestErrorResponseErrorCodes(str, Enum): + REQUEST_ERROR = "RequestError" + + +class BotoRequestErrorResponseError(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + message: str = Field(alias="Message") + code: BotoRequestErrorResponseErrorCodes = Field(alias="Code") + + +class BotoRequestErrorResponse(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + error: BotoRequestErrorResponseError = Field(alias="Error") + response_metadata: ResponseMetadata = Field(alias="ResponseMetadata") + message: str = Field(alias="Message", min_length=50) + error_code: TurkErrorCode = Field(alias="TurkErrorCode") + + @model_validator(mode="after") + def check_consistent_hit_id(self) -> "BotoRequestErrorResponse": + + match self.error_code: + case TurkErrorCode.HIT_NOT_EXIST: + if not re.match( + r"Hit [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message + ): + raise ValueError("Unknown message for TurkErrorCode.HIT_NOT_EXIST") + + case TurkErrorCode.INVALID_ASSIGNEMENT_STATE: + if not re.match( + r"This operation can be called with a status of: Reviewable,Approved,Rejected \(\d{13}\)", + self.message, + ): + raise ValueError( + "Unknown message for TurkErrorCode.INVALID_ASSIGNEMENT_STATE" + ) + + case TurkErrorCode.ASSIGNMENT_NOT_EXIST: + if not re.match( + r"Assignment [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message + ): + raise ValueError( + "Unknown message for TurkErrorCode.ASSIGNMENT_NOT_EXIST" + ) + + return self + + +class BotoRequestError(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + response: BotoRequestErrorResponse = Field() + operation_name: BotoRequestErrorOperation = Field() diff --git a/jb/models/event.py b/jb/models/event.py new file mode 100644 index 0000000..c357772 --- /dev/null +++ b/jb/models/event.py @@ -0,0 +1,38 @@ +from typing import Literal, Dict + +from mypy_boto3_mturk.literals import EventTypeType +from pydantic import BaseModel, Field + +from jb.models.custom_types import AwareDatetimeISO, AMTBoto3ID + + +class MTurkEvent(BaseModel): + """ + What AWS SNS will POST to our mturk_notifications endpoint (inside the request body) + """ + + event_type: EventTypeType = Field(example="AssignmentSubmitted") + event_timestamp: AwareDatetimeISO = Field(example="2025-10-16T18:45:51Z") + amt_hit_id: AMTBoto3ID = Field(example="12345678901234567890") + amt_assignment_id: str = Field( + max_length=64, example="1234567890123456789012345678901234567890" + ) + amt_hit_type_id: AMTBoto3ID = Field(example="09876543210987654321") + + @classmethod + def from_sns(cls, data: Dict): + return cls.model_validate( + { + "event_type": data["EventType"], + "event_timestamp": cls.fix_mturk_timestamp(data["EventTimestamp"]), + "amt_hit_id": data["HITId"], + "amt_assignment_id": data["AssignmentId"], + "amt_hit_type_id": data["HITTypeId"], + } + ) + + @staticmethod + def fix_mturk_timestamp(ts: str) -> str: + if ts.endswith("Z") and "." not in ts: + ts = ts[:-1] + ".000Z" + return ts diff --git a/jb/models/hit.py b/jb/models/hit.py new file mode 100644 index 0000000..c3734fa --- /dev/null +++ b/jb/models/hit.py @@ -0,0 +1,251 @@ +from datetime import datetime, timezone, timedelta +from typing import Optional, List, Dict +from uuid import uuid4 +from xml.etree import ElementTree + +from mypy_boto3_mturk.type_defs import HITTypeDef +from pydantic import ( + BaseModel, + Field, + PositiveInt, + ConfigDict, + NonNegativeInt, +) +from typing_extensions import Self + +from jb.models.currency import USDCent +from jb.models.custom_types import AMTBoto3ID, HttpsUrlStr, AwareDatetimeISO +from jb.models.definitions import HitStatus, HitReviewStatus + + +class HitQuestion(BaseModel): + id: Optional[PositiveInt] = Field(default=None) + + url: HttpsUrlStr = Field() + height: PositiveInt = Field(default=1_200, ge=100, le=4_000) + + # --- Properties --- + + def to_postgres(self): + return self.model_dump(mode="json") + + @property + def xml(self) -> str: + return f"""<?xml version="1.0" encoding="UTF-8"?> + <ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd"> + <ExternalURL>{str(self.url)}</ExternalURL> + <FrameHeight>{self.height}</FrameHeight> + </ExternalQuestion>""" + + +class HitTypeCommon(BaseModel): + """ + Fields on both the HitType and Hit + """ + + model_config = ConfigDict( + extra="forbid", validate_assignment=True, ser_json_timedelta="float" + ) + + title: str = Field( + min_length=3, + max_length=200, + description="The HIT post title that appears in the listing view", + ) + description: str = Field( + min_length=3, + max_length=2_000, + description="The expand more about textarea, has a max of 2000 characters", + ) + reward: USDCent = Field( + description="The amount of money the Requester will pay a Worker for successfully completing the HIT." + ) + + assignment_duration: timedelta = Field( + default=timedelta(minutes=90), + description="The amount of time, in seconds, that a Worker has to complete " + "the HIT after accepting it.", + ) + auto_approval_delay: timedelta = Field( + default=timedelta(days=7), + description="The number of seconds after an assignment for the HIT has " + "been submitted, after which the assignment is considered " + "Approved automatically unless the Requester explicitly " + "rejects it.", + ) + keywords: str = Field(min_length=3, max_length=999) + + +class HitType(HitTypeCommon): + """ + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk/client/create_hit_type.html + https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_CreateHITTypeOperation.html + """ + + id: Optional[PositiveInt] = Field(default=None) + amt_hit_type_id: Optional[AMTBoto3ID] = Field(default=None) + + # --- GRL Specific --- + min_active: NonNegativeInt = Field(default=0, le=100_000) + + def to_api_request_body(self): + return dict( + AutoApprovalDelayInSeconds=round(self.auto_approval_delay.total_seconds()), + AssignmentDurationInSeconds=round(self.assignment_duration.total_seconds()), + Reward=str(self.reward.to_usd()), + Title=self.title, + Keywords=self.keywords, + Description=self.description, + ) + + def to_postgres(self): + d = self.model_dump(mode="json") + d["reward"] = self.reward.to_usd() + return d + + @classmethod + def from_postgres(cls, data: Dict) -> Self: + data["reward"] = USDCent(round(data["reward"] * 100)) + return cls.model_validate(data) + + def generate_hit_amt_request(self, question: HitQuestion): + d = dict() + d["HITTypeId"] = self.amt_hit_type_id + d["MaxAssignments"] = 1 + d["LifetimeInSeconds"] = round(timedelta(days=14).total_seconds()) + d["Question"] = question.xml + d["UniqueRequestToken"] = uuid4().hex + return d + + +class Hit(HitTypeCommon): + model_config = ConfigDict( + extra="forbid", + validate_assignment=True, + ) + + id: Optional[PositiveInt] = Field(default=None) + hit_type_id: Optional[PositiveInt] = Field(default=None) + question_id: Optional[PositiveInt] = Field(default=None) + + amt_hit_id: AMTBoto3ID = Field() + amt_hit_type_id: AMTBoto3ID = Field() + amt_group_id: AMTBoto3ID = Field() + hit_question_xml: str = Field() + + status: HitStatus = Field() + review_status: HitReviewStatus = Field() + creation_time: AwareDatetimeISO = Field(default=None, description="From aws") + expiration: Optional[AwareDatetimeISO] = Field(default=None) + + # GRL Specific + created_at: AwareDatetimeISO = Field( + default_factory=lambda: datetime.now(tz=timezone.utc), + description="When this record was saved in the database", + ) + modified_at: AwareDatetimeISO = Field( + default_factory=lambda: datetime.now(tz=timezone.utc), + description="When this record was last modified", + ) + + # -- Hit specific + + qualification_requirements: Optional[List[Dict]] = Field(default=None) + max_assignments: int = Field() + + # # this comes back as expiration. only for the request + # lifetime: timedelta = Field( + # default=timedelta(days=14), + # description="An amount of time, in seconds, after which the HIT is no longer " + # "available for users to accept.", + # ) + assignment_pending_count: NonNegativeInt = Field() + assignment_available_count: NonNegativeInt = Field() + assignment_completed_count: NonNegativeInt = Field() + + @classmethod + def from_amt_create_hit( + cls, data: HITTypeDef, question: HitQuestion, hit_type: HitType + ) -> Self: + assert question.id is not None + assert hit_type.id is not None + assert hit_type.amt_hit_type_id is not None + + h = Hit.model_validate( + dict( + amt_hit_id=data["HITId"], + amt_hit_type_id=data["HITTypeId"], + amt_group_id=data["HITGroupId"], + status=HitStatus[data["HITStatus"]], + review_status=HitReviewStatus[data["HITReviewStatus"]], + creation_time=data["CreationTime"].astimezone(tz=timezone.utc), + expiration=data["Expiration"].astimezone(tz=timezone.utc), + hit_question_xml=data["Question"], + qualification_requirements=data["QualificationRequirements"], + max_assignments=data["MaxAssignments"], + assignment_pending_count=data["NumberOfAssignmentsPending"], + assignment_available_count=data["NumberOfAssignmentsAvailable"], + assignment_completed_count=data["NumberOfAssignmentsCompleted"], + description=data["Description"], + keywords=data["Keywords"], + reward=USDCent(round(float(data["Reward"]) * 100)), + title=data["Title"], + question_id=question.id, + hit_type_id=hit_type.id, + ) + ) + return h + + @classmethod + def from_amt_get_hit(cls, data: HITTypeDef) -> Self: + h = Hit.model_validate( + dict( + amt_hit_id=data["HITId"], + amt_hit_type_id=data["HITTypeId"], + amt_group_id=data["HITGroupId"], + status=HitStatus[data["HITStatus"]], + review_status=HitReviewStatus[data["HITReviewStatus"]], + creation_time=data["CreationTime"].astimezone(tz=timezone.utc), + expiration=data["Expiration"].astimezone(tz=timezone.utc), + hit_question_xml=data["Question"], + qualification_requirements=data["QualificationRequirements"], + max_assignments=data["MaxAssignments"], + assignment_pending_count=data["NumberOfAssignmentsPending"], + assignment_available_count=data["NumberOfAssignmentsAvailable"], + assignment_completed_count=data["NumberOfAssignmentsCompleted"], + description=data["Description"], + keywords=data["Keywords"], + reward=USDCent(round(float(data["Reward"]) * 100)), + title=data["Title"], + question_id=None, + hit_type_id=None, + ) + ) + return h + + def to_postgres(self): + d = self.model_dump(mode="json") + d["reward"] = self.reward.to_usd() + return d + + @classmethod + def from_postgres(cls, data: Dict) -> Self: + data["reward"] = USDCent(round(data["reward"] * 100)) + return cls.model_validate(data) + + @property + def hit_question(self) -> HitQuestion: + root = ElementTree.fromstring(self.hit_question_xml) + + ns = { + "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd" + } + res = {} + + lookup_table = dict(ExternalURL="url", FrameHeight="height") + for a in root.findall("mt:*", ns): + key = lookup_table[a.tag.split("}")[1]] + val = a.text + res[key] = val + + return HitQuestion.model_validate(res, from_attributes=True) |
