aboutsummaryrefslogtreecommitdiff
path: root/jb/views
diff options
context:
space:
mode:
Diffstat (limited to 'jb/views')
-rw-r--r--jb/views/common.py186
1 files changed, 186 insertions, 0 deletions
diff --git a/jb/views/common.py b/jb/views/common.py
new file mode 100644
index 0000000..46ac608
--- /dev/null
+++ b/jb/views/common.py
@@ -0,0 +1,186 @@
+import json
+from typing import List
+from uuid import uuid4
+
+import requests
+from fastapi import Request, APIRouter, Response, HTTPException, Query
+from fastapi.responses import HTMLResponse, JSONResponse
+from pydantic import BaseModel, ConfigDict, Field
+from starlette.responses import RedirectResponse
+
+from jb.config import settings, JB_EVENTS_STREAM
+from jb.decorators import REDIS, HM
+from jb.flow.monitoring import emit_assignment_event, emit_mturk_notification_event
+from jb.models.currency import USDCent
+from jb.models.definitions import ReportValue, AssignmentStatus
+from jb.models.event import MTurkEvent
+from jb.settings import BASE_HTML
+from jb.config import settings
+from jb.views.tasks import process_request
+
+common_router = APIRouter(prefix="", tags=["API"], include_in_schema=True)
+
+
+class ReportTask(BaseModel):
+ model_config = ConfigDict(extra="forbid")
+
+ worker_id: str = Field()
+
+ reasons: List[ReportValue] = Field(
+ examples=[[3, 4]],
+ default_factory=list,
+ )
+
+ notes: str = Field(
+ default="", examples=["The survey wanted to watch me eat Haejang-guk"]
+ )
+
+
+@common_router.post("/report/")
+def report(request: Request, data: ReportTask):
+ url = f"{settings.fsb_host}{settings.product_id}/report/"
+ params = {
+ "bpuid": data.worker_id,
+ "reasons": [x.value for x in data.reasons],
+ "notes": data.notes,
+ }
+
+ req = requests.post(url, json=params)
+ res = req.json()
+ if res.status_code != 200:
+ raise HTTPException(
+ status_code=res.status_code, detail="Failed to submit report"
+ )
+
+ return Response(res)
+
+
+@common_router.get(path="/work/", response_class=HTMLResponse)
+async def work(request: Request):
+ """
+ HTML Page that the worker lands on in an iFrame.
+ They can either be previewing the HIT, or have already accepted it.
+ """
+ amt_assignment_id = request.query_params.get("assignmentId", None)
+ worker_id = request.query_params.get("workerId", None)
+ amt_hit_id = request.query_params.get("hitId", None)
+ print(f"work: {amt_assignment_id=} {worker_id=} {amt_hit_id=}")
+
+ if not worker_id:
+ return RedirectResponse(
+ url=f"/preview/?{request.url.query}" if request.url.query else "/preview/",
+ status_code=302,
+ )
+ if amt_assignment_id is None or amt_assignment_id == "ASSIGNMENT_ID_NOT_AVAILABLE":
+ # Worker is previewing the HIT
+ amt_hit_type_id = "unknown"
+ if amt_hit_id:
+ hit = HM.get_from_amt_id(amt_hit_id=amt_hit_id)
+ amt_hit_type_id = hit.amt_hit_type_id
+ emit_assignment_event(
+ status=AssignmentStatus.PreviewState, amt_hit_type_id=amt_hit_type_id
+ )
+ return RedirectResponse(
+ url=f"/preview/?{request.url.query}" if request.url.query else "/preview/",
+ status_code=302,
+ )
+
+ # The Worker has accepted the HIT
+ process_request(request)
+
+ return HTMLResponse(BASE_HTML)
+
+
+@common_router.get(path="/survey/", response_class=JSONResponse)
+def survey(
+ request: Request,
+ worker_id: str = Query(),
+ duration: int = Query(default=1200),
+):
+ if not worker_id:
+ raise HTTPException(status_code=400, detail="Missing worker_id")
+
+ # (1) Check wallet
+ wallet_url = f"{settings.fsb_host}{settings.product_id}/wallet/"
+ wallet_res = requests.get(wallet_url, params={"bpuid": worker_id})
+ if wallet_res.status_code != 200:
+ raise HTTPException(status_code=502, detail="Wallet check failed")
+
+ wallet_data = wallet_res.json()
+ wallet_balance = wallet_data["wallet"]["amount"]
+ if wallet_balance < -100:
+ return JSONResponse(
+ {
+ "total_surveys": 0,
+ "link": None,
+ "duration": None,
+ "payout": None,
+ }
+ )
+
+ # (2) Get offerwall
+ client_ip = "69.253.144.55" if settings.debug else request.client.host
+ offerwall_url = f"{settings.fsb_host}{settings.product_id}/offerwall/d48cce47/"
+ offerwall_res = requests.get(
+ offerwall_url,
+ params={
+ "bpuid": worker_id,
+ "ip": client_ip,
+ "n_bins": 1,
+ "duration": duration,
+ },
+ )
+
+ if offerwall_res.status_code != 200:
+ raise HTTPException(status_code=502, detail="Offerwall request failed")
+
+ try:
+ rj = offerwall_res.json()
+ bucket = rj["offerwall"]["buckets"][0]
+ return JSONResponse(
+ {
+ "total_surveys": rj["offerwall"]["availability_count"],
+ "link": bucket["uri"],
+ "duration": round(bucket["duration"]["q2"] / 60),
+ "payout": USDCent(bucket["payout"]["q2"]).to_usd_str(),
+ }
+ )
+ except Exception:
+ return JSONResponse(
+ {
+ "total_surveys": 0,
+ "link": None,
+ "duration": None,
+ "payout": None,
+ }
+ )
+
+
+@common_router.post(path=f"/{settings.sns_path}/", include_in_schema=False)
+async def mturk_notifications(request: Request):
+ """
+ Our SNS topic will POST to this endpoint whenever we get a new message
+ """
+ message = await request.json()
+ msg_type = message.get("Type")
+
+ if msg_type == "SubscriptionConfirmation":
+ subscribe_url = message["SubscribeURL"]
+ print("Confirming SNS subscription...")
+ requests.get(subscribe_url)
+
+ elif msg_type == "Notification":
+ msg = json.loads(message["Message"])
+ print("Received MTurk event:", msg)
+ enqueue_mturk_notifications(msg)
+
+ return {"status": "ok"}
+
+
+def enqueue_mturk_notifications(msg) -> None:
+ for evt in msg["Events"]:
+ event = MTurkEvent.from_sns(evt)
+ emit_mturk_notification_event(
+ event_type=event.event_type, amt_hit_type_id=event.amt_hit_type_id
+ )
+ REDIS.xadd(JB_EVENTS_STREAM, {"data": event.model_dump_json()})