diff options
Diffstat (limited to 'jb')
| -rw-r--r-- | jb/__init__.py | 0 | ||||
| -rw-r--r-- | jb/decorators.py | 75 | ||||
| -rw-r--r-- | jb/flow/__init__.py | 0 | ||||
| -rw-r--r-- | jb/flow/events.py | 126 | ||||
| -rw-r--r-- | jb/flow/maintenance.py | 26 | ||||
| -rw-r--r-- | jb/flow/monitoring.py | 156 | ||||
| -rw-r--r-- | jb/main.py | 59 | ||||
| -rw-r--r-- | jb/views/tasks.py | 70 | ||||
| -rw-r--r-- | jb/views/utils.py | 15 |
9 files changed, 527 insertions, 0 deletions
diff --git a/jb/__init__.py b/jb/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/jb/__init__.py diff --git a/jb/decorators.py b/jb/decorators.py new file mode 100644 index 0000000..54d36c7 --- /dev/null +++ b/jb/decorators.py @@ -0,0 +1,75 @@ +import boto3 +from botocore.config import Config +from generalresearchutils.pg_helper import PostgresConfig +from generalresearchutils.redis_helper import RedisConfig +from influxdb import InfluxDBClient +from mypy_boto3_mturk import MTurkClient +from mypy_boto3_sns import SNSClient + +from jb.config import settings +from jb.managers import Permission +from jb.managers.assignment import AssignmentManager +from jb.managers.bonus import BonusManager +from jb.managers.hit import HitTypeManager, HitManager, HitQuestionManager + +redis_config = RedisConfig( + dsn=settings.redis, + decode_responses=True, + socket_timeout=settings.redis_timeout, + socket_connect_timeout=settings.redis_timeout, +) +REDIS = redis_config.create_redis_client() + +CLIENT_CONFIG = Config( + # connect_timeout (float or int) – The time in seconds till a timeout + # exception is thrown when attempting to make a connection. The default + # is 60 seconds. + connect_timeout=1, + # read_timeout (float or int) – The time in seconds till a timeout + # exception is thrown when attempting to read from a connection. The + # default is 60 seconds. + read_timeout=2.5, +) + +# We shouldn't use this directly. Use our AMTManager wrapper +AMT_CLIENT: MTurkClient = boto3.client( + service_name="mturk", + region_name="us-east-1", + endpoint_url=str(settings.amt_endpoint), + aws_access_key_id=settings.amt_access_id, + aws_secret_access_key=settings.amt_secret_key, + config=CLIENT_CONFIG, +) + +SNS_CLIENT: SNSClient = boto3.client( + service_name="sns", + region_name="us-east-2", + aws_access_key_id=settings.amt_access_id, + aws_secret_access_key=settings.amt_secret_key, + config=CLIENT_CONFIG, +) + +pg_config = PostgresConfig( + dsn=settings.amt_jb_db, + connect_timeout=1, + statement_timeout=1, +) + +HTM = HitTypeManager( + pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] +) +HM = HitManager(pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]) +HQM = HitQuestionManager( + pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] +) +AM = AssignmentManager( + pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] +) + +BM = BonusManager( + pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE] +) + +influx_client = None +if settings.influx_db: + influx_client = InfluxDBClient.from_dsn(str(settings.influx_db)) diff --git a/jb/flow/__init__.py b/jb/flow/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/jb/flow/__init__.py diff --git a/jb/flow/events.py b/jb/flow/events.py new file mode 100644 index 0000000..3961a64 --- /dev/null +++ b/jb/flow/events.py @@ -0,0 +1,126 @@ +import logging +import time +from concurrent import futures +from concurrent.futures import ThreadPoolExecutor, Executor, as_completed +from typing import Optional + +import redis + +from jb.config import ( + JB_EVENTS_STREAM, + CONSUMER_GROUP, + CONSUMER_NAME, + JB_EVENTS_FAILED_STREAM, +) +from jb.decorators import REDIS +from jb.flow.assignment_tasks import process_assignment_submitted +from jb.models.event import MTurkEvent + + +def process_mturk_events_task(): + executor = ThreadPoolExecutor(max_workers=5) + create_consumer_group() + while True: + try: + process_mturk_events(executor=executor) + except Exception as e: + logging.exception(e) + finally: + time.sleep(1) + + +def handle_pending_msgs_task(): + while True: + try: + handle_pending_msgs() + except Exception as e: + logging.exception(e) + finally: + time.sleep(60) + + +def process_mturk_events(executor: Executor): + while True: + n = process_mturk_events_chunk(executor=executor) + if n is None or n < 10: + break + + +def create_consumer_group(): + try: + REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True) + except redis.exceptions.ResponseError as e: + if "BUSYGROUP Consumer Group name already exists" in str(e): + pass # group already exists + else: + raise + + +def process_mturk_events_chunk(executor: Executor) -> Optional[int]: + msgs = REDIS.xreadgroup( + groupname=CONSUMER_GROUP, + consumername=CONSUMER_NAME, + streams={JB_EVENTS_STREAM: ">"}, + count=10, + ) + if not msgs: + return None + msgs = msgs[0][1] # the queue, we only have 1 + + fs = [] + for msg in msgs: + msg_id, data = msg + msg_json = data["data"] + event = MTurkEvent.model_validate_json(msg_json) + if event.event_type == "AssignmentSubmitted": + fs.append( + executor.submit(process_assignment_submitted_event, event, msg_id) + ) + else: + logging.info(f"Discarding {event}") + REDIS.xdel(JB_EVENTS_STREAM, msg_id) + + futures.wait(fs, timeout=60) + return len(msgs) + + +def process_assignment_submitted_event(event: MTurkEvent, msg_id: str): + process_assignment_submitted(event) + REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id) + + +def handle_pending_msgs(): + # Looks in the redis queue for msgs that + # are pending (read by a consumer but not ACK). These prob failed. + # Below is from chatgpt, idk if it works + pending = REDIS.xpending_range( + JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10 + ) + for entry in pending: + msg_id = entry["message_id"] + # Claim message if idle > 10 sec + if entry["idle"] > 10_000: # milliseconds + claimed = REDIS.xclaim( + JB_EVENTS_STREAM, + CONSUMER_GROUP, + CONSUMER_NAME, + min_idle_time=10_000, + message_ids=[msg_id], + ) + for cid, data in claimed: + msg_json = data["data"] + event = MTurkEvent.model_validate_json(msg_json) + if event.event_type == "AssignmentSubmitted": + # Try to process it again. If it fails, add + # it to the failed stream, so maybe we can fix + # and try again? + try: + process_assignment_submitted_event(event, cid) + REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) + except Exception as e: + logging.exception(e) + REDIS.xadd(JB_EVENTS_FAILED_STREAM, data) + REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid) + else: + logging.info(f"Discarding {event}") + REDIS.xdel(JB_EVENTS_STREAM, msg_id) diff --git a/jb/flow/maintenance.py b/jb/flow/maintenance.py new file mode 100644 index 0000000..5dc9cea --- /dev/null +++ b/jb/flow/maintenance.py @@ -0,0 +1,26 @@ +from typing import Optional + +from jb.decorators import HM +from jb.flow.monitoring import emit_hit_event +from jb.managers.amt import AMTManager +from jb.models.definitions import HitStatus + + +def check_hit_status( + amt_hit_id: str, amt_hit_type_id: str, reason: Optional[str] = None +) -> HitStatus: + """ + (this used to be called "process_hit") + Request information from Amazon regarding the status of a HIT ID. Update the local state from + that response. + """ + hit_status = AMTManager.get_hit_status(amt_hit_id=amt_hit_id) + # We're assuming that in the db this Hit is marked as Assignable, or else we wouldn't + # have called this function. + if hit_status != HitStatus.Assignable: + # todo: should update also assignment_pending_count, assignment_available_count, assignment_completed_count + HM.update_status(amt_hit_id=amt_hit_id, hit_status=hit_status) + emit_hit_event( + status=hit_status, amt_hit_type_id=amt_hit_type_id, reason=reason + ) + return hit_status diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py new file mode 100644 index 0000000..c8432bb --- /dev/null +++ b/jb/flow/monitoring.py @@ -0,0 +1,156 @@ +import socket +from typing import Optional + +from mypy_boto3_mturk.literals import EventTypeType + +from jb.config import settings +from jb.decorators import influx_client +from jb.models.currency import USDCent +from jb.models.definitions import HitStatus, AssignmentStatus + + +def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int): + tags = { + "host": socket.gethostname(), # could be "amt-jb-0" + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.hits", + "tags": tags, + "fields": {"count": cnt}, + } + if influx_client: + influx_client.write_points([point]) + + +def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int): + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.assignments", + "tags": tags, + "fields": {"count": cnt}, + } + if influx_client: + influx_client.write_points([point]) + + +def emit_hit_event( + status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None +): + """ + e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus, + so it would just be when status=='Assignable' + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + if reason: + tags["reason"] = reason + point = { + "measurement": "amt_jb.hit_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_assignment_event( + status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None +): + """ + e.g. an Assignment was accepted/approved/reject + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "status": status.value, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + if reason: + tags["reason"] = reason + point = { + "measurement": "amt_jb.assignment_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str): + """ + e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet. + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "event_type": event_type, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.mturk_notification_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_error_event(event_type: str, amt_hit_type_id: str): + """ + e.g. todo: structure the error_types + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "event_type": event_type, + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.error_events", + "tags": tags, + "fields": {"value": 1}, + } + + if influx_client: + influx_client.write_points([point]) + + +def emit_bonus_event(amount: USDCent, amt_hit_type_id: str): + """ + An AMT bonus was awarded + """ + tags = { + "host": socket.gethostname(), + "service": "amt-jb", + "amt_hit_type_id": amt_hit_type_id, + "debug": settings.debug, + } + point = { + "measurement": "amt_jb.bonus_events", + "tags": tags, + "fields": {"value": 1, "amount": int(amount)}, + } + + if influx_client: + influx_client.write_points([point]) diff --git a/jb/main.py b/jb/main.py new file mode 100644 index 0000000..8c1dbed --- /dev/null +++ b/jb/main.py @@ -0,0 +1,59 @@ +from multiprocessing import Process + +from fastapi import FastAPI, Request +from fastapi.responses import HTMLResponse +from starlette.middleware.cors import CORSMiddleware +from starlette.middleware.trustedhost import TrustedHostMiddleware + +from jb.views.common import common_router +from jb.settings import BASE_HTML +from jb.config import settings + +app = FastAPI( + servers=[ + { + "url": "https://jamesbillings67.com/", + "description": "Production environment", + }, + ], + title="jb", + version="1.0.0", +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"]) +app.include_router(router=common_router) + + +@app.get("/robots.txt") +@app.get("/sitemap.xml") +@app.get("/favicon.ico") +def return_nothing(): + return {} + + +@app.get("/{full_path:path}") +def serve_react_app(full_path: str): + # This serves index.html for any unmatched route + # React Router will then handle the client-side routing + return HTMLResponse(BASE_HTML) + + +def schedule_tasks(): + from jb.flow.events import process_mturk_events_task, handle_pending_msgs_task + from jb.flow.tasks import refill_hits_task + + Process(target=process_mturk_events_task).start() + # Process(target=handle_pending_msgs_task).start() + Process(target=refill_hits_task).start() + + +if not settings.debug: + schedule_tasks() diff --git a/jb/views/tasks.py b/jb/views/tasks.py new file mode 100644 index 0000000..7176b29 --- /dev/null +++ b/jb/views/tasks.py @@ -0,0 +1,70 @@ +from datetime import datetime, timezone, timedelta + +from fastapi import Request + +from jb.decorators import AM, HM +from jb.flow.maintenance import check_hit_status +from jb.flow.monitoring import emit_assignment_event +from jb.models.assignment import AssignmentStub +from jb.models.definitions import AssignmentStatus + + +def process_request(request: Request) -> None: + """ + A worker has loaded the HIT (work) page and (probably) accepted the HIT. + AMT creates an assignment, tied to this hit and this worker. + Create it in the DB. + """ + amt_assignment_id = request.query_params.get("assignmentId", None) + if amt_assignment_id == "ASSIGNMENT_ID_NOT_AVAILABLE": + raise ValueError("shouldn't happen") + amt_hit_id = request.query_params.get("hitId", None) + amt_worker_id = request.query_params.get("workerId", None) + print(f"process_request: {amt_assignment_id=} {amt_worker_id=} {amt_hit_id=}") + assert amt_worker_id and amt_hit_id and amt_assignment_id + + # Check that the HIT is still valid + hit = HM.get_from_amt_id(amt_hit_id=amt_hit_id) + _ = check_hit_status(amt_hit_id=amt_hit_id, amt_hit_type_id=hit.amt_hit_type_id) + emit_assignment_event( + status=AssignmentStatus.Accepted, + amt_hit_type_id=hit.amt_hit_type_id, + ) + # I think it won't be assignable anymore? idk + # assert hit_status == HitStatus.Assignable, f"hit {amt_hit_id} {hit_status=}. Expected Assignable" + + # I would like to verify in the AMT API that this assignment is valid, but there + # is no way to do that (until the assignment is submitted) + + # # Make an offerwall to create a user account... + # # todo: GSS: Do we really need to do this??? + # client_ip = get_client_ip(request) + # url = f"{settings.fsb_host}{settings.product_id}/offerwall/45b7228a7/" + # _ = requests.get( + # url, + # {"bpuid": amt_worker_id, "ip": client_ip, "n_bins": 1, "format": "json"}, + # ).json() + + # This assignment shouldn't already exist. If it does, just make sure it + # is all the same. + assignment_stub = AM.get_stub_if_exists(amt_assignment_id=amt_assignment_id) + if assignment_stub: + print(f"{assignment_stub=}") + assert assignment_stub.amt_worker_id == amt_worker_id + assert assignment_stub.amt_assignment_id == amt_assignment_id + assert assignment_stub.created_at > ( + datetime.now(tz=timezone.utc) - timedelta(minutes=90) + ) + return None + + assignment_stub = AssignmentStub( + amt_hit_id=amt_hit_id, + amt_worker_id=amt_worker_id, + amt_assignment_id=amt_assignment_id, + status=AssignmentStatus.Accepted, + hit_id=hit.id, + ) + + AM.create_stub(stub=assignment_stub) + + return None diff --git a/jb/views/utils.py b/jb/views/utils.py new file mode 100644 index 0000000..39db5d2 --- /dev/null +++ b/jb/views/utils.py @@ -0,0 +1,15 @@ +from fastapi import Request + + +def get_client_ip(request: Request) -> str: + """ + Using a testclient, the ip returned is 'testclient'. If so, instead, grab + the ip from the headers + """ + ip = request.headers.get("X-Forwarded-For") + if not ip: + ip = request.client.host + elif ip == "testclient" or ip.startswith("10."): + forwarded = request.headers.get("X-Forwarded-For") + ip = forwarded.split(",")[0].strip() if forwarded else request.client.host + return ip |
