diff options
| author | Max Nanis | 2026-02-19 02:43:23 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-19 02:43:23 -0500 |
| commit | f0f96f83c2630e890a2cbcab53f77fd4c37e1684 (patch) | |
| tree | c6d2cb092e76bf5d499e0ea9949508d6b22164fd /jb/models/errors.py | |
| parent | 3eaa56f0306ead818f64c3d99fc6d230d9b970a4 (diff) | |
| download | amt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.tar.gz amt-jb-f0f96f83c2630e890a2cbcab53f77fd4c37e1684.zip | |
Diffstat (limited to 'jb/models/errors.py')
| -rw-r--r-- | jb/models/errors.py | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/jb/models/errors.py b/jb/models/errors.py new file mode 100644 index 0000000..94f5fbb --- /dev/null +++ b/jb/models/errors.py @@ -0,0 +1,80 @@ +import re +from enum import Enum + +from pydantic import BaseModel, Field, ConfigDict, model_validator + +from jb.models import ResponseMetadata + + +class BotoRequestErrorOperation(str, Enum): + GET_ASSIGNMENT = "GetAssignment" + GET_HIT = "GetHIT" + + +class TurkErrorCode(str, Enum): + # Unclear: maybe when it's new? + HIT_NOT_EXIST = "AWS.MechanicalTurk.HITDoesNotExist" + + # This seems to be for really old Assignments + # Also maybe when it's only a Preview? + # Happens 2+ years back, and also from past 24hrs + INVALID_ASSIGNEMENT_STATE = "AWS.MechanicalTurk.InvalidAssignmentState" + + # If random assignmentId is used + ASSIGNMENT_NOT_EXIST = "AWS.MechanicalTurk.AssignmentDoesNotExist" + + +class BotoRequestErrorResponseErrorCodes(str, Enum): + REQUEST_ERROR = "RequestError" + + +class BotoRequestErrorResponseError(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + message: str = Field(alias="Message") + code: BotoRequestErrorResponseErrorCodes = Field(alias="Code") + + +class BotoRequestErrorResponse(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + error: BotoRequestErrorResponseError = Field(alias="Error") + response_metadata: ResponseMetadata = Field(alias="ResponseMetadata") + message: str = Field(alias="Message", min_length=50) + error_code: TurkErrorCode = Field(alias="TurkErrorCode") + + @model_validator(mode="after") + def check_consistent_hit_id(self) -> "BotoRequestErrorResponse": + + match self.error_code: + case TurkErrorCode.HIT_NOT_EXIST: + if not re.match( + r"Hit [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message + ): + raise ValueError("Unknown message for TurkErrorCode.HIT_NOT_EXIST") + + case TurkErrorCode.INVALID_ASSIGNEMENT_STATE: + if not re.match( + r"This operation can be called with a status of: Reviewable,Approved,Rejected \(\d{13}\)", + self.message, + ): + raise ValueError( + "Unknown message for TurkErrorCode.INVALID_ASSIGNEMENT_STATE" + ) + + case TurkErrorCode.ASSIGNMENT_NOT_EXIST: + if not re.match( + r"Assignment [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message + ): + raise ValueError( + "Unknown message for TurkErrorCode.ASSIGNMENT_NOT_EXIST" + ) + + return self + + +class BotoRequestError(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + response: BotoRequestErrorResponse = Field() + operation_name: BotoRequestErrorOperation = Field() |
