aboutsummaryrefslogtreecommitdiff
path: root/jb/flow
diff options
context:
space:
mode:
Diffstat (limited to 'jb/flow')
-rw-r--r--jb/flow/tasks.py103
1 files changed, 103 insertions, 0 deletions
diff --git a/jb/flow/tasks.py b/jb/flow/tasks.py
new file mode 100644
index 0000000..e7c64b9
--- /dev/null
+++ b/jb/flow/tasks.py
@@ -0,0 +1,103 @@
+import logging
+import time
+
+from generalresearchutils.config import is_debug
+
+from jb.decorators import HTM, HM, HQM, pg_config
+from jb.flow.maintenance import check_hit_status
+from jb.flow.monitoring import write_hit_gauge, emit_hit_event
+from jb.managers.amt import AMTManager
+from jb.models.definitions import HitStatus
+from jb.models.hit import HitType, HitQuestion, Hit
+
+logging.basicConfig()
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+
+
+def check_stale_hits():
+ # Check live hits that haven't been modified in a long time. They may not
+ # be expired yet, but maybe something is wrong?
+ res = pg_config.execute_sql_query(
+ """
+ SELECT amt_hit_id, amt_hit_type_id
+ FROM mtwerk_hit mh
+ JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id
+ WHERE status = %(status)s
+ ORDER BY modified_at
+ LIMIT 100;""",
+ params={"status": HitStatus.Assignable.value},
+ )
+ for hit in res:
+ logging.info(f"check_stale_hits: {hit["amt_hit_id"]}")
+ check_hit_status(
+ amt_hit_id=hit["amt_hit_id"],
+ amt_hit_type_id=hit["amt_hit_type_id"],
+ reason="cleanup",
+ )
+
+
+def check_expired_hits():
+ # Check live/assignable hits that are expired (based on AMT's expiration time)
+ res = pg_config.execute_sql_query(
+ """
+ SELECT amt_hit_id, amt_hit_type_id
+ FROM mtwerk_hit mh
+ JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id
+ WHERE status = %(status)s
+ AND expiration < now()
+ LIMIT 100;""",
+ params={"status": HitStatus.Assignable.value},
+ )
+ for hit in res:
+ logging.info(f"check_expired_hits: {hit["amt_hit_id"]}")
+ check_hit_status(
+ amt_hit_id=hit["amt_hit_id"],
+ amt_hit_type_id=hit["amt_hit_type_id"],
+ reason="expired",
+ )
+
+
+def create_hit_from_hittype(hit_type: HitType) -> Hit:
+ if is_debug():
+ raise Exception("Handle AMT Sandbox issues.")
+
+ else:
+ question = HQM.get_or_create(
+ HitQuestion(height=800, url="https://jamesbillings67.com/work/")
+ )
+
+ hit = AMTManager.create_hit_with_hit_type(hit_type=hit_type, question=question)
+ HM.create(hit)
+ emit_hit_event(status=hit.status, amt_hit_type_id=hit.amt_hit_type_id)
+ return hit
+
+
+def refill_hits() -> None:
+ for hit_type in HTM.filter_active():
+ active_count = HM.get_active_count(hit_type.id)
+ logging.info(
+ f"HitType: {hit_type.amt_hit_type_id}, {hit_type.min_active=}, active_count={active_count}"
+ )
+ write_hit_gauge(
+ status=HitStatus.Assignable,
+ amt_hit_type_id=hit_type.amt_hit_type_id,
+ cnt=active_count,
+ )
+ if active_count < hit_type.min_active:
+ cnt_todo = hit_type.min_active - active_count
+ logging.info(f"Refilling {cnt_todo} hits")
+ for _ in range(cnt_todo):
+ create_hit_from_hittype(hit_type)
+
+
+def refill_hits_task():
+ while True:
+ try:
+ check_expired_hits()
+ check_stale_hits()
+ refill_hits()
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(5 * 60)