aboutsummaryrefslogtreecommitdiff
path: root/jb/models/errors.py
diff options
context:
space:
mode:
Diffstat (limited to 'jb/models/errors.py')
-rw-r--r--jb/models/errors.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/jb/models/errors.py b/jb/models/errors.py
new file mode 100644
index 0000000..94f5fbb
--- /dev/null
+++ b/jb/models/errors.py
@@ -0,0 +1,80 @@
+import re
+from enum import Enum
+
+from pydantic import BaseModel, Field, ConfigDict, model_validator
+
+from jb.models import ResponseMetadata
+
+
+class BotoRequestErrorOperation(str, Enum):
+ GET_ASSIGNMENT = "GetAssignment"
+ GET_HIT = "GetHIT"
+
+
+class TurkErrorCode(str, Enum):
+ # Unclear: maybe when it's new?
+ HIT_NOT_EXIST = "AWS.MechanicalTurk.HITDoesNotExist"
+
+ # This seems to be for really old Assignments
+ # Also maybe when it's only a Preview?
+ # Happens 2+ years back, and also from past 24hrs
+ INVALID_ASSIGNEMENT_STATE = "AWS.MechanicalTurk.InvalidAssignmentState"
+
+ # If random assignmentId is used
+ ASSIGNMENT_NOT_EXIST = "AWS.MechanicalTurk.AssignmentDoesNotExist"
+
+
+class BotoRequestErrorResponseErrorCodes(str, Enum):
+ REQUEST_ERROR = "RequestError"
+
+
+class BotoRequestErrorResponseError(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ message: str = Field(alias="Message")
+ code: BotoRequestErrorResponseErrorCodes = Field(alias="Code")
+
+
+class BotoRequestErrorResponse(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ error: BotoRequestErrorResponseError = Field(alias="Error")
+ response_metadata: ResponseMetadata = Field(alias="ResponseMetadata")
+ message: str = Field(alias="Message", min_length=50)
+ error_code: TurkErrorCode = Field(alias="TurkErrorCode")
+
+ @model_validator(mode="after")
+ def check_consistent_hit_id(self) -> "BotoRequestErrorResponse":
+
+ match self.error_code:
+ case TurkErrorCode.HIT_NOT_EXIST:
+ if not re.match(
+ r"Hit [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message
+ ):
+ raise ValueError("Unknown message for TurkErrorCode.HIT_NOT_EXIST")
+
+ case TurkErrorCode.INVALID_ASSIGNEMENT_STATE:
+ if not re.match(
+ r"This operation can be called with a status of: Reviewable,Approved,Rejected \(\d{13}\)",
+ self.message,
+ ):
+ raise ValueError(
+ "Unknown message for TurkErrorCode.INVALID_ASSIGNEMENT_STATE"
+ )
+
+ case TurkErrorCode.ASSIGNMENT_NOT_EXIST:
+ if not re.match(
+ r"Assignment [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message
+ ):
+ raise ValueError(
+ "Unknown message for TurkErrorCode.ASSIGNMENT_NOT_EXIST"
+ )
+
+ return self
+
+
+class BotoRequestError(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ response: BotoRequestErrorResponse = Field()
+ operation_name: BotoRequestErrorOperation = Field()