summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md148
-rw-r--r--amt-jb.conf12
-rw-r--r--jb/managers/__init__.py23
-rw-r--r--jb/managers/amt.py216
-rw-r--r--jb/managers/assignment.py259
-rw-r--r--jb/managers/bonus.py54
-rw-r--r--jb/managers/hit.py338
-rw-r--r--jb/managers/worker.py16
-rw-r--r--jb/models/__init__.py40
-rw-r--r--jb/models/api_response.py17
-rw-r--r--jb/models/assignment.py388
-rw-r--r--jb/models/bonus.py48
-rw-r--r--jb/models/currency.py70
-rw-r--r--jb/models/custom_types.py113
-rw-r--r--jb/models/definitions.py90
-rw-r--r--jb/models/errors.py80
-rw-r--r--jb/models/event.py38
-rw-r--r--jb/models/hit.py251
-rw-r--r--jb/views/__init__.py0
-rw-r--r--nginx.conf39
-rw-r--r--nginx_amt-jb.conf43
-rw-r--r--nginx_amt-jb_proxy_pass.conf4
-rw-r--r--requirements.txt105
-rw-r--r--run.py6
-rw-r--r--templates/base.html16
-rw-r--r--tests/amt/__init__.py0
-rw-r--r--tests/amt/conftest.py0
-rw-r--r--tests/amt/test_models.py41
-rw-r--r--tests/flow/__init__.py0
-rw-r--r--tests/flow/test_tasks.py534
-rw-r--r--tests/managers/__init__.py0
-rw-r--r--tests/managers/amt.py23
-rw-r--r--tests/managers/hit.py18
33 files changed, 3030 insertions, 0 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..361ffaa
--- /dev/null
+++ b/README.md
@@ -0,0 +1,148 @@
+# AMT James Billings
+
+## Infrastructure
+
+### FastAPI
+- Handles HIT acceptance
+- Handles interface b/w react and backend: work, preview, report.
+- Handling of submitted assignments:
+ - mturk_notifications view: SNS will POST to this endpoint whenever a user submits an assignment
+ - The message gets added to our event stream
+ - process_mturk_events (which calls process_assignment_submitted)
+
+### React
+
+### AirFlow
+- Refill Hits, check for expired hits
+
+## Network
+
+## System Environment
+
+### Debian 12
+
+#### Update & Install required system requirements
+
+```shell
+apt-get update
+apt-get upgrade
+
+apt install -y build-essential libssl-dev zlib1g-dev libbz2-dev \
+ libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
+ xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git supervisor python3-dev \
+ libmemcached-dev libpq-dev htop supervisor rsync
+
+wget https://www.python.org/ftp/python/3.13.7/Python-3.13.7.tgz
+tar -xf Python-3.13.7.tgz
+
+cd Python-3.13.7
+./configure --enable-optimizations
+make -j 4
+make altinstall
+python3.13 --version
+
+python3.13 -m venv /root/amt-jb-venv
+```
+
+#### Git pull projects
+
+```shell
+cd ~
+git clone ssh://code.g-r-l.com/panels/amt-jb
+cd amt-jb
+source /root/amt-jb-venv/bin/activate
+pip install -U pip setuptools wheel
+pip install -r requirements.txt
+```
+
+#### Supervisor
+
+```shell
+cd /etc/supervisor/conf.d/
+ln -s /root/amt-jb/amt-jb.conf .
+supervisorctl reread
+supervisorctl reload
+```
+
+#### Database
+
+```shell
+apt install -y postgresql-common
+bash /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh
+apt update
+apt-cache policy postgresql
+apt install postgresql -y
+
+systemctl enable postgresql
+systemctl start postgresql
+systemctl status postgresql
+
+vim /etc/postgresql/18/main/pg_hba.conf
+```
+
+#### Setup DNS Cache (LXC)
+
+```
+apt install dnsutils dnsmasq -y
+
+# /etc/dnsmasq.conf
+server=10.16.2.2
+server=10.16.2.3
+min-cache-ttl=30
+no-resolv
+listen-address=::1,127.0.0.1
+
+systemctl restart dnsmasq
+
+lxc dns: ::1 (and reboot)
+```
+
+#### NGINX
+
+```shell
+cd /etc/nginx/
+mv nginx.conf nginx.conf.bkp
+ln -s /root/amt-jb/nginx.conf .
+ln -s /root/amt-jb/nginx_amt-jb_proxy_pass.conf .
+
+cd sites-enabled
+rm default
+ln -s /root/amt-jb/nginx_amt-jb.conf .
+
+gpasswd -a www-data root
+chmod g+x /root && chmod g+x /root/amt-jb && chmod g+x /root/amt-jb/static
+chmod g+x /root/amt-jb/static/js
+
+service nginx restart
+
+rm /var/log/nginx/*
+service nginx restart
+
+```
+
+
+Confirm uvicorn is running properly
+
+```shell
+curl -i http://localhost:8000/headers/
+```
+
+## Telegraf
+
+```shell
+# influxdata-archive_compat.key GPG fingerprint:
+# 9D53 9D90 D332 8DC7 D6C8 D3B9 D8FF 8E1F 7DF8 B07E
+wget -q https://repos.influxdata.com/influxdata-archive_compat.key
+echo '393e8779c89ac8d958f81f942f9ad7fb82a25e133faddaf92e15b16e6ac9ce4c influxdata-archive_compat.key' | sha256sum -c && cat influxdata-archive_compat.key | gpg --dearmor | tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null
+echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | tee /etc/apt/sources.list.d/influxdata.list
+apt-get update && apt-get install telegraf
+
+cp /root/amt-jb/telegraf.conf /etc/telegraf/telegraf.conf
+chown telegraf:telegraf /etc/telegraf/telegraf.conf
+chmod 644 /etc/telegraf/telegraf.conf
+# Add the telegraf users to the adm group (for access to /var/log/nginx/)
+sudo usermod -aG adm telegraf
+systemctl restart telegraf
+
+# sudo -u telegraf telegraf --config /etc/telegraf/telegraf.conf --test
+``` \ No newline at end of file
diff --git a/amt-jb.conf b/amt-jb.conf
new file mode 100644
index 0000000..c6edf5f
--- /dev/null
+++ b/amt-jb.conf
@@ -0,0 +1,12 @@
+[fcgi-program:amt-jb]
+socket=tcp://localhost:8000
+command=/root/amt-jb-venv/bin/uvicorn --fd 0 --timeout-keep-alive 35 --env-file .env jb.main:app
+directory=/root/amt-jb
+user=root
+priority=999
+autostart=true
+autorestart=true
+numprocs=2
+numprocs_start=1
+process_name=uvicorn-%(process_num)d
+environment=LD_LIBRARY_PATH="/usr/local/lib:" \ No newline at end of file
diff --git a/jb/managers/__init__.py b/jb/managers/__init__.py
new file mode 100644
index 0000000..e2aab6d
--- /dev/null
+++ b/jb/managers/__init__.py
@@ -0,0 +1,23 @@
+from enum import IntEnum
+from typing import Collection
+
+from generalresearchutils.pg_helper import PostgresConfig
+
+
+class Permission(IntEnum):
+ READ = 1
+ UPDATE = 2
+ CREATE = 3
+ DELETE = 4
+
+
+class PostgresManager:
+ def __init__(
+ self,
+ pg_config: PostgresConfig,
+ permissions: Collection[Permission] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.pg_config = pg_config
+ self.permissions = set(permissions) if permissions else set()
diff --git a/jb/managers/amt.py b/jb/managers/amt.py
new file mode 100644
index 0000000..79661c7
--- /dev/null
+++ b/jb/managers/amt.py
@@ -0,0 +1,216 @@
+import logging
+from datetime import timezone, datetime
+from typing import Tuple, Optional, List
+
+import botocore.exceptions
+from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef
+
+from jb.config import TOPIC_ARN
+from jb.decorators import AMT_CLIENT
+from jb.models import AMTAccount
+from jb.models.assignment import Assignment
+from jb.models.bonus import Bonus
+from jb.models.currency import USDCent
+from jb.models.definitions import HitStatus
+from jb.models.hit import HitType, HitQuestion, Hit
+
+REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"
+REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work."
+REJECT_MESSAGE_BADDIE = "Quality has dropped below an acceptable level"
+APPROVAL_MESSAGE = "Thank you!"
+NO_WORK_APPROVAL_MESSAGE = (
+ REJECT_MESSAGE_NO_WORK + " In the future, if you are not sent into a task, "
+ "please return the assignment, otherwise it will be rejected!"
+)
+BONUS_MESSAGE = "Great job! Bonus for a survey complete"
+
+
+class AMTManager:
+
+ @staticmethod
+ def fetch_account() -> AMTAccount:
+ res = AMT_CLIENT.get_account_balance()
+ return AMTAccount.model_validate(
+ {
+ "available_balance": res["AvailableBalance"],
+ "onhold_balance": res.get("OnHoldBalance", 0),
+ }
+ )
+
+ @staticmethod
+ def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]:
+ try:
+ res = AMT_CLIENT.get_hit(HITId=amt_hit_id)
+ except AMT_CLIENT.exceptions.RequestError as e:
+ msg = e.response.get("Error", {}).get("Message", "")
+ return None, msg
+ hit = Hit.from_amt_get_hit(res["HIT"])
+ return hit, None
+
+ @classmethod
+ def get_hit_status(cls, amt_hit_id: str):
+ res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id)
+ if res is None:
+ if " does not exist. (" in msg:
+ return HitStatus.Disposed
+ else:
+ logging.warning(msg)
+ return HitStatus.Unassignable
+ return res.status
+
+ @staticmethod
+ def create_hit_type(hit_type: HitType):
+ res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body())
+ hit_type.amt_hit_type_id = res["HITTypeId"]
+ AMT_CLIENT.update_notification_settings(
+ HITTypeId=hit_type.amt_hit_type_id,
+ Notification={
+ "Destination": TOPIC_ARN,
+ "Transport": "SNS",
+ "Version": "2006-05-05",
+ # you can add more events, see mypy_boto3_mturk.literals.EventTypeType
+ "EventTypes": ["AssignmentSubmitted"],
+ },
+ Active=True,
+ )
+
+ return hit_type
+
+ @staticmethod
+ def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit:
+ """
+ HITTypeId: str
+ LifetimeInSeconds: int
+ MaxAssignments: NotRequired[int]
+ Question: NotRequired[str]
+ UniqueRequestToken: NotRequired[str]
+ """
+ assert hit_type.id is not None
+ assert hit_type.amt_hit_type_id is not None
+
+ data = hit_type.generate_hit_amt_request(question=question)
+ res = AMT_CLIENT.create_hit_with_hit_type(**data)
+ return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question)
+
+ @staticmethod
+ def get_assignment(amt_assignment_id: str) -> Assignment:
+ # note, you CANNOT get an assignment if it has been only ACCEPTED (by the worker)
+ # the api is stupid. it will only show up once it is submitted
+ res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id)
+ ass_res: AssignmentTypeDef = res["Assignment"]
+ assignment = Assignment.from_amt_get_assignment(ass_res)
+ # to be clear, this has not checked whether it exists in our db
+ assert assignment.id is None
+ return assignment
+
+ @classmethod
+ def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]:
+ expected_err_msg = f"Assignment {amt_assignment_id} does not exist"
+ try:
+ return cls.get_assignment(amt_assignment_id=amt_assignment_id)
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ error_code = e.response["Error"]["Code"]
+ error_msg = e.response["Error"]["Message"]
+ if error_code == "RequestError" and expected_err_msg in error_msg:
+ return None
+ raise e
+
+ @staticmethod
+ def reject_assignment_if_possible(
+ amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT
+ ):
+ # Unclear to me when this would fail
+ try:
+ return AMT_CLIENT.reject_assignment(
+ AssignmentId=amt_assignment_id, RequesterFeedback=msg
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ return None
+
+ @staticmethod
+ def approve_assignment_if_possible(
+ amt_assignment_id: str,
+ msg: str = APPROVAL_MESSAGE,
+ override_rejection: bool = False,
+ ):
+ # Unclear to me when this would fail
+ try:
+ return AMT_CLIENT.approve_assignment(
+ AssignmentId=amt_assignment_id,
+ RequesterFeedback=msg,
+ OverrideRejection=override_rejection,
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ return None
+
+ @staticmethod
+ def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None:
+ try:
+ # Reviewable to Reviewing
+ AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert)
+ except botocore.exceptions.ClientError as e:
+ logging.warning(f"{amt_hit_id=}, {e}")
+ error_msg = e.response["Error"]["Message"]
+ if "does not exist" in error_msg:
+ raise ValueError(error_msg)
+ # elif "This HIT is currently in the state 'Reviewing'" in error_msg:
+ # logging.warning(error_msg)
+ return None
+
+ @staticmethod
+ def send_bonus(
+ amt_worker_id: str,
+ amount: USDCent,
+ amt_assignment_id: str,
+ reason: str,
+ unique_request_token: str,
+ ):
+ try:
+ return AMT_CLIENT.send_bonus(
+ WorkerId=amt_worker_id,
+ BonusAmount=str(amount.to_usd()),
+ AssignmentId=amt_assignment_id,
+ Reason=reason,
+ UniqueRequestToken=unique_request_token,
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}")
+ return None
+
+ @staticmethod
+ def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]:
+ res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments(
+ AssignmentId=amt_assignment_id
+ )["BonusPayments"]
+ assert (
+ len(res) <= 1
+ ), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}"
+ d = res[0] if res else None
+ if d:
+ return Bonus.model_validate(
+ {
+ "amt_worker_id": d["WorkerId"],
+ "amount": USDCent(round(float(d["BonusAmount"]) * 100)),
+ "amt_assignment_id": d["AssignmentId"],
+ "reason": d["Reason"],
+ "grant_time": d["GrantTime"].astimezone(tz=timezone.utc),
+ "payout_event_id": payout_event_id,
+ }
+ )
+ return None
+
+ @staticmethod
+ def expire_all_hits():
+ # used in testing only (or in an emergency I guess)
+ now = datetime.now(tz=timezone.utc)
+ paginator = AMT_CLIENT.get_paginator("list_hits")
+
+ for page in paginator.paginate():
+ for hit in page["HITs"]:
+ if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"):
+ AMT_CLIENT.update_expiration_for_hit(
+ HITId=hit["HITId"], ExpireAt=now
+ )
diff --git a/jb/managers/assignment.py b/jb/managers/assignment.py
new file mode 100644
index 0000000..fca72e8
--- /dev/null
+++ b/jb/managers/assignment.py
@@ -0,0 +1,259 @@
+from datetime import datetime, timezone
+from typing import Optional
+
+from psycopg import sql
+from pydantic import NonNegativeInt
+
+from jb.managers import PostgresManager
+from jb.models.assignment import AssignmentStub, Assignment
+from jb.models.definitions import AssignmentStatus
+
+
+class AssignmentManager(PostgresManager):
+
+ def create_stub(self, stub: AssignmentStub) -> None:
+ assert stub.id is None
+ data = stub.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_assignment
+ (amt_assignment_id, amt_worker_id, status, created_at, modified_at, hit_id)
+ VALUES
+ (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s,
+ %(created_at)s, %(modified_at)s, %(hit_id)s)
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ stub.id = pk
+ return None
+
+ def create(self, assignment: Assignment) -> None:
+ # Typically this is NOT used (we'd create the stub when HIT is
+ # accepted, then update it once the assignment is submitted), however
+ # there may be cases where an assignment is submitted and the stub
+ # was never created (probably due to error/downtime/baddie).
+
+ assert assignment.id is None
+ data = assignment.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_assignment
+ (amt_assignment_id, amt_worker_id, status,
+ created_at, modified_at, hit_id,
+ auto_approval_time, accept_time, submit_time,
+ approval_time, rejection_time, requester_feedback,
+ tsid)
+ VALUES
+ (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s,
+ %(created_at)s, %(modified_at)s, %(hit_id)s,
+ %(auto_approval_time)s, %(accept_time)s, %(submit_time)s,
+ %(approval_time)s, %(rejection_time)s, %(requester_feedback)s,
+ %(tsid)s)
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ assignment.id = pk
+ return None
+
+ def get_stub(self, amt_assignment_id: str) -> AssignmentStub:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT ma.id, ma.hit_id,
+ ma.amt_assignment_id, ma.amt_worker_id,
+ ma.status, ma.created_at, ma.modified_at,
+ mh.amt_hit_id
+ FROM mtwerk_assignment ma
+ JOIN mtwerk_hit mh ON ma.hit_id = mh.id
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ LIMIT 1;
+ """,
+ params={"amt_assignment_id": amt_assignment_id},
+ )
+ assert len(res) == 1
+ return AssignmentStub.model_validate(res[0])
+
+ def get_stub_if_exists(self, amt_assignment_id: str) -> Optional[AssignmentStub]:
+ try:
+ return self.get_stub(amt_assignment_id=amt_assignment_id)
+ except AssertionError:
+ return None
+
+ def get(self, amt_assignment_id: str) -> Assignment:
+ res = self.pg_config.execute_sql_query(
+ query="""
+ SELECT mh.amt_hit_id, ma.id, ma.amt_assignment_id,
+ ma.amt_worker_id, ma.status, ma.auto_approval_time,
+ ma.accept_time, ma.submit_time, ma.approval_time,
+ ma.rejection_time, ma.requester_feedback,
+ ma.created_at, ma.modified_at, ma.tsid, ma.hit_id
+ FROM mtwerk_assignment ma
+ JOIN mtwerk_hit mh ON ma.hit_id = mh.id
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ LIMIT 1;
+ """,
+ params={"amt_assignment_id": amt_assignment_id},
+ )
+ assert len(res) == 1
+ return Assignment.model_validate(res[0])
+
+ def update_answer(self, assignment: Assignment) -> None:
+ # We're assuming a stub already exists
+ # The assignment was submitted, but we haven't made a decision yet
+ now = datetime.now(tz=timezone.utc)
+ data = {
+ "status": assignment.status.value,
+ "submit_time": assignment.submit_time,
+ "auto_approval_time": assignment.auto_approval_time,
+ "tsid": assignment.tsid,
+ "amt_assignment_id": assignment.amt_assignment_id,
+ "modified_at": now,
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_assignment
+ SET submit_time = %(submit_time)s,
+ auto_approval_time = %(auto_approval_time)s,
+ status = %(status)s,
+ tsid = %(tsid)s,
+ modified_at = %(modified_at)s
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ """
+ )
+ # We force this to fail if the assignment doesn't already exist in the db
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}"
+ conn.commit()
+ return None
+
+ def reject(self, assignment: Assignment) -> None:
+ assert assignment.status == AssignmentStatus.Rejected
+ assert assignment.rejection_time is not None
+ assert assignment.approval_time is None
+ assert assignment.requester_feedback is not None
+ now = datetime.now(tz=timezone.utc)
+ data = {
+ "status": assignment.status.value,
+ "submit_time": assignment.submit_time,
+ "rejection_time": assignment.rejection_time,
+ "requester_feedback": assignment.requester_feedback,
+ "amt_assignment_id": assignment.amt_assignment_id,
+ "auto_approval_time": assignment.auto_approval_time,
+ "accept_time": assignment.accept_time,
+ "modified_at": now,
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_assignment
+ SET submit_time = %(submit_time)s,
+ rejection_time = %(rejection_time)s,
+ status = %(status)s,
+ requester_feedback = %(requester_feedback)s,
+ auto_approval_time = %(auto_approval_time)s,
+ accept_time = %(accept_time)s,
+ modified_at = %(modified_at)s
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ """
+ )
+ # We force this to fail if the assignment doesn't already exist in the db
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}"
+ conn.commit()
+ return None
+
+ def approve(self, assignment: Assignment) -> None:
+ assert assignment.status == AssignmentStatus.Approved
+ assert assignment.rejection_time is None
+ assert assignment.approval_time is not None
+ assert assignment.requester_feedback is not None
+ now = datetime.now(tz=timezone.utc)
+ data = {
+ "status": assignment.status.value,
+ "submit_time": assignment.submit_time,
+ "approval_time": assignment.approval_time,
+ "requester_feedback": assignment.requester_feedback,
+ "amt_assignment_id": assignment.amt_assignment_id,
+ "auto_approval_time": assignment.auto_approval_time,
+ "accept_time": assignment.accept_time,
+ "modified_at": now,
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_assignment
+ SET submit_time = %(submit_time)s,
+ approval_time = %(approval_time)s,
+ status = %(status)s,
+ requester_feedback = %(requester_feedback)s,
+ auto_approval_time = %(auto_approval_time)s,
+ accept_time = %(accept_time)s,
+ modified_at = %(modified_at)s
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ """
+ )
+ # We force this to fail if the assignment doesn't already exist in the db
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}"
+ conn.commit()
+ return None
+
+ def missing_tsid_count(
+ self, amt_worker_id: str, lookback_hrs: int = 24
+ ) -> NonNegativeInt:
+ """
+ Look at this user's previous N hrs of submitted assignments.
+ Count how many assignments they have submitted without a tsid.
+ """
+ res = self.pg_config.execute_sql_query(
+ query="""
+ SELECT COUNT(1) AS c
+ FROM mtwerk_assignment
+ WHERE amt_worker_id = %(amt_worker_id)s
+ AND submit_time > NOW() - (%(lookback_interval)s)::interval
+ AND tsid IS NULL;
+ """,
+ params={
+ "amt_worker_id": amt_worker_id,
+ "lookback_interval": f"{lookback_hrs} hour",
+ },
+ )
+ return int(res[0]["c"])
+
+ def rejected_count(
+ self, amt_worker_id: str, lookback_hrs: int = 24
+ ) -> NonNegativeInt:
+ """
+ Look at this user's previous N hrs of submitted assignments.
+ Count how many rejected assignments they have.
+ """
+ res = self.pg_config.execute_sql_query(
+ query="""
+ SELECT COUNT(1) AS c
+ FROM mtwerk_assignment
+ WHERE amt_worker_id = %(amt_worker_id)s
+ AND submit_time > NOW() - (%(lookback_interval)s)::interval
+ AND status = %(status)s;
+ """,
+ params={
+ "amt_worker_id": amt_worker_id,
+ "lookback_interval": f"{lookback_hrs} hour",
+ "status": AssignmentStatus.Rejected.value,
+ },
+ )
+ return int(res[0]["c"])
diff --git a/jb/managers/bonus.py b/jb/managers/bonus.py
new file mode 100644
index 0000000..0cb8b02
--- /dev/null
+++ b/jb/managers/bonus.py
@@ -0,0 +1,54 @@
+from typing import List
+
+from psycopg import sql
+
+from jb.managers import PostgresManager
+from jb.models.bonus import Bonus
+
+
+class BonusManager(PostgresManager):
+
+ def create(self, bonus: Bonus) -> None:
+ assert bonus.id is None
+ data = bonus.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_bonus
+ (payout_event_id, amt_worker_id, amount, grant_time, assignment_id, reason)
+ VALUES (
+ %(payout_event_id)s,
+ %(amt_worker_id)s,
+ %(amount)s,
+ %(grant_time)s,
+ (
+ SELECT id
+ FROM mtwerk_assignment
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ LIMIT 1
+ ),
+ %(reason)s
+ )
+ RETURNING id, assignment_id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ res = c.fetchone()
+ conn.commit()
+ bonus.id = res["id"]
+ bonus.assignment_id = res["assignment_id"]
+ return None
+
+ def filter(self, amt_assignment_id: str) -> List[Bonus]:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT mb.*, ma.amt_assignment_id
+ FROM mtwerk_bonus mb
+ JOIN mtwerk_assignment ma ON ma.id = mb.assignment_id
+ WHERE amt_assignment_id = %(amt_assignment_id)s;
+ """,
+ params={"amt_assignment_id": amt_assignment_id},
+ )
+ return [Bonus.from_postgres(x) for x in res]
diff --git a/jb/managers/hit.py b/jb/managers/hit.py
new file mode 100644
index 0000000..3832418
--- /dev/null
+++ b/jb/managers/hit.py
@@ -0,0 +1,338 @@
+from datetime import datetime, timezone
+from typing import Optional, List
+
+from psycopg import sql
+
+from jb.managers import PostgresManager
+from jb.models.definitions import HitStatus
+from jb.models.hit import HitQuestion, HitType, Hit
+
+
+class HitQuestionManager(PostgresManager):
+
+ def create(self, question: HitQuestion) -> None:
+ assert question.id is None
+ data = question.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_question (url, height)
+ VALUES (%(url)s, %(height)s)
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ question.id = pk
+ return None
+
+ def get_by_id(self, question_id: int) -> HitQuestion:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_question
+ WHERE id = %(question_id)s
+ LIMIT 1;
+ """,
+ params={"question_id": question_id},
+ )
+ assert len(res) == 1
+ return HitQuestion.model_validate(res[0])
+
+ def get_by_values_if_exists(self, url: str, height: int) -> Optional[HitQuestion]:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_question
+ WHERE url = %(url)s AND height = %(height)s
+ LIMIT 2;
+ """,
+ params={"url": url, "height": height},
+ )
+ assert len(res) != 2, "More than 1 result!"
+ if len(res) == 0:
+ return None
+
+ return HitQuestion.model_validate(res[0])
+
+ def get_or_create(self, question: HitQuestion) -> HitQuestion:
+ res = self.get_by_values_if_exists(url=question.url, height=question.height)
+ if res:
+ return res
+ self.create(question=question)
+ return question
+
+
+class HitTypeManager(PostgresManager):
+ def create(self, hit_type: HitType) -> None:
+ assert hit_type.amt_hit_type_id is not None
+ data = hit_type.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_hittype (
+ amt_hit_type_id,
+ title,
+ description,
+ reward,
+ assignment_duration,
+ auto_approval_delay,
+ keywords,
+ min_active
+ )
+ VALUES (
+ %(amt_hit_type_id)s,
+ %(title)s,
+ %(description)s,
+ %(reward)s,
+ %(assignment_duration)s,
+ %(auto_approval_delay)s,
+ %(keywords)s,
+ %(min_active)s
+ )
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ hit_type.id = pk
+ return None
+
+ def filter_active(self) -> List[HitType]:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_hittype
+ WHERE min_active > 0
+ LIMIT 50
+ """
+ )
+
+ if len(res) == 50:
+ raise ValueError("Too many HitTypes!")
+
+ return [HitType.from_postgres(i) for i in res]
+
+ def get(self, amt_hit_type_id: str) -> HitType:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_hittype
+ WHERE amt_hit_type_id = %(amt_hit_type_id)s
+ """,
+ params={"amt_hit_type_id": amt_hit_type_id},
+ )
+ assert len(res) == 1
+ return HitType.from_postgres(res[0])
+
+ def get_if_exists(self, amt_hit_type_id: str) -> Optional[HitType]:
+ try:
+ return self.get(amt_hit_type_id=amt_hit_type_id)
+ except AssertionError:
+ return None
+
+ def get_or_create(self, hit_type: HitType) -> None:
+ res = self.get_if_exists(amt_hit_type_id=hit_type.amt_hit_type_id)
+ if res:
+ hit_type.id = res.id
+ if res is None:
+ self.create(hit_type=hit_type)
+ return None
+
+ def set_min_active(self, hit_type: HitType) -> None:
+ assert hit_type.id, "must be in the db first!"
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_hittype
+ SET min_active = %(min_active)s
+ WHERE id = %(id)s
+ """
+ )
+ data = {"id": hit_type.id, "min_active": hit_type.min_active}
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ conn.commit()
+ assert c.rowcount == 1, c.rowcount
+ return None
+
+
+class HitManager(PostgresManager):
+ def create(self, hit: Hit):
+ assert hit.amt_hit_id is not None
+ assert hit.id is None
+ data = hit.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_hit (
+ amt_hit_id,
+ hit_type_id,
+ amt_group_id,
+ status,
+ review_status,
+ creation_time,
+ expiration,
+ question_id,
+ created_at,
+ modified_at,
+ max_assignments,
+ assignment_pending_count,
+ assignment_completed_count,
+ assignment_available_count
+ )
+ VALUES (
+ %(amt_hit_id)s,
+ %(hit_type_id)s,
+ %(amt_group_id)s,
+ %(status)s,
+ %(review_status)s,
+ %(creation_time)s,
+ %(expiration)s,
+ %(question_id)s,
+ %(created_at)s,
+ %(modified_at)s,
+ %(max_assignments)s,
+ %(assignment_pending_count)s,
+ %(assignment_completed_count)s,
+ %(assignment_available_count)s
+ )
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ hit.id = pk
+ return hit
+
+ def update_status(self, amt_hit_id: str, hit_status: HitStatus):
+ now = datetime.now(tz=timezone.utc)
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_hit
+ SET status = %(status)s, modified_at = %(modified_at)s
+ WHERE amt_hit_id = %(amt_hit_id)s;
+ """
+ )
+
+ data = {
+ "amt_hit_id": amt_hit_id,
+ "status": hit_status.value,
+ "modified_at": now,
+ }
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ conn.commit()
+ assert c.rowcount == 1, c.rowcount
+ return None
+
+ def update_hit(self, hit: Hit):
+ hit.modified_at = datetime.now(tz=timezone.utc)
+ # fields expected to change:
+ fields = {
+ "status",
+ "review_status",
+ "assignment_pending_count",
+ "assignment_available_count",
+ "assignment_completed_count",
+ "amt_hit_id",
+ "modified_at",
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_hit
+ SET status = %(status)s, review_status = %(review_status)s,
+ assignment_pending_count = %(assignment_pending_count)s,
+ assignment_available_count = %(assignment_available_count)s,
+ assignment_completed_count = %(assignment_completed_count)s,
+ modified_at = %(modified_at)s
+ WHERE amt_hit_id = %(amt_hit_id)s
+ RETURNING id;
+ """
+ )
+
+ data = hit.model_dump(mode="json", include=fields)
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ conn.commit()
+ assert c.rowcount == 1, c.rowcount
+ hit.id = c.fetchone()["id"]
+ return None
+
+ def get_from_amt_id(self, amt_hit_id: str) -> Hit:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT
+ mh.id,
+ mh.amt_hit_id,
+ mh.amt_group_id,
+ mh.status,
+ mh.review_status,
+ mh.creation_time,
+ mh.expiration,
+ mh.created_at,
+ mh.modified_at,
+ mh.assignment_available_count,
+ mh.assignment_completed_count,
+ mh.assignment_pending_count,
+ mh.max_assignments,
+ mht.amt_hit_type_id,
+ mht.title,
+ mht.description,
+ mht.reward,
+ mht.assignment_duration,
+ mht.auto_approval_delay,
+ mht.keywords,
+ mq.id as question_id,
+ mq.height,
+ mq.url
+ FROM mtwerk_hit mh
+ JOIN mtwerk_hittype mht ON mh.hit_type_id = mht.id
+ JOIN mtwerk_question mq ON mh.question_id = mq.id
+ WHERE amt_hit_id = %(amt_hit_id)s
+ LIMIT 2;
+ """,
+ params={"amt_hit_id": amt_hit_id},
+ )
+ assert len(res) == 1
+ res = res[0]
+ question_xml = HitQuestion.model_validate(
+ {"height": res.pop("height"), "url": res.pop("url")}
+ ).xml
+ res["question_id"] = res["question_id"]
+ res["hit_question_xml"] = question_xml
+
+ return Hit.from_postgres(res)
+
+ def get_active_count(self, hit_type_id: int):
+ return self.pg_config.execute_sql_query(
+ """
+ SELECT COUNT(1) as active_count
+ FROM mtwerk_hit
+ WHERE status = %(status)s
+ AND hit_type_id = %(hit_type_id)s;
+ """,
+ params={"status": HitStatus.Assignable, "hit_type_id": hit_type_id},
+ )[0]["active_count"]
+
+ def filter_active_ids(self, hit_type_id: int):
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT mh.amt_hit_id
+ FROM mtwerk_hit mh
+ WHERE hit_type_id = %(hit_type_id)s;
+ """,
+ params={"hit_type_id": hit_type_id},
+ )
+ return {x["amt_hit_id"] for x in res}
diff --git a/jb/managers/worker.py b/jb/managers/worker.py
new file mode 100644
index 0000000..e2d7237
--- /dev/null
+++ b/jb/managers/worker.py
@@ -0,0 +1,16 @@
+from typing import List
+
+from mypy_boto3_mturk.type_defs import WorkerBlockTypeDef
+
+from jb.decorators import AMT_CLIENT
+
+
+class WorkerManager:
+
+ @staticmethod
+ def fetch_worker_blocks() -> List[WorkerBlockTypeDef]:
+ p = AMT_CLIENT.get_paginator("list_worker_blocks")
+ res: List[WorkerBlockTypeDef] = []
+ for item in p.paginate():
+ res.extend(item["WorkerBlocks"])
+ return res
diff --git a/jb/models/__init__.py b/jb/models/__init__.py
new file mode 100644
index 0000000..0aeae14
--- /dev/null
+++ b/jb/models/__init__.py
@@ -0,0 +1,40 @@
+from decimal import Decimal
+from typing import Optional
+
+from pydantic import BaseModel, Field, ConfigDict
+
+
+class HTTPHeaders(BaseModel):
+ request_id: str = Field(alias="x-amzn-requestid", min_length=36, max_length=36)
+ content_type: str = Field(alias="content-type", min_length=26, max_length=26)
+ # 'content-length': '1255',
+ content_length: str = Field(alias="content-length", min_length=2)
+ # 'Mon, 15 Jan 2024 23:40:32 GMT'
+ date: str = Field()
+
+ connection: Optional[str] = Field(default=None) # 'close'
+
+
+class ResponseMetadata(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ request_id: str = Field(alias="RequestId", min_length=36, max_length=36)
+ status_code: int = Field(alias="HTTPStatusCode", ge=200, le=599)
+ headers: HTTPHeaders = Field(alias="HTTPHeaders")
+ retry_attempts: int = Field(alias="RetryAttempts", ge=0)
+
+
+class AMTAccount(BaseModel):
+ model_config = ConfigDict(extra="ignore", validate_assignment=True)
+
+ # Remaining available AWS Billing usage if you have enabled AWS Billing.
+ available_balance: Decimal = Field()
+ onhold_balance: Decimal = Field(default=Decimal(0))
+
+ # --- Properties ---
+
+ @property
+ def is_healthy(self) -> bool:
+ # A healthy account is one with at least $2,500 worth of
+ # credit available to it
+ return self.available_balance >= 2_500
diff --git a/jb/models/api_response.py b/jb/models/api_response.py
new file mode 100644
index 0000000..6b29e51
--- /dev/null
+++ b/jb/models/api_response.py
@@ -0,0 +1,17 @@
+from pydantic import BaseModel, ConfigDict, Field, model_validator
+
+from jb.models.assignment import Assignment
+from jb.models.hit import Hit
+
+
+class AssignmentResponse(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ assignment: Assignment = Field(alias="Assignment")
+ hit: Hit = Field(alias="HIT")
+
+ @model_validator(mode="after")
+ def check_consistent_hit_id(self) -> "AssignmentResponse":
+ if self.hit.id != self.assignment.hit_id:
+ raise ValueError("Inconsistent Hit IDs")
+ return self
diff --git a/jb/models/assignment.py b/jb/models/assignment.py
new file mode 100644
index 0000000..39ae47c
--- /dev/null
+++ b/jb/models/assignment.py
@@ -0,0 +1,388 @@
+import logging
+from datetime import datetime, timezone
+from typing import Optional, TypedDict
+from xml.etree import ElementTree
+
+from mypy_boto3_mturk.type_defs import AssignmentTypeDef
+from pydantic import (
+ BaseModel,
+ Field,
+ ConfigDict,
+ model_validator,
+ PositiveInt,
+ computed_field,
+ TypeAdapter,
+ ValidationError,
+)
+from typing_extensions import Self
+
+from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr
+from jb.models.definitions import AssignmentStatus
+
+
+class AnswerDict(TypedDict):
+ amt_assignment_id: str
+ amt_worker_id: str
+ tsid: str
+
+
+class AssignmentStub(BaseModel):
+ # todo: we need an "AssignmentStub" model that just has
+ # the IDs, this is used when a user accepts an assignment
+ # but hasn't submitted it yet. We want to create it in the db
+ # at that point.
+
+ model_config = ConfigDict(
+ extra="forbid",
+ validate_assignment=True,
+ )
+
+ id: Optional[PositiveInt] = Field(default=None)
+ hit_id: Optional[PositiveInt] = Field(default=None)
+ amt_assignment_id: AMTBoto3ID = Field()
+ amt_hit_id: AMTBoto3ID = Field()
+ amt_worker_id: str = Field(min_length=3, max_length=50)
+
+ status: AssignmentStatus = Field()
+
+ # GRL Specific
+ created_at: AwareDatetimeISO = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was saved in the database",
+ )
+
+ modified_at: Optional[AwareDatetimeISO] = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was updated / modified in the database",
+ )
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ return d
+
+
+class Assignment(AssignmentStub):
+ """
+ The Assignment data structure represents a single assignment of a HIT to
+ a Worker. The assignment tracks the Worker's efforts to complete the HIT,
+ and contains the results for later retrieval.
+
+ The Assignment data structure is used as a response element for the
+ following operations:
+
+ GetAssignment
+ GetAssignmentsForHIT
+
+ https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
+ """
+
+ auto_approval_time: AwareDatetimeISO = Field(
+ description="If results have been submitted, AutoApprovalTime is the "
+ "date and time the results of the assignment results are "
+ "considered Approved automatically if they have not already "
+ "been explicitly approved or rejected by the Requester. "
+ "This value is derived from the auto-approval delay "
+ "specified by the Requester in the HIT. This value is "
+ "omitted from the assignment if the Worker has not yet "
+ "submitted results.",
+ )
+
+ accept_time: AwareDatetimeISO = Field(
+ description="The date and time the Worker accepted the assignment.",
+ )
+
+ submit_time: AwareDatetimeISO = Field(
+ description="The date and time the assignment was submitted. This value "
+ "is omitted from the assignment if the Worker has not yet "
+ "submitted results.",
+ )
+
+ approval_time: Optional[AwareDatetimeISO] = Field(
+ default=None,
+ description="The date and time the Requester approved the results. This "
+ "value is omitted from the assignment if the Requester has "
+ "not yet approved the results.",
+ )
+ rejection_time: Optional[AwareDatetimeISO] = Field(
+ default=None,
+ description="The date and time the Requester rejected the results.",
+ )
+
+ requester_feedback: Optional[str] = Field(
+ # Default: None. This field isn't returned with assignment data by
+ # default. To request this field, specify a response group of
+ # AssignmentFeedback. For information about response groups, see
+ # Common Parameters.
+ default=None,
+ min_length=3,
+ max_length=2_000,
+ help_text="The feedback string included with the call to the "
+ "ApproveAssignment operation or the RejectAssignment "
+ "operation, if the Requester approved or rejected the "
+ "assignment and specified feedback.",
+ )
+
+ answer_xml: Optional[str] = Field(default=None, exclude=True)
+
+ # GRL Specific
+
+ tsid: Optional[UUIDStr] = Field(default=None)
+
+ # --- Validators ---
+
+ @model_validator(mode="before")
+ def set_tsid(cls, values: dict):
+ if values.get("tsid") is None and (answer_xml := values.get("answer_xml")):
+ answer_dict = cls.parse_answer_xml(answer_xml)
+ tsid = answer_dict.get("tsid")
+ try:
+ values["tsid"] = TypeAdapter(UUIDStr).validate_python(tsid)
+ except ValidationError as e:
+ # Don't break the model validation if a baddie messes with the tsid in the answer.
+ logging.warning(e)
+ values["tsid"] = None
+ return values
+
+ @model_validator(mode="after")
+ def check_time_sequences(self) -> Self:
+ if self.accept_time > self.submit_time:
+ raise ValueError("Assignment times invalid")
+
+ return self
+
+ @model_validator(mode="after")
+ def check_answers_alignment(self) -> Self:
+ if self.answers_dict is None:
+ return self
+ if self.amt_worker_id != self.answers_dict["amt_worker_id"]:
+ raise ValueError("Assignment answer invalid worker_id")
+ if self.amt_assignment_id != self.answers_dict["amt_assignment_id"]:
+ raise ValueError("Assignment answer invalid amt_assignment_id")
+ if (
+ self.tsid
+ and self.answers_dict["tsid"]
+ and self.tsid != self.answers_dict["tsid"]
+ ):
+ raise ValueError("Assignment answer invalid tsid")
+ return self
+
+ # --- Properties ---
+
+ @property
+ def answers_dict(self) -> Optional[AnswerDict]:
+ # See https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_AssignmentDataStructureArticle.html
+ # https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMechanicalTurkRequester/Concepts_NotificationsArticle.html
+ if self.answer_xml is None:
+ return None
+
+ return self.parse_answer_xml(self.answer_xml)
+
+ @staticmethod
+ def parse_answer_xml(answer_xml: str):
+ root = ElementTree.fromstring(answer_xml)
+ ns = {
+ "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd"
+ }
+ res = {}
+
+ for a in root.findall("mt:Answer", ns):
+ name = a.find("mt:QuestionIdentifier", ns).text
+ value = a.find("mt:FreeText", ns).text
+ res[name] = value or ""
+
+ EXPECTED_KEYS = {"amt_assignment_id", "amt_worker_id", "tsid"}
+ # We don't want validation to fail if a baddie inserts or changes url
+ # params, which will result in new or missing keys. Amazon generates the xml
+ # so that should always be correct
+ # assert all(k in res for k in EXPECTED_KEYS), list(res.keys())
+ res = {k: v for k, v in res.items() if k in EXPECTED_KEYS}
+ return res
+
+ @classmethod
+ def from_amt_get_assignment(cls, data: AssignmentTypeDef) -> Self:
+ assignment = cls(
+ amt_assignment_id=data["AssignmentId"],
+ amt_hit_id=data["HITId"],
+ amt_worker_id=data["WorkerId"],
+ status=AssignmentStatus[data["AssignmentStatus"]],
+ auto_approval_time=data["AutoApprovalTime"].astimezone(tz=timezone.utc),
+ accept_time=data["AcceptTime"].astimezone(tz=timezone.utc),
+ submit_time=data["SubmitTime"].astimezone(tz=timezone.utc),
+ approval_time=(
+ data["ApprovalTime"].astimezone(tz=timezone.utc)
+ if data.get("ApprovalTime")
+ else None
+ ),
+ rejection_time=(
+ data["RejectionTime"].astimezone(tz=timezone.utc)
+ if data.get("RejectionTime")
+ else None
+ ),
+ answer_xml=data["Answer"],
+ requester_feedback=data.get("RequesterFeedback"),
+ )
+ return assignment
+
+ def to_stub(self) -> AssignmentStub:
+ return AssignmentStub.model_validate(
+ self.model_dump(include=set(AssignmentStub.model_fields.keys()))
+ )
+
+ # --- Methods ---
+ #
+ # def refresh(self) -> Self:
+ # from tasks.mtwerk.managers.assignment import AssignmentManager
+ # return AssignmentManager.fetch_by_id(self)
+ #
+ # def reject(self, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT):
+ # """
+ # Save in the database that the Assignment was rejected, and also
+ # Report to Amazon Mechanical Turk that this Assignment should be
+ # rejected
+ #
+ # TODO: can this only occur when the Assignment is in a certain status?
+ #
+ # :return:
+ # """
+ # now = datetime.now(tz=None)
+ #
+ # MYSQLC.execute_sql_query("""
+ # UPDATE `amt-jb`.`mtwerk_assignment`
+ # SET submit_time = %s, rejection_time = %s, status = %s,
+ # requester_feedback = %s
+ # WHERE assignment_id = %s""",
+ # params=[
+ # now, now,
+ # AssignmentStatus.Rejected.value,
+ # msg, self.id],
+ # commit=True)
+ #
+ # CLIENT.reject_assignment(
+ # AssignmentId=self.id,
+ # RequesterFeedback=msg)
+ #
+ # def approve(self, msg: str = "Approved."):
+ # """
+ # Report to Amazon Mechanical Turk that this Assignment should be
+ # approved
+ #
+ # TODO: can this only occur when the Assignment is in a certain status?
+ #
+ # :return:
+ # """
+ # CLIENT.approve_assignment(
+ # AssignmentId=self.id,
+ # RequesterFeedback=msg)
+ #
+ # def submit_and_complete_request(self) -> Optional[str]:
+ # """
+ # This approves the Assignment and issues the Reward
+ # amount (typically $.05)
+ #
+ # :return:
+ # """
+ # worker = self.worker
+ # amount = DecimalUSDDollars(self.hit.reward)
+ #
+ # # If successful, returns the cashout id, otherwise, returns None
+ # cashout: Optional[dict] = worker.cashout_request(
+ # amount=amount,
+ # cashout_method_id=AMT_ASSIGNMENT_CASHOUT_METHOD)
+ #
+ # if cashout is None or cashout.get('status') != PayoutStatus.PENDING:
+ # return None
+ #
+ # cashout_id: str = cashout[id]
+ #
+ # approval: Optional[dict] = Bonus.manage_pending_cashout(
+ # cashout_id=cashout_id,
+ # action=PayoutStatus.APPROVED)
+ #
+ # if approval is None or approval['status'] != PayoutStatus.APPROVED:
+ # return None
+ #
+ # completion: Optional[dict] = Bonus.manage_pending_cashout(
+ # cashout_id=cashout_id,
+ # action=PayoutStatus.COMPLETE)
+ #
+ # if completion is None or completion['status'] != PayoutStatus.COMPLETE:
+ # return None
+ #
+ # return cashout_id
+ #
+ # # --- ORM ---
+ #
+ # def model_dump_mysql(self, *args, **kwargs) -> dict:
+ # d = self.model_dump(mode='json', *args, **kwargs)
+ #
+ # d['auto_approval_time'] = self.auto_approval_time.replace(tzinfo=None)
+ # d['accept_time'] = self.accept_time.replace(tzinfo=None)
+ # d['submit_time'] = self.submit_time.replace(tzinfo=None)
+ #
+ # if self.approval_time:
+ # d['approval_time'] = self.approval_time.replace(tzinfo=None)
+ #
+ # if self.rejection_time:
+ # d['rejection_time'] = self.rejection_time.replace(tzinfo=None)
+ #
+ # # created is automatically added by the database
+ # d['created'] = self.created.replace(tzinfo=None)
+ #
+ # if self.modified:
+ # d['modified'] = self.modified.replace(tzinfo=None)
+ #
+ # d['tsid'] = self.answers.get('tsid')
+ #
+ # return d
+ #
+ # def save(self) -> bool:
+ # """
+ # Either INSERTS or UPDATES the Assignment instance to a Mysql
+ # record.
+ # """
+ #
+ # # We're modifying the record, so set the time to right now!
+ # self.modified = datetime.now(tz=timezone.utc)
+ #
+ # query = """
+ # INSERT `amt-jb`.`mtwerk_assignment` (
+ # id, worker_id, hit_id, status,
+ # auto_approval_time, accept_time, submit_time,
+ # approval_time, rejection_time,
+ # requester_feedback, created, modified, tsid
+ # )
+ # VALUES (
+ # %(id)s, %(worker_id)s, %(hit_id)s, %(status)s,
+ # %(auto_approval_time)s, %(accept_time)s, %(submit_time)s,
+ # %(approval_time)s, %(rejection_time)s,
+ # %(requester_feedback)s, %(created)s, %(modified)s, %(tsid)s
+ # )
+ # ON DUPLICATE KEY UPDATE
+ # worker_id = %(worker_id)s,
+ # hit_id = %(hit_id)s,
+ # status = %(status)s,
+ #
+ # auto_approval_time = %(auto_approval_time)s,
+ # accept_time = %(accept_time)s,
+ # submit_time = %(submit_time)s,
+ #
+ # approval_time = %(approval_time)s,
+ # rejection_time = %(rejection_time)s,
+ #
+ # requester_feedback = %(requester_feedback)s,
+ # -- Not going to update created just incase it changed
+ # -- in pydantic for some reason
+ # modified = %(modified)s,
+ # tsid = %(tsid)s
+ # """
+ #
+ # try:
+ # MYSQLC.execute_sql_query(query, params=self.model_dump_mysql(), commit=True)
+ # return True
+ #
+ # except Exception as e:
+ # return False
+ #
+
+
+# REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"
diff --git a/jb/models/bonus.py b/jb/models/bonus.py
new file mode 100644
index 0000000..564a32d
--- /dev/null
+++ b/jb/models/bonus.py
@@ -0,0 +1,48 @@
+from typing import Optional, Dict
+
+from pydantic import BaseModel, Field, ConfigDict, PositiveInt
+from typing_extensions import Self
+
+from jb.models.currency import USDCent
+from jb.models.custom_types import AMTBoto3ID, AwareDatetimeISO, UUIDStr
+from jb.models.definitions import PayoutStatus
+
+
+class Bonus(BaseModel):
+ """
+ A Bonus is created (in our DB) ONLY associated with an APPROVED
+ thl-payout-event, AFTER the bonus has actually been sent to
+ the worker.
+ We have the payout_event uuid as the unique request token to make
+ sure it only gets sent once (param in the boto request).
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ validate_assignment=True,
+ )
+ id: Optional[PositiveInt] = Field(default=None)
+ assignment_id: Optional[PositiveInt] = Field(default=None)
+
+ amt_worker_id: str = Field(min_length=3, max_length=50)
+ amt_assignment_id: AMTBoto3ID = Field()
+
+ amount: USDCent = Field()
+ reason: str = Field(min_length=5)
+ grant_time: AwareDatetimeISO = Field()
+
+ # -- GRL Specific ---
+ payout_event_id: UUIDStr = Field()
+ # created: Optional[AwareDatetimeISO] = Field(default=None)
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ d["amount"] = self.amount.to_usd()
+ return d
+
+ @classmethod
+ def from_postgres(cls, data: Dict) -> Self:
+ data["amount"] = USDCent(round(data["amount"] * 100))
+ fields = set(cls.model_fields.keys())
+ data = {k: v for k, v in data.items() if k in fields}
+ return cls.model_validate(data)
diff --git a/jb/models/currency.py b/jb/models/currency.py
new file mode 100644
index 0000000..3094e2a
--- /dev/null
+++ b/jb/models/currency.py
@@ -0,0 +1,70 @@
+import warnings
+from decimal import Decimal
+from typing import Any
+
+from pydantic import GetCoreSchemaHandler, NonNegativeInt
+from pydantic_core import CoreSchema, core_schema
+
+
+class USDCent(int):
+ def __new__(cls, value, *args, **kwargs):
+
+ if isinstance(value, float):
+ warnings.warn(
+ "USDCent init with a float. Rounding behavior may " "be unexpected"
+ )
+
+ if isinstance(value, Decimal):
+ warnings.warn(
+ "USDCent init with a Decimal. Rounding behavior may " "be unexpected"
+ )
+
+ if value < 0:
+ raise ValueError("USDCent not be less than zero")
+
+ return super(cls, cls).__new__(cls, value)
+
+ def __add__(self, other):
+ assert isinstance(other, USDCent)
+ res = super(USDCent, self).__add__(other)
+ return self.__class__(res)
+
+ def __sub__(self, other):
+ assert isinstance(other, USDCent)
+ res = super(USDCent, self).__sub__(other)
+ return self.__class__(res)
+
+ def __mul__(self, other):
+ assert isinstance(other, USDCent)
+ res = super(USDCent, self).__mul__(other)
+ return self.__class__(res)
+
+ def __abs__(self):
+ res = super(USDCent, self).__abs__()
+ return self.__class__(res)
+
+ def __truediv__(self, other):
+ raise ValueError("Division not allowed for USDCent")
+
+ def __str__(self):
+ return "%d" % int(self)
+
+ def __repr__(self):
+ return "USDCent(%d)" % int(self)
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls, source_type: Any, handler: GetCoreSchemaHandler
+ ) -> CoreSchema:
+ """
+ https://docs.pydantic.dev/latest/concepts/types/#customizing-validation-with-__get_pydantic_core_schema__
+ """
+ return core_schema.no_info_after_validator_function(
+ cls, handler(NonNegativeInt)
+ )
+
+ def to_usd(self) -> Decimal:
+ return Decimal(int(self) / 100).quantize(Decimal(".01"))
+
+ def to_usd_str(self) -> str:
+ return "${:,.2f}".format(float(self.to_usd()))
diff --git a/jb/models/custom_types.py b/jb/models/custom_types.py
new file mode 100644
index 0000000..70bc5c1
--- /dev/null
+++ b/jb/models/custom_types.py
@@ -0,0 +1,113 @@
+import re
+from datetime import datetime, timezone
+from typing import Any, Optional
+from uuid import UUID
+
+from pydantic import (
+ AwareDatetime,
+ StringConstraints,
+ TypeAdapter,
+ HttpUrl,
+)
+from pydantic.functional_serializers import PlainSerializer
+from pydantic.functional_validators import AfterValidator, BeforeValidator
+from pydantic.networks import UrlConstraints
+from pydantic_core import Url
+from typing_extensions import Annotated
+
+
+def convert_datetime_to_iso_8601_with_z_suffix(dt: datetime) -> str:
+ # By default, datetimes are serialized with the %f optional. We don't want that because
+ # then the deserialization fails if the datetime didn't have microseconds.
+ return dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+
+
+def convert_str_dt(v: Any) -> Optional[AwareDatetime]:
+ # By default, pydantic is unable to handle tz-aware isoformat str. Attempt to parse a str
+ # that was dumped using the iso8601 format with Z suffix.
+ if v is not None and type(v) is str:
+ assert v.endswith("Z") and "T" in v, "invalid format"
+ return datetime.strptime(v, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
+ tzinfo=timezone.utc
+ )
+ return v
+
+
+def assert_utc(v: AwareDatetime) -> AwareDatetime:
+ if isinstance(v, datetime):
+ assert v.tzinfo == timezone.utc, "Timezone is not UTC"
+ return v
+
+
+# Our custom AwareDatetime that correctly serializes and deserializes
+# to an ISO8601 str with timezone
+AwareDatetimeISO = Annotated[
+ AwareDatetime,
+ BeforeValidator(convert_str_dt),
+ AfterValidator(assert_utc),
+ PlainSerializer(
+ lambda x: x.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
+ when_used="json-unless-none",
+ ),
+]
+
+# ISO 3166-1 alpha-2 (two-letter codes, lowercase)
+# "Like" b/c it matches the format, but we're not explicitly checking
+# it is one of our supported values. See models.thl.locales for that.
+CountryISOLike = Annotated[
+ str, StringConstraints(max_length=2, min_length=2, pattern=r"^[a-z]{2}$")
+]
+# 3-char ISO 639-2/B, lowercase
+LanguageISOLike = Annotated[
+ str, StringConstraints(max_length=3, min_length=3, pattern=r"^[a-z]{3}$")
+]
+
+
+def check_valid_uuid(v: str) -> str:
+ try:
+ assert UUID(v).hex == v
+ except Exception:
+ raise ValueError("Invalid UUID")
+ return v
+
+
+# Our custom field that stores a UUID4 as the .hex string representation
+UUIDStr = Annotated[
+ str,
+ StringConstraints(min_length=32, max_length=32),
+ AfterValidator(check_valid_uuid),
+]
+# Accepts the non-hex representation and coerces
+UUIDStrCoerce = Annotated[
+ str,
+ StringConstraints(min_length=32, max_length=32),
+ BeforeValidator(lambda value: TypeAdapter(UUID).validate_python(value).hex),
+ AfterValidator(check_valid_uuid),
+]
+
+# Same thing as UUIDStr with HttpUrl field. It is confusing that this
+# is not a str https://github.com/pydantic/pydantic/discussions/6395
+HttpUrlStr = Annotated[
+ str,
+ BeforeValidator(lambda value: str(TypeAdapter(HttpUrl).validate_python(value))),
+]
+
+HttpsUrl = Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=["https"])]
+HttpsUrlStr = Annotated[
+ str,
+ BeforeValidator(lambda value: str(TypeAdapter(HttpsUrl).validate_python(value))),
+]
+
+
+def check_valid_amt_boto3_id(v: str) -> str:
+ # Test ids from amazon have 20 chars
+ if not re.fullmatch(r"[A-Z0-9]{20}|[A-Z0-9]{30}", v):
+ raise ValueError("Invalid AMT Boto3 ID")
+ return v
+
+
+AMTBoto3ID = Annotated[
+ str,
+ StringConstraints(min_length=20, max_length=30),
+ AfterValidator(check_valid_amt_boto3_id),
+]
diff --git a/jb/models/definitions.py b/jb/models/definitions.py
new file mode 100644
index 0000000..a3d27ba
--- /dev/null
+++ b/jb/models/definitions.py
@@ -0,0 +1,90 @@
+from enum import IntEnum, StrEnum
+
+
+class AssignmentStatus(IntEnum):
+ # boto3.mturk specific
+ Submitted = 0 # same thing as Reviewable
+ Approved = 1
+ Rejected = 2
+
+ # GRL specific
+ Accepted = 3
+ PreviewState = 4
+ # Invalid = 5
+ # NotExist = 6
+
+
+class HitStatus(IntEnum):
+ """
+ https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_HITDataStructureArticle.html
+ """
+
+ # Official boto3.mturk
+ Assignable = 0
+ Unassignable = 1
+ Reviewable = 2
+ Reviewing = 3
+ Disposed = 4
+
+ # GRL Specific
+ NotExist = 5
+
+
+class HitReviewStatus(IntEnum):
+ NotReviewed = 0
+ MarkedForReview = 1
+ ReviewedAppropriate = 2
+ ReviewedInappropriate = 3
+
+
+class PayoutStatus(StrEnum):
+ """These are GRL's payout statuses"""
+
+ # The user has requested a payout. The money is taken from their
+ # wallet. A PENDING request can either be APPROVED, REJECTED, or
+ # CANCELLED. We can also implicitly skip the APPROVED step and go
+ # straight to COMPLETE or FAILED.
+ PENDING = "PENDING"
+ # The request is approved (by us or automatically). Once approved,
+ # it can be FAILED or COMPLETE.
+ APPROVED = "APPROVED"
+ # The request is rejected. The user loses the money.
+ REJECTED = "REJECTED"
+ # The user requests to cancel the request, the money goes back into their wallet.
+ CANCELLED = "CANCELLED"
+ # The payment was approved, but failed within external payment provider.
+ # This is an "error" state, as the money won't have moved anywhere. A
+ # FAILED payment can be tried again and be COMPLETE.
+ FAILED = "FAILED"
+ # The payment was sent successfully and (usually) a fee was charged
+ # to us for it.
+ COMPLETE = "COMPLETE"
+ # Not supported # REFUNDED: I'm not sure if this is possible or
+ # if we'd want to allow it.
+
+
+class ReportValue(IntEnum):
+ """
+ The reason a user reported a task.
+ """
+
+ # Used to indicate the user exited the task without giving feedback
+ REASON_UNKNOWN = 0
+ # Task is in the wrong language/country, unanswerable question, won't proceed to
+ # next question, loading forever, error message
+ TECHNICAL_ERROR = 1
+ # Task ended (completed or failed, and showed the user some dialog
+ # indicating the task was over), but failed to redirect
+ NO_REDIRECT = 2
+ # Asked for full name, home address, identity on another site, cc#
+ PRIVACY_INVASION = 3
+ # Asked about children, employer, medical issues, drug use, STDs, etc.
+ UNCOMFORTABLE_TOPICS = 4
+ # Asked to install software, signup/login to external site, access webcam,
+ # promise to pay using external site, etc.
+ ASKED_FOR_NOT_ALLOWED_ACTION = 5
+ # Task doesn't work well on a mobile device
+ BAD_ON_MOBILE = 6
+ # Too long, too boring, confusing, complicated, too many
+ # open-ended/free-response questions
+ DIDNT_LIKE = 7
diff --git a/jb/models/errors.py b/jb/models/errors.py
new file mode 100644
index 0000000..94f5fbb
--- /dev/null
+++ b/jb/models/errors.py
@@ -0,0 +1,80 @@
+import re
+from enum import Enum
+
+from pydantic import BaseModel, Field, ConfigDict, model_validator
+
+from jb.models import ResponseMetadata
+
+
+class BotoRequestErrorOperation(str, Enum):
+ GET_ASSIGNMENT = "GetAssignment"
+ GET_HIT = "GetHIT"
+
+
+class TurkErrorCode(str, Enum):
+ # Unclear: maybe when it's new?
+ HIT_NOT_EXIST = "AWS.MechanicalTurk.HITDoesNotExist"
+
+ # This seems to be for really old Assignments
+ # Also maybe when it's only a Preview?
+ # Happens 2+ years back, and also from past 24hrs
+ INVALID_ASSIGNEMENT_STATE = "AWS.MechanicalTurk.InvalidAssignmentState"
+
+ # If random assignmentId is used
+ ASSIGNMENT_NOT_EXIST = "AWS.MechanicalTurk.AssignmentDoesNotExist"
+
+
+class BotoRequestErrorResponseErrorCodes(str, Enum):
+ REQUEST_ERROR = "RequestError"
+
+
+class BotoRequestErrorResponseError(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ message: str = Field(alias="Message")
+ code: BotoRequestErrorResponseErrorCodes = Field(alias="Code")
+
+
+class BotoRequestErrorResponse(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ error: BotoRequestErrorResponseError = Field(alias="Error")
+ response_metadata: ResponseMetadata = Field(alias="ResponseMetadata")
+ message: str = Field(alias="Message", min_length=50)
+ error_code: TurkErrorCode = Field(alias="TurkErrorCode")
+
+ @model_validator(mode="after")
+ def check_consistent_hit_id(self) -> "BotoRequestErrorResponse":
+
+ match self.error_code:
+ case TurkErrorCode.HIT_NOT_EXIST:
+ if not re.match(
+ r"Hit [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message
+ ):
+ raise ValueError("Unknown message for TurkErrorCode.HIT_NOT_EXIST")
+
+ case TurkErrorCode.INVALID_ASSIGNEMENT_STATE:
+ if not re.match(
+ r"This operation can be called with a status of: Reviewable,Approved,Rejected \(\d{13}\)",
+ self.message,
+ ):
+ raise ValueError(
+ "Unknown message for TurkErrorCode.INVALID_ASSIGNEMENT_STATE"
+ )
+
+ case TurkErrorCode.ASSIGNMENT_NOT_EXIST:
+ if not re.match(
+ r"Assignment [A-Z0-9]{30} does not exist. \(\d{13}\)", self.message
+ ):
+ raise ValueError(
+ "Unknown message for TurkErrorCode.ASSIGNMENT_NOT_EXIST"
+ )
+
+ return self
+
+
+class BotoRequestError(BaseModel):
+ model_config = ConfigDict(extra="forbid", validate_assignment=True)
+
+ response: BotoRequestErrorResponse = Field()
+ operation_name: BotoRequestErrorOperation = Field()
diff --git a/jb/models/event.py b/jb/models/event.py
new file mode 100644
index 0000000..c357772
--- /dev/null
+++ b/jb/models/event.py
@@ -0,0 +1,38 @@
+from typing import Literal, Dict
+
+from mypy_boto3_mturk.literals import EventTypeType
+from pydantic import BaseModel, Field
+
+from jb.models.custom_types import AwareDatetimeISO, AMTBoto3ID
+
+
+class MTurkEvent(BaseModel):
+ """
+ What AWS SNS will POST to our mturk_notifications endpoint (inside the request body)
+ """
+
+ event_type: EventTypeType = Field(example="AssignmentSubmitted")
+ event_timestamp: AwareDatetimeISO = Field(example="2025-10-16T18:45:51Z")
+ amt_hit_id: AMTBoto3ID = Field(example="12345678901234567890")
+ amt_assignment_id: str = Field(
+ max_length=64, example="1234567890123456789012345678901234567890"
+ )
+ amt_hit_type_id: AMTBoto3ID = Field(example="09876543210987654321")
+
+ @classmethod
+ def from_sns(cls, data: Dict):
+ return cls.model_validate(
+ {
+ "event_type": data["EventType"],
+ "event_timestamp": cls.fix_mturk_timestamp(data["EventTimestamp"]),
+ "amt_hit_id": data["HITId"],
+ "amt_assignment_id": data["AssignmentId"],
+ "amt_hit_type_id": data["HITTypeId"],
+ }
+ )
+
+ @staticmethod
+ def fix_mturk_timestamp(ts: str) -> str:
+ if ts.endswith("Z") and "." not in ts:
+ ts = ts[:-1] + ".000Z"
+ return ts
diff --git a/jb/models/hit.py b/jb/models/hit.py
new file mode 100644
index 0000000..c3734fa
--- /dev/null
+++ b/jb/models/hit.py
@@ -0,0 +1,251 @@
+from datetime import datetime, timezone, timedelta
+from typing import Optional, List, Dict
+from uuid import uuid4
+from xml.etree import ElementTree
+
+from mypy_boto3_mturk.type_defs import HITTypeDef
+from pydantic import (
+ BaseModel,
+ Field,
+ PositiveInt,
+ ConfigDict,
+ NonNegativeInt,
+)
+from typing_extensions import Self
+
+from jb.models.currency import USDCent
+from jb.models.custom_types import AMTBoto3ID, HttpsUrlStr, AwareDatetimeISO
+from jb.models.definitions import HitStatus, HitReviewStatus
+
+
+class HitQuestion(BaseModel):
+ id: Optional[PositiveInt] = Field(default=None)
+
+ url: HttpsUrlStr = Field()
+ height: PositiveInt = Field(default=1_200, ge=100, le=4_000)
+
+ # --- Properties ---
+
+ def to_postgres(self):
+ return self.model_dump(mode="json")
+
+ @property
+ def xml(self) -> str:
+ return f"""<?xml version="1.0" encoding="UTF-8"?>
+ <ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd">
+ <ExternalURL>{str(self.url)}</ExternalURL>
+ <FrameHeight>{self.height}</FrameHeight>
+ </ExternalQuestion>"""
+
+
+class HitTypeCommon(BaseModel):
+ """
+ Fields on both the HitType and Hit
+ """
+
+ model_config = ConfigDict(
+ extra="forbid", validate_assignment=True, ser_json_timedelta="float"
+ )
+
+ title: str = Field(
+ min_length=3,
+ max_length=200,
+ description="The HIT post title that appears in the listing view",
+ )
+ description: str = Field(
+ min_length=3,
+ max_length=2_000,
+ description="The expand more about textarea, has a max of 2000 characters",
+ )
+ reward: USDCent = Field(
+ description="The amount of money the Requester will pay a Worker for successfully completing the HIT."
+ )
+
+ assignment_duration: timedelta = Field(
+ default=timedelta(minutes=90),
+ description="The amount of time, in seconds, that a Worker has to complete "
+ "the HIT after accepting it.",
+ )
+ auto_approval_delay: timedelta = Field(
+ default=timedelta(days=7),
+ description="The number of seconds after an assignment for the HIT has "
+ "been submitted, after which the assignment is considered "
+ "Approved automatically unless the Requester explicitly "
+ "rejects it.",
+ )
+ keywords: str = Field(min_length=3, max_length=999)
+
+
+class HitType(HitTypeCommon):
+ """
+ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk/client/create_hit_type.html
+ https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_CreateHITTypeOperation.html
+ """
+
+ id: Optional[PositiveInt] = Field(default=None)
+ amt_hit_type_id: Optional[AMTBoto3ID] = Field(default=None)
+
+ # --- GRL Specific ---
+ min_active: NonNegativeInt = Field(default=0, le=100_000)
+
+ def to_api_request_body(self):
+ return dict(
+ AutoApprovalDelayInSeconds=round(self.auto_approval_delay.total_seconds()),
+ AssignmentDurationInSeconds=round(self.assignment_duration.total_seconds()),
+ Reward=str(self.reward.to_usd()),
+ Title=self.title,
+ Keywords=self.keywords,
+ Description=self.description,
+ )
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ d["reward"] = self.reward.to_usd()
+ return d
+
+ @classmethod
+ def from_postgres(cls, data: Dict) -> Self:
+ data["reward"] = USDCent(round(data["reward"] * 100))
+ return cls.model_validate(data)
+
+ def generate_hit_amt_request(self, question: HitQuestion):
+ d = dict()
+ d["HITTypeId"] = self.amt_hit_type_id
+ d["MaxAssignments"] = 1
+ d["LifetimeInSeconds"] = round(timedelta(days=14).total_seconds())
+ d["Question"] = question.xml
+ d["UniqueRequestToken"] = uuid4().hex
+ return d
+
+
+class Hit(HitTypeCommon):
+ model_config = ConfigDict(
+ extra="forbid",
+ validate_assignment=True,
+ )
+
+ id: Optional[PositiveInt] = Field(default=None)
+ hit_type_id: Optional[PositiveInt] = Field(default=None)
+ question_id: Optional[PositiveInt] = Field(default=None)
+
+ amt_hit_id: AMTBoto3ID = Field()
+ amt_hit_type_id: AMTBoto3ID = Field()
+ amt_group_id: AMTBoto3ID = Field()
+ hit_question_xml: str = Field()
+
+ status: HitStatus = Field()
+ review_status: HitReviewStatus = Field()
+ creation_time: AwareDatetimeISO = Field(default=None, description="From aws")
+ expiration: Optional[AwareDatetimeISO] = Field(default=None)
+
+ # GRL Specific
+ created_at: AwareDatetimeISO = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was saved in the database",
+ )
+ modified_at: AwareDatetimeISO = Field(
+ default_factory=lambda: datetime.now(tz=timezone.utc),
+ description="When this record was last modified",
+ )
+
+ # -- Hit specific
+
+ qualification_requirements: Optional[List[Dict]] = Field(default=None)
+ max_assignments: int = Field()
+
+ # # this comes back as expiration. only for the request
+ # lifetime: timedelta = Field(
+ # default=timedelta(days=14),
+ # description="An amount of time, in seconds, after which the HIT is no longer "
+ # "available for users to accept.",
+ # )
+ assignment_pending_count: NonNegativeInt = Field()
+ assignment_available_count: NonNegativeInt = Field()
+ assignment_completed_count: NonNegativeInt = Field()
+
+ @classmethod
+ def from_amt_create_hit(
+ cls, data: HITTypeDef, question: HitQuestion, hit_type: HitType
+ ) -> Self:
+ assert question.id is not None
+ assert hit_type.id is not None
+ assert hit_type.amt_hit_type_id is not None
+
+ h = Hit.model_validate(
+ dict(
+ amt_hit_id=data["HITId"],
+ amt_hit_type_id=data["HITTypeId"],
+ amt_group_id=data["HITGroupId"],
+ status=HitStatus[data["HITStatus"]],
+ review_status=HitReviewStatus[data["HITReviewStatus"]],
+ creation_time=data["CreationTime"].astimezone(tz=timezone.utc),
+ expiration=data["Expiration"].astimezone(tz=timezone.utc),
+ hit_question_xml=data["Question"],
+ qualification_requirements=data["QualificationRequirements"],
+ max_assignments=data["MaxAssignments"],
+ assignment_pending_count=data["NumberOfAssignmentsPending"],
+ assignment_available_count=data["NumberOfAssignmentsAvailable"],
+ assignment_completed_count=data["NumberOfAssignmentsCompleted"],
+ description=data["Description"],
+ keywords=data["Keywords"],
+ reward=USDCent(round(float(data["Reward"]) * 100)),
+ title=data["Title"],
+ question_id=question.id,
+ hit_type_id=hit_type.id,
+ )
+ )
+ return h
+
+ @classmethod
+ def from_amt_get_hit(cls, data: HITTypeDef) -> Self:
+ h = Hit.model_validate(
+ dict(
+ amt_hit_id=data["HITId"],
+ amt_hit_type_id=data["HITTypeId"],
+ amt_group_id=data["HITGroupId"],
+ status=HitStatus[data["HITStatus"]],
+ review_status=HitReviewStatus[data["HITReviewStatus"]],
+ creation_time=data["CreationTime"].astimezone(tz=timezone.utc),
+ expiration=data["Expiration"].astimezone(tz=timezone.utc),
+ hit_question_xml=data["Question"],
+ qualification_requirements=data["QualificationRequirements"],
+ max_assignments=data["MaxAssignments"],
+ assignment_pending_count=data["NumberOfAssignmentsPending"],
+ assignment_available_count=data["NumberOfAssignmentsAvailable"],
+ assignment_completed_count=data["NumberOfAssignmentsCompleted"],
+ description=data["Description"],
+ keywords=data["Keywords"],
+ reward=USDCent(round(float(data["Reward"]) * 100)),
+ title=data["Title"],
+ question_id=None,
+ hit_type_id=None,
+ )
+ )
+ return h
+
+ def to_postgres(self):
+ d = self.model_dump(mode="json")
+ d["reward"] = self.reward.to_usd()
+ return d
+
+ @classmethod
+ def from_postgres(cls, data: Dict) -> Self:
+ data["reward"] = USDCent(round(data["reward"] * 100))
+ return cls.model_validate(data)
+
+ @property
+ def hit_question(self) -> HitQuestion:
+ root = ElementTree.fromstring(self.hit_question_xml)
+
+ ns = {
+ "mt": "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd"
+ }
+ res = {}
+
+ lookup_table = dict(ExternalURL="url", FrameHeight="height")
+ for a in root.findall("mt:*", ns):
+ key = lookup_table[a.tag.split("}")[1]]
+ val = a.text
+ res[key] = val
+
+ return HitQuestion.model_validate(res, from_attributes=True)
diff --git a/jb/views/__init__.py b/jb/views/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/jb/views/__init__.py
diff --git a/nginx.conf b/nginx.conf
new file mode 100644
index 0000000..d58f4b1
--- /dev/null
+++ b/nginx.conf
@@ -0,0 +1,39 @@
+user www-data;
+worker_processes 1;
+pid /run/nginx.pid;
+include /etc/nginx/modules-enabled/*.conf;
+events {
+ worker_connections 512;
+}
+http {
+ include /etc/nginx/conf.d/*.conf;
+ include /etc/nginx/sites-enabled/*;
+
+ # `$upstream_http_x_route` and $upstream_http_x_cache_hit are custom headers set by me in the flask app
+ # All values needs to be in quotes b/c if a variable is empty, it puts '-' which is not valid json
+ log_format json_logs escape=json '{'
+ '"time_local":"$time_local",'
+ '"time":"$msec",'
+ '"client":"$remote_addr",'
+ '"forwarded_for":"$http_x_forwarded_for",'
+ '"method":"$request_method",'
+ '"request":"$request",'
+ '"request_length":"$request_length",'
+ '"status":"$status",'
+ '"bytes_sent":"$bytes_sent",'
+ '"referer":"$http_referer",'
+ '"user_agent":"$http_user_agent",'
+ '"upstream_status":"$upstream_status",'
+ '"request_time":"$request_time",'
+ '"upstream_response_time":"$upstream_response_time",'
+ '"upstream_connect_time":"$upstream_connect_time",'
+ '"upstream_header_time":"$upstream_header_time",'
+ '"upstream_route":"$upstream_http_x_route",'
+ '"upstream_cache_hit":"$upstream_http_x_cache_hit",'
+ '"host":"$host",'
+ '"http_host":"$http_host"'
+ '}';
+
+ access_log /var/log/nginx/access.log json_logs;
+ error_log /var/log/nginx/error.log;
+} \ No newline at end of file
diff --git a/nginx_amt-jb.conf b/nginx_amt-jb.conf
new file mode 100644
index 0000000..1bc770a
--- /dev/null
+++ b/nginx_amt-jb.conf
@@ -0,0 +1,43 @@
+upstream uvicorn {
+ server 127.0.0.1:8000 max_conns=64;
+}
+
+server {
+ listen 8080;
+ listen [::]:8080;
+ server_name jamesbillings67.com www.jamesbillings67.com;
+ charset utf-8;
+ real_ip_header X-Forwarded-For;
+ real_ip_recursive on;
+
+ # max upload size
+ client_max_body_size 1M; # adjust to taste
+
+ # Issue redirect without trailing slash to trailing slash
+ rewrite ^([^.]*[^/])$ https://jamesbillings67.com$1/ permanent;
+
+ location / {
+ include nginx_amt-jb_proxy_pass.conf;
+ proxy_pass http://uvicorn;
+ }
+
+ location = /favicon.ico {
+ access_log off;
+ return 200 '{}';
+ }
+
+ location = /nginx_health/ {
+ access_log off;
+ add_header 'Content-Type' 'application/json';
+ return 200 '{"status":"ok"}';
+ }
+}
+
+server {
+ listen 8080;
+ server_name cdn.jamesbillings67.com;
+
+ location / {
+ alias /root/amt-jb/static/;
+ }
+}
diff --git a/nginx_amt-jb_proxy_pass.conf b/nginx_amt-jb_proxy_pass.conf
new file mode 100644
index 0000000..ab092a9
--- /dev/null
+++ b/nginx_amt-jb_proxy_pass.conf
@@ -0,0 +1,4 @@
+proxy_set_header Host $host;
+proxy_set_header X-Real-IP $remote_addr;
+proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+proxy_set_header X-Forwarded-Proto $scheme; \ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..04e12ab
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,105 @@
+git+ssh://code.g-r-l.com:6611/py-utils@v3.2.2
+aiohappyeyeballs==2.6.1
+aiohttp==3.13.0
+aiosignal==1.4.0
+annotated-types==0.7.0
+anyio==4.11.0
+asgi-lifespan==2.1.0
+attrs==25.4.0
+black==25.9.0
+boto3==1.40.51
+boto3-stubs==1.40.51
+botocore==1.40.51
+botocore-stubs==1.40.51
+cachetools==6.2.1
+certifi==2025.10.5
+charset-normalizer==3.4.3
+click==8.3.0
+cloudpickle==3.1.1
+dask==2025.7.0
+decorator==5.2.1
+Deprecated==1.2.18
+distributed==2025.7.0
+dnspython==2.8.0
+email-validator==2.3.0
+Faker==37.11.0
+fastapi==0.119.0
+frozenlist==1.8.0
+fsspec==2025.9.0
+geoip2==5.1.0
+h11==0.16.0
+httpcore==1.0.9
+httpx==0.28.1
+idna==3.11
+influxdb==5.3.2
+iniconfig==2.1.0
+Jinja2==3.1.6
+jmespath==1.0.1
+limits==5.6.0
+locket==1.0.0
+MarkupSafe==3.0.3
+maxminddb==2.8.2
+more-itertools==10.8.0
+msgpack==1.1.2
+multidict==6.7.0
+mypy-boto3-mturk==1.40.20
+mypy-boto3-sns==1.40.1
+mypy_extensions==1.1.0
+numpy==2.3.3
+packaging==25.0
+pandas==2.3.3
+pandera==0.26.1
+partd==1.4.2
+pathspec==0.12.1
+platformdirs==4.5.0
+pluggy==1.6.0
+propcache==0.4.1
+protobuf==6.32.1
+psutil==7.1.1
+psycopg==3.2.10
+pycountry==24.6.1
+pydantic==2.12.0
+pydantic-extra-types==2.10.6
+pydantic-settings==2.11.0
+pydantic_core==2.41.1
+Pygments==2.19.2
+pylibmc==1.6.3
+pymemcache==4.0.0
+PyMySQL==1.1.2
+pytest==8.4.2
+python-dateutil==2.9.0.post0
+python-dotenv==1.1.1
+pytokens==0.1.10
+pytz==2025.2
+PyYAML==6.0.3
+redis==6.4.0
+requests==2.32.5
+requests-mock==1.12.1
+s3transfer==0.14.0
+scipy==1.16.2
+sentry-sdk==2.41.0
+setuptools==80.9.0
+six==1.17.0
+slackclient==2.9.4
+sniffio==1.3.1
+sortedcontainers==2.4.0
+starlette==0.48.0
+tblib==3.2.0
+toolz==1.1.0
+tornado==6.5.2
+typeguard==4.4.4
+types-awscrt==0.28.1
+types-s3transfer==0.14.0
+typing-inspect==0.9.0
+typing-inspection==0.4.2
+typing_extensions==4.15.0
+tzdata==2025.2
+ua-parser==1.0.1
+ua-parser-builtins==0.18.0.post1
+urllib3==2.5.0
+user-agents==2.2.0
+uvicorn==0.37.0
+wheel==0.45.1
+wrapt==1.17.3
+yarl==1.22.0
+zict==3.0.0
diff --git a/run.py b/run.py
new file mode 100644
index 0000000..ebf2e4b
--- /dev/null
+++ b/run.py
@@ -0,0 +1,6 @@
+import uvicorn
+
+from jb.main import app
+
+if __name__ == "__main__":
+ uvicorn.run(app, host="127.0.0.1", port=8081, timeout_keep_alive=180)
diff --git a/templates/base.html b/templates/base.html
new file mode 100644
index 0000000..c17972e
--- /dev/null
+++ b/templates/base.html
@@ -0,0 +1,16 @@
+<!doctype html>
+<html lang="en-US">
+<head>
+ <meta charset="utf-8">
+ <meta name="viewport"
+ content="width=device-width, initial-scale=1.0, height=device-height, minimum-scale=1.0, user-scalable=1">
+ <title>James Billings loves you.</title>
+ <link rel="stylesheet"
+ href="https://cdn.jamesbillings67.com/james-has-style.css"
+ type="text/css">
+</head>
+<body>
+<div id="root"></div>
+<script src="https://cdn.jamesbillings67.com/james-billings.js"></script>
+</body>
+</html> \ No newline at end of file
diff --git a/tests/amt/__init__.py b/tests/amt/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/amt/__init__.py
diff --git a/tests/amt/conftest.py b/tests/amt/conftest.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/amt/conftest.py
diff --git a/tests/amt/test_models.py b/tests/amt/test_models.py
new file mode 100644
index 0000000..c2a61b5
--- /dev/null
+++ b/tests/amt/test_models.py
@@ -0,0 +1,41 @@
+import copy
+
+import pytest
+
+from jb.models.assignment import Assignment
+
+
+@pytest.fixture
+def get_assignment_response_bad_tsid(
+ get_assignment_response, amt_worker_id, amt_assignment_id
+):
+ res = copy.deepcopy(get_assignment_response)
+ res["Assignment"]["Answer"] = (
+ '<?xml version="1.0" encoding="UTF-8"?>\n'
+ '<QuestionFormAnswers xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd">\n '
+ "<Answer>\n <QuestionIdentifier>amt_worker_id</QuestionIdentifier>\n "
+ f" <FreeText>{amt_worker_id}</FreeText>\n </Answer>\n <Answer>\n "
+ " <QuestionIdentifier>amt_assignment_id</QuestionIdentifier>\n "
+ f" <FreeText>{amt_assignment_id}</FreeText>\n </Answer>\n "
+ f" <Answer>\n <QuestionIdentifier>tsid</QuestionIdentifier>\n abc123 <FreeText></FreeText>\n </Answer>\n"
+ f"</QuestionFormAnswers>"
+ )
+ return res
+
+def test_get_assignment(get_assignment_response):
+ assignment = Assignment.from_amt_get_assignment(
+ get_assignment_response["Assignment"]
+ )
+ assert assignment.tsid is not None
+
+def test_get_assignment_no_tsid(get_assignment_response_no_tsid):
+ assignment = Assignment.from_amt_get_assignment(
+ get_assignment_response_no_tsid["Assignment"]
+ )
+ assert assignment.tsid is None
+
+def test_get_assignment_bad_tsid(get_assignment_response_bad_tsid):
+ assignment = Assignment.from_amt_get_assignment(
+ get_assignment_response_bad_tsid["Assignment"]
+ )
+ assert assignment.tsid is None \ No newline at end of file
diff --git a/tests/flow/__init__.py b/tests/flow/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/flow/__init__.py
diff --git a/tests/flow/test_tasks.py b/tests/flow/test_tasks.py
new file mode 100644
index 0000000..37391d1
--- /dev/null
+++ b/tests/flow/test_tasks.py
@@ -0,0 +1,534 @@
+import logging
+from contextlib import contextmanager
+from datetime import timezone, datetime
+from typing import Dict
+from uuid import uuid4
+
+import pytest
+import requests
+from botocore.stub import Stubber
+from generalresearchutils.models.thl.payout import UserPayoutEvent
+from generalresearchutils.models.thl.wallet import PayoutType
+from generalresearchutils.models.thl.wallet.cashout_method import (
+ CashoutRequestResponse,
+ CashoutRequestInfo,
+)
+
+from jb.config import settings
+from jb.decorators import AMT_CLIENT, AM, BM
+from jb.flow.assignment_tasks import process_assignment_submitted
+from jb.managers.amt import (
+ AMTManager,
+ APPROVAL_MESSAGE,
+ REJECT_MESSAGE_BADDIE,
+ REJECT_MESSAGE_UNKNOWN_ASSIGNMENT,
+ REJECT_MESSAGE_NO_WORK,
+ BONUS_MESSAGE,
+ NO_WORK_APPROVAL_MESSAGE,
+)
+from jb.models.currency import USDCent
+from jb.models.definitions import AssignmentStatus, PayoutStatus
+from jb.models.event import MTurkEvent
+
+
+@contextmanager
+def amt_stub_context(responses):
+ # ty chatgpt for this
+ with Stubber(AMT_CLIENT) as stub:
+ for r in responses:
+ stub.add_response(
+ r["operation"],
+ r.get("response", {}),
+ r.get("expected_params", {}),
+ )
+ yield stub
+
+
+@pytest.fixture
+def approved_assignment_stubs(
+ get_assignment_response,
+ get_assignment_response_approved,
+ amt_assignment_id,
+ amt_hit_id,
+ get_hit_response_reviewing,
+):
+ # These are the AMT_CLIENT stubs/mocks that need to be set when running
+ # process_assignment_submitted() which will result in an approved
+ # assignment and sent bonus
+ def _approved_assignment_stubs(
+ feedback: str = APPROVAL_MESSAGE,
+ override_response=None,
+ override_approve_response=None,
+ ):
+ response = override_response or get_assignment_response
+ approve_response = (
+ override_approve_response or get_assignment_response_approved(feedback)
+ )
+ return [
+ {
+ "operation": "get_assignment",
+ "response": response,
+ "expected_params": {"AssignmentId": amt_assignment_id},
+ },
+ {
+ "operation": "approve_assignment",
+ "response": {},
+ "expected_params": {
+ "AssignmentId": amt_assignment_id,
+ "RequesterFeedback": feedback,
+ "OverrideRejection": False,
+ },
+ },
+ {
+ "operation": "get_assignment",
+ "response": approve_response,
+ "expected_params": {"AssignmentId": amt_assignment_id},
+ },
+ {
+ "operation": "update_hit_review_status",
+ "response": {},
+ "expected_params": {"HITId": amt_hit_id, "Revert": False},
+ },
+ {
+ "operation": "get_hit",
+ "response": get_hit_response_reviewing,
+ "expected_params": {"HITId": amt_hit_id},
+ },
+ ]
+
+ return _approved_assignment_stubs
+
+
+@pytest.fixture
+def approved_assignment_stubs_w_bonus(
+ approved_assignment_stubs, amt_worker_id, amt_assignment_id, pe_id
+):
+ now = datetime.now(tz=timezone.utc)
+ stubs = approved_assignment_stubs().copy()
+ stubs.append(
+ {
+ "operation": "send_bonus",
+ "response": {},
+ "expected_params": {
+ "WorkerId": amt_worker_id,
+ "BonusAmount": "0.07",
+ "AssignmentId": amt_assignment_id,
+ "Reason": BONUS_MESSAGE,
+ "UniqueRequestToken": pe_id,
+ },
+ }
+ )
+ stubs.append(
+ {
+ "operation": "list_bonus_payments",
+ "response": {
+ "BonusPayments": [
+ {
+ "WorkerId": amt_worker_id,
+ "BonusAmount": "0.07",
+ "AssignmentId": amt_assignment_id,
+ "Reason": BONUS_MESSAGE,
+ "GrantTime": now,
+ }
+ ]
+ },
+ "expected_params": {"AssignmentId": amt_assignment_id},
+ }
+ )
+ return stubs
+
+
+@pytest.fixture
+def rejected_assignment_stubs(
+ get_assignment_response,
+ get_assignment_response_rejected,
+ amt_assignment_id,
+ amt_hit_id,
+ get_hit_response_reviewing,
+):
+ # These are the AMT_CLIENT stubs/mocks that need to be set when running
+ # process_assignment_submitted() which will result in a rejected
+ # assignment
+ def _rejected_assignment_stubs(
+ reject_reason: str, override_response=None, override_reject_response=None
+ ):
+ response = override_response or get_assignment_response
+ reject_response = override_reject_response or get_assignment_response_rejected(
+ reject_reason
+ )
+ return [
+ {
+ "operation": "get_assignment",
+ "response": response,
+ "expected_params": {"AssignmentId": amt_assignment_id},
+ },
+ {
+ "operation": "reject_assignment",
+ "response": {},
+ "expected_params": {
+ "AssignmentId": amt_assignment_id,
+ "RequesterFeedback": reject_reason,
+ },
+ },
+ {
+ "operation": "get_assignment",
+ "response": reject_response,
+ "expected_params": {"AssignmentId": amt_assignment_id},
+ },
+ {
+ "operation": "update_hit_review_status",
+ "response": {},
+ "expected_params": {"HITId": amt_hit_id, "Revert": False},
+ },
+ {
+ "operation": "get_hit",
+ "response": get_hit_response_reviewing,
+ "expected_params": {"HITId": amt_hit_id},
+ },
+ ]
+
+ return _rejected_assignment_stubs
+
+
+@pytest.fixture
+def mock_thl_responses(monkeypatch, amt_worker_id, tsid, pe_id):
+ original_get = requests.get
+ original_post = requests.post
+
+ class MockThlCashoutRequestResponse:
+ def json(self):
+ return CashoutRequestResponse(
+ status="success",
+ cashout=CashoutRequestInfo(
+ id=pe_id,
+ description="amt something",
+ status=PayoutStatus.PENDING,
+ ),
+ ).model_dump(mode="json")
+
+ def _apply_mock(
+ user_blocked: bool = False,
+ status_finished: bool = True,
+ status_complete: bool = False,
+ wallet_redeemable_amount: int = 10,
+ ):
+
+ def mock_get(url, *args, **kwargs):
+ profile_url = f"{settings.fsb_host}{settings.product_id}/user/{amt_worker_id}/profile/"
+ status_url = f"{settings.fsb_host}{settings.product_id}/status/{tsid}/"
+ cashout_request_url = f"{settings.fsb_host}{settings.product_id}/cashout/"
+ wallet_url = f"{settings.fsb_host}{settings.product_id}/wallet/"
+ if url == profile_url:
+
+ class MockThlProfileResponse:
+ def json(self):
+ return {"user-profile": {"user": {"blocked": user_blocked}}}
+
+ return MockThlProfileResponse()
+
+ elif url == wallet_url:
+
+ class MockThlWalletResponse:
+ def json(self):
+ return {
+ "wallet": {"redeemable_amount": wallet_redeemable_amount}
+ }
+
+ return MockThlWalletResponse()
+
+ elif url == status_url:
+
+ class MockThlStatusResponse:
+ def json(self):
+ return {
+ "tsid": tsid,
+ "product_id": str(settings.product_id),
+ "product_user_id": amt_worker_id,
+ "started": "2020-06-02T00:30:35.036398Z",
+ "finished": (
+ "2020-06-02T00:31:35.036398Z"
+ if status_finished
+ else None
+ ),
+ "status": 3 if status_complete else 2,
+ "payout": 10 if status_complete else 0,
+ "user_payout": 10 if status_complete else 0,
+ "status_code_1": "BUYER_FAIL",
+ "status_code_2": None,
+ }
+
+ return MockThlStatusResponse()
+ elif url == cashout_request_url:
+ return MockThlCashoutRequestResponse()
+ else:
+ raise ValueError(f"unhandled call: {url=} {args=} {kwargs=}")
+ return original_get(url, *args, **kwargs)
+
+ def mock_post(url, *args, **kwargs):
+ cashout_request_url = f"{settings.fsb_host}{settings.product_id}/cashout/"
+ manage_cashout_request_url = f"{settings.fsb_host}{settings.fsb_host_private_route}/thl/manage_cashout/"
+
+ if url == cashout_request_url:
+ return MockThlCashoutRequestResponse()
+
+ elif url == manage_cashout_request_url:
+ json = kwargs["json"]
+ print(json)
+ payout_id = json["payout_id"]
+ new_status = json["new_status"]
+
+ class MockThlManageCashoutResponse:
+ def json(self):
+ return UserPayoutEvent(
+ uuid=payout_id,
+ status=new_status,
+ amount=USDCent(5),
+ debit_account_uuid=uuid4().hex,
+ cashout_method_uuid=uuid4().hex,
+ payout_type=PayoutType.AMT,
+ ).model_dump(mode="json")
+
+ return MockThlManageCashoutResponse()
+ return original_post(url, *args, **kwargs)
+
+ monkeypatch.setattr(requests, "get", mock_get)
+ monkeypatch.setattr(requests, "post", mock_post)
+
+ return _apply_mock
+
+
+@pytest.fixture
+def mturk_event(amt_assignment_id, amt_hit_id, amt_hit_type_id):
+ now = datetime.now(tz=timezone.utc)
+ return MTurkEvent(
+ event_type="AssignmentSubmitted",
+ event_timestamp=now,
+ amt_assignment_id=amt_assignment_id,
+ amt_hit_type_id=amt_hit_type_id,
+ amt_hit_id=amt_hit_id,
+ )
+
+
+def test_fake_get_assignment(
+ amt_assignment_id, amt_worker_id, get_assignment_response: Dict
+):
+ # Testing just that this boto stubber works (we fake a response using the real boto client)
+ fake_response = get_assignment_response.copy()
+ with Stubber(AMT_CLIENT) as stub:
+ expected_params = {"AssignmentId": amt_assignment_id}
+ stub.add_response("get_assignment", fake_response, expected_params)
+
+ assignment = AMTManager.get_assignment_if_exists(
+ amt_assignment_id=amt_assignment_id
+ )
+ assert assignment.amt_assignment_id == amt_assignment_id
+ assert assignment.amt_worker_id == amt_worker_id
+
+ # Optionally, ensure all queued responses were used:
+ stub.assert_no_pending_responses()
+
+
+class TestProcessAssignmentSubmitted:
+
+ def test_no_assignment_in_db(
+ self,
+ mturk_event,
+ amt_assignment_id,
+ get_assignment_response: Dict,
+ caplog,
+ hit_in_db,
+ rejected_assignment_stubs,
+ ):
+ # An assignment is submitted. The hit exists in the DB. The amt assignment id is valid,
+ # but the assignment stub is not in our db. Reject it and write the assignment to the db.
+
+ amt_stubs = rejected_assignment_stubs(
+ reject_reason=REJECT_MESSAGE_UNKNOWN_ASSIGNMENT
+ )
+
+ with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING):
+ process_assignment_submitted(mturk_event)
+ stub.assert_no_pending_responses()
+
+ assert f"No assignment found in DB: {amt_assignment_id}" in caplog.text
+ assert f"Rejected assignment doesn't exist in DB. Creating ... " in caplog.text
+ assert f"Rejected assignment: " in caplog.text
+ stub.assert_no_pending_responses()
+
+ ass = AM.get(amt_assignment_id=amt_assignment_id)
+ assert ass.status == AssignmentStatus.Rejected
+ assert ass.requester_feedback == REJECT_MESSAGE_UNKNOWN_ASSIGNMENT
+
+ def test_assignment_in_db_user_doesnt_exist(
+ self,
+ mturk_event,
+ amt_assignment_id,
+ assignment_stub_in_db,
+ caplog,
+ mock_thl_responses,
+ rejected_assignment_stubs,
+ ):
+ # An assignment is submitted. The hit and assignment stub exist in the DB. We think we're going to
+ # approve the assignment, but the user-profile / check blocked call on THL shows the user doesn't
+ # exist (same thing would happen if the user does exist and is blocked). So we reject.
+
+ _ = assignment_stub_in_db # we need this to make the assignment stub in the db
+
+ amt_stubs = rejected_assignment_stubs(reject_reason=REJECT_MESSAGE_BADDIE)
+
+ mock_thl_responses(user_blocked=True)
+
+ with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING):
+ process_assignment_submitted(mturk_event)
+ stub.assert_no_pending_responses()
+
+ assert f"No assignment found in DB: {amt_assignment_id}" not in caplog.text
+ assert f"blocked or not exists" in caplog.text
+ assert f"Rejected assignment: " in caplog.text
+
+ ass = AM.get(amt_assignment_id=amt_assignment_id)
+ assert ass.status == AssignmentStatus.Rejected
+ assert ass.requester_feedback == REJECT_MESSAGE_BADDIE
+
+ def test_no_work_w_warning(
+ self,
+ mturk_event,
+ amt_assignment_id,
+ assignment_stub_in_db,
+ caplog,
+ mock_thl_responses,
+ approved_assignment_stubs,
+ get_assignment_response_approved_no_tsid,
+ get_assignment_response_no_tsid,
+ ):
+ # An assignment is submitted. The hit and assignment stub exist in the DB.
+ # The assignment has no tsid.
+ # We APPROVE this assignment b/c we are very nice and give users a couple
+ # chances, with an explanation, before rejecting.
+
+ _ = assignment_stub_in_db # we need this to make the assignment stub in the db
+
+ # Simulate that the AMT.get_assignment call returns the assignment, but the answers xml
+ # has no tsid.
+ amt_stubs = approved_assignment_stubs(
+ feedback=NO_WORK_APPROVAL_MESSAGE,
+ override_response=get_assignment_response_no_tsid,
+ override_approve_response=get_assignment_response_approved_no_tsid,
+ )
+
+ mock_thl_responses(user_blocked=False)
+
+ with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING):
+ process_assignment_submitted(mturk_event)
+ stub.assert_no_pending_responses()
+
+ assert f"No assignment found in DB: {amt_assignment_id}" not in caplog.text
+ assert f"Assignment submitted with no tsid" in caplog.text
+ assert f"Approved assignment: " in caplog.text
+
+ ass = AM.get(amt_assignment_id=amt_assignment_id)
+ assert ass.status == AssignmentStatus.Approved
+ assert ass.requester_feedback == NO_WORK_APPROVAL_MESSAGE
+ assert AM.missing_tsid_count(amt_worker_id=ass.amt_worker_id) == 1
+
+ def test_no_work_no_warning(
+ self,
+ mturk_event,
+ amt_assignment_id,
+ assignment_stub_in_db,
+ caplog,
+ mock_thl_responses,
+ rejected_assignment_stubs,
+ get_assignment_response_rejected_no_tsid,
+ get_assignment_response_no_tsid,
+ assignment_in_db_factory,
+ hit_in_db,
+ amt_worker_id,
+ ):
+ # An assignment is submitted. The hit and assignment stub exist in the DB.
+ # The assignment has no tsid.
+
+ # Going to create and submit 3 assignments w no work
+ # (all on the same hit, which we don't do in JB for real,
+ # but doesn't matter here)
+ a1 = assignment_in_db_factory(hit_id=hit_in_db.id, amt_worker_id=amt_worker_id)
+ a2 = assignment_in_db_factory(hit_id=hit_in_db.id, amt_worker_id=amt_worker_id)
+ a3 = assignment_in_db_factory(hit_id=hit_in_db.id, amt_worker_id=amt_worker_id)
+ assert AM.missing_tsid_count(amt_worker_id=amt_worker_id) == 3
+ # So now, we'll reject, b/c they've already gotten 3 warnings
+
+ _ = assignment_stub_in_db # we need this to make the assignment stub in the db
+
+ # Simulate that the AMT.get_assignment call returns the assignment, but the answers xml
+ # has no tsid.
+ amt_stubs = rejected_assignment_stubs(
+ reject_reason=REJECT_MESSAGE_NO_WORK,
+ override_response=get_assignment_response_no_tsid,
+ override_reject_response=get_assignment_response_rejected_no_tsid(
+ REJECT_MESSAGE_NO_WORK
+ ),
+ )
+
+ mock_thl_responses(user_blocked=False)
+
+ with amt_stub_context(amt_stubs) as stub, caplog.at_level(logging.WARNING):
+ process_assignment_submitted(mturk_event)
+ stub.assert_no_pending_responses()
+
+ assert f"No assignment found in DB: {amt_assignment_id}" not in caplog.text
+ assert f"Assignment submitted with no tsid" in caplog.text
+ assert f"Rejected assignment: " in caplog.text
+
+ # It will exist in the db since we can validate the model.
+ ass = AM.get(amt_assignment_id=amt_assignment_id)
+ assert ass.status == AssignmentStatus.Rejected
+ assert ass.requester_feedback == REJECT_MESSAGE_NO_WORK
+
+ def test_assignment_submitted_no_bonus(
+ self,
+ mturk_event,
+ amt_assignment_id,
+ assignment_stub_in_db,
+ caplog,
+ mock_thl_responses,
+ approved_assignment_stubs,
+ ):
+ _ = assignment_stub_in_db # we need this to make the assignment stub in the db
+ # The "send bonus" stuff will still run, even if the user didn't get a complete,
+ # because all we do is check the user's wallet balance (if an assignment is approved)
+ # and they may have money in their wallet from a prev event or bribe
+ # So mock the wallet balance as 1cent, so no bonus will be triggered
+ mock_thl_responses(status_complete=False, wallet_redeemable_amount=1)
+ with amt_stub_context(approved_assignment_stubs()) as stub, caplog.at_level(
+ logging.WARNING
+ ):
+ process_assignment_submitted(mturk_event)
+ stub.assert_no_pending_responses()
+
+ ass = AM.get(amt_assignment_id=amt_assignment_id)
+ assert ass.status == AssignmentStatus.Approved
+ assert ass.requester_feedback == APPROVAL_MESSAGE
+
+ def test_assignment_submitted_w_bonus(
+ self,
+ mturk_event,
+ amt_assignment_id,
+ assignment_stub_in_db,
+ caplog,
+ mock_thl_responses,
+ approved_assignment_stubs_w_bonus,
+ ):
+ _ = assignment_stub_in_db # we need this to make the assignment stub in the db
+ mock_thl_responses(status_complete=True, wallet_redeemable_amount=10)
+ with amt_stub_context(
+ approved_assignment_stubs_w_bonus
+ ) as stub, caplog.at_level(logging.WARNING):
+ process_assignment_submitted(mturk_event)
+ stub.assert_no_pending_responses()
+
+ ass = AM.get(amt_assignment_id=amt_assignment_id)
+ assert ass.status == AssignmentStatus.Approved
+ assert ass.requester_feedback == APPROVAL_MESSAGE
+
+ bonus = BM.filter(amt_assignment_id=amt_assignment_id)[0]
+ assert bonus.amount == USDCent(7)
diff --git a/tests/managers/__init__.py b/tests/managers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/managers/__init__.py
diff --git a/tests/managers/amt.py b/tests/managers/amt.py
new file mode 100644
index 0000000..0b2e501
--- /dev/null
+++ b/tests/managers/amt.py
@@ -0,0 +1,23 @@
+from jb.decorators import HTM, HM, HQM
+from jb.managers.amt import AMTManager
+
+
+def test_create_hit_type(hit_type):
+ assert hit_type.amt_hit_type_id is None
+ AMTManager.create_hit_type(hit_type=hit_type)
+ assert hit_type.amt_hit_type_id is not None
+
+
+def test_create_hit_with_hit_type(hit_type_with_amt_id, question):
+ question = HQM.get_or_create(question)
+
+ hit_type = hit_type_with_amt_id
+ hit_type = [
+ x for x in HTM.filter_active() if x.amt_hit_type_id == hit_type.amt_hit_type_id
+ ][0]
+
+ hit = AMTManager.create_hit_with_hit_type(hit_type=hit_type, question=question)
+ assert hit.amt_hit_id is not None
+ assert hit.id is None
+ HM.create(hit)
+ assert hit.id is not None
diff --git a/tests/managers/hit.py b/tests/managers/hit.py
new file mode 100644
index 0000000..8fcd673
--- /dev/null
+++ b/tests/managers/hit.py
@@ -0,0 +1,18 @@
+from jb.decorators import HTM
+
+
+class TestHitTypeManager:
+
+ def test_create(self, hit_type_with_amt_id):
+ assert hit_type_with_amt_id.id is None
+ HTM.create(hit_type_with_amt_id)
+ assert hit_type_with_amt_id.id is not None
+
+ res = HTM.filter_active()
+ assert len(res) == 1
+
+ hit_type_with_amt_id.min_active = 0
+ HTM.set_min_active(hit_type_with_amt_id)
+
+ res = HTM.filter_active()
+ assert len(res) == 0