1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
import logging
import time
from typing import TypedDict, cast
from generalresearchutils.config import is_debug
from jb.decorators import AMTM, HTM, HM, HQM, pg_config
from jb.flow.maintenance import check_hit_status
from jb.flow.monitoring import write_hit_gauge, emit_hit_event
from jb.models.definitions import HitStatus
from jb.models.hit import HitType, HitQuestion, Hit
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class HitRow(TypedDict):
amt_hit_id: str
amt_hit_type_id: str
def check_stale_hits():
# Check live hits that haven't been modified in a long time. They may not
# be expired yet, but maybe something is wrong?
res = pg_config.execute_sql_query(
"""
SELECT amt_hit_id, amt_hit_type_id
FROM mtwerk_hit mh
JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id
WHERE status = %(status)s
ORDER BY modified_at
LIMIT 100;""",
params={"status": HitStatus.Assignable.value},
)
for hit in cast(list[HitRow], res):
logging.info(f"check_stale_hits: {hit["amt_hit_id"]}")
check_hit_status(
amtm=AMTM,
amt_hit_id=hit["amt_hit_id"],
amt_hit_type_id=hit["amt_hit_type_id"],
reason="cleanup",
)
def check_expired_hits():
# Check live/assignable hits that are expired (based on AMT's expiration time)
res = pg_config.execute_sql_query(
"""
SELECT amt_hit_id, amt_hit_type_id
FROM mtwerk_hit mh
JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id
WHERE status = %(status)s
AND expiration < now()
LIMIT 100;""",
params={"status": HitStatus.Assignable.value},
)
for hit in cast(list[HitRow], res):
logging.info(f"check_expired_hits: {hit["amt_hit_id"]}")
check_hit_status(
amtm=AMTM,
amt_hit_id=hit["amt_hit_id"],
amt_hit_type_id=hit["amt_hit_type_id"],
reason="expired",
)
def create_hit_from_hittype(hit_type: HitType) -> Hit:
if is_debug():
raise Exception("Handle AMT Sandbox issues.")
else:
question = HQM.get_or_create(
HitQuestion(height=800, url="https://jamesbillings67.com/work/")
)
hit = AMTM.create_hit_with_hit_type(hit_type=hit_type, question=question)
HM.create(hit)
emit_hit_event(status=hit.status, amt_hit_type_id=hit.amt_hit_type_id)
return hit
def refill_hits() -> None:
for hit_type in HTM.filter_active():
assert hit_type.id
assert hit_type.amt_hit_type_id
active_count = HM.get_active_count(hit_type_id=hit_type.id)
logging.info(
f"HitType: {hit_type.amt_hit_type_id}, {hit_type.min_active=}, active_count={active_count}"
)
write_hit_gauge(
status=HitStatus.Assignable,
amt_hit_type_id=hit_type.amt_hit_type_id,
cnt=active_count,
)
if active_count < hit_type.min_active:
cnt_todo = hit_type.min_active - active_count
logging.info(f"Refilling {cnt_todo} hits")
for _ in range(cnt_todo):
create_hit_from_hittype(hit_type)
def refill_hits_task():
while True:
try:
check_expired_hits()
check_stale_hits()
refill_hits()
except Exception as e:
logging.exception(e)
finally:
time.sleep(5 * 60)
|