from datetime import datetime, timezone, timedelta from typing import Optional, List, Dict from uuid import uuid4 from xml.etree import ElementTree from mypy_boto3_mturk.type_defs import HITTypeDef from pydantic import ( BaseModel, Field, PositiveInt, ConfigDict, NonNegativeInt, ) from typing_extensions import Self from jb.models.currency import USDCent from jb.models.custom_types import AMTBoto3ID, HttpsUrlStr, AwareDatetimeISO from jb.models.definitions import HitStatus, HitReviewStatus class HitQuestion(BaseModel): id: Optional[PositiveInt] = Field(default=None) url: HttpsUrlStr = Field() height: PositiveInt = Field(default=1_200, ge=100, le=4_000) # --- Properties --- def to_postgres(self): return self.model_dump(mode="json") @property def xml(self) -> str: return f""" {str(self.url)} {self.height} """ class HitTypeCommon(BaseModel): """ Fields on both the HitType and Hit """ model_config = ConfigDict( extra="forbid", validate_assignment=True, ser_json_timedelta="float" ) title: str = Field( min_length=3, max_length=200, description="The HIT post title that appears in the listing view", ) description: str = Field( min_length=3, max_length=2_000, description="The expand more about textarea, has a max of 2000 characters", ) reward: USDCent = Field( description="The amount of money the Requester will pay a Worker for successfully completing the HIT." ) assignment_duration: timedelta = Field( default=timedelta(minutes=90), description="The amount of time, in seconds, that a Worker has to complete " "the HIT after accepting it.", ) auto_approval_delay: timedelta = Field( default=timedelta(days=7), description="The number of seconds after an assignment for the HIT has " "been submitted, after which the assignment is considered " "Approved automatically unless the Requester explicitly " "rejects it.", ) keywords: str = Field(min_length=3, max_length=999) class HitType(HitTypeCommon): """ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk/client/create_hit_type.html https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_CreateHITTypeOperation.html """ id: Optional[PositiveInt] = Field(default=None) amt_hit_type_id: Optional[AMTBoto3ID] = Field(default=None) # --- GRL Specific --- min_active: NonNegativeInt = Field(default=0, le=100_000) def to_api_request_body(self): return dict( AutoApprovalDelayInSeconds=round(self.auto_approval_delay.total_seconds()), AssignmentDurationInSeconds=round(self.assignment_duration.total_seconds()), Reward=str(self.reward.to_usd()), Title=self.title, Keywords=self.keywords, Description=self.description, ) def to_postgres(self): d = self.model_dump(mode="json") d["reward"] = self.reward.to_usd() return d @classmethod def from_postgres(cls, data: Dict) -> Self: data["reward"] = USDCent(round(data["reward"] * 100)) return cls.model_validate(data) def generate_hit_amt_request(self, question: HitQuestion): d = dict() d["HITTypeId"] = self.amt_hit_type_id d["MaxAssignments"] = 1 d["LifetimeInSeconds"] = round(timedelta(days=14).total_seconds()) d["Question"] = question.xml d["UniqueRequestToken"] = uuid4().hex return d class Hit(HitTypeCommon): model_config = ConfigDict( extra="forbid", validate_assignment=True, ) id: Optional[PositiveInt] = Field(default=None) hit_type_id: Optional[PositiveInt] = Field(default=None) question_id: Optional[PositiveInt] = Field(default=None) amt_hit_id: AMTBoto3ID = Field() amt_hit_type_id: AMTBoto3ID = Field() amt_group_id: AMTBoto3ID = Field() hit_question_xml: str = Field() status: HitStatus = Field() review_status: HitReviewStatus = Field() creation_time: AwareDatetimeISO = Field(default=None, description="From aws") expiration: Optional[AwareDatetimeISO] = Field(default=None) # GRL Specific created_at: AwareDatetimeISO = Field( default_factory=lambda: datetime.now(tz=timezone.utc), description="When this record was saved in the database", ) modified_at: AwareDatetimeISO = Field( default_factory=lambda: datetime.now(tz=timezone.utc), description="When this record was last modified", ) # -- Hit specific qualification_requirements: Optional[List[Dict]] = Field(default=None) max_assignments: int = Field() # # this comes back as expiration. only for the request # lifetime: timedelta = Field( # default=timedelta(days=14), # description="An amount of time, in seconds, after which the HIT is no longer " # "available for users to accept.", # ) assignment_pending_count: NonNegativeInt = Field() assignment_available_count: NonNegativeInt = Field() assignment_completed_count: NonNegativeInt = Field() @classmethod def from_amt_create_hit( cls, data: HITTypeDef, question: HitQuestion, hit_type: HitType ) -> Self: assert question.id is not None assert hit_type.id is not None assert hit_type.amt_hit_type_id is not None h = Hit.model_validate( dict( amt_hit_id=data["HITId"], amt_hit_type_id=data["HITTypeId"], amt_group_id=data["HITGroupId"], status=HitStatus[data["HITStatus"]], review_status=HitReviewStatus[data["HITReviewStatus"]], creation_time=data["CreationTime"].astimezone(tz=timezone.utc), expiration=data["Expiration"].astimezone(tz=timezone.utc), hit_question_xml=data["Question"], qualification_requirements=data["QualificationRequirements"], max_assignments=data["MaxAssignments"], assignment_pending_count=data["NumberOfAssignmentsPending"], assignment_available_count=data["NumberOfAssignmentsAvailable"], assignment_completed_count=data["NumberOfAssignmentsCompleted"], description=data["Description"], keywords=data["Keywords"], reward=USDCent(round(float(data["Reward"]) * 100)), title=data["Title"], question_id=question.id, hit_type_id=hit_type.id, ) ) return h @classmethod def from_amt_get_hit(cls, data: HITTypeDef) -> Self: h = Hit.model_validate( dict( amt_hit_id=data["HITId"], amt_hit_type_id=data["HITTypeId"], amt_group_id=data["HITGroupId"], status=HitStatus[data["HITStatus"]], review_status=HitReviewStatus[data["HITReviewStatus"]], creation_time=data["CreationTime"].astimezone(tz=timezone.utc), expiration=data["Expiration"].astimezone(tz=timezone.utc), hit_question_xml=data["Question"], qualification_requirements=data["QualificationRequirements"], max_assignments=data["MaxAssignments"], assignment_pending_count=data["NumberOfAssignmentsPending"], assignment_available_count=data["NumberOfAssignmentsAvailable"], assignment_completed_count=data["NumberOfAssignmentsCompleted"], description=data["Description"], keywords=data["Keywords"], reward=USDCent(round(float(data["Reward"]) * 100)), title=data["Title"], question_id=None, hit_type_id=None, ) ) return h def to_postgres(self): d = self.model_dump(mode="json") d["reward"] = self.reward.to_usd() return d @classmethod def from_postgres(cls, data: Dict) -> Self: data["reward"] = USDCent(round(data["reward"] * 100)) return cls.model_validate(data) @property def hit_question(self) -> HitQuestion: root = ElementTree.fromstring(self.hit_question_xml) ns = { "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd" } res = {} lookup_table = dict(ExternalURL="url", FrameHeight="height") for a in root.findall("mt:*", ns): key = lookup_table[a.tag.split("}")[1]] val = a.text res[key] = val return HitQuestion.model_validate(res, from_attributes=True)