diff options
Diffstat (limited to 'jb/managers/assignment.py')
| -rw-r--r-- | jb/managers/assignment.py | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/jb/managers/assignment.py b/jb/managers/assignment.py new file mode 100644 index 0000000..fca72e8 --- /dev/null +++ b/jb/managers/assignment.py @@ -0,0 +1,259 @@ +from datetime import datetime, timezone +from typing import Optional + +from psycopg import sql +from pydantic import NonNegativeInt + +from jb.managers import PostgresManager +from jb.models.assignment import AssignmentStub, Assignment +from jb.models.definitions import AssignmentStatus + + +class AssignmentManager(PostgresManager): + + def create_stub(self, stub: AssignmentStub) -> None: + assert stub.id is None + data = stub.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_assignment + (amt_assignment_id, amt_worker_id, status, created_at, modified_at, hit_id) + VALUES + (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s, + %(created_at)s, %(modified_at)s, %(hit_id)s) + RETURNING id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + pk = c.fetchone()["id"] + conn.commit() + stub.id = pk + return None + + def create(self, assignment: Assignment) -> None: + # Typically this is NOT used (we'd create the stub when HIT is + # accepted, then update it once the assignment is submitted), however + # there may be cases where an assignment is submitted and the stub + # was never created (probably due to error/downtime/baddie). + + assert assignment.id is None + data = assignment.to_postgres() + query = sql.SQL( + """ + INSERT INTO mtwerk_assignment + (amt_assignment_id, amt_worker_id, status, + created_at, modified_at, hit_id, + auto_approval_time, accept_time, submit_time, + approval_time, rejection_time, requester_feedback, + tsid) + VALUES + (%(amt_assignment_id)s, %(amt_worker_id)s, %(status)s, + %(created_at)s, %(modified_at)s, %(hit_id)s, + %(auto_approval_time)s, %(accept_time)s, %(submit_time)s, + %(approval_time)s, %(rejection_time)s, %(requester_feedback)s, + %(tsid)s) + RETURNING id; + """ + ) + + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + pk = c.fetchone()["id"] + conn.commit() + assignment.id = pk + return None + + def get_stub(self, amt_assignment_id: str) -> AssignmentStub: + res = self.pg_config.execute_sql_query( + """ + SELECT ma.id, ma.hit_id, + ma.amt_assignment_id, ma.amt_worker_id, + ma.status, ma.created_at, ma.modified_at, + mh.amt_hit_id + FROM mtwerk_assignment ma + JOIN mtwerk_hit mh ON ma.hit_id = mh.id + WHERE amt_assignment_id = %(amt_assignment_id)s + LIMIT 1; + """, + params={"amt_assignment_id": amt_assignment_id}, + ) + assert len(res) == 1 + return AssignmentStub.model_validate(res[0]) + + def get_stub_if_exists(self, amt_assignment_id: str) -> Optional[AssignmentStub]: + try: + return self.get_stub(amt_assignment_id=amt_assignment_id) + except AssertionError: + return None + + def get(self, amt_assignment_id: str) -> Assignment: + res = self.pg_config.execute_sql_query( + query=""" + SELECT mh.amt_hit_id, ma.id, ma.amt_assignment_id, + ma.amt_worker_id, ma.status, ma.auto_approval_time, + ma.accept_time, ma.submit_time, ma.approval_time, + ma.rejection_time, ma.requester_feedback, + ma.created_at, ma.modified_at, ma.tsid, ma.hit_id + FROM mtwerk_assignment ma + JOIN mtwerk_hit mh ON ma.hit_id = mh.id + WHERE amt_assignment_id = %(amt_assignment_id)s + LIMIT 1; + """, + params={"amt_assignment_id": amt_assignment_id}, + ) + assert len(res) == 1 + return Assignment.model_validate(res[0]) + + def update_answer(self, assignment: Assignment) -> None: + # We're assuming a stub already exists + # The assignment was submitted, but we haven't made a decision yet + now = datetime.now(tz=timezone.utc) + data = { + "status": assignment.status.value, + "submit_time": assignment.submit_time, + "auto_approval_time": assignment.auto_approval_time, + "tsid": assignment.tsid, + "amt_assignment_id": assignment.amt_assignment_id, + "modified_at": now, + } + query = sql.SQL( + """ + UPDATE mtwerk_assignment + SET submit_time = %(submit_time)s, + auto_approval_time = %(auto_approval_time)s, + status = %(status)s, + tsid = %(tsid)s, + modified_at = %(modified_at)s + WHERE amt_assignment_id = %(amt_assignment_id)s + """ + ) + # We force this to fail if the assignment doesn't already exist in the db + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" + conn.commit() + return None + + def reject(self, assignment: Assignment) -> None: + assert assignment.status == AssignmentStatus.Rejected + assert assignment.rejection_time is not None + assert assignment.approval_time is None + assert assignment.requester_feedback is not None + now = datetime.now(tz=timezone.utc) + data = { + "status": assignment.status.value, + "submit_time": assignment.submit_time, + "rejection_time": assignment.rejection_time, + "requester_feedback": assignment.requester_feedback, + "amt_assignment_id": assignment.amt_assignment_id, + "auto_approval_time": assignment.auto_approval_time, + "accept_time": assignment.accept_time, + "modified_at": now, + } + query = sql.SQL( + """ + UPDATE mtwerk_assignment + SET submit_time = %(submit_time)s, + rejection_time = %(rejection_time)s, + status = %(status)s, + requester_feedback = %(requester_feedback)s, + auto_approval_time = %(auto_approval_time)s, + accept_time = %(accept_time)s, + modified_at = %(modified_at)s + WHERE amt_assignment_id = %(amt_assignment_id)s + """ + ) + # We force this to fail if the assignment doesn't already exist in the db + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" + conn.commit() + return None + + def approve(self, assignment: Assignment) -> None: + assert assignment.status == AssignmentStatus.Approved + assert assignment.rejection_time is None + assert assignment.approval_time is not None + assert assignment.requester_feedback is not None + now = datetime.now(tz=timezone.utc) + data = { + "status": assignment.status.value, + "submit_time": assignment.submit_time, + "approval_time": assignment.approval_time, + "requester_feedback": assignment.requester_feedback, + "amt_assignment_id": assignment.amt_assignment_id, + "auto_approval_time": assignment.auto_approval_time, + "accept_time": assignment.accept_time, + "modified_at": now, + } + query = sql.SQL( + """ + UPDATE mtwerk_assignment + SET submit_time = %(submit_time)s, + approval_time = %(approval_time)s, + status = %(status)s, + requester_feedback = %(requester_feedback)s, + auto_approval_time = %(auto_approval_time)s, + accept_time = %(accept_time)s, + modified_at = %(modified_at)s + WHERE amt_assignment_id = %(amt_assignment_id)s + """ + ) + # We force this to fail if the assignment doesn't already exist in the db + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, data) + assert c.rowcount == 1, f"Expected 1 row, got {c.rowcount}" + conn.commit() + return None + + def missing_tsid_count( + self, amt_worker_id: str, lookback_hrs: int = 24 + ) -> NonNegativeInt: + """ + Look at this user's previous N hrs of submitted assignments. + Count how many assignments they have submitted without a tsid. + """ + res = self.pg_config.execute_sql_query( + query=""" + SELECT COUNT(1) AS c + FROM mtwerk_assignment + WHERE amt_worker_id = %(amt_worker_id)s + AND submit_time > NOW() - (%(lookback_interval)s)::interval + AND tsid IS NULL; + """, + params={ + "amt_worker_id": amt_worker_id, + "lookback_interval": f"{lookback_hrs} hour", + }, + ) + return int(res[0]["c"]) + + def rejected_count( + self, amt_worker_id: str, lookback_hrs: int = 24 + ) -> NonNegativeInt: + """ + Look at this user's previous N hrs of submitted assignments. + Count how many rejected assignments they have. + """ + res = self.pg_config.execute_sql_query( + query=""" + SELECT COUNT(1) AS c + FROM mtwerk_assignment + WHERE amt_worker_id = %(amt_worker_id)s + AND submit_time > NOW() - (%(lookback_interval)s)::interval + AND status = %(status)s; + """, + params={ + "amt_worker_id": amt_worker_id, + "lookback_interval": f"{lookback_hrs} hour", + "status": AssignmentStatus.Rejected.value, + }, + ) + return int(res[0]["c"]) |
