diff options
| author | Max Nanis | 2026-02-19 02:43:23 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-19 02:43:23 -0500 |
| commit | f0f96f83c2630e890a2cbcab53f77fd4c37e1684 (patch) | |
| tree | c6d2cb092e76bf5d499e0ea9949508d6b22164fd /jb/managers | |
| parent | 3eaa56f0306ead818f64c3d99fc6d230d9b970a4 (diff) | |
| download | amt-jb-master.tar.gz amt-jb-master.zip | |
Diffstat (limited to 'jb/managers')
| -rw-r--r-- | jb/managers/__init__.py | 23 | ||||
| -rw-r--r-- | jb/managers/amt.py | 216 | ||||
| -rw-r--r-- | jb/managers/assignment.py | 259 | ||||
| -rw-r--r-- | jb/managers/bonus.py | 54 | ||||
| -rw-r--r-- | jb/managers/hit.py | 338 | ||||
| -rw-r--r-- | jb/managers/worker.py | 16 |
6 files changed, 906 insertions, 0 deletions
diff --git a/jb/managers/__init__.py b/jb/managers/__init__.py new file mode 100644 index 0000000..e2aab6d --- /dev/null +++ b/jb/managers/__init__.py @@ -0,0 +1,23 @@ +from enum import IntEnum +from typing import Collection + +from generalresearchutils.pg_helper import PostgresConfig + + +class Permission(IntEnum): + READ = 1 + UPDATE = 2 + CREATE = 3 + DELETE = 4 + + +class PostgresManager: + def __init__( + self, + pg_config: PostgresConfig, + permissions: Collection[Permission] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.pg_config = pg_config + self.permissions = set(permissions) if permissions else set() diff --git a/jb/managers/amt.py b/jb/managers/amt.py new file mode 100644 index 0000000..79661c7 --- /dev/null +++ b/jb/managers/amt.py @@ -0,0 +1,216 @@ +import logging +from datetime import timezone, datetime +from typing import Tuple, Optional, List + +import botocore.exceptions +from mypy_boto3_mturk.type_defs import AssignmentTypeDef, BonusPaymentTypeDef + +from jb.config import TOPIC_ARN +from jb.decorators import AMT_CLIENT +from jb.models import AMTAccount +from jb.models.assignment import Assignment +from jb.models.bonus import Bonus +from jb.models.currency import USDCent +from jb.models.definitions import HitStatus +from jb.models.hit import HitType, HitQuestion, Hit + +REJECT_MESSAGE_UNKNOWN_ASSIGNMENT = "Unknown assignment" +REJECT_MESSAGE_NO_WORK = "Assignment was submitted with no attempted work." +REJECT_MESSAGE_BADDIE = "Quality has dropped below an acceptable level" +APPROVAL_MESSAGE = "Thank you!" +NO_WORK_APPROVAL_MESSAGE = ( + REJECT_MESSAGE_NO_WORK + " In the future, if you are not sent into a task, " + "please return the assignment, otherwise it will be rejected!" +) +BONUS_MESSAGE = "Great job! Bonus for a survey complete" + + +class AMTManager: + + @staticmethod + def fetch_account() -> AMTAccount: + res = AMT_CLIENT.get_account_balance() + return AMTAccount.model_validate( + { + "available_balance": res["AvailableBalance"], + "onhold_balance": res.get("OnHoldBalance", 0), + } + ) + + @staticmethod + def get_hit_if_exists(amt_hit_id: str) -> Tuple[Optional[Hit], Optional[str]]: + try: + res = AMT_CLIENT.get_hit(HITId=amt_hit_id) + except AMT_CLIENT.exceptions.RequestError as e: + msg = e.response.get("Error", {}).get("Message", "") + return None, msg + hit = Hit.from_amt_get_hit(res["HIT"]) + return hit, None + + @classmethod + def get_hit_status(cls, amt_hit_id: str): + res, msg = cls.get_hit_if_exists(amt_hit_id=amt_hit_id) + if res is None: + if " does not exist. (" in msg: + return HitStatus.Disposed + else: + logging.warning(msg) + return HitStatus.Unassignable + return res.status + + @staticmethod + def create_hit_type(hit_type: HitType): + res = AMT_CLIENT.create_hit_type(**hit_type.to_api_request_body()) + hit_type.amt_hit_type_id = res["HITTypeId"] + AMT_CLIENT.update_notification_settings( + HITTypeId=hit_type.amt_hit_type_id, + Notification={ + "Destination": TOPIC_ARN, + "Transport": "SNS", + "Version": "2006-05-05", + # you can add more events, see mypy_boto3_mturk.literals.EventTypeType + "EventTypes": ["AssignmentSubmitted"], + }, + Active=True, + ) + + return hit_type + + @staticmethod + def create_hit_with_hit_type(hit_type: HitType, question: HitQuestion) -> Hit: + """ + HITTypeId: str + LifetimeInSeconds: int + MaxAssignments: NotRequired[int] + Question: NotRequired[str] + UniqueRequestToken: NotRequired[str] + """ + assert hit_type.id is not None + assert hit_type.amt_hit_type_id is not None + + data = hit_type.generate_hit_amt_request(question=question) + res = AMT_CLIENT.create_hit_with_hit_type(**data) + return Hit.from_amt_create_hit(res["HIT"], hit_type=hit_type, question=question) + + @staticmethod + def get_assignment(amt_assignment_id: str) -> Assignment: + # note, you CANNOT get an assignment if it has been only ACCEPTED (by the worker) + # the api is stupid. it will only show up once it is submitted + res = AMT_CLIENT.get_assignment(AssignmentId=amt_assignment_id) + ass_res: AssignmentTypeDef = res["Assignment"] + assignment = Assignment.from_amt_get_assignment(ass_res) + # to be clear, this has not checked whether it exists in our db + assert assignment.id is None + return assignment + + @classmethod + def get_assignment_if_exists(cls, amt_assignment_id: str) -> Optional[Assignment]: + expected_err_msg = f"Assignment {amt_assignment_id} does not exist" + try: + return cls.get_assignment(amt_assignment_id=amt_assignment_id) + except botocore.exceptions.ClientError as e: + logging.warning(e) + error_code = e.response["Error"]["Code"] + error_msg = e.response["Error"]["Message"] + if error_code == "RequestError" and expected_err_msg in error_msg: + return None + raise e + + @staticmethod + def reject_assignment_if_possible( + amt_assignment_id: str, msg: str = REJECT_MESSAGE_UNKNOWN_ASSIGNMENT + ): + # Unclear to me when this would fail + try: + return AMT_CLIENT.reject_assignment( + AssignmentId=amt_assignment_id, RequesterFeedback=msg + ) + except botocore.exceptions.ClientError as e: + logging.warning(e) + return None + + @staticmethod + def approve_assignment_if_possible( + amt_assignment_id: str, + msg: str = APPROVAL_MESSAGE, + override_rejection: bool = False, + ): + # Unclear to me when this would fail + try: + return AMT_CLIENT.approve_assignment( + AssignmentId=amt_assignment_id, + RequesterFeedback=msg, + OverrideRejection=override_rejection, + ) + except botocore.exceptions.ClientError as e: + logging.warning(e) + return None + + @staticmethod + def update_hit_review_status(amt_hit_id: str, revert: bool = False) -> None: + try: + # Reviewable to Reviewing + AMT_CLIENT.update_hit_review_status(HITId=amt_hit_id, Revert=revert) + except botocore.exceptions.ClientError as e: + logging.warning(f"{amt_hit_id=}, {e}") + error_msg = e.response["Error"]["Message"] + if "does not exist" in error_msg: + raise ValueError(error_msg) + # elif "This HIT is currently in the state 'Reviewing'" in error_msg: + # logging.warning(error_msg) + return None + + @staticmethod + def send_bonus( + amt_worker_id: str, + amount: USDCent, + amt_assignment_id: str, + reason: str, + unique_request_token: str, + ): + try: + return AMT_CLIENT.send_bonus( + WorkerId=amt_worker_id, + BonusAmount=str(amount.to_usd()), + AssignmentId=amt_assignment_id, + Reason=reason, + UniqueRequestToken=unique_request_token, + ) + except botocore.exceptions.ClientError as e: + logging.warning(f"{amt_worker_id=} {amt_assignment_id=}, {e}") + return None + + @staticmethod + def get_bonus(amt_assignment_id: str, payout_event_id: str) -> Optional[Bonus]: + res: List[BonusPaymentTypeDef] = AMT_CLIENT.list_bonus_payments( + AssignmentId=amt_assignment_id + )["BonusPayments"] + assert ( + len(res) <= 1 + ), f"{amt_assignment_id=} Expected 1 or 0 bonuses, got {len(res)}" + d = res[0] if res else None + if d: + return Bonus.model_validate( + { + "amt_worker_id": d["WorkerId"], + "amount": USDCent(round(float(d["BonusAmount"]) * 100)), + "amt_assignment_id": d["AssignmentId"], + "reason": d["Reason"], + "grant_time": d["GrantTime"].astimezone(tz=timezone.utc), + "payout_event_id": payout_event_id, + } + ) + return None + + @staticmethod + def expire_all_hits(): + # used in testing only (or in an emergency I guess) + now = datetime.now(tz=timezone.utc) + paginator = AMT_CLIENT.get_paginator("list_hits") + + for page in paginator.paginate(): + for hit in page["HITs"]: + if hit["HITStatus"] in ("Assignable", "Reviewable", "Reviewing"): + AMT_CLIENT.update_expiration_for_hit( + HITId=hit["HITId"], ExpireAt=now + ) diff --git a/jb/managers/assignment.py b/jb/managers/assignment.py new file mode 100644 index 0000000..fca72e8 --- /dev/null +++ b/jb/managers/assignment.py @@ -0,0 +1,259 @@ +from datetime import datetime, timezone +from typing import Optional + +from psycopg import sql +from pydantic import NonNegativeInt + +from jb.managers import PostgresManager +from jb.models.assignment import AssignmentStub, Assignment +from jb.models.definitions import AssignmentStatus + + +class AssignmentManager(PostgresManager): + + def create_stub(self, stub: AssignmentStub) -> None: + assert stub.id is None + data = stub.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_assignment + (amt_assignment_id, amt_worker_id, status, created_at, modified_at, hit_id) + VALUES + (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s, + %(created_at)s, %(modified_at)s, %(hit_id)s) + RETURNING id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + pk = c.fetchone()["id"] + conn.commit() + stub.id = pk + return None + + def create(self, assignment: Assignment) -> None: + # Typically this is NOT used (we'd create the stub when HIT is + # accepted, then update it once the assignment is submitted), however + # there may be cases where an assignment is submitted and the stub + # was never created (probably due to error/downtime/baddie). + + assert assignment.id is None + data = assignment.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_assignment + (amt_assignment_id, amt_worker_id, status, + created_at, modified_at, hit_id, + auto_approval_time, accept_time, submit_time, + approval_time, rejection_time, requester_feedback, + tsid) + VALUES + (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s, + %(created_at)s, %(modified_at)s, %(hit_id)s, + %(auto_approval_time)s, %(accept_time)s, %(submit_time)s, + %(approval_time)s, %(rejection_time)s, %(requester_feedback)s, + %(tsid)s) + RETURNING id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + pk = c.fetchone()["id"] + conn.commit() + assignment.id = pk + return None + + def get_stub(self, amt_assignment_id: str) -> AssignmentStub: + res = self.pg_config.execute_sql_query( + """ + SELECT ma.id, ma.hit_id, + ma.amt_assignment_id, ma.amt_worker_id, + ma.status, ma.created_at, ma.modified_at, + mh.amt_hit_id + FROM mtwerk_assignment ma + JOIN mtwerk_hit mh ON ma.hit_id = mh.id + WHERE amt_assignment_id = %(amt_assignment_id)s + LIMIT 1; + """, + params={"amt_assignment_id": amt_assignment_id}, + ) + assert len(res) == 1 + return AssignmentStub.model_validate(res[0]) + + def get_stub_if_exists(self, amt_assignment_id: str) -> Optional[AssignmentStub]: + try: + return self.get_stub(amt_assignment_id=amt_assignment_id) + except AssertionError: + return None + + def get(self, amt_assignment_id: str) -> Assignment: + res = self.pg_config.execute_sql_query( + query=""" + SELECT mh.amt_hit_id, ma.id, ma.amt_assignment_id, + ma.amt_worker_id, ma.status, ma.auto_approval_time, + ma.accept_time, ma.submit_time, ma.approval_time, + ma.rejection_time, ma.requester_feedback, + ma.created_at, ma.modified_at, ma.tsid, ma.hit_id + FROM mtwerk_assignment ma + JOIN mtwerk_hit mh ON ma.hit_id = mh.id + WHERE amt_assignment_id = %(amt_assignment_id)s + LIMIT 1; + """, + params={"amt_assignment_id": amt_assignment_id}, + ) + assert len(res) == 1 + return Assignment.model_validate(res[0]) + + def update_answer(self, assignment: Assignment) -> None: + # We're assuming a stub already exists + # The assignment was submitted, but we haven't made a decision yet + now = datetime.now(tz=timezone.utc) + data = { + "status": assignment.status.value, + "submit_time": assignment.submit_time, + "auto_approval_time": assignment.auto_approval_time, + "tsid": assignment.tsid, + "amt_assignment_id": assignment.amt_assignment_id, + "modified_at": now, + } + query = sql.SQL( + """ + UPDATE mtwerk_assignment + SET submit_time = %(submit_time)s, + auto_approval_time = %(auto_approval_time)s, + status = %(status)s, + tsid = %(tsid)s, + modified_at = %(modified_at)s + WHERE amt_assignment_id = %(amt_assignment_id)s + """ + ) + # We force this to fail if the assignment doesn't already exist in the db + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" + conn.commit() + return None + + def reject(self, assignment: Assignment) -> None: + assert assignment.status == AssignmentStatus.Rejected + assert assignment.rejection_time is not None + assert assignment.approval_time is None + assert assignment.requester_feedback is not None + now = datetime.now(tz=timezone.utc) + data = { + "status": assignment.status.value, + "submit_time": assignment.submit_time, + "rejection_time": assignment.rejection_time, + "requester_feedback": assignment.requester_feedback, + "amt_assignment_id": assignment.amt_assignment_id, + "auto_approval_time": assignment.auto_approval_time, + "accept_time": assignment.accept_time, + "modified_at": now, + } + query = sql.SQL( + """ + UPDATE mtwerk_assignment + SET submit_time = %(submit_time)s, + rejection_time = %(rejection_time)s, + status = %(status)s, + requester_feedback = %(requester_feedback)s, + auto_approval_time = %(auto_approval_time)s, + accept_time = %(accept_time)s, + modified_at = %(modified_at)s + WHERE amt_assignment_id = %(amt_assignment_id)s + """ + ) + # We force this to fail if the assignment doesn't already exist in the db + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" + conn.commit() + return None + + def approve(self, assignment: Assignment) -> None: + assert assignment.status == AssignmentStatus.Approved + assert assignment.rejection_time is None + assert assignment.approval_time is not None + assert assignment.requester_feedback is not None + now = datetime.now(tz=timezone.utc) + data = { + "status": assignment.status.value, + "submit_time": assignment.submit_time, + "approval_time": assignment.approval_time, + "requester_feedback": assignment.requester_feedback, + "amt_assignment_id": assignment.amt_assignment_id, + "auto_approval_time": assignment.auto_approval_time, + "accept_time": assignment.accept_time, + "modified_at": now, + } + query = sql.SQL( + """ + UPDATE mtwerk_assignment + SET submit_time = %(submit_time)s, + approval_time = %(approval_time)s, + status = %(status)s, + requester_feedback = %(requester_feedback)s, + auto_approval_time = %(auto_approval_time)s, + accept_time = %(accept_time)s, + modified_at = %(modified_at)s + WHERE amt_assignment_id = %(amt_assignment_id)s + """ + ) + # We force this to fail if the assignment doesn't already exist in the db + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" + conn.commit() + return None + + def missing_tsid_count( + self, amt_worker_id: str, lookback_hrs: int = 24 + ) -> NonNegativeInt: + """ + Look at this user's previous N hrs of submitted assignments. + Count how many assignments they have submitted without a tsid. + """ + res = self.pg_config.execute_sql_query( + query=""" + SELECT COUNT(1) AS c + FROM mtwerk_assignment + WHERE amt_worker_id = %(amt_worker_id)s + AND submit_time > NOW() - (%(lookback_interval)s)::interval + AND tsid IS NULL; + """, + params={ + "amt_worker_id": amt_worker_id, + "lookback_interval": f"{lookback_hrs} hour", + }, + ) + return int(res[0]["c"]) + + def rejected_count( + self, amt_worker_id: str, lookback_hrs: int = 24 + ) -> NonNegativeInt: + """ + Look at this user's previous N hrs of submitted assignments. + Count how many rejected assignments they have. + """ + res = self.pg_config.execute_sql_query( + query=""" + SELECT COUNT(1) AS c + FROM mtwerk_assignment + WHERE amt_worker_id = %(amt_worker_id)s + AND submit_time > NOW() - (%(lookback_interval)s)::interval + AND status = %(status)s; + """, + params={ + "amt_worker_id": amt_worker_id, + "lookback_interval": f"{lookback_hrs} hour", + "status": AssignmentStatus.Rejected.value, + }, + ) + return int(res[0]["c"]) diff --git a/jb/managers/bonus.py b/jb/managers/bonus.py new file mode 100644 index 0000000..0cb8b02 --- /dev/null +++ b/jb/managers/bonus.py @@ -0,0 +1,54 @@ +from typing import List + +from psycopg import sql + +from jb.managers import PostgresManager +from jb.models.bonus import Bonus + + +class BonusManager(PostgresManager): + + def create(self, bonus: Bonus) -> None: + assert bonus.id is None + data = bonus.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_bonus + (payout_event_id, amt_worker_id, amount, grant_time, assignment_id, reason) + VALUES ( + %(payout_event_id)s, + %(amt_worker_id)s, + %(amount)s, + %(grant_time)s, + ( + SELECT id + FROM mtwerk_assignment + WHERE amt_assignment_id = %(amt_assignment_id)s + LIMIT 1 + ), + %(reason)s + ) + RETURNING id, assignment_id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + res = c.fetchone() + conn.commit() + bonus.id = res["id"] + bonus.assignment_id = res["assignment_id"] + return None + + def filter(self, amt_assignment_id: str) -> List[Bonus]: + res = self.pg_config.execute_sql_query( + """ + SELECT mb.*, ma.amt_assignment_id + FROM mtwerk_bonus mb + JOIN mtwerk_assignment ma ON ma.id = mb.assignment_id + WHERE amt_assignment_id = %(amt_assignment_id)s; + """, + params={"amt_assignment_id": amt_assignment_id}, + ) + return [Bonus.from_postgres(x) for x in res] diff --git a/jb/managers/hit.py b/jb/managers/hit.py new file mode 100644 index 0000000..3832418 --- /dev/null +++ b/jb/managers/hit.py @@ -0,0 +1,338 @@ +from datetime import datetime, timezone +from typing import Optional, List + +from psycopg import sql + +from jb.managers import PostgresManager +from jb.models.definitions import HitStatus +from jb.models.hit import HitQuestion, HitType, Hit + + +class HitQuestionManager(PostgresManager): + + def create(self, question: HitQuestion) -> None: + assert question.id is None + data = question.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_question (url, height) + VALUES (%(url)s, %(height)s) + RETURNING id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + pk = c.fetchone()["id"] + conn.commit() + question.id = pk + return None + + def get_by_id(self, question_id: int) -> HitQuestion: + res = self.pg_config.execute_sql_query( + """ + SELECT * + FROM mtwerk_question + WHERE id = %(question_id)s + LIMIT 1; + """, + params={"question_id": question_id}, + ) + assert len(res) == 1 + return HitQuestion.model_validate(res[0]) + + def get_by_values_if_exists(self, url: str, height: int) -> Optional[HitQuestion]: + res = self.pg_config.execute_sql_query( + """ + SELECT * + FROM mtwerk_question + WHERE url = %(url)s AND height = %(height)s + LIMIT 2; + """, + params={"url": url, "height": height}, + ) + assert len(res) != 2, "More than 1 result!" + if len(res) == 0: + return None + + return HitQuestion.model_validate(res[0]) + + def get_or_create(self, question: HitQuestion) -> HitQuestion: + res = self.get_by_values_if_exists(url=question.url, height=question.height) + if res: + return res + self.create(question=question) + return question + + +class HitTypeManager(PostgresManager): + def create(self, hit_type: HitType) -> None: + assert hit_type.amt_hit_type_id is not None + data = hit_type.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_hittype ( + amt_hit_type_id, + title, + description, + reward, + assignment_duration, + auto_approval_delay, + keywords, + min_active + ) + VALUES ( + %(amt_hit_type_id)s, + %(title)s, + %(description)s, + %(reward)s, + %(assignment_duration)s, + %(auto_approval_delay)s, + %(keywords)s, + %(min_active)s + ) + RETURNING id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + pk = c.fetchone()["id"] + conn.commit() + hit_type.id = pk + return None + + def filter_active(self) -> List[HitType]: + res = self.pg_config.execute_sql_query( + """ + SELECT * + FROM mtwerk_hittype + WHERE min_active > 0 + LIMIT 50 + """ + ) + + if len(res) == 50: + raise ValueError("Too many HitTypes!") + + return [HitType.from_postgres(i) for i in res] + + def get(self, amt_hit_type_id: str) -> HitType: + res = self.pg_config.execute_sql_query( + """ + SELECT * + FROM mtwerk_hittype + WHERE amt_hit_type_id = %(amt_hit_type_id)s + """, + params={"amt_hit_type_id": amt_hit_type_id}, + ) + assert len(res) == 1 + return HitType.from_postgres(res[0]) + + def get_if_exists(self, amt_hit_type_id: str) -> Optional[HitType]: + try: + return self.get(amt_hit_type_id=amt_hit_type_id) + except AssertionError: + return None + + def get_or_create(self, hit_type: HitType) -> None: + res = self.get_if_exists(amt_hit_type_id=hit_type.amt_hit_type_id) + if res: + hit_type.id = res.id + if res is None: + self.create(hit_type=hit_type) + return None + + def set_min_active(self, hit_type: HitType) -> None: + assert hit_type.id, "must be in the db first!" + query = sql.SQL( + """ + UPDATE mtwerk_hittype + SET min_active = %(min_active)s + WHERE id = %(id)s + """ + ) + data = {"id": hit_type.id, "min_active": hit_type.min_active} + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + conn.commit() + assert c.rowcount == 1, c.rowcount + return None + + +class HitManager(PostgresManager): + def create(self, hit: Hit): + assert hit.amt_hit_id is not None + assert hit.id is None + data = hit.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_hit ( + amt_hit_id, + hit_type_id, + amt_group_id, + status, + review_status, + creation_time, + expiration, + question_id, + created_at, + modified_at, + max_assignments, + assignment_pending_count, + assignment_completed_count, + assignment_available_count + ) + VALUES ( + %(amt_hit_id)s, + %(hit_type_id)s, + %(amt_group_id)s, + %(status)s, + %(review_status)s, + %(creation_time)s, + %(expiration)s, + %(question_id)s, + %(created_at)s, + %(modified_at)s, + %(max_assignments)s, + %(assignment_pending_count)s, + %(assignment_completed_count)s, + %(assignment_available_count)s + ) + RETURNING id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + pk = c.fetchone()["id"] + conn.commit() + hit.id = pk + return hit + + def update_status(self, amt_hit_id: str, hit_status: HitStatus): + now = datetime.now(tz=timezone.utc) + query = sql.SQL( + """ + UPDATE mtwerk_hit + SET status = %(status)s, modified_at = %(modified_at)s + WHERE amt_hit_id = %(amt_hit_id)s; + """ + ) + + data = { + "amt_hit_id": amt_hit_id, + "status": hit_status.value, + "modified_at": now, + } + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + conn.commit() + assert c.rowcount == 1, c.rowcount + return None + + def update_hit(self, hit: Hit): + hit.modified_at = datetime.now(tz=timezone.utc) + # fields expected to change: + fields = { + "status", + "review_status", + "assignment_pending_count", + "assignment_available_count", + "assignment_completed_count", + "amt_hit_id", + "modified_at", + } + query = sql.SQL( + """ + UPDATE mtwerk_hit + SET status = %(status)s, review_status = %(review_status)s, + assignment_pending_count = %(assignment_pending_count)s, + assignment_available_count = %(assignment_available_count)s, + assignment_completed_count = %(assignment_completed_count)s, + modified_at = %(modified_at)s + WHERE amt_hit_id = %(amt_hit_id)s + RETURNING id; + """ + ) + + data = hit.model_dump(mode="json", include=fields) + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + conn.commit() + assert c.rowcount == 1, c.rowcount + hit.id = c.fetchone()["id"] + return None + + def get_from_amt_id(self, amt_hit_id: str) -> Hit: + res = self.pg_config.execute_sql_query( + """ + SELECT + mh.id, + mh.amt_hit_id, + mh.amt_group_id, + mh.status, + mh.review_status, + mh.creation_time, + mh.expiration, + mh.created_at, + mh.modified_at, + mh.assignment_available_count, + mh.assignment_completed_count, + mh.assignment_pending_count, + mh.max_assignments, + mht.amt_hit_type_id, + mht.title, + mht.description, + mht.reward, + mht.assignment_duration, + mht.auto_approval_delay, + mht.keywords, + mq.id as question_id, + mq.height, + mq.url + FROM mtwerk_hit mh + JOIN mtwerk_hittype mht ON mh.hit_type_id = mht.id + JOIN mtwerk_question mq ON mh.question_id = mq.id + WHERE amt_hit_id = %(amt_hit_id)s + LIMIT 2; + """, + params={"amt_hit_id": amt_hit_id}, + ) + assert len(res) == 1 + res = res[0] + question_xml = HitQuestion.model_validate( + {"height": res.pop("height"), "url": res.pop("url")} + ).xml + res["question_id"] = res["question_id"] + res["hit_question_xml"] = question_xml + + return Hit.from_postgres(res) + + def get_active_count(self, hit_type_id: int): + return self.pg_config.execute_sql_query( + """ + SELECT COUNT(1) as active_count + FROM mtwerk_hit + WHERE status = %(status)s + AND hit_type_id = %(hit_type_id)s; + """, + params={"status": HitStatus.Assignable, "hit_type_id": hit_type_id}, + )[0]["active_count"] + + def filter_active_ids(self, hit_type_id: int): + res = self.pg_config.execute_sql_query( + """ + SELECT mh.amt_hit_id + FROM mtwerk_hit mh + WHERE hit_type_id = %(hit_type_id)s; + """, + params={"hit_type_id": hit_type_id}, + ) + return {x["amt_hit_id"] for x in res} diff --git a/jb/managers/worker.py b/jb/managers/worker.py new file mode 100644 index 0000000..e2d7237 --- /dev/null +++ b/jb/managers/worker.py @@ -0,0 +1,16 @@ +from typing import List + +from mypy_boto3_mturk.type_defs import WorkerBlockTypeDef + +from jb.decorators import AMT_CLIENT + + +class WorkerManager: + + @staticmethod + def fetch_worker_blocks() -> List[WorkerBlockTypeDef]: + p = AMT_CLIENT.get_paginator("list_worker_blocks") + res: List[WorkerBlockTypeDef] = [] + for item in p.paginate(): + res.extend(item["WorkerBlocks"]) + return res |
