from datetime import datetime, timezone from typing import Optional from psycopg import sql from pydantic import NonNegativeInt from jb.managers import PostgresManager from jb.models.assignment import AssignmentStub, Assignment from jb.models.definitions import AssignmentStatus class AssignmentManager(PostgresManager): def create_stub(self, stub: AssignmentStub) -> None: assert stub.id is None data = stub.to_postgres() query = sql.SQL( """ INSERT INTO mtwerk_assignment (amt_assignment_id, amt_worker_id, status, created_at, modified_at, hit_id) VALUES (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s, %(created_at)s, %(modified_at)s, %(hit_id)s) RETURNING id; """ ) with self.pg_config.make_connection() as conn: with conn.cursor() as c: c.execute(query, data) pk = c.fetchone()["id"] conn.commit() stub.id = pk return None def create(self, assignment: Assignment) -> None: # Typically this is NOT used (we'd create the stub when HIT is # accepted, then update it once the assignment is submitted), however # there may be cases where an assignment is submitted and the stub # was never created (probably due to error/downtime/baddie). assert assignment.id is None data = assignment.to_postgres() query = sql.SQL( """ INSERT INTO mtwerk_assignment (amt_assignment_id, amt_worker_id, status, created_at, modified_at, hit_id, auto_approval_time, accept_time, submit_time, approval_time, rejection_time, requester_feedback, tsid) VALUES (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s, %(created_at)s, %(modified_at)s, %(hit_id)s, %(auto_approval_time)s, %(accept_time)s, %(submit_time)s, %(approval_time)s, %(rejection_time)s, %(requester_feedback)s, %(tsid)s) RETURNING id; """ ) with self.pg_config.make_connection() as conn: with conn.cursor() as c: c.execute(query, data) pk = c.fetchone()["id"] conn.commit() assignment.id = pk return None def get_stub(self, amt_assignment_id: str) -> AssignmentStub: res = self.pg_config.execute_sql_query( """ SELECT ma.id, ma.hit_id, ma.amt_assignment_id, ma.amt_worker_id, ma.status, ma.created_at, ma.modified_at, mh.amt_hit_id FROM mtwerk_assignment ma JOIN mtwerk_hit mh ON ma.hit_id = mh.id WHERE amt_assignment_id = %(amt_assignment_id)s LIMIT 1; """, params={"amt_assignment_id": amt_assignment_id}, ) assert len(res) == 1 return AssignmentStub.model_validate(res[0]) def get_stub_if_exists(self, amt_assignment_id: str) -> Optional[AssignmentStub]: try: return self.get_stub(amt_assignment_id=amt_assignment_id) except AssertionError: return None def get(self, amt_assignment_id: str) -> Assignment: res = self.pg_config.execute_sql_query( query=""" SELECT mh.amt_hit_id, ma.id, ma.amt_assignment_id, ma.amt_worker_id, ma.status, ma.auto_approval_time, ma.accept_time, ma.submit_time, ma.approval_time, ma.rejection_time, ma.requester_feedback, ma.created_at, ma.modified_at, ma.tsid, ma.hit_id FROM mtwerk_assignment ma JOIN mtwerk_hit mh ON ma.hit_id = mh.id WHERE amt_assignment_id = %(amt_assignment_id)s LIMIT 1; """, params={"amt_assignment_id": amt_assignment_id}, ) assert len(res) == 1 return Assignment.model_validate(res[0]) def update_answer(self, assignment: Assignment) -> None: # We're assuming a stub already exists # The assignment was submitted, but we haven't made a decision yet now = datetime.now(tz=timezone.utc) data = { "status": assignment.status.value, "submit_time": assignment.submit_time, "auto_approval_time": assignment.auto_approval_time, "tsid": assignment.tsid, "amt_assignment_id": assignment.amt_assignment_id, "modified_at": now, } query = sql.SQL( """ UPDATE mtwerk_assignment SET submit_time = %(submit_time)s, auto_approval_time = %(auto_approval_time)s, status = %(status)s, tsid = %(tsid)s, modified_at = %(modified_at)s WHERE amt_assignment_id = %(amt_assignment_id)s """ ) # We force this to fail if the assignment doesn't already exist in the db with self.pg_config.make_connection() as conn: with conn.cursor() as c: c.execute(query, data) assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" conn.commit() return None def reject(self, assignment: Assignment) -> None: assert assignment.status == AssignmentStatus.Rejected assert assignment.rejection_time is not None assert assignment.approval_time is None assert assignment.requester_feedback is not None now = datetime.now(tz=timezone.utc) data = { "status": assignment.status.value, "submit_time": assignment.submit_time, "rejection_time": assignment.rejection_time, "requester_feedback": assignment.requester_feedback, "amt_assignment_id": assignment.amt_assignment_id, "auto_approval_time": assignment.auto_approval_time, "accept_time": assignment.accept_time, "modified_at": now, } query = sql.SQL( """ UPDATE mtwerk_assignment SET submit_time = %(submit_time)s, rejection_time = %(rejection_time)s, status = %(status)s, requester_feedback = %(requester_feedback)s, auto_approval_time = %(auto_approval_time)s, accept_time = %(accept_time)s, modified_at = %(modified_at)s WHERE amt_assignment_id = %(amt_assignment_id)s """ ) # We force this to fail if the assignment doesn't already exist in the db with self.pg_config.make_connection() as conn: with conn.cursor() as c: c.execute(query, data) assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" conn.commit() return None def approve(self, assignment: Assignment) -> None: assert assignment.status == AssignmentStatus.Approved assert assignment.rejection_time is None assert assignment.approval_time is not None assert assignment.requester_feedback is not None now = datetime.now(tz=timezone.utc) data = { "status": assignment.status.value, "submit_time": assignment.submit_time, "approval_time": assignment.approval_time, "requester_feedback": assignment.requester_feedback, "amt_assignment_id": assignment.amt_assignment_id, "auto_approval_time": assignment.auto_approval_time, "accept_time": assignment.accept_time, "modified_at": now, } query = sql.SQL( """ UPDATE mtwerk_assignment SET submit_time = %(submit_time)s, approval_time = %(approval_time)s, status = %(status)s, requester_feedback = %(requester_feedback)s, auto_approval_time = %(auto_approval_time)s, accept_time = %(accept_time)s, modified_at = %(modified_at)s WHERE amt_assignment_id = %(amt_assignment_id)s """ ) # We force this to fail if the assignment doesn't already exist in the db with self.pg_config.make_connection() as conn: with conn.cursor() as c: c.execute(query, data) assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" conn.commit() return None def missing_tsid_count( self, amt_worker_id: str, lookback_hrs: int = 24 ) -> NonNegativeInt: """ Look at this user's previous N hrs of submitted assignments. Count how many assignments they have submitted without a tsid. """ res = self.pg_config.execute_sql_query( query=""" SELECT COUNT(1) AS c FROM mtwerk_assignment WHERE amt_worker_id = %(amt_worker_id)s AND submit_time > NOW() - (%(lookback_interval)s)::interval AND tsid IS NULL; """, params={ "amt_worker_id": amt_worker_id, "lookback_interval": f"{lookback_hrs} hour", }, ) return int(res[0]["c"]) def rejected_count( self, amt_worker_id: str, lookback_hrs: int = 24 ) -> NonNegativeInt: """ Look at this user's previous N hrs of submitted assignments. Count how many rejected assignments they have. """ res = self.pg_config.execute_sql_query( query=""" SELECT COUNT(1) AS c FROM mtwerk_assignment WHERE amt_worker_id = %(amt_worker_id)s AND submit_time > NOW() - (%(lookback_interval)s)::interval AND status = %(status)s; """, params={ "amt_worker_id": amt_worker_id, "lookback_interval": f"{lookback_hrs} hour", "status": AssignmentStatus.Rejected.value, }, ) return int(res[0]["c"])