import re from enum import Enum from pydantic import BaseModel, Field, ConfigDict, model_validator from jb.models import ResponseMetadata class BotoRequestErrorOperation(str, Enum): GET_ASSIGNMENT = "GetAssignment" GET_HIT = "GetHIT" class TurkErrorCode(str, Enum): # Unclear: maybe when it's new? HIT_NOT_EXIST = "AWS.MechanicalTurk.HITDoesNotExist" # This seems to be for really old Assignments # Also maybe when it's only a Preview? # Happens 2+ years back, and also from past 24hrs INVALID_ASSIGNEMENT_STATE = "AWS.MechanicalTurk.InvalidAssignmentState" # If random assignmentId is used ASSIGNMENT_NOT_EXIST = "AWS.MechanicalTurk.AssignmentDoesNotExist" class BotoRequestErrorResponseErrorCodes(str, Enum): REQUEST_ERROR = "RequestError" class BotoRequestErrorResponseError(BaseModel): model_config = ConfigDict(extra="forbid", validate_assignment=True) message: str = Field(alias="Message") code: BotoRequestErrorResponseErrorCodes = Field(alias="Code") class BotoRequestErrorResponse(BaseModel): model_config = ConfigDict(extra="forbid", validate_assignment=True) error: BotoRequestErrorResponseError = Field(alias="Error") response_metadata: ResponseMetadata = Field(alias="ResponseMetadata") message: str = Field(alias="Message", min_length=50) error_code: TurkErrorCode = Field(alias="TurkErrorCode") @model_validator(mode="after") def check_consistent_hit_id(self) -> "BotoRequestErrorResponse": match self.error_code: case TurkErrorCode.HIT_NOT_EXIST: if not re.match( r"Hit [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message ): raise ValueError("Unknown message for TurkErrorCode.HIT_NOT_EXIST") case TurkErrorCode.INVALID_ASSIGNEMENT_STATE: if not re.match( r"This operation can be called with a status of: Reviewable,Approved,Rejected \(\d{13}\)", self.message, ): raise ValueError( "Unknown message for TurkErrorCode.INVALID_ASSIGNEMENT_STATE" ) case TurkErrorCode.ASSIGNMENT_NOT_EXIST: if not re.match( r"Assignment [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message ): raise ValueError( "Unknown message for TurkErrorCode.ASSIGNMENT_NOT_EXIST" ) return self class BotoRequestError(BaseModel): model_config = ConfigDict(extra="forbid", validate_assignment=True) response: BotoRequestErrorResponse = Field() operation_name: BotoRequestErrorOperation = Field()