diff options
Diffstat (limited to 'jb/views/common.py')
| -rw-r--r-- | jb/views/common.py | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/jb/views/common.py b/jb/views/common.py new file mode 100644 index 0000000..46ac608 --- /dev/null +++ b/jb/views/common.py @@ -0,0 +1,186 @@ +import json +from typing import List +from uuid import uuid4 + +import requests +from fastapi import Request, APIRouter, Response, HTTPException, Query +from fastapi.responses import HTMLResponse, JSONResponse +from pydantic import BaseModel, ConfigDict, Field +from starlette.responses import RedirectResponse + +from jb.config import settings, JB_EVENTS_STREAM +from jb.decorators import REDIS, HM +from jb.flow.monitoring import emit_assignment_event, emit_mturk_notification_event +from jb.models.currency import USDCent +from jb.models.definitions import ReportValue, AssignmentStatus +from jb.models.event import MTurkEvent +from jb.settings import BASE_HTML +from jb.config import settings +from jb.views.tasks import process_request + +common_router = APIRouter(prefix="", tags=["API"], include_in_schema=True) + + +class ReportTask(BaseModel): + model_config = ConfigDict(extra="forbid") + + worker_id: str = Field() + + reasons: List[ReportValue] = Field( + examples=[[3, 4]], + default_factory=list, + ) + + notes: str = Field( + default="", examples=["The survey wanted to watch me eat Haejang-guk"] + ) + + +@common_router.post("/report/") +def report(request: Request, data: ReportTask): + url = f"{settings.fsb_host}{settings.product_id}/report/" + params = { + "bpuid": data.worker_id, + "reasons": [x.value for x in data.reasons], + "notes": data.notes, + } + + req = requests.post(url, json=params) + res = req.json() + if res.status_code != 200: + raise HTTPException( + status_code=res.status_code, detail="Failed to submit report" + ) + + return Response(res) + + +@common_router.get(path="/work/", response_class=HTMLResponse) +async def work(request: Request): + """ + HTML Page that the worker lands on in an iFrame. + They can either be previewing the HIT, or have already accepted it. + """ + amt_assignment_id = request.query_params.get("assignmentId", None) + worker_id = request.query_params.get("workerId", None) + amt_hit_id = request.query_params.get("hitId", None) + print(f"work: {amt_assignment_id=} {worker_id=} {amt_hit_id=}") + + if not worker_id: + return RedirectResponse( + url=f"/preview/?{request.url.query}" if request.url.query else "/preview/", + status_code=302, + ) + if amt_assignment_id is None or amt_assignment_id == "ASSIGNMENT_ID_NOT_AVAILABLE": + # Worker is previewing the HIT + amt_hit_type_id = "unknown" + if amt_hit_id: + hit = HM.get_from_amt_id(amt_hit_id=amt_hit_id) + amt_hit_type_id = hit.amt_hit_type_id + emit_assignment_event( + status=AssignmentStatus.PreviewState, amt_hit_type_id=amt_hit_type_id + ) + return RedirectResponse( + url=f"/preview/?{request.url.query}" if request.url.query else "/preview/", + status_code=302, + ) + + # The Worker has accepted the HIT + process_request(request) + + return HTMLResponse(BASE_HTML) + + +@common_router.get(path="/survey/", response_class=JSONResponse) +def survey( + request: Request, + worker_id: str = Query(), + duration: int = Query(default=1200), +): + if not worker_id: + raise HTTPException(status_code=400, detail="Missing worker_id") + + # (1) Check wallet + wallet_url = f"{settings.fsb_host}{settings.product_id}/wallet/" + wallet_res = requests.get(wallet_url, params={"bpuid": worker_id}) + if wallet_res.status_code != 200: + raise HTTPException(status_code=502, detail="Wallet check failed") + + wallet_data = wallet_res.json() + wallet_balance = wallet_data["wallet"]["amount"] + if wallet_balance < -100: + return JSONResponse( + { + "total_surveys": 0, + "link": None, + "duration": None, + "payout": None, + } + ) + + # (2) Get offerwall + client_ip = "69.253.144.55" if settings.debug else request.client.host + offerwall_url = f"{settings.fsb_host}{settings.product_id}/offerwall/d48cce47/" + offerwall_res = requests.get( + offerwall_url, + params={ + "bpuid": worker_id, + "ip": client_ip, + "n_bins": 1, + "duration": duration, + }, + ) + + if offerwall_res.status_code != 200: + raise HTTPException(status_code=502, detail="Offerwall request failed") + + try: + rj = offerwall_res.json() + bucket = rj["offerwall"]["buckets"][0] + return JSONResponse( + { + "total_surveys": rj["offerwall"]["availability_count"], + "link": bucket["uri"], + "duration": round(bucket["duration"]["q2"] / 60), + "payout": USDCent(bucket["payout"]["q2"]).to_usd_str(), + } + ) + except Exception: + return JSONResponse( + { + "total_surveys": 0, + "link": None, + "duration": None, + "payout": None, + } + ) + + +@common_router.post(path=f"/{settings.sns_path}/", include_in_schema=False) +async def mturk_notifications(request: Request): + """ + Our SNS topic will POST to this endpoint whenever we get a new message + """ + message = await request.json() + msg_type = message.get("Type") + + if msg_type == "SubscriptionConfirmation": + subscribe_url = message["SubscribeURL"] + print("Confirming SNS subscription...") + requests.get(subscribe_url) + + elif msg_type == "Notification": + msg = json.loads(message["Message"]) + print("Received MTurk event:", msg) + enqueue_mturk_notifications(msg) + + return {"status": "ok"} + + +def enqueue_mturk_notifications(msg) -> None: + for evt in msg["Events"]: + event = MTurkEvent.from_sns(evt) + emit_mturk_notification_event( + event_type=event.event_type, amt_hit_type_id=event.amt_hit_type_id + ) + REDIS.xadd(JB_EVENTS_STREAM, {"data": event.model_dump_json()}) |
