diff options
Diffstat (limited to 'tests/managers/thl/test_user_manager')
6 files changed, 515 insertions, 0 deletions
diff --git a/tests/managers/thl/test_user_manager/__init__.py b/tests/managers/thl/test_user_manager/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/managers/thl/test_user_manager/__init__.py diff --git a/tests/managers/thl/test_user_manager/test_base.py b/tests/managers/thl/test_user_manager/test_base.py new file mode 100644 index 0000000..3c7ee38 --- /dev/null +++ b/tests/managers/thl/test_user_manager/test_base.py @@ -0,0 +1,274 @@ +import logging +from datetime import datetime, timezone +from random import randint +from uuid import uuid4 + +import pytest + +from generalresearch.managers.thl.user_manager import ( + UserCreateNotAllowedError, + get_bp_user_create_limit_hourly, +) +from generalresearch.managers.thl.user_manager.rate_limit import ( + RateLimitItemPerHourConstantKey, +) +from generalresearch.models.thl.product import UserCreateConfig, Product +from generalresearch.models.thl.user import User +from test_utils.models.conftest import ( + user, + product, + user_manager, + product_manager, +) + +logger = logging.getLogger() + + +class TestUserManager: + + def test_copying_lru_cache(self, user_manager, user): + # Before adding the deepcopy_return decorator, this would fail b/c the returned user + # is mutable, and it would mutate in the cache + # user_manager = self.get_user_manager() + + user_manager.clear_user_inmemory_cache(user) + u = user_manager.get_user(user_id=user.user_id) + assert not u.blocked + + u.blocked = True + u = user_manager.get_user(user_id=user.user_id) + assert not u.blocked + + def test_get_user_no_inmemory(self, user, user_manager): + user_manager.clear_user_inmemory_cache(user) + user_manager.get_user.__wrapped__.cache_clear() + u = user_manager.get_user(user_id=user.user_id) + # this should hit mysql + assert u == user + + cache_info = user_manager.get_user.__wrapped__.cache_info() + assert cache_info.hits == 0, cache_info + assert cache_info.misses == 1, cache_info + + # this should hit the lru cache + u = user_manager.get_user(user_id=user.user_id) + assert u == user + + cache_info = user_manager.get_user.__wrapped__.cache_info() + assert cache_info.hits == 1, cache_info + assert cache_info.misses == 1, cache_info + + def test_get_user_with_inmemory(self, user_manager, user): + # user_manager = self.get_user_manager() + + user_manager.set_user_inmemory_cache(user) + user_manager.get_user.__wrapped__.cache_clear() + u = user_manager.get_user(user_id=user.user_id) + # this should hit inmemory cache + assert u == user + + cache_info = user_manager.get_user.__wrapped__.cache_info() + assert cache_info.hits == 0, cache_info + assert cache_info.misses == 1, cache_info + + # this should hit the lru cache + u = user_manager.get_user(user_id=user.user_id) + assert u == user + + cache_info = user_manager.get_user.__wrapped__.cache_info() + assert cache_info.hits == 1, cache_info + assert cache_info.misses == 1, cache_info + + +class TestBlockUserManager: + + def test_block_user(self, product, user_manager): + product_user_id = f"user-{uuid4().hex[:10]}" + + # mysql_user_manager to skip user creation limit check + user: User = user_manager.mysql_user_manager.create_user( + product_id=product.id, product_user_id=product_user_id + ) + assert not user.blocked + + # get_user to make sure caches are populated + user = user_manager.get_user( + product_id=product.id, product_user_id=product_user_id + ) + assert not user.blocked + + assert user_manager.block_user(user) is True + assert user.blocked + + user = user_manager.get_user( + product_id=product.id, product_user_id=product_user_id + ) + assert user.blocked + + user = user_manager.get_user(user_id=user.user_id) + assert user.blocked + + def test_block_user_whitelist(self, product, user_manager, thl_web_rw): + product_user_id = f"user-{uuid4().hex[:10]}" + + # mysql_user_manager to skip user creation limit check + user: User = user_manager.mysql_user_manager.create_user( + product_id=product.id, product_user_id=product_user_id + ) + assert not user.blocked + + now = datetime.now(tz=timezone.utc) + # Adds user to whitelist + thl_web_rw.execute_write( + """ + INSERT INTO userprofile_userstat + (user_id, key, value, date) + VALUES (%(user_id)s, 'USER_HEALTH.access_control', 1, %(date)s) + ON CONFLICT (user_id, key) DO UPDATE SET value=1""", + params={"user_id": user.user_id, "date": now}, + ) + assert user_manager.is_whitelisted(user) + assert user_manager.block_user(user) is False + assert not user.blocked + + +class TestCreateUserManager: + + def test_create_user(self, product_manager, thl_web_rw, user_manager): + product: Product = product_manager.create_dummy( + user_create_config=UserCreateConfig( + min_hourly_create_limit=10, max_hourly_create_limit=69 + ), + ) + + product_user_id = f"user-{uuid4().hex[:10]}" + + user: User = user_manager.mysql_user_manager.create_user( + product_id=product.id, product_user_id=product_user_id + ) + assert isinstance(user, User) + assert user.product_id == product.id + assert user.product_user_id == product_user_id + + assert user.user_id is not None + assert user.uuid is not None + + # make sure thl_user row is created + res_thl_user = thl_web_rw.execute_sql_query( + query=f""" + SELECT * + FROM thl_user AS u + WHERE u.id = %s + """, + params=[user.user_id], + ) + + assert len(res_thl_user) == 1 + + u2 = user_manager.get_user( + product_id=product.id, product_user_id=product_user_id + ) + assert u2.user_id == user.user_id + assert u2.uuid == user.uuid + + def test_create_user_integrity_error(self, product_manager, user_manager, caplog): + product: Product = product_manager.create_dummy( + product_id=uuid4().hex, + team_id=uuid4().hex, + name=f"Test Product ID #{uuid4().hex[:6]}", + user_create_config=UserCreateConfig( + min_hourly_create_limit=10, max_hourly_create_limit=69 + ), + ) + + product_user_id = f"user-{uuid4().hex[:10]}" + rand_msg = f"log-{uuid4().hex}" + + with caplog.at_level(logging.INFO): + logger.info(rand_msg) + user1 = user_manager.mysql_user_manager.create_user( + product_id=product.id, product_user_id=product_user_id + ) + + assert len(caplog.records) == 1 + assert caplog.records[0].getMessage() == rand_msg + + # Should cause a constraint error, triggering a lookup instead + with caplog.at_level(logging.INFO): + user2 = user_manager.mysql_user_manager.create_user( + product_id=product.id, product_user_id=product_user_id + ) + + assert len(caplog.records) == 3 + assert caplog.records[0].getMessage() == rand_msg + assert ( + caplog.records[1].getMessage() + == f"mysql_user_manager.create_user_new integrity error: {product.id} {product_user_id}" + ) + assert ( + caplog.records[2].getMessage() + == f"get_user_from_mysql: {product.id}, {product_user_id}, None, None" + ) + + assert user1 == user2 + + def test_raise_allow_user_create(self, product_manager, user_manager): + rand_num = randint(25, 200) + product: Product = product_manager.create_dummy( + product_id=uuid4().hex, + team_id=uuid4().hex, + name=f"Test Product ID #{uuid4().hex[:6]}", + user_create_config=UserCreateConfig( + min_hourly_create_limit=rand_num, + max_hourly_create_limit=rand_num, + ), + ) + + instance: Product = user_manager.product_manager.get_by_uuid( + product_uuid=product.id + ) + + # get_bp_user_create_limit_hourly is dynamically generated, make sure + # we use this value in our tests and not the + # UserCreateConfig.max_hourly_create_limit value + rl_value: int = get_bp_user_create_limit_hourly(product=instance) + + # This is a randomly generated product_id, which means we'll always + # use the global defaults + assert rand_num == instance.user_create_config.min_hourly_create_limit + assert rand_num == instance.user_create_config.max_hourly_create_limit + assert rand_num == rl_value + + rl = RateLimitItemPerHourConstantKey(rl_value) + assert str(rl) == f"{rand_num} per 1 hour" + + key = rl.key_for("thl-grpc", "allow_user_create", instance.id) + assert key == f"LIMITER/thl-grpc/allow_user_create/{instance.id}" + + # make sure we clear the key or subsequent tests will fail + user_manager.user_manager_limiter.storage.clear(key=key) + + n = 0 + with pytest.raises(expected_exception=UserCreateNotAllowedError) as cm: + for n, _ in enumerate(range(rl_value + 5)): + user_manager.user_manager_limiter.raise_allow_user_create( + product=product + ) + assert rl_value == n + + +class TestUserManagerMethods: + + def test_audit_log(self, user_manager, user, audit_log_manager): + from generalresearch.models.thl.userhealth import AuditLog + + res = audit_log_manager.filter_by_user_id(user_id=user.user_id) + assert len(res) == 0 + + msg = uuid4().hex + user_manager.audit_log(user=user, level=30, event_type=msg) + + res = audit_log_manager.filter_by_user_id(user_id=user.user_id) + assert len(res) == 1 + assert isinstance(res[0], AuditLog) + assert res[0].event_type == msg diff --git a/tests/managers/thl/test_user_manager/test_mysql.py b/tests/managers/thl/test_user_manager/test_mysql.py new file mode 100644 index 0000000..0313bbf --- /dev/null +++ b/tests/managers/thl/test_user_manager/test_mysql.py @@ -0,0 +1,25 @@ +from test_utils.models.conftest import user, user_manager + + +class TestUserManagerMysqlNew: + + def test_get_notset(self, user_manager): + assert ( + user_manager.mysql_user_manager.get_user_from_mysql(user_id=-3105) is None + ) + + def test_get_user_id(self, user, user_manager): + assert ( + user_manager.mysql_user_manager.get_user_from_mysql(user_id=user.user_id) + == user + ) + + def test_get_uuid(self, user, user_manager): + u = user_manager.mysql_user_manager.get_user_from_mysql(user_uuid=user.uuid) + assert u == user + + def test_get_ubp(self, user, user_manager): + u = user_manager.mysql_user_manager.get_user_from_mysql( + product_id=user.product_id, product_user_id=user.product_user_id + ) + assert u == user diff --git a/tests/managers/thl/test_user_manager/test_redis.py b/tests/managers/thl/test_user_manager/test_redis.py new file mode 100644 index 0000000..a69519e --- /dev/null +++ b/tests/managers/thl/test_user_manager/test_redis.py @@ -0,0 +1,80 @@ +import pytest + +from generalresearch.managers.base import Permission + + +class TestUserManagerRedis: + + def test_get_notset(self, user_manager, user): + user_manager.clear_user_inmemory_cache(user=user) + assert user_manager.redis_user_manager.get_user(user_id=user.user_id) is None + + def test_get_user_id(self, user_manager, user): + user_manager.redis_user_manager.set_user(user=user) + + assert user_manager.redis_user_manager.get_user(user_id=user.user_id) == user + + def test_get_uuid(self, user_manager, user): + user_manager.redis_user_manager.set_user(user=user) + + assert user_manager.redis_user_manager.get_user(user_uuid=user.uuid) == user + + def test_get_ubp(self, user_manager, user): + user_manager.redis_user_manager.set_user(user=user) + + assert ( + user_manager.redis_user_manager.get_user( + product_id=user.product_id, product_user_id=user.product_user_id + ) + == user + ) + + @pytest.mark.skip(reason="TODO") + def test_set(self): + # I mean, the sets are implicitly tested by the get tests above. no point + pass + + def test_get_with_cache_prefix(self, settings, user, thl_web_rw, thl_web_rr): + """ + Confirm the prefix functionality is working; we do this so it + is easier to migrate between any potentially breaking versions + if we don't want any broken keys; not as important after + pydantic usage... + """ + from generalresearch.managers.thl.user_manager.user_manager import ( + UserManager, + ) + + um1 = UserManager( + pg_config=thl_web_rw, + pg_config_rr=thl_web_rr, + sql_permissions=[Permission.UPDATE, Permission.CREATE], + redis=settings.redis, + redis_timeout=settings.redis_timeout, + ) + + um2 = UserManager( + pg_config=thl_web_rw, + pg_config_rr=thl_web_rr, + sql_permissions=[Permission.UPDATE, Permission.CREATE], + redis=settings.redis, + redis_timeout=settings.redis_timeout, + cache_prefix="user-lookup-v2", + ) + + um1.get_or_create_user( + product_id=user.product_id, product_user_id=user.product_user_id + ) + um2.get_or_create_user( + product_id=user.product_id, product_user_id=user.product_user_id + ) + + res1 = um1.redis_user_manager.client.get(f"user-lookup:user_id:{user.user_id}") + assert res1 is not None + + res2 = um2.redis_user_manager.client.get( + f"user-lookup-v2:user_id:{user.user_id}" + ) + assert res2 is not None + + assert res1 == res2 diff --git a/tests/managers/thl/test_user_manager/test_user_fetch.py b/tests/managers/thl/test_user_manager/test_user_fetch.py new file mode 100644 index 0000000..a4b3d57 --- /dev/null +++ b/tests/managers/thl/test_user_manager/test_user_fetch.py @@ -0,0 +1,48 @@ +from uuid import uuid4 + +import pytest + +from generalresearch.models.thl.user import User +from test_utils.models.conftest import product, user_manager, user_factory + + +class TestUserManagerFetch: + + def test_fetch(self, user_factory, product, user_manager): + user1: User = user_factory(product=product) + user2: User = user_factory(product=product) + res = user_manager.fetch_by_bpuids( + product_id=product.uuid, + product_user_ids=[user1.product_user_id, user2.product_user_id], + ) + assert len(res) == 2 + + res = user_manager.fetch(user_ids=[user1.user_id, user2.user_id]) + assert len(res) == 2 + + res = user_manager.fetch(user_uuids=[user1.uuid, user2.uuid]) + assert len(res) == 2 + + # filter including bogus values + res = user_manager.fetch(user_uuids=[user1.uuid, uuid4().hex]) + assert len(res) == 1 + + res = user_manager.fetch(user_uuids=[uuid4().hex]) + assert len(res) == 0 + + def test_fetch_invalid(self, user_manager): + with pytest.raises(AssertionError) as e: + user_manager.fetch(user_uuids=[], user_ids=None) + assert "Must pass ONE of user_ids, user_uuids" in str(e.value) + + with pytest.raises(AssertionError) as e: + user_manager.fetch(user_uuids=uuid4().hex) + assert "must pass a collection of user_uuids" in str(e.value) + + with pytest.raises(AssertionError) as e: + user_manager.fetch(user_uuids=[uuid4().hex], user_ids=[1, 2, 3]) + assert "Must pass ONE of user_ids, user_uuids" in str(e.value) + + with pytest.raises(AssertionError) as e: + user_manager.fetch(user_ids=list(range(501))) + assert "limit 500 user_ids" in str(e.value) diff --git a/tests/managers/thl/test_user_manager/test_user_metadata.py b/tests/managers/thl/test_user_manager/test_user_metadata.py new file mode 100644 index 0000000..91dc16a --- /dev/null +++ b/tests/managers/thl/test_user_manager/test_user_metadata.py @@ -0,0 +1,88 @@ +from uuid import uuid4 + +import pytest + +from generalresearch.models.thl.user_profile import UserMetadata +from test_utils.models.conftest import user, user_manager, user_factory + + +class TestUserMetadataManager: + + def test_get_notset(self, user, user_manager, user_metadata_manager): + # The row in the db won't exist. It just returns the default obj with everything None (except for the user_id) + um1 = user_metadata_manager.get(user_id=user.user_id) + assert um1 == UserMetadata(user_id=user.user_id) + + def test_create(self, user_factory, product, user_metadata_manager): + from generalresearch.models.thl.user import User + + u1: User = user_factory(product=product) + + email_address = f"{uuid4().hex}@example.com" + um = UserMetadata(user_id=u1.user_id, email_address=email_address) + # This happens in the model itself, nothing to do with the manager (a model_validator) + assert um.email_sha256 is not None + + user_metadata_manager.update(um) + um2 = user_metadata_manager.get(email_address=email_address) + assert um == um2 + + def test_create_no_email(self, product, user_factory, user_metadata_manager): + from generalresearch.models.thl.user import User + + u1: User = user_factory(product=product) + um = UserMetadata(user_id=u1.user_id) + assert um.email_sha256 is None + + user_metadata_manager.update(um) + um2 = user_metadata_manager.get(user_id=u1.user_id) + assert um == um2 + + def test_update(self, product, user_factory, user_metadata_manager): + from generalresearch.models.thl.user import User + + u: User = user_factory(product=product) + + email_address = f"{uuid4().hex}@example1.com" + um = UserMetadata(user_id=u.user_id, email_address=email_address) + user_metadata_manager.update(user_metadata=um) + + um.email_address = email_address.replace("example1", "example2") + user_metadata_manager.update(user_metadata=um) + + um2 = user_metadata_manager.get(email_address=um.email_address) + assert um2.email_address != email_address + + assert um2 == UserMetadata( + user_id=u.user_id, + email_address=email_address.replace("example1", "example2"), + ) + + def test_filter(self, user_factory, product, user_metadata_manager): + from generalresearch.models.thl.user import User + + user1: User = user_factory(product=product) + user2: User = user_factory(product=product) + + email_address = f"{uuid4().hex}@example.com" + res = user_metadata_manager.filter(email_addresses=[email_address]) + assert len(res) == 0 + + # Create 2 user metadata with the same email address + user_metadata_manager.update( + user_metadata=UserMetadata( + user_id=user1.user_id, email_address=email_address + ) + ) + user_metadata_manager.update( + user_metadata=UserMetadata( + user_id=user2.user_id, email_address=email_address + ) + ) + + res = user_metadata_manager.filter(email_addresses=[email_address]) + assert len(res) == 2 + + with pytest.raises(expected_exception=ValueError) as e: + res = user_metadata_manager.get(email_address=email_address) + assert "More than 1 result returned!" in str(e.value) |
