aboutsummaryrefslogtreecommitdiff
path: root/jb
diff options
context:
space:
mode:
Diffstat (limited to 'jb')
-rw-r--r--jb/__init__.py0
-rw-r--r--jb/decorators.py75
-rw-r--r--jb/flow/__init__.py0
-rw-r--r--jb/flow/events.py126
-rw-r--r--jb/flow/maintenance.py26
-rw-r--r--jb/flow/monitoring.py156
-rw-r--r--jb/main.py59
-rw-r--r--jb/views/tasks.py70
-rw-r--r--jb/views/utils.py15
9 files changed, 527 insertions, 0 deletions
diff --git a/jb/__init__.py b/jb/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/jb/__init__.py
diff --git a/jb/decorators.py b/jb/decorators.py
new file mode 100644
index 0000000..54d36c7
--- /dev/null
+++ b/jb/decorators.py
@@ -0,0 +1,75 @@
+import boto3
+from botocore.config import Config
+from generalresearchutils.pg_helper import PostgresConfig
+from generalresearchutils.redis_helper import RedisConfig
+from influxdb import InfluxDBClient
+from mypy_boto3_mturk import MTurkClient
+from mypy_boto3_sns import SNSClient
+
+from jb.config import settings
+from jb.managers import Permission
+from jb.managers.assignment import AssignmentManager
+from jb.managers.bonus import BonusManager
+from jb.managers.hit import HitTypeManager, HitManager, HitQuestionManager
+
+redis_config = RedisConfig(
+ dsn=settings.redis,
+ decode_responses=True,
+ socket_timeout=settings.redis_timeout,
+ socket_connect_timeout=settings.redis_timeout,
+)
+REDIS = redis_config.create_redis_client()
+
+CLIENT_CONFIG = Config(
+ # connect_timeout (float or int) – The time in seconds till a timeout
+ # exception is thrown when attempting to make a connection. The default
+ # is 60 seconds.
+ connect_timeout=1,
+ # read_timeout (float or int) – The time in seconds till a timeout
+ # exception is thrown when attempting to read from a connection. The
+ # default is 60 seconds.
+ read_timeout=2.5,
+)
+
+# We shouldn't use this directly. Use our AMTManager wrapper
+AMT_CLIENT: MTurkClient = boto3.client(
+ service_name="mturk",
+ region_name="us-east-1",
+ endpoint_url=str(settings.amt_endpoint),
+ aws_access_key_id=settings.amt_access_id,
+ aws_secret_access_key=settings.amt_secret_key,
+ config=CLIENT_CONFIG,
+)
+
+SNS_CLIENT: SNSClient = boto3.client(
+ service_name="sns",
+ region_name="us-east-2",
+ aws_access_key_id=settings.amt_access_id,
+ aws_secret_access_key=settings.amt_secret_key,
+ config=CLIENT_CONFIG,
+)
+
+pg_config = PostgresConfig(
+ dsn=settings.amt_jb_db,
+ connect_timeout=1,
+ statement_timeout=1,
+)
+
+HTM = HitTypeManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+HM = HitManager(pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE])
+HQM = HitQuestionManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+AM = AssignmentManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+
+BM = BonusManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+
+influx_client = None
+if settings.influx_db:
+ influx_client = InfluxDBClient.from_dsn(str(settings.influx_db))
diff --git a/jb/flow/__init__.py b/jb/flow/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/jb/flow/__init__.py
diff --git a/jb/flow/events.py b/jb/flow/events.py
new file mode 100644
index 0000000..3961a64
--- /dev/null
+++ b/jb/flow/events.py
@@ -0,0 +1,126 @@
+import logging
+import time
+from concurrent import futures
+from concurrent.futures import ThreadPoolExecutor, Executor, as_completed
+from typing import Optional
+
+import redis
+
+from jb.config import (
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ JB_EVENTS_FAILED_STREAM,
+)
+from jb.decorators import REDIS
+from jb.flow.assignment_tasks import process_assignment_submitted
+from jb.models.event import MTurkEvent
+
+
+def process_mturk_events_task():
+ executor = ThreadPoolExecutor(max_workers=5)
+ create_consumer_group()
+ while True:
+ try:
+ process_mturk_events(executor=executor)
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(1)
+
+
+def handle_pending_msgs_task():
+ while True:
+ try:
+ handle_pending_msgs()
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(60)
+
+
+def process_mturk_events(executor: Executor):
+ while True:
+ n = process_mturk_events_chunk(executor=executor)
+ if n is None or n < 10:
+ break
+
+
+def create_consumer_group():
+ try:
+ REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True)
+ except redis.exceptions.ResponseError as e:
+ if "BUSYGROUP Consumer Group name already exists" in str(e):
+ pass # group already exists
+ else:
+ raise
+
+
+def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
+ msgs = REDIS.xreadgroup(
+ groupname=CONSUMER_GROUP,
+ consumername=CONSUMER_NAME,
+ streams={JB_EVENTS_STREAM: ">"},
+ count=10,
+ )
+ if not msgs:
+ return None
+ msgs = msgs[0][1] # the queue, we only have 1
+
+ fs = []
+ for msg in msgs:
+ msg_id, data = msg
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ fs.append(
+ executor.submit(process_assignment_submitted_event, event, msg_id)
+ )
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
+
+ futures.wait(fs, timeout=60)
+ return len(msgs)
+
+
+def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
+ process_assignment_submitted(event)
+ REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)
+
+
+def handle_pending_msgs():
+ # Looks in the redis queue for msgs that
+ # are pending (read by a consumer but not ACK). These prob failed.
+ # Below is from chatgpt, idk if it works
+ pending = REDIS.xpending_range(
+ JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
+ )
+ for entry in pending:
+ msg_id = entry["message_id"]
+ # Claim message if idle > 10 sec
+ if entry["idle"] > 10_000: # milliseconds
+ claimed = REDIS.xclaim(
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ min_idle_time=10_000,
+ message_ids=[msg_id],
+ )
+ for cid, data in claimed:
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ # Try to process it again. If it fails, add
+ # it to the failed stream, so maybe we can fix
+ # and try again?
+ try:
+ process_assignment_submitted_event(event, cid)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ except Exception as e:
+ logging.exception(e)
+ REDIS.xadd(JB_EVENTS_FAILED_STREAM, data)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
diff --git a/jb/flow/maintenance.py b/jb/flow/maintenance.py
new file mode 100644
index 0000000..5dc9cea
--- /dev/null
+++ b/jb/flow/maintenance.py
@@ -0,0 +1,26 @@
+from typing import Optional
+
+from jb.decorators import HM
+from jb.flow.monitoring import emit_hit_event
+from jb.managers.amt import AMTManager
+from jb.models.definitions import HitStatus
+
+
+def check_hit_status(
+ amt_hit_id: str, amt_hit_type_id: str, reason: Optional[str] = None
+) -> HitStatus:
+ """
+ (this used to be called "process_hit")
+ Request information from Amazon regarding the status of a HIT ID. Update the local state from
+ that response.
+ """
+ hit_status = AMTManager.get_hit_status(amt_hit_id=amt_hit_id)
+ # We're assuming that in the db this Hit is marked as Assignable, or else we wouldn't
+ # have called this function.
+ if hit_status != HitStatus.Assignable:
+ # todo: should update also assignment_pending_count, assignment_available_count, assignment_completed_count
+ HM.update_status(amt_hit_id=amt_hit_id, hit_status=hit_status)
+ emit_hit_event(
+ status=hit_status, amt_hit_type_id=amt_hit_type_id, reason=reason
+ )
+ return hit_status
diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py
new file mode 100644
index 0000000..c8432bb
--- /dev/null
+++ b/jb/flow/monitoring.py
@@ -0,0 +1,156 @@
+import socket
+from typing import Optional
+
+from mypy_boto3_mturk.literals import EventTypeType
+
+from jb.config import settings
+from jb.decorators import influx_client
+from jb.models.currency import USDCent
+from jb.models.definitions import HitStatus, AssignmentStatus
+
+
+def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(), # could be "amt-jb-0"
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.hits",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.assignments",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_hit_event(
+ status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus,
+ so it would just be when status=='Assignable'
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.hit_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_assignment_event(
+ status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. an Assignment was accepted/approved/reject
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.assignment_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str):
+ """
+ e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet.
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.mturk_notification_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_error_event(event_type: str, amt_hit_type_id: str):
+ """
+ e.g. todo: structure the error_types
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.error_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_bonus_event(amount: USDCent, amt_hit_type_id: str):
+ """
+ An AMT bonus was awarded
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.bonus_events",
+ "tags": tags,
+ "fields": {"value": 1, "amount": int(amount)},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
diff --git a/jb/main.py b/jb/main.py
new file mode 100644
index 0000000..8c1dbed
--- /dev/null
+++ b/jb/main.py
@@ -0,0 +1,59 @@
+from multiprocessing import Process
+
+from fastapi import FastAPI, Request
+from fastapi.responses import HTMLResponse
+from starlette.middleware.cors import CORSMiddleware
+from starlette.middleware.trustedhost import TrustedHostMiddleware
+
+from jb.views.common import common_router
+from jb.settings import BASE_HTML
+from jb.config import settings
+
+app = FastAPI(
+ servers=[
+ {
+ "url": "https://jamesbillings67.com/",
+ "description": "Production environment",
+ },
+ ],
+ title="jb",
+ version="1.0.0",
+)
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"])
+app.include_router(router=common_router)
+
+
+@app.get("/robots.txt")
+@app.get("/sitemap.xml")
+@app.get("/favicon.ico")
+def return_nothing():
+ return {}
+
+
+@app.get("/{full_path:path}")
+def serve_react_app(full_path: str):
+ # This serves index.html for any unmatched route
+ # React Router will then handle the client-side routing
+ return HTMLResponse(BASE_HTML)
+
+
+def schedule_tasks():
+ from jb.flow.events import process_mturk_events_task, handle_pending_msgs_task
+ from jb.flow.tasks import refill_hits_task
+
+ Process(target=process_mturk_events_task).start()
+ # Process(target=handle_pending_msgs_task).start()
+ Process(target=refill_hits_task).start()
+
+
+if not settings.debug:
+ schedule_tasks()
diff --git a/jb/views/tasks.py b/jb/views/tasks.py
new file mode 100644
index 0000000..7176b29
--- /dev/null
+++ b/jb/views/tasks.py
@@ -0,0 +1,70 @@
+from datetime import datetime, timezone, timedelta
+
+from fastapi import Request
+
+from jb.decorators import AM, HM
+from jb.flow.maintenance import check_hit_status
+from jb.flow.monitoring import emit_assignment_event
+from jb.models.assignment import AssignmentStub
+from jb.models.definitions import AssignmentStatus
+
+
+def process_request(request: Request) -> None:
+ """
+ A worker has loaded the HIT (work) page and (probably) accepted the HIT.
+ AMT creates an assignment, tied to this hit and this worker.
+ Create it in the DB.
+ """
+ amt_assignment_id = request.query_params.get("assignmentId", None)
+ if amt_assignment_id == "ASSIGNMENT_ID_NOT_AVAILABLE":
+ raise ValueError("shouldn't happen")
+ amt_hit_id = request.query_params.get("hitId", None)
+ amt_worker_id = request.query_params.get("workerId", None)
+ print(f"process_request: {amt_assignment_id=} {amt_worker_id=} {amt_hit_id=}")
+ assert amt_worker_id and amt_hit_id and amt_assignment_id
+
+ # Check that the HIT is still valid
+ hit = HM.get_from_amt_id(amt_hit_id=amt_hit_id)
+ _ = check_hit_status(amt_hit_id=amt_hit_id, amt_hit_type_id=hit.amt_hit_type_id)
+ emit_assignment_event(
+ status=AssignmentStatus.Accepted,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ # I think it won't be assignable anymore? idk
+ # assert hit_status == HitStatus.Assignable, f"hit {amt_hit_id} {hit_status=}. Expected Assignable"
+
+ # I would like to verify in the AMT API that this assignment is valid, but there
+ # is no way to do that (until the assignment is submitted)
+
+ # # Make an offerwall to create a user account...
+ # # todo: GSS: Do we really need to do this???
+ # client_ip = get_client_ip(request)
+ # url = f"{settings.fsb_host}{settings.product_id}/offerwall/45b7228a7/"
+ # _ = requests.get(
+ # url,
+ # {"bpuid": amt_worker_id, "ip": client_ip, "n_bins": 1, "format": "json"},
+ # ).json()
+
+ # This assignment shouldn't already exist. If it does, just make sure it
+ # is all the same.
+ assignment_stub = AM.get_stub_if_exists(amt_assignment_id=amt_assignment_id)
+ if assignment_stub:
+ print(f"{assignment_stub=}")
+ assert assignment_stub.amt_worker_id == amt_worker_id
+ assert assignment_stub.amt_assignment_id == amt_assignment_id
+ assert assignment_stub.created_at > (
+ datetime.now(tz=timezone.utc) - timedelta(minutes=90)
+ )
+ return None
+
+ assignment_stub = AssignmentStub(
+ amt_hit_id=amt_hit_id,
+ amt_worker_id=amt_worker_id,
+ amt_assignment_id=amt_assignment_id,
+ status=AssignmentStatus.Accepted,
+ hit_id=hit.id,
+ )
+
+ AM.create_stub(stub=assignment_stub)
+
+ return None
diff --git a/jb/views/utils.py b/jb/views/utils.py
new file mode 100644
index 0000000..39db5d2
--- /dev/null
+++ b/jb/views/utils.py
@@ -0,0 +1,15 @@
+from fastapi import Request
+
+
+def get_client_ip(request: Request) -> str:
+ """
+ Using a testclient, the ip returned is 'testclient'. If so, instead, grab
+ the ip from the headers
+ """
+ ip = request.headers.get("X-Forwarded-For")
+ if not ip:
+ ip = request.client.host
+ elif ip == "testclient" or ip.startswith("10."):
+ forwarded = request.headers.get("X-Forwarded-For")
+ ip = forwarded.split(",")[0].strip() if forwarded else request.client.host
+ return ip