import logging import time from generalresearchutils.config import is_debug from jb.decorators import HTM, HM, HQM, pg_config from jb.flow.maintenance import check_hit_status from jb.flow.monitoring import write_hit_gauge, emit_hit_event from jb.managers.amt import AMTManager from jb.models.definitions import HitStatus from jb.models.hit import HitType, HitQuestion, Hit logging.basicConfig() logger = logging.getLogger() logger.setLevel(logging.INFO) def check_stale_hits(): # Check live hits that haven't been modified in a long time. They may not # be expired yet, but maybe something is wrong? res = pg_config.execute_sql_query( """ SELECT amt_hit_id, amt_hit_type_id FROM mtwerk_hit mh JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id WHERE status = %(status)s ORDER BY modified_at LIMIT 100;""", params={"status": HitStatus.Assignable.value}, ) for hit in res: logging.info(f"check_stale_hits: {hit["amt_hit_id"]}") check_hit_status( amt_hit_id=hit["amt_hit_id"], amt_hit_type_id=hit["amt_hit_type_id"], reason="cleanup", ) def check_expired_hits(): # Check live/assignable hits that are expired (based on AMT's expiration time) res = pg_config.execute_sql_query( """ SELECT amt_hit_id, amt_hit_type_id FROM mtwerk_hit mh JOIN mtwerk_hittype mht ON mht.id = mh.hit_type_id WHERE status = %(status)s AND expiration < now() LIMIT 100;""", params={"status": HitStatus.Assignable.value}, ) for hit in res: logging.info(f"check_expired_hits: {hit["amt_hit_id"]}") check_hit_status( amt_hit_id=hit["amt_hit_id"], amt_hit_type_id=hit["amt_hit_type_id"], reason="expired", ) def create_hit_from_hittype(hit_type: HitType) -> Hit: if is_debug(): raise Exception("Handle AMT Sandbox issues.") else: question = HQM.get_or_create( HitQuestion(height=800, url="https://jamesbillings67.com/work/") ) hit = AMTManager.create_hit_with_hit_type(hit_type=hit_type, question=question) HM.create(hit) emit_hit_event(status=hit.status, amt_hit_type_id=hit.amt_hit_type_id) return hit def refill_hits() -> None: for hit_type in HTM.filter_active(): active_count = HM.get_active_count(hit_type.id) logging.info( f"HitType: {hit_type.amt_hit_type_id}, {hit_type.min_active=}, active_count={active_count}" ) write_hit_gauge( status=HitStatus.Assignable, amt_hit_type_id=hit_type.amt_hit_type_id, cnt=active_count, ) if active_count < hit_type.min_active: cnt_todo = hit_type.min_active - active_count logging.info(f"Refilling {cnt_todo} hits") for _ in range(cnt_todo): create_hit_from_hittype(hit_type) def refill_hits_task(): while True: try: check_expired_hits() check_stale_hits() refill_hits() except Exception as e: logging.exception(e) finally: time.sleep(5 * 60)