diff options
Diffstat (limited to 'tests/managers/thl/test_ledger/test_lm_tx_locks.py')
| -rw-r--r-- | tests/managers/thl/test_ledger/test_lm_tx_locks.py | 371 |
1 files changed, 371 insertions, 0 deletions
diff --git a/tests/managers/thl/test_ledger/test_lm_tx_locks.py b/tests/managers/thl/test_ledger/test_lm_tx_locks.py new file mode 100644 index 0000000..df2611b --- /dev/null +++ b/tests/managers/thl/test_ledger/test_lm_tx_locks.py @@ -0,0 +1,371 @@ +import logging +from datetime import datetime, timezone, timedelta +from decimal import Decimal +from typing import Callable + +import pytest + +from generalresearch.managers.thl.ledger_manager.conditions import ( + generate_condition_mp_payment, +) +from generalresearch.managers.thl.ledger_manager.exceptions import ( + LedgerTransactionCreateLockError, + LedgerTransactionFlagAlreadyExistsError, + LedgerTransactionCreateError, +) +from generalresearch.models import Source +from generalresearch.models.thl.ledger import LedgerTransaction +from generalresearch.models.thl.session import ( + Wall, + Status, + StatusCode1, + Session, + WallAdjustedStatus, +) +from generalresearch.models.thl.user import User +from test_utils.models.conftest import user_factory, session, product_user_wallet_no + +logger = logging.getLogger("LedgerManager") + + +class TestLedgerLocks: + + def test_a( + self, + user_factory, + session_factory, + product_user_wallet_no, + create_main_accounts, + caplog, + thl_lm, + lm, + utc_hour_ago, + currency, + wall_factory, + delete_ledger_db, + ): + """ + TODO: This whole test is confusing a I don't really understand. + It needs to be better documented and explained what we want + it to do and evaluate... + """ + delete_ledger_db() + create_main_accounts() + + user: User = user_factory(product=product_user_wallet_no) + s1 = session_factory( + user=user, + wall_count=3, + wall_req_cpis=[Decimal("1.23"), Decimal("3.21"), Decimal("4")], + wall_statuses=[Status.COMPLETE, Status.COMPLETE, Status.COMPLETE], + ) + + # A User does a Wall Completion in Session=1 + w1 = s1.wall_events[0] + tx = thl_lm.create_tx_task_complete(wall=w1, user=user, created=w1.started) + assert isinstance(tx, LedgerTransaction) + + # A User does another Wall Completion in Session=1 + w2 = s1.wall_events[1] + tx = thl_lm.create_tx_task_complete(wall=w2, user=user, created=w2.started) + assert isinstance(tx, LedgerTransaction) + + # That first Wall Complete was "adjusted" to instead be marked + # as a Failure + w1.update( + adjusted_status=WallAdjustedStatus.ADJUSTED_TO_FAIL, + adjusted_cpi=0, + adjusted_timestamp=utc_hour_ago + timedelta(hours=1), + ) + tx = thl_lm.create_tx_task_adjustment(wall=w1, user=user) + assert isinstance(tx, LedgerTransaction) + + # A User does another! Wall Completion in Session=1; however, we + # don't create a transaction for it + w3 = s1.wall_events[2] + + # Make sure we clear any flags/locks first + lock_key = f"{currency.value}:thl_wall:{w3.uuid}" + lock_name = f"{lm.cache_prefix}:transaction_lock:{lock_key}" + flag_name = f"{lm.cache_prefix}:transaction_flag:{lock_key}" + lm.redis_client.delete(lock_name) + lm.redis_client.delete(flag_name) + + # Despite the + f1 = generate_condition_mp_payment(wall=w1) + f2 = generate_condition_mp_payment(wall=w2) + f3 = generate_condition_mp_payment(wall=w3) + assert f1(lm=lm) is False + assert f2(lm=lm) is False + assert f3(lm=lm) is True + + condition = f3 + create_tx_func = lambda: thl_lm.create_tx_task_complete_(wall=w3, user=user) + assert isinstance(create_tx_func, Callable) + assert f3(lm) is True + + lm.redis_client.delete(flag_name) + lm.redis_client.delete(lock_name) + + tx = thl_lm.create_tx_protected( + lock_key=lock_key, condition=condition, create_tx_func=create_tx_func + ) + assert f3(lm) is False + + # purposely hold the lock open + tx = None + lm.redis_client.set(lock_name, "1") + with caplog.at_level(logging.ERROR): + with pytest.raises(expected_exception=LedgerTransactionCreateLockError): + tx = thl_lm.create_tx_protected( + lock_key=lock_key, + condition=condition, + create_tx_func=create_tx_func, + ) + assert tx is None + assert "Unable to acquire lock within the time specified" in caplog.text + lm.redis_client.delete(lock_name) + + def test_locking( + self, + user_factory, + product_user_wallet_no, + create_main_accounts, + delete_ledger_db, + caplog, + thl_lm, + lm, + ): + delete_ledger_db() + create_main_accounts() + + now = datetime.now(timezone.utc) - timedelta(hours=1) + user: User = user_factory(product=product_user_wallet_no) + + # A User does a Wall complete on Session.id=1 and the transaction is + # logged to the ledger + wall1 = Wall( + user_id=user.user_id, + source=Source.DYNATA, + req_survey_id="xxx", + req_cpi=Decimal("1.23"), + session_id=1, + status=Status.COMPLETE, + status_code_1=StatusCode1.COMPLETE, + started=now, + finished=now + timedelta(seconds=1), + ) + thl_lm.create_tx_task_complete(wall=wall1, user=user, created=wall1.started) + + # A User does a Wall complete on Session.id=1 and the transaction is + # logged to the ledger + wall2 = Wall( + user_id=user.user_id, + source=Source.FULL_CIRCLE, + req_survey_id="yyy", + req_cpi=Decimal("3.21"), + session_id=1, + status=Status.COMPLETE, + status_code_1=StatusCode1.COMPLETE, + started=now, + finished=now + timedelta(seconds=1), + ) + thl_lm.create_tx_task_complete(wall=wall2, user=user, created=wall2.started) + + # An hour later, the first wall complete is adjusted to a Failure and + # it's tracked in the ledger + wall1.update( + adjusted_status=WallAdjustedStatus.ADJUSTED_TO_FAIL, + adjusted_cpi=0, + adjusted_timestamp=now + timedelta(hours=1), + ) + thl_lm.create_tx_task_adjustment(wall=wall1, user=user) + + # A User does a Wall complete on Session.id=1 and the transaction + # IS NOT logged to the ledger + wall3 = Wall( + user_id=user.user_id, + source=Source.DYNATA, + req_survey_id="xxx", + req_cpi=Decimal("4"), + session_id=1, + status=Status.COMPLETE, + status_code_1=StatusCode1.COMPLETE, + started=now, + finished=now + timedelta(seconds=1), + uuid="867a282d8b4d40d2a2093d75b802b629", + ) + + revenue_account = thl_lm.get_account_task_complete_revenue() + assert 0 == thl_lm.get_account_filtered_balance( + account=revenue_account, + metadata_key="thl_wall", + metadata_value=wall3.uuid, + ) + # Make sure we clear any flags/locks first + lock_key = f"test:thl_wall:{wall3.uuid}" + lock_name = f"{lm.cache_prefix}:transaction_lock:{lock_key}" + flag_name = f"{lm.cache_prefix}:transaction_flag:{lock_key}" + lm.redis_client.delete(lock_name) + lm.redis_client.delete(flag_name) + + # Purposely hold the lock open + lm.redis_client.set(name=lock_name, value="1") + with caplog.at_level(logging.DEBUG): + with pytest.raises(expected_exception=LedgerTransactionCreateLockError): + tx = thl_lm.create_tx_task_complete( + wall=wall3, user=user, created=wall3.started + ) + assert isinstance(tx, LedgerTransaction) + assert "Unable to acquire lock within the time specified" in caplog.text + + # Release the lock + lm.redis_client.delete(lock_name) + + # Set the redis flag to indicate it has been run + lm.redis_client.set(flag_name, "1") + # with self.assertLogs(logger=logger, level=logging.DEBUG) as cm2: + with pytest.raises(expected_exception=LedgerTransactionFlagAlreadyExistsError): + tx = thl_lm.create_tx_task_complete( + wall=wall3, user=user, created=wall3.started + ) + # self.assertIn("entered_lock: True, flag_set: True", cm2.output[0]) + + # Unset the flag + lm.redis_client.delete(flag_name) + + assert 0 == lm.get_account_filtered_balance( + account=revenue_account, + metadata_key="thl_wall", + metadata_value=wall3.uuid, + ) + + # Now actually run it + tx = thl_lm.create_tx_task_complete( + wall=wall3, user=user, created=wall3.started + ) + assert tx is not None + + # Run it again, should return None + # Confirm the Exception inheritance works + tx = None + with pytest.raises(expected_exception=LedgerTransactionCreateError): + tx = thl_lm.create_tx_task_complete( + wall=wall3, user=user, created=wall3.started + ) + assert tx is None + + # clear the redis flag, it should query the db + assert lm.redis_client.get(flag_name) is not None + lm.redis_client.delete(flag_name) + assert lm.redis_client.get(flag_name) is None + + with pytest.raises(expected_exception=LedgerTransactionCreateError): + tx = thl_lm.create_tx_task_complete( + wall=wall3, user=user, created=wall3.started + ) + + assert 400 == thl_lm.get_account_filtered_balance( + account=revenue_account, + metadata_key="thl_wall", + metadata_value=wall3.uuid, + ) + + def test_bp_payment_without_locks( + self, user_factory, product_user_wallet_no, create_main_accounts, thl_lm, lm + ): + user: User = user_factory(product=product_user_wallet_no) + wall1 = Wall( + user_id=user.user_id, + source=Source.SAGO, + req_survey_id="xxx", + req_cpi=Decimal("0.50"), + session_id=3, + status=Status.COMPLETE, + status_code_1=StatusCode1.COMPLETE, + started=datetime.now(timezone.utc), + finished=datetime.now(timezone.utc) + timedelta(seconds=1), + ) + + thl_lm.create_tx_task_complete(wall=wall1, user=user, created=wall1.started) + session = Session(started=wall1.started, user=user, wall_events=[wall1]) + status, status_code_1 = session.determine_session_status() + thl_net, commission_amount, bp_pay, user_pay = session.determine_payments() + session.update( + **{ + "status": status, + "status_code_1": status_code_1, + "finished": session.started + timedelta(minutes=10), + "payout": bp_pay, + "user_payout": user_pay, + } + ) + print(thl_net, commission_amount, bp_pay, user_pay) + + # Run it 3 times without any checks, and it gets made three times! + thl_lm.create_tx_bp_payment(session=session, created=wall1.started) + thl_lm.create_tx_bp_payment_(session=session, created=wall1.started) + thl_lm.create_tx_bp_payment_(session=session, created=wall1.started) + + bp_wallet = thl_lm.get_account_or_create_bp_wallet(product=user.product) + assert 48 * 3 == lm.get_account_balance(account=bp_wallet) + assert 48 * 3 == thl_lm.get_account_filtered_balance( + account=bp_wallet, metadata_key="thl_session", metadata_value=session.uuid + ) + assert lm.check_ledger_balanced() + + def test_bp_payment_with_locks( + self, user_factory, product_user_wallet_no, create_main_accounts, thl_lm, lm + ): + user: User = user_factory(product=product_user_wallet_no) + + wall1 = Wall( + user_id=user.user_id, + source=Source.SAGO, + req_survey_id="xxx", + req_cpi=Decimal("0.50"), + session_id=3, + status=Status.COMPLETE, + status_code_1=StatusCode1.COMPLETE, + started=datetime.now(timezone.utc), + finished=datetime.now(timezone.utc) + timedelta(seconds=1), + ) + + thl_lm.create_tx_task_complete(wall1, user, created=wall1.started) + session = Session(started=wall1.started, user=user, wall_events=[wall1]) + status, status_code_1 = session.determine_session_status() + thl_net, commission_amount, bp_pay, user_pay = session.determine_payments() + session.update( + **{ + "status": status, + "status_code_1": status_code_1, + "finished": session.started + timedelta(minutes=10), + "payout": bp_pay, + "user_payout": user_pay, + } + ) + print(thl_net, commission_amount, bp_pay, user_pay) + + # Make sure we clear any flags/locks first + lock_key = f"test:thl_wall:{wall1.uuid}" + lock_name = f"{lm.cache_prefix}:transaction_lock:{lock_key}" + flag_name = f"{lm.cache_prefix}:transaction_flag:{lock_key}" + lm.redis_client.delete(lock_name) + lm.redis_client.delete(flag_name) + + # Run it 3 times with check, and it gets made once! + thl_lm.create_tx_bp_payment(session=session, created=wall1.started) + with pytest.raises(expected_exception=LedgerTransactionCreateError): + thl_lm.create_tx_bp_payment(session=session, created=wall1.started) + + with pytest.raises(expected_exception=LedgerTransactionCreateError): + thl_lm.create_tx_bp_payment(session=session, created=wall1.started) + + bp_wallet = thl_lm.get_account_or_create_bp_wallet(product=user.product) + assert 48 == thl_lm.get_account_balance(bp_wallet) + assert 48 == thl_lm.get_account_filtered_balance( + account=bp_wallet, + metadata_key="thl_session", + metadata_value=session.uuid, + ) + assert lm.check_ledger_balanced() |
