diff options
| author | Max Nanis | 2026-02-21 02:15:52 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-02-21 02:15:52 -0500 |
| commit | 67ab724561e4ceb8fe8fb4031de277168f7d9724 (patch) | |
| tree | 4d85619973491e7239f0e83dc5cdd85618f0f248 /jb/flow | |
| parent | af8057d58ff152f511f5161a7626b0fffa9d661a (diff) | |
| download | amt-jb-67ab724561e4ceb8fe8fb4031de277168f7d9724.tar.gz amt-jb-67ab724561e4ceb8fe8fb4031de277168f7d9724.zip | |
More pytest conf, some views, and defining more attrs on the settings config
Diffstat (limited to 'jb/flow')
| -rw-r--r-- | jb/flow/tasks.py | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/jb/flow/tasks.py b/jb/flow/tasks.py new file mode 100644 index 0000000..e7c64b9 --- /dev/null +++ b/jb/flow/tasks.py @@ -0,0 +1,103 @@ +import logging +import time + +from generalresearchutils.config import is_debug + +from jb.decorators import HTM, HM, HQM, pg_config +from jb.flow.maintenance import check_hit_status +from jb.flow.monitoring import write_hit_gauge, emit_hit_event +from jb.managers.amt import AMTManager +from jb.models.definitions import HitStatus +from jb.models.hit import HitType, HitQuestion, Hit + +logging.basicConfig() +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def check_stale_hits(): + # Check live hits that haven't been modified in a long time. They may not + # be expired yet, but maybe something is wrong? + res = pg_config.execute_sql_query( + """ + SELECT amt_hit_id, amt_hit_type_id + FROM mtwerk_hit mh + JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id + WHERE status = %(status)s + ORDER BY modified_at + LIMIT 100;""", + params={"status": HitStatus.Assignable.value}, + ) + for hit in res: + logging.info(f"check_stale_hits: {hit["amt_hit_id"]}") + check_hit_status( + amt_hit_id=hit["amt_hit_id"], + amt_hit_type_id=hit["amt_hit_type_id"], + reason="cleanup", + ) + + +def check_expired_hits(): + # Check live/assignable hits that are expired (based on AMT's expiration time) + res = pg_config.execute_sql_query( + """ + SELECT amt_hit_id, amt_hit_type_id + FROM mtwerk_hit mh + JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id + WHERE status = %(status)s + AND expiration < now() + LIMIT 100;""", + params={"status": HitStatus.Assignable.value}, + ) + for hit in res: + logging.info(f"check_expired_hits: {hit["amt_hit_id"]}") + check_hit_status( + amt_hit_id=hit["amt_hit_id"], + amt_hit_type_id=hit["amt_hit_type_id"], + reason="expired", + ) + + +def create_hit_from_hittype(hit_type: HitType) -> Hit: + if is_debug(): + raise Exception("Handle AMT Sandbox issues.") + + else: + question = HQM.get_or_create( + HitQuestion(height=800, url="https://jamesbillings67.com/work/") + ) + + hit = AMTManager.create_hit_with_hit_type(hit_type=hit_type, question=question) + HM.create(hit) + emit_hit_event(status=hit.status, amt_hit_type_id=hit.amt_hit_type_id) + return hit + + +def refill_hits() -> None: + for hit_type in HTM.filter_active(): + active_count = HM.get_active_count(hit_type.id) + logging.info( + f"HitType: {hit_type.amt_hit_type_id}, {hit_type.min_active=}, active_count={active_count}" + ) + write_hit_gauge( + status=HitStatus.Assignable, + amt_hit_type_id=hit_type.amt_hit_type_id, + cnt=active_count, + ) + if active_count < hit_type.min_active: + cnt_todo = hit_type.min_active - active_count + logging.info(f"Refilling {cnt_todo} hits") + for _ in range(cnt_todo): + create_hit_from_hittype(hit_type) + + +def refill_hits_task(): + while True: + try: + check_expired_hits() + check_stale_hits() + refill_hits() + except Exception as e: + logging.exception(e) + finally: + time.sleep(5 * 60) |
