aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore6
-rw-r--r--Jenkinsfile95
-rw-r--r--LICENSE7
-rw-r--r--README.md26
-rw-r--r--__init__.py0
-rw-r--r--carer/README.md20
-rw-r--r--carer/__init__.py0
-rw-r--r--carer/carer/__init__.py0
-rw-r--r--carer/carer/mtwerk/__init__.py0
-rw-r--r--carer/carer/mtwerk/migrations/0001_initial.py157
-rw-r--r--carer/carer/mtwerk/migrations/__init__.py0
-rw-r--r--carer/carer/mtwerk/models.py130
-rw-r--r--carer/manage.py22
-rw-r--r--carer/requirements.txt4
-rw-r--r--jb/__init__.py0
-rw-r--r--jb/decorators.py75
-rw-r--r--jb/flow/__init__.py0
-rw-r--r--jb/flow/events.py126
-rw-r--r--jb/flow/maintenance.py26
-rw-r--r--jb/flow/monitoring.py156
-rw-r--r--jb/main.py59
-rw-r--r--jb/views/tasks.py70
-rw-r--r--jb/views/utils.py15
-rw-r--r--telegraf.conf100
-rw-r--r--tests/http/__init__.py0
-rw-r--r--tests/http/conftest.py50
-rw-r--r--tests/http/test_basic.py35
-rw-r--r--tests/http/test_work.py24
-rw-r--r--tests_sandbox/__init__.py0
-rw-r--r--tests_sandbox/test_flow.py29
-rw-r--r--tests_sandbox/utils.py0
31 files changed, 1217 insertions, 15 deletions
diff --git a/.gitignore b/.gitignore
index eb40667..c732fb1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -147,6 +147,12 @@ static-src/node_modules
/static-src/.sentryclirc
/static-src/package-lock.json
+# Settings
+.env*
+
+# Carer
+/carer/carer/settings/*
+
# dependencies
/jb-ui/node_modules
/jb-ui/.pnp
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..265e6c6
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,95 @@
+/* This is intended to test the dev & master branches of the gr-api project,
+ where the tests are entirely self contained
+*/
+
+pipeline {
+ agent any
+
+ triggers {
+ cron('H */6 * * *')
+ pollSCM('H */3 * * *')
+ }
+
+ environment {
+ DATA_SRC = "${env.WORKSPACE}/mnt/"
+
+ AMT_JB_CARER_VENV = "${env.WORKSPACE}/amt-jb-carer-venv"
+ AMT_JB_VENV = "${env.WORKSPACE}/amt-jb-venv"
+ }
+
+ stages {
+ stage('Setup DB'){
+ steps {
+ script {
+ env.DB_NAME = "unittest-amt-jb-" + UUID.randomUUID().toString().replace("-", "").take(12)
+ env.AMT_JB_DB = "postgres://jenkins:123456789@unittest-postgresql.fmt2.grl.internal/${env.DB_NAME}"
+ echo "Using database: ${env.DB_NAME}"
+ }
+
+ sh """
+ PGPASSWORD=123456789 psql -h unittest-postgresql.fmt2.grl.internal -U jenkins -d postgres <<EOF
+ CREATE DATABASE "${env.DB_NAME}" WITH TEMPLATE = template0 ENCODING = 'UTF8';
+ EOF
+ """
+ }
+ }
+
+ stage('setup:amt-jb-carer') {
+ steps {
+ checkout scmGit(
+ branches: [[name: 'master']],
+ extensions: [ cloneOption(shallow: true) ],
+ userRemoteConfigs: [
+ [credentialsId: 'abdeb570-b708-44f3-b857-8a6b06ed9822',
+ url: 'ssh://code.g-r-l.com:6611/panels/amt-jb']
+ ],
+ )
+
+ dir("carer/") {
+ sh 'python3.13 -m venv $AMT_JB_CARER_VENV'
+ sh '$AMT_JB_CARER_VENV/bin/pip install -U setuptools wheel pip'
+ sh '$AMT_JB_CARER_VENV/bin/pip install -r requirements.txt'
+ sh '$AMT_JB_CARER_VENV/bin/python3.13 manage.py migrate --settings=carer.settings.unittest'
+ }
+ }
+ }
+
+ /* Okay, finally we can setup the virtual environment for the actual
+ project itself. gr-api FastAPI doesn't manage any of it's own
+ database so it doesn't need to do any migrations or anything.
+ */
+ stage('setup:amt-jb') {
+ steps {
+ sh 'python3.13 -m venv $AMT_JB_VENV'
+ sh '$AMT_JB_VENV/bin/pip install -U setuptools wheel pip'
+ withCredentials([sshUserPrivateKey(
+ credentialsId: 'abdeb570-b708-44f3-b857-8a6b06ed9822',
+ keyFileVariable: 'SSH_PRIVATE_KEY')]) {
+
+ sh """
+ eval \$(ssh-agent) && ssh-add ${SSH_PRIVATE_KEY} && \
+ ${AMT_JB_VENV}/bin/pip install -r requirements.txt
+ """
+ }
+ }
+ }
+
+ stage('tests') {
+ steps {
+ sh '$AMT_JB_VENV/bin/pytest -v tests'
+ }
+ }
+ }
+ post {
+ always {
+ echo 'One way or another, I have finished'
+ deleteDir() /* clean up our workspace */
+
+ sh """
+ PGPASSWORD=123456789 psql -h unittest-postgresql.fmt2.grl.internal -U jenkins -d postgres <<EOF
+ DROP DATABASE "${env.DB_NAME}";
+ EOF
+ """
+ }
+ }
+}
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..521f8ee
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,7 @@
+Copyright 2026 General Research Laboratories, LLC
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file
diff --git a/README.md b/README.md
index 361ffaa..64a61e8 100644
--- a/README.md
+++ b/README.md
@@ -1,22 +1,23 @@
-# AMT James Billings
+# [AMT James Billings](https://git.generalresearch.com/panels/amt-jb/) &middot; [![license](https://cdn.generalresearch.com/buildStatus/icon?subject=license&status=MIT&color=blue)](./LICENSE) ![master (app)](https://cdn.generalresearch.com/buildStatus/icon?subject=master%20(api)&job=amt-jb-app%2Fmaster) ![master (ui)](https://cdn.generalresearch.com/buildStatus/icon?subject=master%20(ui)&job=amt-jb%2Fmaster) ![python](https://cdn.generalresearch.com/buildStatus/icon?subject=Python&status=3.13.7&color=blue)
+
## Infrastructure
### FastAPI
- Handles HIT acceptance
-- Handles interface b/w react and backend: work, preview, report.
+- Handles interface between React and FastAPI: work, preview, report.
- Handling of submitted assignments:
- - mturk_notifications view: SNS will POST to this endpoint whenever a user submits an assignment
+ - mturk_notifications view: SNS will POST to this endpoint whenever a user submits an Assignment
- The message gets added to our event stream
- process_mturk_events (which calls process_assignment_submitted)
+- Refill Hits, check for expired hits
### React
-### AirFlow
-- Refill Hits, check for expired hits
## Network
+
## System Environment
### Debian 12
@@ -80,14 +81,16 @@ systemctl status postgresql
vim /etc/postgresql/18/main/pg_hba.conf
```
-#### Setup DNS Cache (LXC)
+#### Setup DNS Cache
```
apt install dnsutils dnsmasq -y
# /etc/dnsmasq.conf
-server=10.16.2.2
-server=10.16.2.3
+server=x.x.x.x
+server=x.x.x.x
+server=x.x.x.x
+server=x.x.x.x
min-cache-ttl=30
no-resolv
listen-address=::1,127.0.0.1
@@ -120,13 +123,6 @@ service nginx restart
```
-
-Confirm uvicorn is running properly
-
-```shell
-curl -i http://localhost:8000/headers/
-```
-
## Telegraf
```shell
diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/__init__.py
diff --git a/carer/README.md b/carer/README.md
new file mode 100644
index 0000000..e5f9123
--- /dev/null
+++ b/carer/README.md
@@ -0,0 +1,20 @@
+# amt-jb
+
+This refers to the Django carer that handles DB models
+
+## Environment
+
+- amt-jb-carer: (amt-jb-carer-venv) Python 3.13.7
+
+
+```shell
+
+python3.13 -m venv /root/amt-jb-carer-venv
+source /root/amt-jb-carer-venv/bin/activate
+pip install -U pip setuptools wheel
+cd ~/amt-jb/carer/
+pip install -r requirements.txt
+
+python3 manage.py makemigrations mtwerk --settings=carer.settings.production
+python3 manage.py migrate mtwerk --settings=carer.settings.production
+``` \ No newline at end of file
diff --git a/carer/__init__.py b/carer/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/carer/__init__.py
diff --git a/carer/carer/__init__.py b/carer/carer/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/carer/carer/__init__.py
diff --git a/carer/carer/mtwerk/__init__.py b/carer/carer/mtwerk/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/carer/carer/mtwerk/__init__.py
diff --git a/carer/carer/mtwerk/migrations/0001_initial.py b/carer/carer/mtwerk/migrations/0001_initial.py
new file mode 100644
index 0000000..d5c856a
--- /dev/null
+++ b/carer/carer/mtwerk/migrations/0001_initial.py
@@ -0,0 +1,157 @@
+# Generated by Django 5.2.7 on 2025-10-28 22:06
+
+import django.db.models.deletion
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ initial = True
+
+ dependencies = [
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name='Hit',
+ fields=[
+ ('id', models.BigAutoField(primary_key=True, serialize=False)),
+ ('amt_hit_id', models.CharField(max_length=30, unique=True)),
+ ('amt_group_id', models.CharField(max_length=30)),
+ ('status', models.SmallIntegerField()),
+ ('review_status', models.SmallIntegerField()),
+ ('creation_time', models.DateTimeField()),
+ ('expiration', models.DateTimeField()),
+ ('created_at', models.DateTimeField(auto_now_add=True)),
+ ('modified_at', models.DateTimeField(auto_now=True)),
+ ('max_assignments', models.IntegerField(default=1)),
+ ('assignment_pending_count', models.IntegerField(default=0)),
+ ('assignment_available_count', models.IntegerField(default=0)),
+ ('assignment_completed_count', models.IntegerField(default=0)),
+ ],
+ options={
+ 'db_table': 'mtwerk_hit',
+ },
+ ),
+ migrations.CreateModel(
+ name='Assignment',
+ fields=[
+ ('id', models.BigAutoField(primary_key=True, serialize=False)),
+ ('amt_assignment_id', models.CharField(max_length=30, unique=True)),
+ ('amt_worker_id', models.CharField(max_length=30)),
+ ('status', models.SmallIntegerField()),
+ ('auto_approval_time', models.DateTimeField(null=True)),
+ ('accept_time', models.DateTimeField(null=True)),
+ ('submit_time', models.DateTimeField(null=True)),
+ ('approval_time', models.DateTimeField(null=True)),
+ ('rejection_time', models.DateTimeField(null=True)),
+ ('requester_feedback', models.CharField(max_length=1024, null=True)),
+ ('created_at', models.DateTimeField(auto_now_add=True)),
+ ('modified_at', models.DateTimeField(auto_now=True)),
+ ('tsid', models.UUIDField(null=True)),
+ ('hit', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='mtwerk.hit')),
+ ],
+ options={
+ 'db_table': 'mtwerk_assignment',
+ },
+ ),
+ migrations.CreateModel(
+ name='HitType',
+ fields=[
+ ('id', models.AutoField(primary_key=True, serialize=False)),
+ ('amt_hit_type_id', models.CharField(max_length=30, unique=True)),
+ ('title', models.CharField(max_length=200)),
+ ('description', models.CharField(max_length=2000)),
+ ('reward', models.DecimalField(decimal_places=2, max_digits=8)),
+ ('assignment_duration', models.PositiveIntegerField()),
+ ('auto_approval_delay', models.PositiveIntegerField()),
+ ('keywords', models.CharField(max_length=256)),
+ ('min_active', models.IntegerField(default=0)),
+ ],
+ options={
+ 'db_table': 'mtwerk_hittype',
+ 'indexes': [models.Index(fields=['amt_hit_type_id'], name='mtwerk_hitt_amt_hit_05dd10_idx'), models.Index(fields=['min_active'], name='mtwerk_hitt_min_act_e18222_idx')],
+ },
+ ),
+ migrations.AddField(
+ model_name='hit',
+ name='hit_type',
+ field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='mtwerk.hittype'),
+ ),
+ migrations.CreateModel(
+ name='Question',
+ fields=[
+ ('id', models.AutoField(primary_key=True, serialize=False)),
+ ('url', models.URLField()),
+ ('height', models.IntegerField(default=1200)),
+ ],
+ options={
+ 'db_table': 'mtwerk_question',
+ 'unique_together': {('url', 'height')},
+ },
+ ),
+ migrations.AddField(
+ model_name='hit',
+ name='question',
+ field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='mtwerk.question'),
+ ),
+ migrations.CreateModel(
+ name='Bonus',
+ fields=[
+ ('id', models.BigAutoField(primary_key=True, serialize=False)),
+ ('payout_event_id', models.UUIDField(unique=True)),
+ ('amt_worker_id', models.CharField(max_length=30, null=True)),
+ ('amount', models.DecimalField(decimal_places=2, max_digits=10)),
+ ('grant_time', models.DateTimeField()),
+ ('reason', models.CharField(max_length=1024)),
+ ('assignment', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='mtwerk.assignment')),
+ ],
+ options={
+ 'indexes': [models.Index(fields=['amt_worker_id'], name='mtwerk_bonu_amt_wor_77d514_idx'), models.Index(fields=['grant_time'], name='mtwerk_bonu_grant_t_d43c6c_idx')],
+ },
+ ),
+ migrations.AddIndex(
+ model_name='assignment',
+ index=models.Index(fields=['amt_worker_id'], name='mtwerk_assi_amt_wor_7dbe42_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='assignment',
+ index=models.Index(fields=['status'], name='mtwerk_assi_status_4e01de_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='assignment',
+ index=models.Index(fields=['submit_time'], name='mtwerk_assi_submit__429d3e_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='assignment',
+ index=models.Index(fields=['created_at'], name='mtwerk_assi_created_9292fa_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='assignment',
+ index=models.Index(fields=['modified_at'], name='mtwerk_assi_modifie_44f1d0_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='assignment',
+ index=models.Index(fields=['tsid'], name='mtwerk_assi_tsid_bd796d_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='hit',
+ index=models.Index(fields=['status'], name='mtwerk_hit_status_ae726f_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='hit',
+ index=models.Index(fields=['review_status'], name='mtwerk_hit_review__a9f420_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='hit',
+ index=models.Index(fields=['creation_time'], name='mtwerk_hit_creatio_48e0fd_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='hit',
+ index=models.Index(fields=['created_at'], name='mtwerk_hit_created_f1614b_idx'),
+ ),
+ migrations.AddIndex(
+ model_name='hit',
+ index=models.Index(fields=['modified_at'], name='mtwerk_hit_modifie_45c2ed_idx'),
+ ),
+ ]
diff --git a/carer/carer/mtwerk/migrations/__init__.py b/carer/carer/mtwerk/migrations/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/carer/carer/mtwerk/migrations/__init__.py
diff --git a/carer/carer/mtwerk/models.py b/carer/carer/mtwerk/models.py
new file mode 100644
index 0000000..5cb026d
--- /dev/null
+++ b/carer/carer/mtwerk/models.py
@@ -0,0 +1,130 @@
+from django.db import models
+
+
+class Question(models.Model):
+ """The iFrame location and details for a HIT"""
+
+ id = models.AutoField(primary_key=True)
+
+ url = models.URLField(max_length=200)
+ height = models.IntegerField(default=1200)
+
+ class Meta:
+ db_table = "mtwerk_question"
+ unique_together = ["url", "height"]
+
+
+class HitType(models.Model):
+ id = models.AutoField(primary_key=True)
+ amt_hit_type_id = models.CharField(max_length=30, unique=True)
+
+ title = models.CharField(max_length=200)
+ description = models.CharField(max_length=2000)
+ reward = models.DecimalField(decimal_places=2, max_digits=8)
+ assignment_duration = models.PositiveIntegerField()
+ auto_approval_delay = models.PositiveIntegerField()
+ keywords = models.CharField(max_length=256)
+
+ # If set, don't let the active HIT count drop below this amount
+ min_active = models.IntegerField(null=False, default=0)
+
+ class Meta:
+ db_table = "mtwerk_hittype"
+ indexes = [
+ models.Index(fields=["amt_hit_type_id"]),
+ models.Index(fields=["min_active"]),
+ ]
+
+
+class Hit(models.Model):
+ id = models.BigAutoField(primary_key=True)
+ amt_hit_id = models.CharField(max_length=30, unique=True)
+
+ hit_type = models.ForeignKey(HitType, on_delete=models.PROTECT)
+
+ amt_group_id = models.CharField(max_length=30)
+
+ status = models.SmallIntegerField(null=False)
+ review_status = models.SmallIntegerField(null=False)
+ creation_time = models.DateTimeField(null=False)
+ expiration = models.DateTimeField(null=False)
+
+ question = models.ForeignKey(Question, on_delete=models.PROTECT)
+ created_at = models.DateTimeField(null=False, auto_now_add=True)
+ modified_at = models.DateTimeField(null=False, auto_now=True)
+
+ max_assignments = models.IntegerField(null=False, default=1)
+ assignment_pending_count = models.IntegerField(null=False, default=0)
+ assignment_available_count = models.IntegerField(null=False, default=0)
+ assignment_completed_count = models.IntegerField(null=False, default=0)
+
+ class Meta:
+ db_table = "mtwerk_hit"
+ indexes = [
+ models.Index(fields=["status"]),
+ models.Index(fields=["review_status"]),
+ models.Index(fields=["creation_time"]),
+ models.Index(fields=["created_at"]),
+ models.Index(fields=["modified_at"]),
+ ]
+
+
+class Assignment(models.Model):
+ id = models.BigAutoField(primary_key=True)
+ amt_assignment_id = models.CharField(max_length=30, unique=True)
+
+ amt_worker_id = models.CharField(max_length=30, null=False)
+
+ hit = models.ForeignKey(Hit, on_delete=models.PROTECT)
+
+ status = models.SmallIntegerField(null=False)
+
+ auto_approval_time = models.DateTimeField(null=True)
+ accept_time = models.DateTimeField(null=True)
+ submit_time = models.DateTimeField(null=True)
+ approval_time = models.DateTimeField(null=True)
+ rejection_time = models.DateTimeField(null=True)
+
+ # Don't need this
+ # answer = models.CharField
+
+ requester_feedback = models.CharField(null=True, max_length=1024)
+
+ # Internal tracking
+ created_at = models.DateTimeField(null=False, auto_now_add=True)
+ modified_at = models.DateTimeField(null=False, auto_now=True)
+ tsid = models.UUIDField(null=True)
+
+ class Meta:
+ db_table = "mtwerk_assignment"
+ indexes = [
+ models.Index(fields=["amt_worker_id"]),
+ models.Index(fields=["status"]),
+ models.Index(fields=["submit_time"]),
+ models.Index(fields=["created_at"]),
+ models.Index(fields=["modified_at"]),
+ models.Index(fields=["tsid"]),
+ ]
+
+
+class Bonus(models.Model):
+ """Issued bonuses to workers for supplemental payments"""
+
+ id = models.BigAutoField(primary_key=True)
+ payout_event_id = models.UUIDField(null=False, unique=True)
+
+ amt_worker_id = models.CharField(max_length=30, null=True)
+
+ # Not unique=True because maybe multiple Bonuses will be associated
+ # with a single Assignment
+ assignment = models.ForeignKey(Assignment, null=False, on_delete=models.PROTECT)
+
+ amount = models.DecimalField(max_digits=10, decimal_places=2)
+ grant_time = models.DateTimeField(null=False)
+ reason = models.CharField(max_length=1024)
+
+ class Meta:
+ indexes = [
+ models.Index(fields=["amt_worker_id"]),
+ models.Index(fields=["grant_time"]),
+ ]
diff --git a/carer/manage.py b/carer/manage.py
new file mode 100644
index 0000000..091b39b
--- /dev/null
+++ b/carer/manage.py
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+"""Django's command-line utility for administrative tasks."""
+import os
+import sys
+
+
+def main():
+ """Run administrative tasks."""
+ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "carer.settings")
+ try:
+ from django.core.management import execute_from_command_line
+ except ImportError as exc:
+ raise ImportError(
+ "Couldn't import Django. Are you sure it's installed and "
+ "available on your PYTHONPATH environment variable? Did you "
+ "forget to activate a virtual environment?"
+ ) from exc
+ execute_from_command_line(sys.argv)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/carer/requirements.txt b/carer/requirements.txt
new file mode 100644
index 0000000..f4f866e
--- /dev/null
+++ b/carer/requirements.txt
@@ -0,0 +1,4 @@
+asgiref==3.10.0
+Django==5.2.7
+psycopg==3.2.10
+sqlparse==0.5.3
diff --git a/jb/__init__.py b/jb/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/jb/__init__.py
diff --git a/jb/decorators.py b/jb/decorators.py
new file mode 100644
index 0000000..54d36c7
--- /dev/null
+++ b/jb/decorators.py
@@ -0,0 +1,75 @@
+import boto3
+from botocore.config import Config
+from generalresearchutils.pg_helper import PostgresConfig
+from generalresearchutils.redis_helper import RedisConfig
+from influxdb import InfluxDBClient
+from mypy_boto3_mturk import MTurkClient
+from mypy_boto3_sns import SNSClient
+
+from jb.config import settings
+from jb.managers import Permission
+from jb.managers.assignment import AssignmentManager
+from jb.managers.bonus import BonusManager
+from jb.managers.hit import HitTypeManager, HitManager, HitQuestionManager
+
+redis_config = RedisConfig(
+ dsn=settings.redis,
+ decode_responses=True,
+ socket_timeout=settings.redis_timeout,
+ socket_connect_timeout=settings.redis_timeout,
+)
+REDIS = redis_config.create_redis_client()
+
+CLIENT_CONFIG = Config(
+ # connect_timeout (float or int) – The time in seconds till a timeout
+ # exception is thrown when attempting to make a connection. The default
+ # is 60 seconds.
+ connect_timeout=1,
+ # read_timeout (float or int) – The time in seconds till a timeout
+ # exception is thrown when attempting to read from a connection. The
+ # default is 60 seconds.
+ read_timeout=2.5,
+)
+
+# We shouldn't use this directly. Use our AMTManager wrapper
+AMT_CLIENT: MTurkClient = boto3.client(
+ service_name="mturk",
+ region_name="us-east-1",
+ endpoint_url=str(settings.amt_endpoint),
+ aws_access_key_id=settings.amt_access_id,
+ aws_secret_access_key=settings.amt_secret_key,
+ config=CLIENT_CONFIG,
+)
+
+SNS_CLIENT: SNSClient = boto3.client(
+ service_name="sns",
+ region_name="us-east-2",
+ aws_access_key_id=settings.amt_access_id,
+ aws_secret_access_key=settings.amt_secret_key,
+ config=CLIENT_CONFIG,
+)
+
+pg_config = PostgresConfig(
+ dsn=settings.amt_jb_db,
+ connect_timeout=1,
+ statement_timeout=1,
+)
+
+HTM = HitTypeManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+HM = HitManager(pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE])
+HQM = HitQuestionManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+AM = AssignmentManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+
+BM = BonusManager(
+ pg_config=pg_config, permissions=[Permission.READ, Permission.CREATE]
+)
+
+influx_client = None
+if settings.influx_db:
+ influx_client = InfluxDBClient.from_dsn(str(settings.influx_db))
diff --git a/jb/flow/__init__.py b/jb/flow/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/jb/flow/__init__.py
diff --git a/jb/flow/events.py b/jb/flow/events.py
new file mode 100644
index 0000000..3961a64
--- /dev/null
+++ b/jb/flow/events.py
@@ -0,0 +1,126 @@
+import logging
+import time
+from concurrent import futures
+from concurrent.futures import ThreadPoolExecutor, Executor, as_completed
+from typing import Optional
+
+import redis
+
+from jb.config import (
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ JB_EVENTS_FAILED_STREAM,
+)
+from jb.decorators import REDIS
+from jb.flow.assignment_tasks import process_assignment_submitted
+from jb.models.event import MTurkEvent
+
+
+def process_mturk_events_task():
+ executor = ThreadPoolExecutor(max_workers=5)
+ create_consumer_group()
+ while True:
+ try:
+ process_mturk_events(executor=executor)
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(1)
+
+
+def handle_pending_msgs_task():
+ while True:
+ try:
+ handle_pending_msgs()
+ except Exception as e:
+ logging.exception(e)
+ finally:
+ time.sleep(60)
+
+
+def process_mturk_events(executor: Executor):
+ while True:
+ n = process_mturk_events_chunk(executor=executor)
+ if n is None or n < 10:
+ break
+
+
+def create_consumer_group():
+ try:
+ REDIS.xgroup_create(JB_EVENTS_STREAM, CONSUMER_GROUP, id="0", mkstream=True)
+ except redis.exceptions.ResponseError as e:
+ if "BUSYGROUP Consumer Group name already exists" in str(e):
+ pass # group already exists
+ else:
+ raise
+
+
+def process_mturk_events_chunk(executor: Executor) -> Optional[int]:
+ msgs = REDIS.xreadgroup(
+ groupname=CONSUMER_GROUP,
+ consumername=CONSUMER_NAME,
+ streams={JB_EVENTS_STREAM: ">"},
+ count=10,
+ )
+ if not msgs:
+ return None
+ msgs = msgs[0][1] # the queue, we only have 1
+
+ fs = []
+ for msg in msgs:
+ msg_id, data = msg
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ fs.append(
+ executor.submit(process_assignment_submitted_event, event, msg_id)
+ )
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
+
+ futures.wait(fs, timeout=60)
+ return len(msgs)
+
+
+def process_assignment_submitted_event(event: MTurkEvent, msg_id: str):
+ process_assignment_submitted(event)
+ REDIS.xackdel(JB_EVENTS_STREAM, CONSUMER_GROUP, msg_id)
+
+
+def handle_pending_msgs():
+ # Looks in the redis queue for msgs that
+ # are pending (read by a consumer but not ACK). These prob failed.
+ # Below is from chatgpt, idk if it works
+ pending = REDIS.xpending_range(
+ JB_EVENTS_STREAM, CONSUMER_GROUP, min="-", max="+", count=10
+ )
+ for entry in pending:
+ msg_id = entry["message_id"]
+ # Claim message if idle > 10 sec
+ if entry["idle"] > 10_000: # milliseconds
+ claimed = REDIS.xclaim(
+ JB_EVENTS_STREAM,
+ CONSUMER_GROUP,
+ CONSUMER_NAME,
+ min_idle_time=10_000,
+ message_ids=[msg_id],
+ )
+ for cid, data in claimed:
+ msg_json = data["data"]
+ event = MTurkEvent.model_validate_json(msg_json)
+ if event.event_type == "AssignmentSubmitted":
+ # Try to process it again. If it fails, add
+ # it to the failed stream, so maybe we can fix
+ # and try again?
+ try:
+ process_assignment_submitted_event(event, cid)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ except Exception as e:
+ logging.exception(e)
+ REDIS.xadd(JB_EVENTS_FAILED_STREAM, data)
+ REDIS.xack(JB_EVENTS_STREAM, CONSUMER_GROUP, cid)
+ else:
+ logging.info(f"Discarding {event}")
+ REDIS.xdel(JB_EVENTS_STREAM, msg_id)
diff --git a/jb/flow/maintenance.py b/jb/flow/maintenance.py
new file mode 100644
index 0000000..5dc9cea
--- /dev/null
+++ b/jb/flow/maintenance.py
@@ -0,0 +1,26 @@
+from typing import Optional
+
+from jb.decorators import HM
+from jb.flow.monitoring import emit_hit_event
+from jb.managers.amt import AMTManager
+from jb.models.definitions import HitStatus
+
+
+def check_hit_status(
+ amt_hit_id: str, amt_hit_type_id: str, reason: Optional[str] = None
+) -> HitStatus:
+ """
+ (this used to be called "process_hit")
+ Request information from Amazon regarding the status of a HIT ID. Update the local state from
+ that response.
+ """
+ hit_status = AMTManager.get_hit_status(amt_hit_id=amt_hit_id)
+ # We're assuming that in the db this Hit is marked as Assignable, or else we wouldn't
+ # have called this function.
+ if hit_status != HitStatus.Assignable:
+ # todo: should update also assignment_pending_count, assignment_available_count, assignment_completed_count
+ HM.update_status(amt_hit_id=amt_hit_id, hit_status=hit_status)
+ emit_hit_event(
+ status=hit_status, amt_hit_type_id=amt_hit_type_id, reason=reason
+ )
+ return hit_status
diff --git a/jb/flow/monitoring.py b/jb/flow/monitoring.py
new file mode 100644
index 0000000..c8432bb
--- /dev/null
+++ b/jb/flow/monitoring.py
@@ -0,0 +1,156 @@
+import socket
+from typing import Optional
+
+from mypy_boto3_mturk.literals import EventTypeType
+
+from jb.config import settings
+from jb.decorators import influx_client
+from jb.models.currency import USDCent
+from jb.models.definitions import HitStatus, AssignmentStatus
+
+
+def write_hit_gauge(status: HitStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(), # could be "amt-jb-0"
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.hits",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def write_assignment_gauge(status: AssignmentStatus, amt_hit_type_id: str, cnt: int):
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.assignments",
+ "tags": tags,
+ "fields": {"count": cnt},
+ }
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_hit_event(
+ status: HitStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. a HIT was created, Reviewable, etc. We don't have a "created" HitStatus,
+ so it would just be when status=='Assignable'
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.hit_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_assignment_event(
+ status: AssignmentStatus, amt_hit_type_id: str, reason: Optional[str] = None
+):
+ """
+ e.g. an Assignment was accepted/approved/reject
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "status": status.value,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ if reason:
+ tags["reason"] = reason
+ point = {
+ "measurement": "amt_jb.assignment_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_mturk_notification_event(event_type: EventTypeType, amt_hit_type_id: str):
+ """
+ e.g. a Mturk notification was received. We just put it in redis, we haven't processed it yet.
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.mturk_notification_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_error_event(event_type: str, amt_hit_type_id: str):
+ """
+ e.g. todo: structure the error_types
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "event_type": event_type,
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.error_events",
+ "tags": tags,
+ "fields": {"value": 1},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
+
+
+def emit_bonus_event(amount: USDCent, amt_hit_type_id: str):
+ """
+ An AMT bonus was awarded
+ """
+ tags = {
+ "host": socket.gethostname(),
+ "service": "amt-jb",
+ "amt_hit_type_id": amt_hit_type_id,
+ "debug": settings.debug,
+ }
+ point = {
+ "measurement": "amt_jb.bonus_events",
+ "tags": tags,
+ "fields": {"value": 1, "amount": int(amount)},
+ }
+
+ if influx_client:
+ influx_client.write_points([point])
diff --git a/jb/main.py b/jb/main.py
new file mode 100644
index 0000000..8c1dbed
--- /dev/null
+++ b/jb/main.py
@@ -0,0 +1,59 @@
+from multiprocessing import Process
+
+from fastapi import FastAPI, Request
+from fastapi.responses import HTMLResponse
+from starlette.middleware.cors import CORSMiddleware
+from starlette.middleware.trustedhost import TrustedHostMiddleware
+
+from jb.views.common import common_router
+from jb.settings import BASE_HTML
+from jb.config import settings
+
+app = FastAPI(
+ servers=[
+ {
+ "url": "https://jamesbillings67.com/",
+ "description": "Production environment",
+ },
+ ],
+ title="jb",
+ version="1.0.0",
+)
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"])
+app.include_router(router=common_router)
+
+
+@app.get("/robots.txt")
+@app.get("/sitemap.xml")
+@app.get("/favicon.ico")
+def return_nothing():
+ return {}
+
+
+@app.get("/{full_path:path}")
+def serve_react_app(full_path: str):
+ # This serves index.html for any unmatched route
+ # React Router will then handle the client-side routing
+ return HTMLResponse(BASE_HTML)
+
+
+def schedule_tasks():
+ from jb.flow.events import process_mturk_events_task, handle_pending_msgs_task
+ from jb.flow.tasks import refill_hits_task
+
+ Process(target=process_mturk_events_task).start()
+ # Process(target=handle_pending_msgs_task).start()
+ Process(target=refill_hits_task).start()
+
+
+if not settings.debug:
+ schedule_tasks()
diff --git a/jb/views/tasks.py b/jb/views/tasks.py
new file mode 100644
index 0000000..7176b29
--- /dev/null
+++ b/jb/views/tasks.py
@@ -0,0 +1,70 @@
+from datetime import datetime, timezone, timedelta
+
+from fastapi import Request
+
+from jb.decorators import AM, HM
+from jb.flow.maintenance import check_hit_status
+from jb.flow.monitoring import emit_assignment_event
+from jb.models.assignment import AssignmentStub
+from jb.models.definitions import AssignmentStatus
+
+
+def process_request(request: Request) -> None:
+ """
+ A worker has loaded the HIT (work) page and (probably) accepted the HIT.
+ AMT creates an assignment, tied to this hit and this worker.
+ Create it in the DB.
+ """
+ amt_assignment_id = request.query_params.get("assignmentId", None)
+ if amt_assignment_id == "ASSIGNMENT_ID_NOT_AVAILABLE":
+ raise ValueError("shouldn't happen")
+ amt_hit_id = request.query_params.get("hitId", None)
+ amt_worker_id = request.query_params.get("workerId", None)
+ print(f"process_request: {amt_assignment_id=} {amt_worker_id=} {amt_hit_id=}")
+ assert amt_worker_id and amt_hit_id and amt_assignment_id
+
+ # Check that the HIT is still valid
+ hit = HM.get_from_amt_id(amt_hit_id=amt_hit_id)
+ _ = check_hit_status(amt_hit_id=amt_hit_id, amt_hit_type_id=hit.amt_hit_type_id)
+ emit_assignment_event(
+ status=AssignmentStatus.Accepted,
+ amt_hit_type_id=hit.amt_hit_type_id,
+ )
+ # I think it won't be assignable anymore? idk
+ # assert hit_status == HitStatus.Assignable, f"hit {amt_hit_id} {hit_status=}. Expected Assignable"
+
+ # I would like to verify in the AMT API that this assignment is valid, but there
+ # is no way to do that (until the assignment is submitted)
+
+ # # Make an offerwall to create a user account...
+ # # todo: GSS: Do we really need to do this???
+ # client_ip = get_client_ip(request)
+ # url = f"{settings.fsb_host}{settings.product_id}/offerwall/45b7228a7/"
+ # _ = requests.get(
+ # url,
+ # {"bpuid": amt_worker_id, "ip": client_ip, "n_bins": 1, "format": "json"},
+ # ).json()
+
+ # This assignment shouldn't already exist. If it does, just make sure it
+ # is all the same.
+ assignment_stub = AM.get_stub_if_exists(amt_assignment_id=amt_assignment_id)
+ if assignment_stub:
+ print(f"{assignment_stub=}")
+ assert assignment_stub.amt_worker_id == amt_worker_id
+ assert assignment_stub.amt_assignment_id == amt_assignment_id
+ assert assignment_stub.created_at > (
+ datetime.now(tz=timezone.utc) - timedelta(minutes=90)
+ )
+ return None
+
+ assignment_stub = AssignmentStub(
+ amt_hit_id=amt_hit_id,
+ amt_worker_id=amt_worker_id,
+ amt_assignment_id=amt_assignment_id,
+ status=AssignmentStatus.Accepted,
+ hit_id=hit.id,
+ )
+
+ AM.create_stub(stub=assignment_stub)
+
+ return None
diff --git a/jb/views/utils.py b/jb/views/utils.py
new file mode 100644
index 0000000..39db5d2
--- /dev/null
+++ b/jb/views/utils.py
@@ -0,0 +1,15 @@
+from fastapi import Request
+
+
+def get_client_ip(request: Request) -> str:
+ """
+ Using a testclient, the ip returned is 'testclient'. If so, instead, grab
+ the ip from the headers
+ """
+ ip = request.headers.get("X-Forwarded-For")
+ if not ip:
+ ip = request.client.host
+ elif ip == "testclient" or ip.startswith("10."):
+ forwarded = request.headers.get("X-Forwarded-For")
+ ip = forwarded.split(",")[0].strip() if forwarded else request.client.host
+ return ip
diff --git a/telegraf.conf b/telegraf.conf
new file mode 100644
index 0000000..f00e8ad
--- /dev/null
+++ b/telegraf.conf
@@ -0,0 +1,100 @@
+# Telegraf Configuration
+
+# Global tags can be specified here in key="value" format.
+[global_tags]
+ dc = "fmt2"
+ rack = "1500.1.59"
+ host_kind = "amt-jb"
+ ## Environment variables can be used as tags, and throughout the config file
+ # user = "$USER"
+
+# Configuration for telegraf agent
+[agent]
+ ## Default data collection interval for all inputs
+ interval = "10s"
+ ## Rounds collection interval to 'interval'
+ ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
+ round_interval = true
+
+ ## Telegraf will send metrics to outputs in batches of at most
+ ## metric_batch_size metrics.
+ ## This controls the size of writes that Telegraf sends to output plugins.
+ metric_batch_size = 1000
+
+ ## Maximum number of unwritten metrics per output. Increasing this value
+ ## allows for longer periods of output downtime without dropping metrics at the
+ ## cost of higher maximum memory usage.
+ metric_buffer_limit = 10000
+
+ ## Collection jitter is used to jitter the collection by a random amount.
+ ## Each plugin will sleep for a random time within jitter before collecting.
+ ## This can be used to avoid many plugins querying things like sysfs at the
+ ## same time, which can have a measurable effect on the system.
+ collection_jitter = "0s"
+
+ ## Collection offset is used to shift the collection by the given amount.
+ ## This can be be used to avoid many plugins querying constraint devices
+ ## at the same time by manually scheduling them in time.
+ # collection_offset = "0s"
+
+ ## Default flushing interval for all outputs. Maximum flush_interval will be
+ ## flush_interval + flush_jitter
+ flush_interval = "1s"
+ ## Jitter the flush interval by a random amount. This is primarily to avoid
+ ## large write spikes for users running a large number of telegraf instances.
+ ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
+ flush_jitter = "0s"
+
+ ## Collected metrics are rounded to the precision specified. Precision is
+ ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
+ ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
+ ##
+ ## By default or when set to "0s", precision will be set to the same
+ ## timestamp order as the collection interval, with the maximum being 1s:
+ ## ie, when interval = "10s", precision will be "1s"
+ ## when interval = "250ms", precision will be "1ms"
+ ##
+ ## Precision will NOT be used for service inputs. It is up to each individual
+ ## service input to set the timestamp at the appropriate precision.
+ precision = "0s"
+
+
+# Configuration for sending metrics to InfluxDB
+[[outputs.influxdb]]
+ urls = ["http://influxdb.fmt2.grl.internal:8086"]
+ username = "$INFLUX_USER"
+ password = "$INFLUX_PASS"
+
+# [[outputs.file]]
+# files = ["stdout"]
+
+###############################################################################
+# PROCESSOR PLUGINS #
+###############################################################################
+
+[[processors.converter]]
+ [processors.converter.fields]
+ float = ["request_time", "upstream_response_time"]
+
+###############################################################################
+# SERVICE INPUT PLUGINS #
+###############################################################################
+
+[[inputs.cpu]]
+ report_active = true
+
+[[inputs.net]]
+[[inputs.mem]]
+[[inputs.disk]]
+
+
+[[inputs.tail]]
+ name_override = "nginx_access"
+ files = ["/var/log/nginx/access.log"]
+ initial_read_offset = "saved-or-end"
+ data_format = "json"
+ json_strict = false
+ json_time_key = "time"
+ json_time_format = "unix"
+ tag_keys = ["status", "method", "upstream_route", "upstream_cache_hit", "product_id"]
+ json_string_fields = ["request_time", "upstream_response_time"]
diff --git a/tests/http/__init__.py b/tests/http/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/http/__init__.py
diff --git a/tests/http/conftest.py b/tests/http/conftest.py
new file mode 100644
index 0000000..200bf1c
--- /dev/null
+++ b/tests/http/conftest.py
@@ -0,0 +1,50 @@
+import httpx
+import pytest
+import requests_mock
+from asgi_lifespan import LifespanManager
+from httpx import AsyncClient, ASGITransport
+
+from jb.main import app
+
+
+@pytest.fixture(scope="session")
+def anyio_backend():
+ return "asyncio"
+
+
+@pytest.fixture(scope="session")
+async def httpxclient():
+ # limiter.enabled = True
+ # limiter.reset()
+ app.testing = True
+
+ async with LifespanManager(app):
+ # await FastAPICache.clear()
+ transport = ASGITransport(app=app)
+ async with AsyncClient(
+ transport=transport, base_url="http://127.0.0.1:8001/"
+ ) as client:
+ yield client
+ await client.aclose()
+
+
+@pytest.fixture()
+def no_limit():
+ """Fixture to execute asserts before and after a test is run"""
+ # limiter.enabled = False
+ yield # this is where the testing happens
+ # limiter.enabled = True
+
+
+@pytest.fixture()
+def httpxclient_ip(httpxclient):
+ """Fixture to execute asserts before and after a test is run"""
+ httpxclient._transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 8001))
+ yield httpxclient # this is where the testing happens
+ httpxclient._transport = httpx.ASGITransport(app=app)
+
+
+@pytest.fixture
+def mock_requests():
+ with requests_mock.Mocker() as m:
+ yield m
diff --git a/tests/http/test_basic.py b/tests/http/test_basic.py
new file mode 100644
index 0000000..7b03a1e
--- /dev/null
+++ b/tests/http/test_basic.py
@@ -0,0 +1,35 @@
+import pytest
+from httpx import AsyncClient
+
+
+@pytest.mark.anyio
+async def test_base(httpxclient: AsyncClient):
+ client = httpxclient
+ res = await client.get("/")
+ # actually returns 404. old test expects 401. idk what is should be
+ print(res.text)
+ # assert res.status_code == 404
+ assert res.status_code == 200
+
+
+@pytest.mark.anyio
+async def test_static_file_alias(httpxclient: AsyncClient):
+ client = httpxclient
+ """
+ These are here for site crawlers and stuff..
+ """
+ for p in ["/robots.txt", "/sitemap.xml", "/favicon.ico"]:
+ res = await client.get(p)
+ assert res.status_code == 200, p
+ assert res.json() == {}
+
+
+@pytest.mark.anyio
+async def test_health(httpxclient: AsyncClient):
+ client = httpxclient
+ """
+ These are here for site crawlers and stuff..
+ """
+ res = await client.get("/health/")
+ assert res.status_code == 200
+ assert res.json() == {}
diff --git a/tests/http/test_work.py b/tests/http/test_work.py
new file mode 100644
index 0000000..59b8830
--- /dev/null
+++ b/tests/http/test_work.py
@@ -0,0 +1,24 @@
+import pytest
+from httpx import AsyncClient
+
+from jb.models.hit import Hit
+
+
+@pytest.mark.skip(reason="hits live api, need to look at this")
+async def test_work(
+ httpxclient: AsyncClient,
+ no_limit,
+ amt_worker_id,
+ amt_assignment_id,
+ hit_in_amt: Hit,
+):
+ client = httpxclient
+ params = {
+ "workerId": amt_worker_id,
+ "assignmentId": amt_assignment_id,
+ "hitId": hit_in_amt.amt_hit_id,
+ }
+ res = await client.get(f"/work/", params=params)
+ assert res.status_code == 200
+ # the response is an html page
+ assert res.headers["content-type"] == "text/html; charset=utf-8"
diff --git a/tests_sandbox/__init__.py b/tests_sandbox/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests_sandbox/__init__.py
diff --git a/tests_sandbox/test_flow.py b/tests_sandbox/test_flow.py
new file mode 100644
index 0000000..7925d92
--- /dev/null
+++ b/tests_sandbox/test_flow.py
@@ -0,0 +1,29 @@
+from jb.decorators import HM
+from jb.flow.tasks import refill_hits, check_stale_hits, check_expired_hits
+
+
+def test_refill_hits(
+ set_hit_types_in_db_min_active_0, hit_type_in_db, expire_all_hits, amt_manager
+):
+
+ assert HM.get_active_count(hit_type_in_db.id) == 0
+ assert hit_type_in_db.min_active > 0
+ refill_hits()
+ assert HM.get_active_count(hit_type_in_db.id) == hit_type_in_db.min_active
+
+ amt_hit_ids = HM.filter_active_ids(hit_type_id=hit_type_in_db.id)
+ amt_hit_id = list(amt_hit_ids)[0]
+ hit, _ = amt_manager.get_hit_if_exists(amt_hit_id=amt_hit_id)
+ assert hit
+
+
+def test_check_stale_hits():
+ # todo: I'd have to create some purposely stale hits.
+ # just make sure it runs for now
+ check_stale_hits()
+
+
+def test_check_expired_hits():
+ # todo: I'd have to create some purposely expired hits.
+ # just make sure it runs for now
+ check_expired_hits()
diff --git a/tests_sandbox/utils.py b/tests_sandbox/utils.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests_sandbox/utils.py