aboutsummaryrefslogtreecommitdiff
path: root/tests/managers/thl/test_ledger/test_thl_lm_bp_payout.py
diff options
context:
space:
mode:
authorMax Nanis2026-03-06 16:49:46 -0500
committerMax Nanis2026-03-06 16:49:46 -0500
commit91d040211a4ed6e4157896256a762d3854777b5e (patch)
treecd95922ea4257dc8d3f4e4cbe8534474709a20dc /tests/managers/thl/test_ledger/test_thl_lm_bp_payout.py
downloadgeneralresearch-91d040211a4ed6e4157896256a762d3854777b5e.tar.gz
generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.zip
Initial commitv3.3.4
Diffstat (limited to 'tests/managers/thl/test_ledger/test_thl_lm_bp_payout.py')
-rw-r--r--tests/managers/thl/test_ledger/test_thl_lm_bp_payout.py516
1 files changed, 516 insertions, 0 deletions
diff --git a/tests/managers/thl/test_ledger/test_thl_lm_bp_payout.py b/tests/managers/thl/test_ledger/test_thl_lm_bp_payout.py
new file mode 100644
index 0000000..294d092
--- /dev/null
+++ b/tests/managers/thl/test_ledger/test_thl_lm_bp_payout.py
@@ -0,0 +1,516 @@
+import logging
+from datetime import datetime, timezone, timedelta
+from decimal import Decimal
+from random import randint
+from uuid import uuid4
+
+import pytest
+import redis
+from pydantic import RedisDsn
+from redis.lock import Lock
+
+from generalresearch.currency import USDCent
+from generalresearch.managers.base import Permission
+from generalresearch.managers.thl.ledger_manager.thl_ledger import ThlLedgerManager
+from generalresearch.managers.thl.ledger_manager.exceptions import (
+ LedgerTransactionFlagAlreadyExistsError,
+ LedgerTransactionConditionFailedError,
+ LedgerTransactionReleaseLockError,
+ LedgerTransactionCreateError,
+)
+from generalresearch.managers.thl.ledger_manager.ledger import LedgerTransaction
+from generalresearch.models import Source
+from generalresearch.models.thl.definitions import PayoutStatus
+from generalresearch.models.thl.ledger import Direction, TransactionType
+from generalresearch.models.thl.session import (
+ Wall,
+ Status,
+ StatusCode1,
+ Session,
+)
+from generalresearch.models.thl.user import User
+from generalresearch.models.thl.wallet import PayoutType
+from generalresearch.redis_helper import RedisConfig
+
+
+def broken_acquire(self, *args, **kwargs):
+ raise redis.exceptions.TimeoutError("Simulated timeout during acquire")
+
+
+def broken_release(self, *args, **kwargs):
+ raise redis.exceptions.TimeoutError("Simulated timeout during release")
+
+
+class TestThlLedgerManagerBPPayout:
+
+ def test_create_tx_with_bp_payment(
+ self,
+ user_factory,
+ product_user_wallet_no,
+ create_main_accounts,
+ caplog,
+ thl_lm,
+ delete_ledger_db,
+ ):
+ delete_ledger_db()
+ create_main_accounts()
+
+ now = datetime.now(timezone.utc) - timedelta(hours=1)
+ user: User = user_factory(product=product_user_wallet_no)
+
+ wall1 = Wall(
+ user_id=user.user_id,
+ source=Source.DYNATA,
+ req_survey_id="xxx",
+ req_cpi=Decimal("6.00"),
+ session_id=1,
+ status=Status.COMPLETE,
+ status_code_1=StatusCode1.COMPLETE,
+ started=now,
+ finished=now + timedelta(seconds=1),
+ )
+ tx = thl_lm.create_tx_task_complete(
+ wall=wall1, user=user, created=wall1.started
+ )
+ assert isinstance(tx, LedgerTransaction)
+
+ session = Session(started=wall1.started, user=user, wall_events=[wall1])
+ status, status_code_1 = session.determine_session_status()
+ thl_net, commission_amount, bp_pay, user_pay = session.determine_payments()
+ session.update(
+ **{
+ "status": status,
+ "status_code_1": status_code_1,
+ "finished": now + timedelta(minutes=10),
+ "payout": bp_pay,
+ "user_payout": user_pay,
+ }
+ )
+ thl_lm.create_tx_bp_payment(session=session, created=wall1.started)
+
+ lock_key = f"test:bp_payout:{user.product.id}"
+ flag_name = f"{thl_lm.cache_prefix}:transaction_flag:{lock_key}"
+ thl_lm.redis_client.delete(flag_name)
+
+ payoutevent_uuid = uuid4().hex
+ thl_lm.create_tx_bp_payout(
+ product=user.product,
+ amount=USDCent(200),
+ created=now,
+ payoutevent_uuid=payoutevent_uuid,
+ )
+
+ payoutevent_uuid = uuid4().hex
+ thl_lm.create_tx_bp_payout(
+ product=user.product,
+ amount=USDCent(200),
+ created=now + timedelta(minutes=2),
+ skip_one_per_day_check=True,
+ payoutevent_uuid=payoutevent_uuid,
+ )
+
+ cash = thl_lm.get_account_cash()
+ bp_wallet_account = thl_lm.get_account_or_create_bp_wallet(user.product)
+ assert 170 == thl_lm.get_account_balance(bp_wallet_account)
+ assert 200 == thl_lm.get_account_balance(cash)
+
+ with pytest.raises(expected_exception=LedgerTransactionFlagAlreadyExistsError):
+ thl_lm.create_tx_bp_payout(
+ user.product,
+ amount=USDCent(200),
+ created=now + timedelta(minutes=2),
+ skip_one_per_day_check=False,
+ skip_wallet_balance_check=False,
+ payoutevent_uuid=payoutevent_uuid,
+ )
+
+ payoutevent_uuid = uuid4().hex
+ with caplog.at_level(logging.INFO):
+ with pytest.raises(LedgerTransactionConditionFailedError):
+ thl_lm.create_tx_bp_payout(
+ user.product,
+ amount=USDCent(10_000),
+ created=now + timedelta(minutes=2),
+ skip_one_per_day_check=True,
+ skip_wallet_balance_check=False,
+ payoutevent_uuid=payoutevent_uuid,
+ )
+ assert "failed condition check balance:" in caplog.text
+
+ thl_lm.create_tx_bp_payout(
+ product=user.product,
+ amount=USDCent(10_00),
+ created=now + timedelta(minutes=2),
+ skip_one_per_day_check=True,
+ skip_wallet_balance_check=True,
+ payoutevent_uuid=payoutevent_uuid,
+ )
+ assert 170 - 1000 == thl_lm.get_account_balance(bp_wallet_account)
+
+ def test_create_tx(self, product, caplog, thl_lm, currency):
+ rand_amount: USDCent = USDCent(randint(100, 1_000))
+ payoutevent_uuid = uuid4().hex
+
+ # Create a BP Payout for a Product without any activity. By issuing,
+ # the skip_* checks, we should be able to force it to work, and will
+ # then ultimately result in a negative balance
+ tx = thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid,
+ created=datetime.now(tz=timezone.utc),
+ skip_wallet_balance_check=True,
+ skip_one_per_day_check=True,
+ skip_flag_check=True,
+ )
+
+ # Check the basic attributes
+ assert isinstance(tx, LedgerTransaction)
+ assert tx.ext_description == "BP Payout"
+ assert (
+ tx.tag
+ == f"{currency.value}:{TransactionType.BP_PAYOUT.value}:{payoutevent_uuid}"
+ )
+ assert tx.entries[0].amount == rand_amount
+ assert tx.entries[1].amount == rand_amount
+
+ # Check the Product's balance, it should be negative the amount that was
+ # paid out. That's because the Product earned nothing.. and then was
+ # sent something.
+ balance = thl_lm.get_account_balance(
+ account=thl_lm.get_account_or_create_bp_wallet(product=product)
+ )
+ assert balance == int(rand_amount) * -1
+
+ # Test some basic assertions
+ with caplog.at_level(logging.INFO):
+ with pytest.raises(expected_exception=Exception):
+ thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=uuid4().hex,
+ created=datetime.now(tz=timezone.utc),
+ skip_wallet_balance_check=False,
+ skip_one_per_day_check=False,
+ skip_flag_check=False,
+ )
+ assert "failed condition check >1 tx per day" in caplog.text
+
+ def test_create_tx_redis_failure(self, product, thl_web_rw, thl_lm):
+ rand_amount: USDCent = USDCent(randint(100, 1_000))
+ payoutevent_uuid = uuid4().hex
+ now = datetime.now(tz=timezone.utc)
+
+ thl_lm.create_tx_plug_bp_wallet(
+ product, rand_amount, now, direction=Direction.CREDIT
+ )
+
+ # Non routable IP address. Redis will fail
+ thl_lm_redis_0 = ThlLedgerManager(
+ pg_config=thl_web_rw,
+ permissions=[
+ Permission.CREATE,
+ Permission.READ,
+ Permission.UPDATE,
+ Permission.DELETE,
+ ],
+ testing=True,
+ redis_config=RedisConfig(
+ dsn=RedisDsn("redis://10.255.255.1:6379"),
+ socket_connect_timeout=0.1,
+ ),
+ )
+
+ with pytest.raises(expected_exception=Exception) as e:
+ tx = thl_lm_redis_0.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid,
+ created=datetime.now(tz=timezone.utc),
+ )
+ assert e.type is redis.exceptions.TimeoutError
+ # No txs were created
+ bp_wallet_account = thl_lm.get_account_or_create_bp_wallet(product=product)
+ txs = thl_lm.get_tx_filtered_by_account(account_uuid=bp_wallet_account.uuid)
+ txs = [tx for tx in txs if tx.metadata["tx_type"] != "plug"]
+ assert len(txs) == 0
+
+ def test_create_tx_multiple_per_day(self, product, thl_lm):
+ rand_amount: USDCent = USDCent(randint(100, 1_000))
+ payoutevent_uuid = uuid4().hex
+ now = datetime.now(tz=timezone.utc)
+
+ thl_lm.create_tx_plug_bp_wallet(
+ product, rand_amount * USDCent(2), now, direction=Direction.CREDIT
+ )
+
+ tx = thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid,
+ created=datetime.now(tz=timezone.utc),
+ )
+
+ # Try to create another
+ # Will fail b/c it has the same payout event uuid
+ with pytest.raises(expected_exception=Exception) as e:
+ tx = thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid,
+ created=datetime.now(tz=timezone.utc),
+ )
+ assert e.type is LedgerTransactionFlagAlreadyExistsError
+
+ # Try to create another
+ # Will fail due to multiple per day
+ payoutevent_uuid2 = uuid4().hex
+ with pytest.raises(expected_exception=Exception) as e:
+ tx = thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid2,
+ created=datetime.now(tz=timezone.utc),
+ )
+ assert e.type is LedgerTransactionConditionFailedError
+ assert str(e.value) == ">1 tx per day"
+
+ # Make it run by skipping one per day check
+ tx = thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid2,
+ created=datetime.now(tz=timezone.utc),
+ skip_one_per_day_check=True,
+ )
+
+ def test_create_tx_redis_lock_release_error(self, product, thl_lm):
+ rand_amount: USDCent = USDCent(randint(100, 1_000))
+ payoutevent_uuid = uuid4().hex
+ now = datetime.now(tz=timezone.utc)
+ bp_wallet_account = thl_lm.get_account_or_create_bp_wallet(product=product)
+
+ thl_lm.create_tx_plug_bp_wallet(
+ product, rand_amount * USDCent(2), now, direction=Direction.CREDIT
+ )
+
+ original_acquire = Lock.acquire
+ original_release = Lock.release
+ Lock.acquire = broken_acquire
+
+ # Create TX will fail on lock enter, no tx will actually get created
+ with pytest.raises(expected_exception=Exception) as e:
+ tx = thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid,
+ created=datetime.now(tz=timezone.utc),
+ )
+ assert e.type is LedgerTransactionCreateError
+ assert str(e.value) == "Redis error: Simulated timeout during acquire"
+ txs = thl_lm.get_tx_filtered_by_account(account_uuid=bp_wallet_account.uuid)
+ txs = [tx for tx in txs if tx.metadata["tx_type"] != "plug"]
+ assert len(txs) == 0
+
+ Lock.acquire = original_acquire
+ Lock.release = broken_release
+
+ # Create TX will fail on lock exit, after the tx was created!
+ with pytest.raises(expected_exception=Exception) as e:
+ tx = thl_lm.create_tx_bp_payout(
+ product=product,
+ amount=rand_amount,
+ payoutevent_uuid=payoutevent_uuid,
+ created=datetime.now(tz=timezone.utc),
+ )
+ assert e.type is LedgerTransactionReleaseLockError
+ assert str(e.value) == "Redis error: Simulated timeout during release"
+
+ # Transaction was still created!
+ txs = thl_lm.get_tx_filtered_by_account(account_uuid=bp_wallet_account.uuid)
+ txs = [tx for tx in txs if tx.metadata["tx_type"] != "plug"]
+ assert len(txs) == 1
+ Lock.release = original_release
+
+
+class TestPayoutEventManagerBPPayout:
+
+ def test_create(self, product, thl_lm, brokerage_product_payout_event_manager):
+ rand_amount: USDCent = USDCent(randint(100, 1_000))
+ now = datetime.now(tz=timezone.utc)
+ bp_wallet_account = thl_lm.get_account_or_create_bp_wallet(product=product)
+ assert thl_lm.get_account_balance(bp_wallet_account) == 0
+ thl_lm.create_tx_plug_bp_wallet(
+ product, rand_amount, now, direction=Direction.CREDIT
+ )
+ assert thl_lm.get_account_balance(bp_wallet_account) == rand_amount
+ brokerage_product_payout_event_manager.set_account_lookup_table(thl_lm=thl_lm)
+
+ pe = brokerage_product_payout_event_manager.create_bp_payout_event(
+ thl_ledger_manager=thl_lm,
+ product=product,
+ created=now,
+ amount=rand_amount,
+ payout_type=PayoutType.ACH,
+ )
+ assert brokerage_product_payout_event_manager.check_for_ledger_tx(
+ thl_ledger_manager=thl_lm,
+ product_id=product.id,
+ amount=rand_amount,
+ payout_event=pe,
+ )
+ assert thl_lm.get_account_balance(bp_wallet_account) == 0
+
+ def test_create_with_redis_error(
+ self, product, caplog, thl_lm, brokerage_product_payout_event_manager
+ ):
+ caplog.set_level("WARNING")
+ original_acquire = Lock.acquire
+ original_release = Lock.release
+
+ rand_amount: USDCent = USDCent(randint(100, 1_000))
+ now = datetime.now(tz=timezone.utc)
+ bp_wallet_account = thl_lm.get_account_or_create_bp_wallet(product=product)
+ assert thl_lm.get_account_balance(bp_wallet_account) == 0
+ thl_lm.create_tx_plug_bp_wallet(
+ product, rand_amount, now, direction=Direction.CREDIT
+ )
+ assert thl_lm.get_account_balance(bp_wallet_account) == rand_amount
+ brokerage_product_payout_event_manager.set_account_lookup_table(thl_lm=thl_lm)
+
+ # Will fail on lock enter, no tx will actually get created
+ Lock.acquire = broken_acquire
+ with pytest.raises(expected_exception=Exception) as e:
+ pe = brokerage_product_payout_event_manager.create_bp_payout_event(
+ thl_ledger_manager=thl_lm,
+ product=product,
+ created=now,
+ amount=rand_amount,
+ payout_type=PayoutType.ACH,
+ )
+ assert e.type is LedgerTransactionCreateError
+ assert str(e.value) == "Redis error: Simulated timeout during acquire"
+ assert any(
+ "Simulated timeout during acquire. No ledger tx was created" in m
+ for m in caplog.messages
+ )
+
+ txs = thl_lm.get_tx_filtered_by_account(account_uuid=bp_wallet_account.uuid)
+ txs = [tx for tx in txs if tx.metadata["tx_type"] != "plug"]
+ # One payout event is created, status is failed, and no ledger txs exist
+ assert len(txs) == 0
+ pes = (
+ brokerage_product_payout_event_manager.get_bp_bp_payout_events_for_products(
+ thl_ledger_manager=thl_lm, product_uuids=[product.id]
+ )
+ )
+ assert len(pes) == 1
+ assert pes[0].status == PayoutStatus.FAILED
+ pe = pes[0]
+
+ # Fix the redis method
+ Lock.acquire = original_acquire
+
+ # Try to fix the failed payout, by trying ledger tx again
+ brokerage_product_payout_event_manager.retry_create_bp_payout_event_tx(
+ product=product, thl_ledger_manager=thl_lm, payout_event_uuid=pe.uuid
+ )
+ txs = thl_lm.get_tx_filtered_by_account(account_uuid=bp_wallet_account.uuid)
+ txs = [tx for tx in txs if tx.metadata["tx_type"] != "plug"]
+ assert len(txs) == 1
+ assert thl_lm.get_account_balance(bp_wallet_account) == 0
+
+ # And then try to run it again, it'll fail because a payout event with the same info exists
+ with pytest.raises(expected_exception=Exception) as e:
+ pe = brokerage_product_payout_event_manager.create_bp_payout_event(
+ thl_ledger_manager=thl_lm,
+ product=product,
+ created=now,
+ amount=rand_amount,
+ payout_type=PayoutType.ACH,
+ )
+ assert e.type is ValueError
+ assert "Payout event already exists!" in str(e.value)
+
+ # We wouldn't do this in practice, because this is paying out the BP again, but
+ # we can if want to.
+ # Change the timestamp so it'll create a new payout event
+ now = datetime.now(tz=timezone.utc)
+ with pytest.raises(LedgerTransactionConditionFailedError) as e:
+ pe = brokerage_product_payout_event_manager.create_bp_payout_event(
+ thl_ledger_manager=thl_lm,
+ product=product,
+ created=now,
+ amount=rand_amount,
+ payout_type=PayoutType.ACH,
+ )
+ # But it will fail due to 1 per day check
+ assert str(e.value) == ">1 tx per day"
+ pe = brokerage_product_payout_event_manager.get_by_uuid(e.value.pe_uuid)
+ assert pe.status == PayoutStatus.FAILED
+
+ # And if we really want to, we can make it again
+ now = datetime.now(tz=timezone.utc)
+ pe = brokerage_product_payout_event_manager.create_bp_payout_event(
+ thl_ledger_manager=thl_lm,
+ product=product,
+ created=now,
+ amount=rand_amount,
+ payout_type=PayoutType.ACH,
+ skip_one_per_day_check=True,
+ skip_wallet_balance_check=True,
+ )
+
+ txs = thl_lm.get_tx_filtered_by_account(account_uuid=bp_wallet_account.uuid)
+ txs = [tx for tx in txs if tx.metadata["tx_type"] != "plug"]
+ assert len(txs) == 2
+ # since they were paid twice
+ assert thl_lm.get_account_balance(bp_wallet_account) == 0 - rand_amount
+
+ Lock.release = original_release
+ Lock.acquire = original_acquire
+
+ def test_create_with_redis_error_release(
+ self, product, caplog, thl_lm, brokerage_product_payout_event_manager
+ ):
+ caplog.set_level("WARNING")
+
+ original_release = Lock.release
+
+ rand_amount: USDCent = USDCent(randint(100, 1_000))
+ now = datetime.now(tz=timezone.utc)
+ bp_wallet_account = thl_lm.get_account_or_create_bp_wallet(product=product)
+ brokerage_product_payout_event_manager.set_account_lookup_table(thl_lm=thl_lm)
+
+ assert thl_lm.get_account_balance(bp_wallet_account) == 0
+ thl_lm.create_tx_plug_bp_wallet(
+ product, rand_amount, now, direction=Direction.CREDIT
+ )
+ assert thl_lm.get_account_balance(bp_wallet_account) == rand_amount
+
+ # Will fail on lock exit, after the tx was created!
+ # But it'll see that the tx was created and so everything will be fine
+ Lock.release = broken_release
+ pe = brokerage_product_payout_event_manager.create_bp_payout_event(
+ thl_ledger_manager=thl_lm,
+ product=product,
+ created=now,
+ amount=rand_amount,
+ payout_type=PayoutType.ACH,
+ )
+ assert any(
+ "Simulated timeout during release but ledger tx exists" in m
+ for m in caplog.messages
+ )
+
+ txs = thl_lm.get_tx_filtered_by_account(account_uuid=bp_wallet_account.uuid)
+ txs = [tx for tx in txs if tx.metadata["tx_type"] != "plug"]
+ assert len(txs) == 1
+ pes = (
+ brokerage_product_payout_event_manager.get_bp_bp_payout_events_for_products(
+ thl_ledger_manager=thl_lm, product_uuids=[product.uuid]
+ )
+ )
+ assert len(pes) == 1
+ assert pes[0].status == PayoutStatus.COMPLETE
+ Lock.release = original_release