summaryrefslogtreecommitdiff
path: root/jb/models
diff options
context:
space:
mode:
authorMax Nanis2026-02-19 02:43:23 -0500
committerMax Nanis2026-02-19 02:43:23 -0500
commitf0f96f83c2630e890a2cbcab53f77fd4c37e1684 (patch)
treec6d2cb092e76bf5d499e0ea9949508d6b22164fd /jb/models
parent3eaa56f0306ead818f64c3d99fc6d230d9b970a4 (diff)
downloadamt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.tar.gz
amt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.zip
Models, Project files, some pytests, requirements.. etcHEADmaster
Diffstat (limited to 'jb/models')
-rw-r--r--jb/models/__init__.py40
-rw-r--r--jb/models/api_response.py17
-rw-r--r--jb/models/assignment.py388
-rw-r--r--jb/models/bonus.py48
-rw-r--r--jb/models/currency.py70
-rw-r--r--jb/models/custom_types.py113
-rw-r--r--jb/models/definitions.py90
-rw-r--r--jb/models/errors.py80
-rw-r--r--jb/models/event.py38
-rw-r--r--jb/models/hit.py251
10 files changed, 1135 insertions, 0 deletions
diff --git a/jb/models/__init__.py b/jb/models/__init__.py
new file mode 100644
index 0000000..0aeae14
--- /dev/null
+++ b/jb/models/__init__.py
@@ -0,0 +1,40 @@
+from decimal import Decimal
+from typing import Optional
+
+from pydantic import BaseModel, Field, ConfigDict
+
+
+class HTTPHeaders(BaseModel):
+ request_id: str = Field(alias="x-amzn-requestid", min_length=36, max_length=36)
+ content_type: str = Field(alias="content-type", min_length=26, max_length=26)
+ # 'content-length': '1255',
+ content_length: str = Field(alias="content-length", min_length=2)
+ # 'Mon, 15 Jan 2024 23:40:32 GMT'
+ date: str = Field()
+
+ connection: Optional[str] = Field(default=None) # 'close'
+
+
+class ResponseMetadata(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ request_id: str = Field(alias="RequestId", min_length=36, max_length=36)
+ status_code: int = Field(alias="HTTPStatusCode", ge=200, le=599)
+ headers: HTTPHeaders = Field(alias="HTTPHeaders")
+ retry_attempts: int = Field(alias="RetryAttempts", ge=0)
+
+
+class AMTAccount(BaseModel):
+ model_config = ConfigDict(extra="ignore", validate_assignment=True)
+
+ # Remaining available AWS Billing usage if you have enabled AWS Billing.
+ available_balance: Decimal = Field()
+ onhold_balance: Decimal = Field(default=Decimal(0))
+
+ # --- Properties ---
+
+ @property
+ def is_healthy(self) -> bool:
+ # A healthy account is one with at least $2,500 worth of
+ # credit available to it
+ return self.available_balance >= 2_500
diff --git a/jb/models/api_response.py b/jb/models/api_response.py
new file mode 100644
index 0000000..6b29e51
--- /dev/null
+++ b/jb/models/api_response.py
@@ -0,0 +1,17 @@
+from pydantic import BaseModel, ConfigDict, Field, model_validator
+
+from jb.models.assignment import Assignment
+from jb.models.hit import Hit
+
+
+class AssignmentResponse(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ assignment: Assignment = Field(alias="Assignment")
+ hit: Hit = Field(alias="HIT")
+
+ @model_validator(mode="after")
+ def check_consistent_hit_id(self) -> "AssignmentResponse":
+ if self.hit.id != self.assignment.hit_id:
+ raise ValueError("Inconsistent Hit IDs")
+ return self
diff --git a/jb/models/assignment.py b/jb/models/assignment.py
new file mode 100644
index 0000000..39ae47c
--- /dev/null
+++ b/jb/models/assignment.py
@@ -0,0 +1,388 @@
+import logging
+from datetime import datetime, timezone
+from typing import Optional, TypedDict
+from xml.etree import ElementTree
+
+from mypy_boto3_mturk.type_defs import AssignmentTypeDef
+from pydantic import (
+ BaseModel,
+ Field,
+ ConfigDict,
+ model_validator,
+ PositiveInt,
+ computed_field,
+ TypeAdapter,
+ ValidationError,
+)
+from typing_extensions import Self
+
+from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr
+from jb.models.definitions import AssignmentStatus
+
+
+class AnswerDict(TypedDict):
+ amt_assignment_id: str
+ amt_worker_id: str
+ tsid: str
+
+
+class AssignmentStub(BaseModel):
+ # todo: we need an "AssignmentStub" model that just has
+ # the IDs, this is used when a user accepts an assignment
+ # but hasn't submitted it yet. We want to create it in the db
+ # at that point.
+
+ model_config = ConfigDict(
+ extra="forbid",
+ validate_assignment=True,
+ )
+
+ id: Optional[PositiveInt] = Field(default=None)
+ hit_id: Optional[PositiveInt] = Field(default=None)
+ amt_assignment_id: AMTBoto3ID = Field()
+ amt_hit_id: AMTBoto3ID = Field()
+ amt_worker_id: str = Field(min_length=3, max_length=50)
+
+ status: AssignmentStatus = Field()
+
+ # GRL Specific
+ created_at: AwareDatetimeISO = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was saved in the database",
+ )
+
+ modified_at: Optional[AwareDatetimeISO] = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was updated / modified in the database",
+ )
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ return d
+
+
+class Assignment(AssignmentStub):
+ """
+ The Assignment data structure represents a single assignment of a HIT to
+ a Worker. The assignment tracks the Worker's efforts to complete the HIT,
+ and contains the results for later retrieval.
+
+ The Assignment data structure is used as a response element for the
+ following operations:
+
+ GetAssignment
+ GetAssignmentsForHIT
+
+ https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
+ """
+
+ auto_approval_time: AwareDatetimeISO = Field(
+ description="If results have been submitted, AutoApprovalTime is the "
+ "date and time the results of the assignment results are "
+ "considered Approved automatically if they have not already "
+ "been explicitly approved or rejected by the Requester. "
+ "This value is derived from the auto-approval delay "
+ "specified by the Requester in the HIT. This value is "
+ "omitted from the assignment if the Worker has not yet "
+ "submitted results.",
+ )
+
+ accept_time: AwareDatetimeISO = Field(
+ description="The date and time the Worker accepted the assignment.",
+ )
+
+ submit_time: AwareDatetimeISO = Field(
+ description="The date and time the assignment was submitted. This value "
+ "is omitted from the assignment if the Worker has not yet "
+ "submitted results.",
+ )
+
+ approval_time: Optional[AwareDatetimeISO] = Field(
+ default=None,
+ description="The date and time the Requester approved the results. This "
+ "value is omitted from the assignment if the Requester has "
+ "not yet approved the results.",
+ )
+ rejection_time: Optional[AwareDatetimeISO] = Field(
+ default=None,
+ description="The date and time the Requester rejected the results.",
+ )
+
+ requester_feedback: Optional[str] = Field(
+ # Default: None. This field isn't returned with assignment data by
+ # default. To request this field, specify a response group of
+ # AssignmentFeedback. For information about response groups, see
+ # Common Parameters.
+ default=None,
+ min_length=3,
+ max_length=2_000,
+ help_text="The feedback string included with the call to the "
+ "ApproveAssignment operation or the RejectAssignment "
+ "operation, if the Requester approved or rejected the "
+ "assignment and specified feedback.",
+ )
+
+ answer_xml: Optional[str] = Field(default=None, exclude=True)
+
+ # GRL Specific
+
+ tsid: Optional[UUIDStr] = Field(default=None)
+
+ # --- Validators ---
+
+ @model_validator(mode="before")
+ def set_tsid(cls, values: dict):
+ if values.get("tsid") is None and (answer_xml := values.get("answer_xml")):
+ answer_dict = cls.parse_answer_xml(answer_xml)
+ tsid = answer_dict.get("tsid")
+ try:
+ values["tsid"] = TypeAdapter(UUIDStr).validate_python(tsid)
+ except ValidationError as e:
+ # Don't break the model validation if a baddie messes with the tsid in the answer.
+ logging.warning(e)
+ values["tsid"] = None
+ return values
+
+ @model_validator(mode="after")
+ def check_time_sequences(self) -> Self:
+ if self.accept_time > self.submit_time:
+ raise ValueError("Assignment times invalid")
+
+ return self
+
+ @model_validator(mode="after")
+ def check_answers_alignment(self) -> Self:
+ if self.answers_dict is None:
+ return self
+ if self.amt_worker_id != self.answers_dict["amt_worker_id"]:
+ raise ValueError("Assignment answer invalid worker_id")
+ if self.amt_assignment_id != self.answers_dict["amt_assignment_id"]:
+ raise ValueError("Assignment answer invalid amt_assignment_id")
+ if (
+ self.tsid
+ and self.answers_dict["tsid"]
+ and self.tsid != self.answers_dict["tsid"]
+ ):
+ raise ValueError("Assignment answer invalid tsid")
+ return self
+
+ # --- Properties ---
+
+ @property
+ def answers_dict(self) -> Optional[AnswerDict]:
+ # See https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
+ # https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMechanicalTurkRequester/Concepts_NotificationsArticle.html
+ if self.answer_xml is None:
+ return None
+
+ return self.parse_answer_xml(self.answer_xml)
+
+ @staticmethod
+ def parse_answer_xml(answer_xml: str):
+ root = ElementTree.fromstring(answer_xml)
+ ns = {
+ "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd"
+ }
+ res = {}
+
+ for a in root.findall("mt:Answer", ns):
+ name = a.find("mt:QuestionIdentifier", ns).text
+ value = a.find("mt:FreeText", ns).text
+ res[name] = value or ""
+
+ EXPECTED_KEYS = {"amt_assignment_id", "amt_worker_id", "tsid"}
+ # We don't want validation to fail if a baddie inserts or changes url
+ # params, which will result in new or missing keys. Amazon generates the xml
+ # so that should always be correct
+ # assert all(k in res for k in EXPECTED_KEYS), list(res.keys())
+ res = {k: v for k, v in res.items() if k in EXPECTED_KEYS}
+ return res
+
+ @classmethod
+ def from_amt_get_assignment(cls, data: AssignmentTypeDef) -> Self:
+ assignment = cls(
+ amt_assignment_id=data["AssignmentId"],
+ amt_hit_id=data["HITId"],
+ amt_worker_id=data["WorkerId"],
+ status=AssignmentStatus[data["AssignmentStatus"]],
+ auto_approval_time=data["AutoApprovalTime"].astimezone(tz=timezone.utc),
+ accept_time=data["AcceptTime"].astimezone(tz=timezone.utc),
+ submit_time=data["SubmitTime"].astimezone(tz=timezone.utc),
+ approval_time=(
+ data["ApprovalTime"].astimezone(tz=timezone.utc)
+ if data.get("ApprovalTime")
+ else None
+ ),
+ rejection_time=(
+ data["RejectionTime"].astimezone(tz=timezone.utc)
+ if data.get("RejectionTime")
+ else None
+ ),
+ answer_xml=data["Answer"],
+ requester_feedback=data.get("RequesterFeedback"),
+ )
+ return assignment
+
+ def to_stub(self) -> AssignmentStub:
+ return AssignmentStub.model_validate(
+ self.model_dump(include=set(AssignmentStub.model_fields.keys()))
+ )
+
+ # --- Methods ---
+ #
+ # def refresh(self) -> Self:
+ # from tasks.mtwerk.managers.assignment import AssignmentManager
+ # return AssignmentManager.fetch_by_id(self)
+ #
+ # def reject(self, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT):
+ # """
+ # Save in the database that the Assignment was rejected, and also
+ # Report to Amazon Mechanical Turk that this Assignment should be
+ # rejected
+ #
+ # TODO: can this only occur when the Assignment is in a certain status?
+ #
+ # :return:
+ # """
+ # now = datetime.now(tz=None)
+ #
+ # MYSQLC.execute_sql_query("""
+ # UPDATE `amt-jb`.`mtwerk_assignment`
+ # SET submit_time = %s, rejection_time = %s, status = %s,
+ # requester_feedback = %s
+ # WHERE assignment_id = %s""",
+ # params=[
+ # now, now,
+ # AssignmentStatus.Rejected.value,
+ # msg, self.id],
+ # commit=True)
+ #
+ # CLIENT.reject_assignment(
+ # AssignmentId=self.id,
+ # RequesterFeedback=msg)
+ #
+ # def approve(self, msg: str = "Approved."):
+ # """
+ # Report to Amazon Mechanical Turk that this Assignment should be
+ # approved
+ #
+ # TODO: can this only occur when the Assignment is in a certain status?
+ #
+ # :return:
+ # """
+ # CLIENT.approve_assignment(
+ # AssignmentId=self.id,
+ # RequesterFeedback=msg)
+ #
+ # def submit_and_complete_request(self) -> Optional[str]:
+ # """
+ # This approves the Assignment and issues the Reward
+ # amount (typically $.05)
+ #
+ # :return:
+ # """
+ # worker = self.worker
+ # amount = DecimalUSDDollars(self.hit.reward)
+ #
+ # # If successful, returns the cashout id, otherwise, returns None
+ # cashout: Optional[dict] = worker.cashout_request(
+ # amount=amount,
+ # cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD)
+ #
+ # if cashout is None or cashout.get('status') != PayoutStatus.PENDING:
+ # return None
+ #
+ # cashout_id: str = cashout[id]
+ #
+ # approval: Optional[dict] = Bonus.manage_pending_cashout(
+ # cashout_id=cashout_id,
+ # action=PayoutStatus.APPROVED)
+ #
+ # if approval is None or approval['status'] != PayoutStatus.APPROVED:
+ # return None
+ #
+ # completion: Optional[dict] = Bonus.manage_pending_cashout(
+ # cashout_id=cashout_id,
+ # action=PayoutStatus.COMPLETE)
+ #
+ # if completion is None or completion['status'] != PayoutStatus.COMPLETE:
+ # return None
+ #
+ # return cashout_id
+ #
+ # # --- ORM ---
+ #
+ # def model_dump_mysql(self, *args, **kwargs) -> dict:
+ # d = self.model_dump(mode='json', *args, **kwargs)
+ #
+ # d['auto_approval_time'] = self.auto_approval_time.replace(tzinfo=None)
+ # d['accept_time'] = self.accept_time.replace(tzinfo=None)
+ # d['submit_time'] = self.submit_time.replace(tzinfo=None)
+ #
+ # if self.approval_time:
+ # d['approval_time'] = self.approval_time.replace(tzinfo=None)
+ #
+ # if self.rejection_time:
+ # d['rejection_time'] = self.rejection_time.replace(tzinfo=None)
+ #
+ # # created is automatically added by the database
+ # d['created'] = self.created.replace(tzinfo=None)
+ #
+ # if self.modified:
+ # d['modified'] = self.modified.replace(tzinfo=None)
+ #
+ # d['tsid'] = self.answers.get('tsid')
+ #
+ # return d
+ #
+ # def save(self) -> bool:
+ # """
+ # Either INSERTS or UPDATES the Assignment instance to a Mysql
+ # record.
+ # """
+ #
+ # # We're modifying the record, so set the time to right now!
+ # self.modified = datetime.now(tz=timezone.utc)
+ #
+ # query = """
+ # INSERT `amt-jb`.`mtwerk_assignment` (
+ # id, worker_id, hit_id, status,
+ # auto_approval_time, accept_time, submit_time,
+ # approval_time, rejection_time,
+ # requester_feedback, created, modified, tsid
+ # )
+ # VALUES (
+ # %(id)s, %(worker_id)s, %(hit_id)s, %(status)s,
+ # %(auto_approval_time)s, %(accept_time)s, %(submit_time)s,
+ # %(approval_time)s, %(rejection_time)s,
+ # %(requester_feedback)s, %(created)s, %(modified)s, %(tsid)s
+ # )
+ # ON DUPLICATE KEY UPDATE
+ # worker_id = %(worker_id)s,
+ # hit_id = %(hit_id)s,
+ # status = %(status)s,
+ #
+ # auto_approval_time = %(auto_approval_time)s,
+ # accept_time = %(accept_time)s,
+ # submit_time = %(submit_time)s,
+ #
+ # approval_time = %(approval_time)s,
+ # rejection_time = %(rejection_time)s,
+ #
+ # requester_feedback = %(requester_feedback)s,
+ # -- Not going to update created just incase it changed
+ # -- in pydantic for some reason
+ # modified = %(modified)s,
+ # tsid = %(tsid)s
+ # """
+ #
+ # try:
+ # MYSQLC.execute_sql_query(query, params=self.model_dump_mysql(), commit=True)
+ # return True
+ #
+ # except Exception as e:
+ # return False
+ #
+
+
+# REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"
diff --git a/jb/models/bonus.py b/jb/models/bonus.py
new file mode 100644
index 0000000..564a32d
--- /dev/null
+++ b/jb/models/bonus.py
@@ -0,0 +1,48 @@
+from typing import Optional, Dict
+
+from pydantic import BaseModel, Field, ConfigDict, PositiveInt
+from typing_extensions import Self
+
+from jb.models.currency import USDCent
+from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr
+from jb.models.definitions import PayoutStatus
+
+
+class Bonus(BaseModel):
+ """
+ A Bonus is created (in our DB) ONLY associated with an APPROVED
+ thl-payout-event, AFTER the bonus has actually been sent to
+ the worker.
+ We have the payout_event uuid as the unique request token to make
+ sure it only gets sent once (param in the boto request).
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ validate_assignment=True,
+ )
+ id: Optional[PositiveInt] = Field(default=None)
+ assignment_id: Optional[PositiveInt] = Field(default=None)
+
+ amt_worker_id: str = Field(min_length=3, max_length=50)
+ amt_assignment_id: AMTBoto3ID = Field()
+
+ amount: USDCent = Field()
+ reason: str = Field(min_length=5)
+ grant_time: AwareDatetimeISO = Field()
+
+ # -- GRL Specific ---
+ payout_event_id: UUIDStr = Field()
+ # created: Optional[AwareDatetimeISO] = Field(default=None)
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ d["amount"] = self.amount.to_usd()
+ return d
+
+ @classmethod
+ def from_postgres(cls, data: Dict) -> Self:
+ data["amount"] = USDCent(round(data["amount"] * 100))
+ fields = set(cls.model_fields.keys())
+ data = {k: v for k, v in data.items() if k in fields}
+ return cls.model_validate(data)
diff --git a/jb/models/currency.py b/jb/models/currency.py
new file mode 100644
index 0000000..3094e2a
--- /dev/null
+++ b/jb/models/currency.py
@@ -0,0 +1,70 @@
+import warnings
+from decimal import Decimal
+from typing import Any
+
+from pydantic import GetCoreSchemaHandler, NonNegativeInt
+from pydantic_core import CoreSchema, core_schema
+
+
+class USDCent(int):
+ def __new__(cls, value, *args, **kwargs):
+
+ if isinstance(value, float):
+ warnings.warn(
+ "USDCent init with a float. Rounding behavior may " "be unexpected"
+ )
+
+ if isinstance(value, Decimal):
+ warnings.warn(
+ "USDCent init with a Decimal. Rounding behavior may " "be unexpected"
+ )
+
+ if value < 0:
+ raise ValueError("USDCent not be less than zero")
+
+ return super(cls, cls).__new__(cls, value)
+
+ def __add__(self, other):
+ assert isinstance(other, USDCent)
+ res = super(USDCent, self).__add__(other)
+ return self.__class__(res)
+
+ def __sub__(self, other):
+ assert isinstance(other, USDCent)
+ res = super(USDCent, self).__sub__(other)
+ return self.__class__(res)
+
+ def __mul__(self, other):
+ assert isinstance(other, USDCent)
+ res = super(USDCent, self).__mul__(other)
+ return self.__class__(res)
+
+ def __abs__(self):
+ res = super(USDCent, self).__abs__()
+ return self.__class__(res)
+
+ def __truediv__(self, other):
+ raise ValueError("Division not allowed for USDCent")
+
+ def __str__(self):
+ return "%d" % int(self)
+
+ def __repr__(self):
+ return "USDCent(%d)" % int(self)
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls, source_type: Any, handler: GetCoreSchemaHandler
+ ) -> CoreSchema:
+ """
+ https://docs.pydantic.dev/latest/concepts/types/#customizing-validation-with-__get_pydantic_core_schema__
+ """
+ return core_schema.no_info_after_validator_function(
+ cls, handler(NonNegativeInt)
+ )
+
+ def to_usd(self) -> Decimal:
+ return Decimal(int(self) / 100).quantize(Decimal(".01"))
+
+ def to_usd_str(self) -> str:
+ return "${:,.2f}".format(float(self.to_usd()))
diff --git a/jb/models/custom_types.py b/jb/models/custom_types.py
new file mode 100644
index 0000000..70bc5c1
--- /dev/null
+++ b/jb/models/custom_types.py
@@ -0,0 +1,113 @@
+import re
+from datetime import datetime, timezone
+from typing import Any, Optional
+from uuid import UUID
+
+from pydantic import (
+ AwareDatetime,
+ StringConstraints,
+ TypeAdapter,
+ HttpUrl,
+)
+from pydantic.functional_serializers import PlainSerializer
+from pydantic.functional_validators import AfterValidator, BeforeValidator
+from pydantic.networks import UrlConstraints
+from pydantic_core import Url
+from typing_extensions import Annotated
+
+
+def convert_datetime_to_iso_8601_with_z_suffix(dt: datetime) -> str:
+ # By default, datetimes are serialized with the %f optional. We don't want that because
+ # then the deserialization fails if the datetime didn't have microseconds.
+ return dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+
+
+def convert_str_dt(v: Any) -> Optional[AwareDatetime]:
+ # By default, pydantic is unable to handle tz-aware isoformat str. Attempt to parse a str
+ # that was dumped using the iso8601 format with Z suffix.
+ if v is not None and type(v) is str:
+ assert v.endswith("Z") and "T" in v, "invalid format"
+ return datetime.strptime(v, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
+ tzinfo=timezone.utc
+ )
+ return v
+
+
+def assert_utc(v: AwareDatetime) -> AwareDatetime:
+ if isinstance(v, datetime):
+ assert v.tzinfo == timezone.utc, "Timezone is not UTC"
+ return v
+
+
+# Our custom AwareDatetime that correctly serializes and deserializes
+# to an ISO8601 str with timezone
+AwareDatetimeISO = Annotated[
+ AwareDatetime,
+ BeforeValidator(convert_str_dt),
+ AfterValidator(assert_utc),
+ PlainSerializer(
+ lambda x: x.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
+ when_used="json-unless-none",
+ ),
+]
+
+# ISO 3166-1 alpha-2 (two-letter codes, lowercase)
+# "Like" b/c it matches the format, but we're not explicitly checking
+# it is one of our supported values. See models.thl.locales for that.
+CountryISOLike = Annotated[
+ str, StringConstraints(max_length=2, min_length=2, pattern=r"^[a-z]{2}$")
+]
+# 3-char ISO 639-2/B, lowercase
+LanguageISOLike = Annotated[
+ str, StringConstraints(max_length=3, min_length=3, pattern=r"^[a-z]{3}$")
+]
+
+
+def check_valid_uuid(v: str) -> str:
+ try:
+ assert UUID(v).hex == v
+ except Exception:
+ raise ValueError("Invalid UUID")
+ return v
+
+
+# Our custom field that stores a UUID4 as the .hex string representation
+UUIDStr = Annotated[
+ str,
+ StringConstraints(min_length=32, max_length=32),
+ AfterValidator(check_valid_uuid),
+]
+# Accepts the non-hex representation and coerces
+UUIDStrCoerce = Annotated[
+ str,
+ StringConstraints(min_length=32, max_length=32),
+ BeforeValidator(lambda value: TypeAdapter(UUID).validate_python(value).hex),
+ AfterValidator(check_valid_uuid),
+]
+
+# Same thing as UUIDStr with HttpUrl field. It is confusing that this
+# is not a str https://github.com/pydantic/pydantic/discussions/6395
+HttpUrlStr = Annotated[
+ str,
+ BeforeValidator(lambda value: str(TypeAdapter(HttpUrl).validate_python(value))),
+]
+
+HttpsUrl = Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=["https"])]
+HttpsUrlStr = Annotated[
+ str,
+ BeforeValidator(lambda value: str(TypeAdapter(HttpsUrl).validate_python(value))),
+]
+
+
+def check_valid_amt_boto3_id(v: str) -> str:
+ # Test ids from amazon have 20 chars
+ if not re.fullmatch(r"[A-Z0-9]{20}|[A-Z0-9]{30}", v):
+ raise ValueError("Invalid AMT Boto3 ID")
+ return v
+
+
+AMTBoto3ID = Annotated[
+ str,
+ StringConstraints(min_length=20, max_length=30),
+ AfterValidator(check_valid_amt_boto3_id),
+]
diff --git a/jb/models/definitions.py b/jb/models/definitions.py
new file mode 100644
index 0000000..a3d27ba
--- /dev/null
+++ b/jb/models/definitions.py
@@ -0,0 +1,90 @@
+from enum import IntEnum, StrEnum
+
+
+class AssignmentStatus(IntEnum):
+ # boto3.mturk specific
+ Submitted = 0 # same thing as Reviewable
+ Approved = 1
+ Rejected = 2
+
+ # GRL specific
+ Accepted = 3
+ PreviewState = 4
+ # Invalid = 5
+ # NotExist = 6
+
+
+class HitStatus(IntEnum):
+ """
+ https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_HITDataStructureArticle.html
+ """
+
+ # Official boto3.mturk
+ Assignable = 0
+ Unassignable = 1
+ Reviewable = 2
+ Reviewing = 3
+ Disposed = 4
+
+ # GRL Specific
+ NotExist = 5
+
+
+class HitReviewStatus(IntEnum):
+ NotReviewed = 0
+ MarkedForReview = 1
+ ReviewedAppropriate = 2
+ ReviewedInappropriate = 3
+
+
+class PayoutStatus(StrEnum):
+ """These are GRL's payout statuses"""
+
+ # The user has requested a payout. The money is taken from their
+ # wallet. A PENDING request can either be APPROVED, REJECTED, or
+ # CANCELLED. We can also implicitly skip the APPROVED step and go
+ # straight to COMPLETE or FAILED.
+ PENDING = "PENDING"
+ # The request is approved (by us or automatically). Once approved,
+ # it can be FAILED or COMPLETE.
+ APPROVED = "APPROVED"
+ # The request is rejected. The user loses the money.
+ REJECTED = "REJECTED"
+ # The user requests to cancel the request, the money goes back into their wallet.
+ CANCELLED = "CANCELLED"
+ # The payment was approved, but failed within external payment provider.
+ # This is an "error" state, as the money won't have moved anywhere. A
+ # FAILED payment can be tried again and be COMPLETE.
+ FAILED = "FAILED"
+ # The payment was sent successfully and (usually) a fee was charged
+ # to us for it.
+ COMPLETE = "COMPLETE"
+ # Not supported # REFUNDED: I'm not sure if this is possible or
+ # if we'd want to allow it.
+
+
+class ReportValue(IntEnum):
+ """
+ The reason a user reported a task.
+ """
+
+ # Used to indicate the user exited the task without giving feedback
+ REASON_UNKNOWN = 0
+ # Task is in the wrong language/country, unanswerable question, won't proceed to
+ # next question, loading forever, error message
+ TECHNICAL_ERROR = 1
+ # Task ended (completed or failed, and showed the user some dialog
+ # indicating the task was over), but failed to redirect
+ NO_REDIRECT = 2
+ # Asked for full name, home address, identity on another site, cc#
+ PRIVACY_INVASION = 3
+ # Asked about children, employer, medical issues, drug use, STDs, etc.
+ UNCOMFORTABLE_TOPICS = 4
+ # Asked to install software, signup/login to external site, access webcam,
+ # promise to pay using external site, etc.
+ ASKED_FOR_NOT_ALLOWED_ACTION = 5
+ # Task doesn't work well on a mobile device
+ BAD_ON_MOBILE = 6
+ # Too long, too boring, confusing, complicated, too many
+ # open-ended/free-response questions
+ DIDNT_LIKE = 7
diff --git a/jb/models/errors.py b/jb/models/errors.py
new file mode 100644
index 0000000..94f5fbb
--- /dev/null
+++ b/jb/models/errors.py
@@ -0,0 +1,80 @@
+import re
+from enum import Enum
+
+from pydantic import BaseModel, Field, ConfigDict, model_validator
+
+from jb.models import ResponseMetadata
+
+
+class BotoRequestErrorOperation(str, Enum):
+ GET_ASSIGNMENT = "GetAssignment"
+ GET_HIT = "GetHIT"
+
+
+class TurkErrorCode(str, Enum):
+ # Unclear: maybe when it's new?
+ HIT_NOT_EXIST = "AWS.MechanicalTurk.HITDoesNotExist"
+
+ # This seems to be for really old Assignments
+ # Also maybe when it's only a Preview?
+ # Happens 2+ years back, and also from past 24hrs
+ INVALID_ASSIGNEMENT_STATE = "AWS.MechanicalTurk.InvalidAssignmentState"
+
+ # If random assignmentId is used
+ ASSIGNMENT_NOT_EXIST = "AWS.MechanicalTurk.AssignmentDoesNotExist"
+
+
+class BotoRequestErrorResponseErrorCodes(str, Enum):
+ REQUEST_ERROR = "RequestError"
+
+
+class BotoRequestErrorResponseError(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ message: str = Field(alias="Message")
+ code: BotoRequestErrorResponseErrorCodes = Field(alias="Code")
+
+
+class BotoRequestErrorResponse(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ error: BotoRequestErrorResponseError = Field(alias="Error")
+ response_metadata: ResponseMetadata = Field(alias="ResponseMetadata")
+ message: str = Field(alias="Message", min_length=50)
+ error_code: TurkErrorCode = Field(alias="TurkErrorCode")
+
+ @model_validator(mode="after")
+ def check_consistent_hit_id(self) -> "BotoRequestErrorResponse":
+
+ match self.error_code:
+ case TurkErrorCode.HIT_NOT_EXIST:
+ if not re.match(
+ r"Hit [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message
+ ):
+ raise ValueError("Unknown message for TurkErrorCode.HIT_NOT_EXIST")
+
+ case TurkErrorCode.INVALID_ASSIGNEMENT_STATE:
+ if not re.match(
+ r"This operation can be called with a status of: Reviewable,Approved,Rejected \(\d{13}\)",
+ self.message,
+ ):
+ raise ValueError(
+ "Unknown message for TurkErrorCode.INVALID_ASSIGNEMENT_STATE"
+ )
+
+ case TurkErrorCode.ASSIGNMENT_NOT_EXIST:
+ if not re.match(
+ r"Assignment [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message
+ ):
+ raise ValueError(
+ "Unknown message for TurkErrorCode.ASSIGNMENT_NOT_EXIST"
+ )
+
+ return self
+
+
+class BotoRequestError(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ response: BotoRequestErrorResponse = Field()
+ operation_name: BotoRequestErrorOperation = Field()
diff --git a/jb/models/event.py b/jb/models/event.py
new file mode 100644
index 0000000..c357772
--- /dev/null
+++ b/jb/models/event.py
@@ -0,0 +1,38 @@
+from typing import Literal, Dict
+
+from mypy_boto3_mturk.literals import EventTypeType
+from pydantic import BaseModel, Field
+
+from jb.models.custom_types import AwareDatetimeISO, AMTBoto3ID
+
+
+class MTurkEvent(BaseModel):
+ """
+ What AWS SNS will POST to our mturk_notifications endpoint (inside the request body)
+ """
+
+ event_type: EventTypeType = Field(example="AssignmentSubmitted")
+ event_timestamp: AwareDatetimeISO = Field(example="2025-10-16T18:45:51Z")
+ amt_hit_id: AMTBoto3ID = Field(example="12345678901234567890")
+ amt_assignment_id: str = Field(
+ max_length=64, example="1234567890123456789012345678901234567890"
+ )
+ amt_hit_type_id: AMTBoto3ID = Field(example="09876543210987654321")
+
+ @classmethod
+ def from_sns(cls, data: Dict):
+ return cls.model_validate(
+ {
+ "event_type": data["EventType"],
+ "event_timestamp": cls.fix_mturk_timestamp(data["EventTimestamp"]),
+ "amt_hit_id": data["HITId"],
+ "amt_assignment_id": data["AssignmentId"],
+ "amt_hit_type_id": data["HITTypeId"],
+ }
+ )
+
+ @staticmethod
+ def fix_mturk_timestamp(ts: str) -> str:
+ if ts.endswith("Z") and "." not in ts:
+ ts = ts[:-1] + ".000Z"
+ return ts
diff --git a/jb/models/hit.py b/jb/models/hit.py
new file mode 100644
index 0000000..c3734fa
--- /dev/null
+++ b/jb/models/hit.py
@@ -0,0 +1,251 @@
+from datetime import datetime, timezone, timedelta
+from typing import Optional, List, Dict
+from uuid import uuid4
+from xml.etree import ElementTree
+
+from mypy_boto3_mturk.type_defs import HITTypeDef
+from pydantic import (
+ BaseModel,
+ Field,
+ PositiveInt,
+ ConfigDict,
+ NonNegativeInt,
+)
+from typing_extensions import Self
+
+from jb.models.currency import USDCent
+from jb.models.custom_types import AMTBoto3ID, HttpsUrlStr, AwareDatetimeISO
+from jb.models.definitions import HitStatus, HitReviewStatus
+
+
+class HitQuestion(BaseModel):
+ id: Optional[PositiveInt] = Field(default=None)
+
+ url: HttpsUrlStr = Field()
+ height: PositiveInt = Field(default=1_200, ge=100, le=4_000)
+
+ # --- Properties ---
+
+ def to_postgres(self):
+ return self.model_dump(mode="json")
+
+ @property
+ def xml(self) -> str:
+ return f"""<?xml version="1.0" encoding="UTF-8"?>
+ <ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd">
+ <ExternalURL>{str(self.url)}</ExternalURL>
+ <FrameHeight>{self.height}</FrameHeight>
+ </ExternalQuestion>"""
+
+
+class HitTypeCommon(BaseModel):
+ """
+ Fields on both the HitType and Hit
+ """
+
+ model_config = ConfigDict(
+ extra="forbid", validate_assignment=True, ser_json_timedelta="float"
+ )
+
+ title: str = Field(
+ min_length=3,
+ max_length=200,
+ description="The HIT post title that appears in the listing view",
+ )
+ description: str = Field(
+ min_length=3,
+ max_length=2_000,
+ description="The expand more about textarea, has a max of 2000 characters",
+ )
+ reward: USDCent = Field(
+ description="The amount of money the Requester will pay a Worker for successfully completing the HIT."
+ )
+
+ assignment_duration: timedelta = Field(
+ default=timedelta(minutes=90),
+ description="The amount of time, in seconds, that a Worker has to complete "
+ "the HIT after accepting it.",
+ )
+ auto_approval_delay: timedelta = Field(
+ default=timedelta(days=7),
+ description="The number of seconds after an assignment for the HIT has "
+ "been submitted, after which the assignment is considered "
+ "Approved automatically unless the Requester explicitly "
+ "rejects it.",
+ )
+ keywords: str = Field(min_length=3, max_length=999)
+
+
+class HitType(HitTypeCommon):
+ """
+ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk/client/create_hit_type.html
+ https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_CreateHITTypeOperation.html
+ """
+
+ id: Optional[PositiveInt] = Field(default=None)
+ amt_hit_type_id: Optional[AMTBoto3ID] = Field(default=None)
+
+ # --- GRL Specific ---
+ min_active: NonNegativeInt = Field(default=0, le=100_000)
+
+ def to_api_request_body(self):
+ return dict(
+ AutoApprovalDelayInSeconds=round(self.auto_approval_delay.total_seconds()),
+ AssignmentDurationInSeconds=round(self.assignment_duration.total_seconds()),
+ Reward=str(self.reward.to_usd()),
+ Title=self.title,
+ Keywords=self.keywords,
+ Description=self.description,
+ )
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ d["reward"] = self.reward.to_usd()
+ return d
+
+ @classmethod
+ def from_postgres(cls, data: Dict) -> Self:
+ data["reward"] = USDCent(round(data["reward"] * 100))
+ return cls.model_validate(data)
+
+ def generate_hit_amt_request(self, question: HitQuestion):
+ d = dict()
+ d["HITTypeId"] = self.amt_hit_type_id
+ d["MaxAssignments"] = 1
+ d["LifetimeInSeconds"] = round(timedelta(days=14).total_seconds())
+ d["Question"] = question.xml
+ d["UniqueRequestToken"] = uuid4().hex
+ return d
+
+
+class Hit(HitTypeCommon):
+ model_config = ConfigDict(
+ extra="forbid",
+ validate_assignment=True,
+ )
+
+ id: Optional[PositiveInt] = Field(default=None)
+ hit_type_id: Optional[PositiveInt] = Field(default=None)
+ question_id: Optional[PositiveInt] = Field(default=None)
+
+ amt_hit_id: AMTBoto3ID = Field()
+ amt_hit_type_id: AMTBoto3ID = Field()
+ amt_group_id: AMTBoto3ID = Field()
+ hit_question_xml: str = Field()
+
+ status: HitStatus = Field()
+ review_status: HitReviewStatus = Field()
+ creation_time: AwareDatetimeISO = Field(default=None, description="From aws")
+ expiration: Optional[AwareDatetimeISO] = Field(default=None)
+
+ # GRL Specific
+ created_at: AwareDatetimeISO = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was saved in the database",
+ )
+ modified_at: AwareDatetimeISO = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was last modified",
+ )
+
+ # -- Hit specific
+
+ qualification_requirements: Optional[List[Dict]] = Field(default=None)
+ max_assignments: int = Field()
+
+ # # this comes back as expiration. only for the request
+ # lifetime: timedelta = Field(
+ # default=timedelta(days=14),
+ # description="An amount of time, in seconds, after which the HIT is no longer "
+ # "available for users to accept.",
+ # )
+ assignment_pending_count: NonNegativeInt = Field()
+ assignment_available_count: NonNegativeInt = Field()
+ assignment_completed_count: NonNegativeInt = Field()
+
+ @classmethod
+ def from_amt_create_hit(
+ cls, data: HITTypeDef, question: HitQuestion, hit_type: HitType
+ ) -> Self:
+ assert question.id is not None
+ assert hit_type.id is not None
+ assert hit_type.amt_hit_type_id is not None
+
+ h = Hit.model_validate(
+ dict(
+ amt_hit_id=data["HITId"],
+ amt_hit_type_id=data["HITTypeId"],
+ amt_group_id=data["HITGroupId"],
+ status=HitStatus[data["HITStatus"]],
+ review_status=HitReviewStatus[data["HITReviewStatus"]],
+ creation_time=data["CreationTime"].astimezone(tz=timezone.utc),
+ expiration=data["Expiration"].astimezone(tz=timezone.utc),
+ hit_question_xml=data["Question"],
+ qualification_requirements=data["QualificationRequirements"],
+ max_assignments=data["MaxAssignments"],
+ assignment_pending_count=data["NumberOfAssignmentsPending"],
+ assignment_available_count=data["NumberOfAssignmentsAvailable"],
+ assignment_completed_count=data["NumberOfAssignmentsCompleted"],
+ description=data["Description"],
+ keywords=data["Keywords"],
+ reward=USDCent(round(float(data["Reward"]) * 100)),
+ title=data["Title"],
+ question_id=question.id,
+ hit_type_id=hit_type.id,
+ )
+ )
+ return h
+
+ @classmethod
+ def from_amt_get_hit(cls, data: HITTypeDef) -> Self:
+ h = Hit.model_validate(
+ dict(
+ amt_hit_id=data["HITId"],
+ amt_hit_type_id=data["HITTypeId"],
+ amt_group_id=data["HITGroupId"],
+ status=HitStatus[data["HITStatus"]],
+ review_status=HitReviewStatus[data["HITReviewStatus"]],
+ creation_time=data["CreationTime"].astimezone(tz=timezone.utc),
+ expiration=data["Expiration"].astimezone(tz=timezone.utc),
+ hit_question_xml=data["Question"],
+ qualification_requirements=data["QualificationRequirements"],
+ max_assignments=data["MaxAssignments"],
+ assignment_pending_count=data["NumberOfAssignmentsPending"],
+ assignment_available_count=data["NumberOfAssignmentsAvailable"],
+ assignment_completed_count=data["NumberOfAssignmentsCompleted"],
+ description=data["Description"],
+ keywords=data["Keywords"],
+ reward=USDCent(round(float(data["Reward"]) * 100)),
+ title=data["Title"],
+ question_id=None,
+ hit_type_id=None,
+ )
+ )
+ return h
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ d["reward"] = self.reward.to_usd()
+ return d
+
+ @classmethod
+ def from_postgres(cls, data: Dict) -> Self:
+ data["reward"] = USDCent(round(data["reward"] * 100))
+ return cls.model_validate(data)
+
+ @property
+ def hit_question(self) -> HitQuestion:
+ root = ElementTree.fromstring(self.hit_question_xml)
+
+ ns = {
+ "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd"
+ }
+ res = {}
+
+ lookup_table = dict(ExternalURL="url", FrameHeight="height")
+ for a in root.findall("mt:*", ns):
+ key = lookup_table[a.tag.split("}")[1]]
+ val = a.text
+ res[key] = val
+
+ return HitQuestion.model_validate(res, from_attributes=True)