import json from typing import Dict, Any import requests from fastapi import Request, APIRouter, HTTPException from fastapi.responses import HTMLResponse from starlette.responses import RedirectResponse from jb.config import settings, JB_EVENTS_STREAM from jb.decorators import REDIS, HM from jb.flow.monitoring import emit_assignment_event, emit_mturk_notification_event from jb.models.definitions import AssignmentStatus from jb.models.event import MTurkEvent from jb.settings import BASE_HTML from jb.config import settings from jb.views.tasks import process_request common_router = APIRouter(prefix="", tags=["API"], include_in_schema=True) @common_router.get(path="/work/", response_class=HTMLResponse) async def work(request: Request): """ HTML Page that the worker lands on in an iFrame. They can either be previewing the HIT, or have already accepted it. """ amt_assignment_id = request.query_params.get("assignmentId", None) worker_id = request.query_params.get("workerId", None) amt_hit_id = request.query_params.get("hitId", None) print(f"work: {amt_assignment_id=} {worker_id=} {amt_hit_id=}") if not worker_id: return RedirectResponse( url=f"/preview/?{request.url.query}" if request.url.query else "/preview/", status_code=302, ) if amt_assignment_id is None or amt_assignment_id == "ASSIGNMENT_ID_NOT_AVAILABLE": # Worker is previewing the HIT amt_hit_type_id = "unknown" if amt_hit_id: hit = HM.get_from_amt_id(amt_hit_id=amt_hit_id) amt_hit_type_id = hit.amt_hit_type_id emit_assignment_event( status=AssignmentStatus.PreviewState, amt_hit_type_id=amt_hit_type_id ) return RedirectResponse( url=f"/preview/?{request.url.query}" if request.url.query else "/preview/", status_code=302, ) try: # The Worker has accepted the HIT process_request(request) except Exception: raise HTTPException(status_code=500, detail="Error processing request") return HTMLResponse(BASE_HTML) @common_router.post(path=f"/{settings.sns_path}/", include_in_schema=False) async def mturk_notifications(request: Request): """ Our SNS topic will POST to this endpoint whenever we get a new message """ try: message = await request.json() except Exception: raise HTTPException(status_code=500, detail="No POST data") match message.get("Type"): case "SubscriptionConfirmation": subscribe_url = message["SubscribeURL"] print("Confirming SNS subscription...") requests.get(subscribe_url) case "Notification": msg = json.loads(message["Message"]) print("Received MTurk event:", msg) enqueue_mturk_notifications(msg) case _: raise HTTPException(status_code=500, detail="Invalid JSON") return {"status": "ok"} def enqueue_mturk_notifications(msg: Dict[str, Any]) -> None: for evt in msg["Events"]: event = MTurkEvent.from_sns(evt) emit_mturk_notification_event( event_type=event.event_type, amt_hit_type_id=event.amt_hit_type_id ) REDIS.xadd(JB_EVENTS_STREAM, {"data": event.model_dump_json()})