aboutsummaryrefslogtreecommitdiff
path: root/jb/flow/tasks.py
blob: 77825d3089e901b8381bd2142a2c96fb106d110f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import logging
import time
from typing import TypedDict, cast

from generalresearchutils.config import is_debug

from jb.decorators import AMTM, HTM, HM, HQM, pg_config
from jb.flow.maintenance import check_hit_status
from jb.flow.monitoring import write_hit_gauge, emit_hit_event
from jb.models.definitions import HitStatus
from jb.models.hit import HitType, HitQuestion, Hit

logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)


class HitRow(TypedDict):
    amt_hit_id: str
    amt_hit_type_id: str


def check_stale_hits():
    # Check live hits that haven't been modified in a long time. They may not
    #   be expired yet, but maybe something is wrong?
    res = pg_config.execute_sql_query(
        """
        SELECT amt_hit_id, amt_hit_type_id
        FROM mtwerk_hit mh
        JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id
        WHERE status = %(status)s
        ORDER BY modified_at
        LIMIT 100;""",
        params={"status": HitStatus.Assignable.value},
    )
    for hit in cast(list[HitRow], res):
        logging.info(f"check_stale_hits: {hit["amt_hit_id"]}")
        check_hit_status(
            amtm=AMTM,
            amt_hit_id=hit["amt_hit_id"],
            amt_hit_type_id=hit["amt_hit_type_id"],
            reason="cleanup",
        )


def check_expired_hits():
    # Check live/assignable hits that are expired (based on AMT's expiration time)
    res = pg_config.execute_sql_query(
        """
        SELECT amt_hit_id, amt_hit_type_id
        FROM mtwerk_hit mh
        JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id
        WHERE status = %(status)s
        AND expiration < now()
        LIMIT 100;""",
        params={"status": HitStatus.Assignable.value},
    )
    for hit in cast(list[HitRow], res):
        logging.info(f"check_expired_hits: {hit["amt_hit_id"]}")
        check_hit_status(
            amtm=AMTM,
            amt_hit_id=hit["amt_hit_id"],
            amt_hit_type_id=hit["amt_hit_type_id"],
            reason="expired",
        )


def create_hit_from_hittype(hit_type: HitType) -> Hit:
    if is_debug():
        raise Exception("Handle AMT Sandbox issues.")

    else:
        question = HQM.get_or_create(
            HitQuestion(height=800, url="https://jamesbillings67.com/work/")
        )

    hit = AMTM.create_hit_with_hit_type(hit_type=hit_type, question=question)
    HM.create(hit)
    emit_hit_event(status=hit.status, amt_hit_type_id=hit.amt_hit_type_id)
    return hit


def refill_hits() -> None:

    for hit_type in HTM.filter_active():
        assert hit_type.id
        assert hit_type.amt_hit_type_id

        active_count = HM.get_active_count(hit_type_id=hit_type.id)
        logging.info(
            f"HitType: {hit_type.amt_hit_type_id}, {hit_type.min_active=}, active_count={active_count}"
        )
        write_hit_gauge(
            status=HitStatus.Assignable,
            amt_hit_type_id=hit_type.amt_hit_type_id,
            cnt=active_count,
        )
        if active_count < hit_type.min_active:
            cnt_todo = hit_type.min_active - active_count
            logging.info(f"Refilling {cnt_todo} hits")
            for _ in range(cnt_todo):
                create_hit_from_hittype(hit_type)


def refill_hits_task():
    while True:
        try:
            check_expired_hits()
            check_stale_hits()
            refill_hits()
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(5 * 60)