aboutsummaryrefslogtreecommitdiff
path: root/jb/managers
diff options
context:
space:
mode:
Diffstat (limited to 'jb/managers')
-rw-r--r--jb/managers/__init__.py23
-rw-r--r--jb/managers/amt.py216
-rw-r--r--jb/managers/assignment.py259
-rw-r--r--jb/managers/bonus.py54
-rw-r--r--jb/managers/hit.py338
-rw-r--r--jb/managers/worker.py16
6 files changed, 906 insertions, 0 deletions
diff --git a/jb/managers/__init__.py b/jb/managers/__init__.py
new file mode 100644
index 0000000..e2aab6d
--- /dev/null
+++ b/jb/managers/__init__.py
@@ -0,0 +1,23 @@
+from enum import IntEnum
+from typing import Collection
+
+from generalresearchutils.pg_helper import PostgresConfig
+
+
+class Permission(IntEnum):
+ READ = 1
+ UPDATE = 2
+ CREATE = 3
+ DELETE = 4
+
+
+class PostgresManager:
+ def __init__(
+ self,
+ pg_config: PostgresConfig,
+ permissions: Collection[Permission] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.pg_config = pg_config
+ self.permissions = set(permissions) if permissions else set()
diff --git a/jb/managers/amt.py b/jb/managers/amt.py
new file mode 100644
index 0000000..79661c7
--- /dev/null
+++ b/jb/managers/amt.py
@@ -0,0 +1,216 @@
+import logging
+from datetime import timezone, datetime
+from typing import Tuple, Optional, List
+
+import botocore.exceptions
+from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef
+
+from jb.config import TOPIC_ARN
+from jb.decorators import AMT_CLIENT
+from jb.models import AMTAccount
+from jb.models.assignment import Assignment
+from jb.models.bonus import Bonus
+from jb.models.currency import USDCent
+from jb.models.definitions import HitStatus
+from jb.models.hit import HitType, HitQuestion, Hit
+
+REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment"
+REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work."
+REJECT_MESSAGE_BADDIE = "Quality has dropped below an acceptable level"
+APPROVAL_MESSAGE = "Thank you!"
+NO_WORK_APPROVAL_MESSAGE = (
+ REJECT_MESSAGE_NO_WORK + " In the future, if you are not sent into a task, "
+ "please return the assignment, otherwise it will be rejected!"
+)
+BONUS_MESSAGE = "Great job! Bonus for a survey complete"
+
+
+class AMTManager:
+
+ @staticmethod
+ def fetch_account() -> AMTAccount:
+ res = AMT_CLIENT.get_account_balance()
+ return AMTAccount.model_validate(
+ {
+ "available_balance": res["AvailableBalance"],
+ "onhold_balance": res.get("OnHoldBalance", 0),
+ }
+ )
+
+ @staticmethod
+ def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]:
+ try:
+ res = AMT_CLIENT.get_hit(HITId=amt_hit_id)
+ except AMT_CLIENT.exceptions.RequestError as e:
+ msg = e.response.get("Error", {}).get("Message", "")
+ return None, msg
+ hit = Hit.from_amt_get_hit(res["HIT"])
+ return hit, None
+
+ @classmethod
+ def get_hit_status(cls, amt_hit_id: str):
+ res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id)
+ if res is None:
+ if " does not exist. (" in msg:
+ return HitStatus.Disposed
+ else:
+ logging.warning(msg)
+ return HitStatus.Unassignable
+ return res.status
+
+ @staticmethod
+ def create_hit_type(hit_type: HitType):
+ res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body())
+ hit_type.amt_hit_type_id = res["HITTypeId"]
+ AMT_CLIENT.update_notification_settings(
+ HITTypeId=hit_type.amt_hit_type_id,
+ Notification={
+ "Destination": TOPIC_ARN,
+ "Transport": "SNS",
+ "Version": "2006-05-05",
+ # you can add more events, see mypy_boto3_mturk.literals.EventTypeType
+ "EventTypes": ["AssignmentSubmitted"],
+ },
+ Active=True,
+ )
+
+ return hit_type
+
+ @staticmethod
+ def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit:
+ """
+ HITTypeId: str
+ LifetimeInSeconds: int
+ MaxAssignments: NotRequired[int]
+ Question: NotRequired[str]
+ UniqueRequestToken: NotRequired[str]
+ """
+ assert hit_type.id is not None
+ assert hit_type.amt_hit_type_id is not None
+
+ data = hit_type.generate_hit_amt_request(question=question)
+ res = AMT_CLIENT.create_hit_with_hit_type(**data)
+ return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question)
+
+ @staticmethod
+ def get_assignment(amt_assignment_id: str) -> Assignment:
+ # note, you CANNOT get an assignment if it has been only ACCEPTED (by the worker)
+ # the api is stupid. it will only show up once it is submitted
+ res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id)
+ ass_res: AssignmentTypeDef = res["Assignment"]
+ assignment = Assignment.from_amt_get_assignment(ass_res)
+ # to be clear, this has not checked whether it exists in our db
+ assert assignment.id is None
+ return assignment
+
+ @classmethod
+ def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]:
+ expected_err_msg = f"Assignment {amt_assignment_id} does not exist"
+ try:
+ return cls.get_assignment(amt_assignment_id=amt_assignment_id)
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ error_code = e.response["Error"]["Code"]
+ error_msg = e.response["Error"]["Message"]
+ if error_code == "RequestError" and expected_err_msg in error_msg:
+ return None
+ raise e
+
+ @staticmethod
+ def reject_assignment_if_possible(
+ amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT
+ ):
+ # Unclear to me when this would fail
+ try:
+ return AMT_CLIENT.reject_assignment(
+ AssignmentId=amt_assignment_id, RequesterFeedback=msg
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ return None
+
+ @staticmethod
+ def approve_assignment_if_possible(
+ amt_assignment_id: str,
+ msg: str = APPROVAL_MESSAGE,
+ override_rejection: bool = False,
+ ):
+ # Unclear to me when this would fail
+ try:
+ return AMT_CLIENT.approve_assignment(
+ AssignmentId=amt_assignment_id,
+ RequesterFeedback=msg,
+ OverrideRejection=override_rejection,
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(e)
+ return None
+
+ @staticmethod
+ def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None:
+ try:
+ # Reviewable to Reviewing
+ AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert)
+ except botocore.exceptions.ClientError as e:
+ logging.warning(f"{amt_hit_id=}, {e}")
+ error_msg = e.response["Error"]["Message"]
+ if "does not exist" in error_msg:
+ raise ValueError(error_msg)
+ # elif "This HIT is currently in the state 'Reviewing'" in error_msg:
+ # logging.warning(error_msg)
+ return None
+
+ @staticmethod
+ def send_bonus(
+ amt_worker_id: str,
+ amount: USDCent,
+ amt_assignment_id: str,
+ reason: str,
+ unique_request_token: str,
+ ):
+ try:
+ return AMT_CLIENT.send_bonus(
+ WorkerId=amt_worker_id,
+ BonusAmount=str(amount.to_usd()),
+ AssignmentId=amt_assignment_id,
+ Reason=reason,
+ UniqueRequestToken=unique_request_token,
+ )
+ except botocore.exceptions.ClientError as e:
+ logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}")
+ return None
+
+ @staticmethod
+ def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]:
+ res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments(
+ AssignmentId=amt_assignment_id
+ )["BonusPayments"]
+ assert (
+ len(res) <= 1
+ ), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}"
+ d = res[0] if res else None
+ if d:
+ return Bonus.model_validate(
+ {
+ "amt_worker_id": d["WorkerId"],
+ "amount": USDCent(round(float(d["BonusAmount"]) * 100)),
+ "amt_assignment_id": d["AssignmentId"],
+ "reason": d["Reason"],
+ "grant_time": d["GrantTime"].astimezone(tz=timezone.utc),
+ "payout_event_id": payout_event_id,
+ }
+ )
+ return None
+
+ @staticmethod
+ def expire_all_hits():
+ # used in testing only (or in an emergency I guess)
+ now = datetime.now(tz=timezone.utc)
+ paginator = AMT_CLIENT.get_paginator("list_hits")
+
+ for page in paginator.paginate():
+ for hit in page["HITs"]:
+ if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"):
+ AMT_CLIENT.update_expiration_for_hit(
+ HITId=hit["HITId"], ExpireAt=now
+ )
diff --git a/jb/managers/assignment.py b/jb/managers/assignment.py
new file mode 100644
index 0000000..fca72e8
--- /dev/null
+++ b/jb/managers/assignment.py
@@ -0,0 +1,259 @@
+from datetime import datetime, timezone
+from typing import Optional
+
+from psycopg import sql
+from pydantic import NonNegativeInt
+
+from jb.managers import PostgresManager
+from jb.models.assignment import AssignmentStub, Assignment
+from jb.models.definitions import AssignmentStatus
+
+
+class AssignmentManager(PostgresManager):
+
+ def create_stub(self, stub: AssignmentStub) -> None:
+ assert stub.id is None
+ data = stub.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_assignment
+ (amt_assignment_id, amt_worker_id, status, created_at, modified_at, hit_id)
+ VALUES
+ (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s,
+ %(created_at)s, %(modified_at)s, %(hit_id)s)
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ stub.id = pk
+ return None
+
+ def create(self, assignment: Assignment) -> None:
+ # Typically this is NOT used (we'd create the stub when HIT is
+ # accepted, then update it once the assignment is submitted), however
+ # there may be cases where an assignment is submitted and the stub
+ # was never created (probably due to error/downtime/baddie).
+
+ assert assignment.id is None
+ data = assignment.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_assignment
+ (amt_assignment_id, amt_worker_id, status,
+ created_at, modified_at, hit_id,
+ auto_approval_time, accept_time, submit_time,
+ approval_time, rejection_time, requester_feedback,
+ tsid)
+ VALUES
+ (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s,
+ %(created_at)s, %(modified_at)s, %(hit_id)s,
+ %(auto_approval_time)s, %(accept_time)s, %(submit_time)s,
+ %(approval_time)s, %(rejection_time)s, %(requester_feedback)s,
+ %(tsid)s)
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ assignment.id = pk
+ return None
+
+ def get_stub(self, amt_assignment_id: str) -> AssignmentStub:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT ma.id, ma.hit_id,
+ ma.amt_assignment_id, ma.amt_worker_id,
+ ma.status, ma.created_at, ma.modified_at,
+ mh.amt_hit_id
+ FROM mtwerk_assignment ma
+ JOIN mtwerk_hit mh ON ma.hit_id = mh.id
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ LIMIT 1;
+ """,
+ params={"amt_assignment_id": amt_assignment_id},
+ )
+ assert len(res) == 1
+ return AssignmentStub.model_validate(res[0])
+
+ def get_stub_if_exists(self, amt_assignment_id: str) -> Optional[AssignmentStub]:
+ try:
+ return self.get_stub(amt_assignment_id=amt_assignment_id)
+ except AssertionError:
+ return None
+
+ def get(self, amt_assignment_id: str) -> Assignment:
+ res = self.pg_config.execute_sql_query(
+ query="""
+ SELECT mh.amt_hit_id, ma.id, ma.amt_assignment_id,
+ ma.amt_worker_id, ma.status, ma.auto_approval_time,
+ ma.accept_time, ma.submit_time, ma.approval_time,
+ ma.rejection_time, ma.requester_feedback,
+ ma.created_at, ma.modified_at, ma.tsid, ma.hit_id
+ FROM mtwerk_assignment ma
+ JOIN mtwerk_hit mh ON ma.hit_id = mh.id
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ LIMIT 1;
+ """,
+ params={"amt_assignment_id": amt_assignment_id},
+ )
+ assert len(res) == 1
+ return Assignment.model_validate(res[0])
+
+ def update_answer(self, assignment: Assignment) -> None:
+ # We're assuming a stub already exists
+ # The assignment was submitted, but we haven't made a decision yet
+ now = datetime.now(tz=timezone.utc)
+ data = {
+ "status": assignment.status.value,
+ "submit_time": assignment.submit_time,
+ "auto_approval_time": assignment.auto_approval_time,
+ "tsid": assignment.tsid,
+ "amt_assignment_id": assignment.amt_assignment_id,
+ "modified_at": now,
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_assignment
+ SET submit_time = %(submit_time)s,
+ auto_approval_time = %(auto_approval_time)s,
+ status = %(status)s,
+ tsid = %(tsid)s,
+ modified_at = %(modified_at)s
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ """
+ )
+ # We force this to fail if the assignment doesn't already exist in the db
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}"
+ conn.commit()
+ return None
+
+ def reject(self, assignment: Assignment) -> None:
+ assert assignment.status == AssignmentStatus.Rejected
+ assert assignment.rejection_time is not None
+ assert assignment.approval_time is None
+ assert assignment.requester_feedback is not None
+ now = datetime.now(tz=timezone.utc)
+ data = {
+ "status": assignment.status.value,
+ "submit_time": assignment.submit_time,
+ "rejection_time": assignment.rejection_time,
+ "requester_feedback": assignment.requester_feedback,
+ "amt_assignment_id": assignment.amt_assignment_id,
+ "auto_approval_time": assignment.auto_approval_time,
+ "accept_time": assignment.accept_time,
+ "modified_at": now,
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_assignment
+ SET submit_time = %(submit_time)s,
+ rejection_time = %(rejection_time)s,
+ status = %(status)s,
+ requester_feedback = %(requester_feedback)s,
+ auto_approval_time = %(auto_approval_time)s,
+ accept_time = %(accept_time)s,
+ modified_at = %(modified_at)s
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ """
+ )
+ # We force this to fail if the assignment doesn't already exist in the db
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}"
+ conn.commit()
+ return None
+
+ def approve(self, assignment: Assignment) -> None:
+ assert assignment.status == AssignmentStatus.Approved
+ assert assignment.rejection_time is None
+ assert assignment.approval_time is not None
+ assert assignment.requester_feedback is not None
+ now = datetime.now(tz=timezone.utc)
+ data = {
+ "status": assignment.status.value,
+ "submit_time": assignment.submit_time,
+ "approval_time": assignment.approval_time,
+ "requester_feedback": assignment.requester_feedback,
+ "amt_assignment_id": assignment.amt_assignment_id,
+ "auto_approval_time": assignment.auto_approval_time,
+ "accept_time": assignment.accept_time,
+ "modified_at": now,
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_assignment
+ SET submit_time = %(submit_time)s,
+ approval_time = %(approval_time)s,
+ status = %(status)s,
+ requester_feedback = %(requester_feedback)s,
+ auto_approval_time = %(auto_approval_time)s,
+ accept_time = %(accept_time)s,
+ modified_at = %(modified_at)s
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ """
+ )
+ # We force this to fail if the assignment doesn't already exist in the db
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}"
+ conn.commit()
+ return None
+
+ def missing_tsid_count(
+ self, amt_worker_id: str, lookback_hrs: int = 24
+ ) -> NonNegativeInt:
+ """
+ Look at this user's previous N hrs of submitted assignments.
+ Count how many assignments they have submitted without a tsid.
+ """
+ res = self.pg_config.execute_sql_query(
+ query="""
+ SELECT COUNT(1) AS c
+ FROM mtwerk_assignment
+ WHERE amt_worker_id = %(amt_worker_id)s
+ AND submit_time > NOW() - (%(lookback_interval)s)::interval
+ AND tsid IS NULL;
+ """,
+ params={
+ "amt_worker_id": amt_worker_id,
+ "lookback_interval": f"{lookback_hrs} hour",
+ },
+ )
+ return int(res[0]["c"])
+
+ def rejected_count(
+ self, amt_worker_id: str, lookback_hrs: int = 24
+ ) -> NonNegativeInt:
+ """
+ Look at this user's previous N hrs of submitted assignments.
+ Count how many rejected assignments they have.
+ """
+ res = self.pg_config.execute_sql_query(
+ query="""
+ SELECT COUNT(1) AS c
+ FROM mtwerk_assignment
+ WHERE amt_worker_id = %(amt_worker_id)s
+ AND submit_time > NOW() - (%(lookback_interval)s)::interval
+ AND status = %(status)s;
+ """,
+ params={
+ "amt_worker_id": amt_worker_id,
+ "lookback_interval": f"{lookback_hrs} hour",
+ "status": AssignmentStatus.Rejected.value,
+ },
+ )
+ return int(res[0]["c"])
diff --git a/jb/managers/bonus.py b/jb/managers/bonus.py
new file mode 100644
index 0000000..0cb8b02
--- /dev/null
+++ b/jb/managers/bonus.py
@@ -0,0 +1,54 @@
+from typing import List
+
+from psycopg import sql
+
+from jb.managers import PostgresManager
+from jb.models.bonus import Bonus
+
+
+class BonusManager(PostgresManager):
+
+ def create(self, bonus: Bonus) -> None:
+ assert bonus.id is None
+ data = bonus.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_bonus
+ (payout_event_id, amt_worker_id, amount, grant_time, assignment_id, reason)
+ VALUES (
+ %(payout_event_id)s,
+ %(amt_worker_id)s,
+ %(amount)s,
+ %(grant_time)s,
+ (
+ SELECT id
+ FROM mtwerk_assignment
+ WHERE amt_assignment_id = %(amt_assignment_id)s
+ LIMIT 1
+ ),
+ %(reason)s
+ )
+ RETURNING id, assignment_id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ res = c.fetchone()
+ conn.commit()
+ bonus.id = res["id"]
+ bonus.assignment_id = res["assignment_id"]
+ return None
+
+ def filter(self, amt_assignment_id: str) -> List[Bonus]:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT mb.*, ma.amt_assignment_id
+ FROM mtwerk_bonus mb
+ JOIN mtwerk_assignment ma ON ma.id = mb.assignment_id
+ WHERE amt_assignment_id = %(amt_assignment_id)s;
+ """,
+ params={"amt_assignment_id": amt_assignment_id},
+ )
+ return [Bonus.from_postgres(x) for x in res]
diff --git a/jb/managers/hit.py b/jb/managers/hit.py
new file mode 100644
index 0000000..3832418
--- /dev/null
+++ b/jb/managers/hit.py
@@ -0,0 +1,338 @@
+from datetime import datetime, timezone
+from typing import Optional, List
+
+from psycopg import sql
+
+from jb.managers import PostgresManager
+from jb.models.definitions import HitStatus
+from jb.models.hit import HitQuestion, HitType, Hit
+
+
+class HitQuestionManager(PostgresManager):
+
+ def create(self, question: HitQuestion) -> None:
+ assert question.id is None
+ data = question.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_question (url, height)
+ VALUES (%(url)s, %(height)s)
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ question.id = pk
+ return None
+
+ def get_by_id(self, question_id: int) -> HitQuestion:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_question
+ WHERE id = %(question_id)s
+ LIMIT 1;
+ """,
+ params={"question_id": question_id},
+ )
+ assert len(res) == 1
+ return HitQuestion.model_validate(res[0])
+
+ def get_by_values_if_exists(self, url: str, height: int) -> Optional[HitQuestion]:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_question
+ WHERE url = %(url)s AND height = %(height)s
+ LIMIT 2;
+ """,
+ params={"url": url, "height": height},
+ )
+ assert len(res) != 2, "More than 1 result!"
+ if len(res) == 0:
+ return None
+
+ return HitQuestion.model_validate(res[0])
+
+ def get_or_create(self, question: HitQuestion) -> HitQuestion:
+ res = self.get_by_values_if_exists(url=question.url, height=question.height)
+ if res:
+ return res
+ self.create(question=question)
+ return question
+
+
+class HitTypeManager(PostgresManager):
+ def create(self, hit_type: HitType) -> None:
+ assert hit_type.amt_hit_type_id is not None
+ data = hit_type.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_hittype (
+ amt_hit_type_id,
+ title,
+ description,
+ reward,
+ assignment_duration,
+ auto_approval_delay,
+ keywords,
+ min_active
+ )
+ VALUES (
+ %(amt_hit_type_id)s,
+ %(title)s,
+ %(description)s,
+ %(reward)s,
+ %(assignment_duration)s,
+ %(auto_approval_delay)s,
+ %(keywords)s,
+ %(min_active)s
+ )
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ hit_type.id = pk
+ return None
+
+ def filter_active(self) -> List[HitType]:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_hittype
+ WHERE min_active > 0
+ LIMIT 50
+ """
+ )
+
+ if len(res) == 50:
+ raise ValueError("Too many HitTypes!")
+
+ return [HitType.from_postgres(i) for i in res]
+
+ def get(self, amt_hit_type_id: str) -> HitType:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT *
+ FROM mtwerk_hittype
+ WHERE amt_hit_type_id = %(amt_hit_type_id)s
+ """,
+ params={"amt_hit_type_id": amt_hit_type_id},
+ )
+ assert len(res) == 1
+ return HitType.from_postgres(res[0])
+
+ def get_if_exists(self, amt_hit_type_id: str) -> Optional[HitType]:
+ try:
+ return self.get(amt_hit_type_id=amt_hit_type_id)
+ except AssertionError:
+ return None
+
+ def get_or_create(self, hit_type: HitType) -> None:
+ res = self.get_if_exists(amt_hit_type_id=hit_type.amt_hit_type_id)
+ if res:
+ hit_type.id = res.id
+ if res is None:
+ self.create(hit_type=hit_type)
+ return None
+
+ def set_min_active(self, hit_type: HitType) -> None:
+ assert hit_type.id, "must be in the db first!"
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_hittype
+ SET min_active = %(min_active)s
+ WHERE id = %(id)s
+ """
+ )
+ data = {"id": hit_type.id, "min_active": hit_type.min_active}
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ conn.commit()
+ assert c.rowcount == 1, c.rowcount
+ return None
+
+
+class HitManager(PostgresManager):
+ def create(self, hit: Hit):
+ assert hit.amt_hit_id is not None
+ assert hit.id is None
+ data = hit.to_postgres()
+ query = sql.SQL(
+ """
+ INSERT INTO mtwerk_hit (
+ amt_hit_id,
+ hit_type_id,
+ amt_group_id,
+ status,
+ review_status,
+ creation_time,
+ expiration,
+ question_id,
+ created_at,
+ modified_at,
+ max_assignments,
+ assignment_pending_count,
+ assignment_completed_count,
+ assignment_available_count
+ )
+ VALUES (
+ %(amt_hit_id)s,
+ %(hit_type_id)s,
+ %(amt_group_id)s,
+ %(status)s,
+ %(review_status)s,
+ %(creation_time)s,
+ %(expiration)s,
+ %(question_id)s,
+ %(created_at)s,
+ %(modified_at)s,
+ %(max_assignments)s,
+ %(assignment_pending_count)s,
+ %(assignment_completed_count)s,
+ %(assignment_available_count)s
+ )
+ RETURNING id;
+ """
+ )
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ pk = c.fetchone()["id"]
+ conn.commit()
+ hit.id = pk
+ return hit
+
+ def update_status(self, amt_hit_id: str, hit_status: HitStatus):
+ now = datetime.now(tz=timezone.utc)
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_hit
+ SET status = %(status)s, modified_at = %(modified_at)s
+ WHERE amt_hit_id = %(amt_hit_id)s;
+ """
+ )
+
+ data = {
+ "amt_hit_id": amt_hit_id,
+ "status": hit_status.value,
+ "modified_at": now,
+ }
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ conn.commit()
+ assert c.rowcount == 1, c.rowcount
+ return None
+
+ def update_hit(self, hit: Hit):
+ hit.modified_at = datetime.now(tz=timezone.utc)
+ # fields expected to change:
+ fields = {
+ "status",
+ "review_status",
+ "assignment_pending_count",
+ "assignment_available_count",
+ "assignment_completed_count",
+ "amt_hit_id",
+ "modified_at",
+ }
+ query = sql.SQL(
+ """
+ UPDATE mtwerk_hit
+ SET status = %(status)s, review_status = %(review_status)s,
+ assignment_pending_count = %(assignment_pending_count)s,
+ assignment_available_count = %(assignment_available_count)s,
+ assignment_completed_count = %(assignment_completed_count)s,
+ modified_at = %(modified_at)s
+ WHERE amt_hit_id = %(amt_hit_id)s
+ RETURNING id;
+ """
+ )
+
+ data = hit.model_dump(mode="json", include=fields)
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, data)
+ conn.commit()
+ assert c.rowcount == 1, c.rowcount
+ hit.id = c.fetchone()["id"]
+ return None
+
+ def get_from_amt_id(self, amt_hit_id: str) -> Hit:
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT
+ mh.id,
+ mh.amt_hit_id,
+ mh.amt_group_id,
+ mh.status,
+ mh.review_status,
+ mh.creation_time,
+ mh.expiration,
+ mh.created_at,
+ mh.modified_at,
+ mh.assignment_available_count,
+ mh.assignment_completed_count,
+ mh.assignment_pending_count,
+ mh.max_assignments,
+ mht.amt_hit_type_id,
+ mht.title,
+ mht.description,
+ mht.reward,
+ mht.assignment_duration,
+ mht.auto_approval_delay,
+ mht.keywords,
+ mq.id as question_id,
+ mq.height,
+ mq.url
+ FROM mtwerk_hit mh
+ JOIN mtwerk_hittype mht ON mh.hit_type_id = mht.id
+ JOIN mtwerk_question mq ON mh.question_id = mq.id
+ WHERE amt_hit_id = %(amt_hit_id)s
+ LIMIT 2;
+ """,
+ params={"amt_hit_id": amt_hit_id},
+ )
+ assert len(res) == 1
+ res = res[0]
+ question_xml = HitQuestion.model_validate(
+ {"height": res.pop("height"), "url": res.pop("url")}
+ ).xml
+ res["question_id"] = res["question_id"]
+ res["hit_question_xml"] = question_xml
+
+ return Hit.from_postgres(res)
+
+ def get_active_count(self, hit_type_id: int):
+ return self.pg_config.execute_sql_query(
+ """
+ SELECT COUNT(1) as active_count
+ FROM mtwerk_hit
+ WHERE status = %(status)s
+ AND hit_type_id = %(hit_type_id)s;
+ """,
+ params={"status": HitStatus.Assignable, "hit_type_id": hit_type_id},
+ )[0]["active_count"]
+
+ def filter_active_ids(self, hit_type_id: int):
+ res = self.pg_config.execute_sql_query(
+ """
+ SELECT mh.amt_hit_id
+ FROM mtwerk_hit mh
+ WHERE hit_type_id = %(hit_type_id)s;
+ """,
+ params={"hit_type_id": hit_type_id},
+ )
+ return {x["amt_hit_id"] for x in res}
diff --git a/jb/managers/worker.py b/jb/managers/worker.py
new file mode 100644
index 0000000..e2d7237
--- /dev/null
+++ b/jb/managers/worker.py
@@ -0,0 +1,16 @@
+from typing import List
+
+from mypy_boto3_mturk.type_defs import WorkerBlockTypeDef
+
+from jb.decorators import AMT_CLIENT
+
+
+class WorkerManager:
+
+ @staticmethod
+ def fetch_worker_blocks() -> List[WorkerBlockTypeDef]:
+ p = AMT_CLIENT.get_paginator("list_worker_blocks")
+ res: List[WorkerBlockTypeDef] = []
+ for item in p.paginate():
+ res.extend(item["WorkerBlocks"])
+ return res