aboutsummaryrefslogtreecommitdiff
path: root/tests/managers/thl/test_user_manager
diff options
context:
space:
mode:
Diffstat (limited to 'tests/managers/thl/test_user_manager')
-rw-r--r--tests/managers/thl/test_user_manager/__init__.py0
-rw-r--r--tests/managers/thl/test_user_manager/test_base.py274
-rw-r--r--tests/managers/thl/test_user_manager/test_mysql.py25
-rw-r--r--tests/managers/thl/test_user_manager/test_redis.py80
-rw-r--r--tests/managers/thl/test_user_manager/test_user_fetch.py48
-rw-r--r--tests/managers/thl/test_user_manager/test_user_metadata.py88
6 files changed, 515 insertions, 0 deletions
diff --git a/tests/managers/thl/test_user_manager/__init__.py b/tests/managers/thl/test_user_manager/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/managers/thl/test_user_manager/__init__.py
diff --git a/tests/managers/thl/test_user_manager/test_base.py b/tests/managers/thl/test_user_manager/test_base.py
new file mode 100644
index 0000000..3c7ee38
--- /dev/null
+++ b/tests/managers/thl/test_user_manager/test_base.py
@@ -0,0 +1,274 @@
+import logging
+from datetime import datetime, timezone
+from random import randint
+from uuid import uuid4
+
+import pytest
+
+from generalresearch.managers.thl.user_manager import (
+ UserCreateNotAllowedError,
+ get_bp_user_create_limit_hourly,
+)
+from generalresearch.managers.thl.user_manager.rate_limit import (
+ RateLimitItemPerHourConstantKey,
+)
+from generalresearch.models.thl.product import UserCreateConfig, Product
+from generalresearch.models.thl.user import User
+from test_utils.models.conftest import (
+ user,
+ product,
+ user_manager,
+ product_manager,
+)
+
+logger = logging.getLogger()
+
+
+class TestUserManager:
+
+ def test_copying_lru_cache(self, user_manager, user):
+ # Before adding the deepcopy_return decorator, this would fail b/c the returned user
+ # is mutable, and it would mutate in the cache
+ # user_manager = self.get_user_manager()
+
+ user_manager.clear_user_inmemory_cache(user)
+ u = user_manager.get_user(user_id=user.user_id)
+ assert not u.blocked
+
+ u.blocked = True
+ u = user_manager.get_user(user_id=user.user_id)
+ assert not u.blocked
+
+ def test_get_user_no_inmemory(self, user, user_manager):
+ user_manager.clear_user_inmemory_cache(user)
+ user_manager.get_user.__wrapped__.cache_clear()
+ u = user_manager.get_user(user_id=user.user_id)
+ # this should hit mysql
+ assert u == user
+
+ cache_info = user_manager.get_user.__wrapped__.cache_info()
+ assert cache_info.hits == 0, cache_info
+ assert cache_info.misses == 1, cache_info
+
+ # this should hit the lru cache
+ u = user_manager.get_user(user_id=user.user_id)
+ assert u == user
+
+ cache_info = user_manager.get_user.__wrapped__.cache_info()
+ assert cache_info.hits == 1, cache_info
+ assert cache_info.misses == 1, cache_info
+
+ def test_get_user_with_inmemory(self, user_manager, user):
+ # user_manager = self.get_user_manager()
+
+ user_manager.set_user_inmemory_cache(user)
+ user_manager.get_user.__wrapped__.cache_clear()
+ u = user_manager.get_user(user_id=user.user_id)
+ # this should hit inmemory cache
+ assert u == user
+
+ cache_info = user_manager.get_user.__wrapped__.cache_info()
+ assert cache_info.hits == 0, cache_info
+ assert cache_info.misses == 1, cache_info
+
+ # this should hit the lru cache
+ u = user_manager.get_user(user_id=user.user_id)
+ assert u == user
+
+ cache_info = user_manager.get_user.__wrapped__.cache_info()
+ assert cache_info.hits == 1, cache_info
+ assert cache_info.misses == 1, cache_info
+
+
+class TestBlockUserManager:
+
+ def test_block_user(self, product, user_manager):
+ product_user_id = f"user-{uuid4().hex[:10]}"
+
+ # mysql_user_manager to skip user creation limit check
+ user: User = user_manager.mysql_user_manager.create_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+ assert not user.blocked
+
+ # get_user to make sure caches are populated
+ user = user_manager.get_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+ assert not user.blocked
+
+ assert user_manager.block_user(user) is True
+ assert user.blocked
+
+ user = user_manager.get_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+ assert user.blocked
+
+ user = user_manager.get_user(user_id=user.user_id)
+ assert user.blocked
+
+ def test_block_user_whitelist(self, product, user_manager, thl_web_rw):
+ product_user_id = f"user-{uuid4().hex[:10]}"
+
+ # mysql_user_manager to skip user creation limit check
+ user: User = user_manager.mysql_user_manager.create_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+ assert not user.blocked
+
+ now = datetime.now(tz=timezone.utc)
+ # Adds user to whitelist
+ thl_web_rw.execute_write(
+ """
+ INSERT INTO userprofile_userstat
+ (user_id, key, value, date)
+ VALUES (%(user_id)s, 'USER_HEALTH.access_control', 1, %(date)s)
+ ON CONFLICT (user_id, key) DO UPDATE SET value=1""",
+ params={"user_id": user.user_id, "date": now},
+ )
+ assert user_manager.is_whitelisted(user)
+ assert user_manager.block_user(user) is False
+ assert not user.blocked
+
+
+class TestCreateUserManager:
+
+ def test_create_user(self, product_manager, thl_web_rw, user_manager):
+ product: Product = product_manager.create_dummy(
+ user_create_config=UserCreateConfig(
+ min_hourly_create_limit=10, max_hourly_create_limit=69
+ ),
+ )
+
+ product_user_id = f"user-{uuid4().hex[:10]}"
+
+ user: User = user_manager.mysql_user_manager.create_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+ assert isinstance(user, User)
+ assert user.product_id == product.id
+ assert user.product_user_id == product_user_id
+
+ assert user.user_id is not None
+ assert user.uuid is not None
+
+ # make sure thl_user row is created
+ res_thl_user = thl_web_rw.execute_sql_query(
+ query=f"""
+ SELECT *
+ FROM thl_user AS u
+ WHERE u.id = %s
+ """,
+ params=[user.user_id],
+ )
+
+ assert len(res_thl_user) == 1
+
+ u2 = user_manager.get_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+ assert u2.user_id == user.user_id
+ assert u2.uuid == user.uuid
+
+ def test_create_user_integrity_error(self, product_manager, user_manager, caplog):
+ product: Product = product_manager.create_dummy(
+ product_id=uuid4().hex,
+ team_id=uuid4().hex,
+ name=f"Test Product ID #{uuid4().hex[:6]}",
+ user_create_config=UserCreateConfig(
+ min_hourly_create_limit=10, max_hourly_create_limit=69
+ ),
+ )
+
+ product_user_id = f"user-{uuid4().hex[:10]}"
+ rand_msg = f"log-{uuid4().hex}"
+
+ with caplog.at_level(logging.INFO):
+ logger.info(rand_msg)
+ user1 = user_manager.mysql_user_manager.create_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+
+ assert len(caplog.records) == 1
+ assert caplog.records[0].getMessage() == rand_msg
+
+ # Should cause a constraint error, triggering a lookup instead
+ with caplog.at_level(logging.INFO):
+ user2 = user_manager.mysql_user_manager.create_user(
+ product_id=product.id, product_user_id=product_user_id
+ )
+
+ assert len(caplog.records) == 3
+ assert caplog.records[0].getMessage() == rand_msg
+ assert (
+ caplog.records[1].getMessage()
+ == f"mysql_user_manager.create_user_new integrity error: {product.id} {product_user_id}"
+ )
+ assert (
+ caplog.records[2].getMessage()
+ == f"get_user_from_mysql: {product.id}, {product_user_id}, None, None"
+ )
+
+ assert user1 == user2
+
+ def test_raise_allow_user_create(self, product_manager, user_manager):
+ rand_num = randint(25, 200)
+ product: Product = product_manager.create_dummy(
+ product_id=uuid4().hex,
+ team_id=uuid4().hex,
+ name=f"Test Product ID #{uuid4().hex[:6]}",
+ user_create_config=UserCreateConfig(
+ min_hourly_create_limit=rand_num,
+ max_hourly_create_limit=rand_num,
+ ),
+ )
+
+ instance: Product = user_manager.product_manager.get_by_uuid(
+ product_uuid=product.id
+ )
+
+ # get_bp_user_create_limit_hourly is dynamically generated, make sure
+ # we use this value in our tests and not the
+ # UserCreateConfig.max_hourly_create_limit value
+ rl_value: int = get_bp_user_create_limit_hourly(product=instance)
+
+ # This is a randomly generated product_id, which means we'll always
+ # use the global defaults
+ assert rand_num == instance.user_create_config.min_hourly_create_limit
+ assert rand_num == instance.user_create_config.max_hourly_create_limit
+ assert rand_num == rl_value
+
+ rl = RateLimitItemPerHourConstantKey(rl_value)
+ assert str(rl) == f"{rand_num} per 1 hour"
+
+ key = rl.key_for("thl-grpc", "allow_user_create", instance.id)
+ assert key == f"LIMITER/thl-grpc/allow_user_create/{instance.id}"
+
+ # make sure we clear the key or subsequent tests will fail
+ user_manager.user_manager_limiter.storage.clear(key=key)
+
+ n = 0
+ with pytest.raises(expected_exception=UserCreateNotAllowedError) as cm:
+ for n, _ in enumerate(range(rl_value + 5)):
+ user_manager.user_manager_limiter.raise_allow_user_create(
+ product=product
+ )
+ assert rl_value == n
+
+
+class TestUserManagerMethods:
+
+ def test_audit_log(self, user_manager, user, audit_log_manager):
+ from generalresearch.models.thl.userhealth import AuditLog
+
+ res = audit_log_manager.filter_by_user_id(user_id=user.user_id)
+ assert len(res) == 0
+
+ msg = uuid4().hex
+ user_manager.audit_log(user=user, level=30, event_type=msg)
+
+ res = audit_log_manager.filter_by_user_id(user_id=user.user_id)
+ assert len(res) == 1
+ assert isinstance(res[0], AuditLog)
+ assert res[0].event_type == msg
diff --git a/tests/managers/thl/test_user_manager/test_mysql.py b/tests/managers/thl/test_user_manager/test_mysql.py
new file mode 100644
index 0000000..0313bbf
--- /dev/null
+++ b/tests/managers/thl/test_user_manager/test_mysql.py
@@ -0,0 +1,25 @@
+from test_utils.models.conftest import user, user_manager
+
+
+class TestUserManagerMysqlNew:
+
+ def test_get_notset(self, user_manager):
+ assert (
+ user_manager.mysql_user_manager.get_user_from_mysql(user_id=-3105) is None
+ )
+
+ def test_get_user_id(self, user, user_manager):
+ assert (
+ user_manager.mysql_user_manager.get_user_from_mysql(user_id=user.user_id)
+ == user
+ )
+
+ def test_get_uuid(self, user, user_manager):
+ u = user_manager.mysql_user_manager.get_user_from_mysql(user_uuid=user.uuid)
+ assert u == user
+
+ def test_get_ubp(self, user, user_manager):
+ u = user_manager.mysql_user_manager.get_user_from_mysql(
+ product_id=user.product_id, product_user_id=user.product_user_id
+ )
+ assert u == user
diff --git a/tests/managers/thl/test_user_manager/test_redis.py b/tests/managers/thl/test_user_manager/test_redis.py
new file mode 100644
index 0000000..a69519e
--- /dev/null
+++ b/tests/managers/thl/test_user_manager/test_redis.py
@@ -0,0 +1,80 @@
+import pytest
+
+from generalresearch.managers.base import Permission
+
+
+class TestUserManagerRedis:
+
+ def test_get_notset(self, user_manager, user):
+ user_manager.clear_user_inmemory_cache(user=user)
+ assert user_manager.redis_user_manager.get_user(user_id=user.user_id) is None
+
+ def test_get_user_id(self, user_manager, user):
+ user_manager.redis_user_manager.set_user(user=user)
+
+ assert user_manager.redis_user_manager.get_user(user_id=user.user_id) == user
+
+ def test_get_uuid(self, user_manager, user):
+ user_manager.redis_user_manager.set_user(user=user)
+
+ assert user_manager.redis_user_manager.get_user(user_uuid=user.uuid) == user
+
+ def test_get_ubp(self, user_manager, user):
+ user_manager.redis_user_manager.set_user(user=user)
+
+ assert (
+ user_manager.redis_user_manager.get_user(
+ product_id=user.product_id, product_user_id=user.product_user_id
+ )
+ == user
+ )
+
+ @pytest.mark.skip(reason="TODO")
+ def test_set(self):
+ # I mean, the sets are implicitly tested by the get tests above. no point
+ pass
+
+ def test_get_with_cache_prefix(self, settings, user, thl_web_rw, thl_web_rr):
+ """
+ Confirm the prefix functionality is working; we do this so it
+ is easier to migrate between any potentially breaking versions
+ if we don't want any broken keys; not as important after
+ pydantic usage...
+ """
+ from generalresearch.managers.thl.user_manager.user_manager import (
+ UserManager,
+ )
+
+ um1 = UserManager(
+ pg_config=thl_web_rw,
+ pg_config_rr=thl_web_rr,
+ sql_permissions=[Permission.UPDATE, Permission.CREATE],
+ redis=settings.redis,
+ redis_timeout=settings.redis_timeout,
+ )
+
+ um2 = UserManager(
+ pg_config=thl_web_rw,
+ pg_config_rr=thl_web_rr,
+ sql_permissions=[Permission.UPDATE, Permission.CREATE],
+ redis=settings.redis,
+ redis_timeout=settings.redis_timeout,
+ cache_prefix="user-lookup-v2",
+ )
+
+ um1.get_or_create_user(
+ product_id=user.product_id, product_user_id=user.product_user_id
+ )
+ um2.get_or_create_user(
+ product_id=user.product_id, product_user_id=user.product_user_id
+ )
+
+ res1 = um1.redis_user_manager.client.get(f"user-lookup:user_id:{user.user_id}")
+ assert res1 is not None
+
+ res2 = um2.redis_user_manager.client.get(
+ f"user-lookup-v2:user_id:{user.user_id}"
+ )
+ assert res2 is not None
+
+ assert res1 == res2
diff --git a/tests/managers/thl/test_user_manager/test_user_fetch.py b/tests/managers/thl/test_user_manager/test_user_fetch.py
new file mode 100644
index 0000000..a4b3d57
--- /dev/null
+++ b/tests/managers/thl/test_user_manager/test_user_fetch.py
@@ -0,0 +1,48 @@
+from uuid import uuid4
+
+import pytest
+
+from generalresearch.models.thl.user import User
+from test_utils.models.conftest import product, user_manager, user_factory
+
+
+class TestUserManagerFetch:
+
+ def test_fetch(self, user_factory, product, user_manager):
+ user1: User = user_factory(product=product)
+ user2: User = user_factory(product=product)
+ res = user_manager.fetch_by_bpuids(
+ product_id=product.uuid,
+ product_user_ids=[user1.product_user_id, user2.product_user_id],
+ )
+ assert len(res) == 2
+
+ res = user_manager.fetch(user_ids=[user1.user_id, user2.user_id])
+ assert len(res) == 2
+
+ res = user_manager.fetch(user_uuids=[user1.uuid, user2.uuid])
+ assert len(res) == 2
+
+ # filter including bogus values
+ res = user_manager.fetch(user_uuids=[user1.uuid, uuid4().hex])
+ assert len(res) == 1
+
+ res = user_manager.fetch(user_uuids=[uuid4().hex])
+ assert len(res) == 0
+
+ def test_fetch_invalid(self, user_manager):
+ with pytest.raises(AssertionError) as e:
+ user_manager.fetch(user_uuids=[], user_ids=None)
+ assert "Must pass ONE of user_ids, user_uuids" in str(e.value)
+
+ with pytest.raises(AssertionError) as e:
+ user_manager.fetch(user_uuids=uuid4().hex)
+ assert "must pass a collection of user_uuids" in str(e.value)
+
+ with pytest.raises(AssertionError) as e:
+ user_manager.fetch(user_uuids=[uuid4().hex], user_ids=[1, 2, 3])
+ assert "Must pass ONE of user_ids, user_uuids" in str(e.value)
+
+ with pytest.raises(AssertionError) as e:
+ user_manager.fetch(user_ids=list(range(501)))
+ assert "limit 500 user_ids" in str(e.value)
diff --git a/tests/managers/thl/test_user_manager/test_user_metadata.py b/tests/managers/thl/test_user_manager/test_user_metadata.py
new file mode 100644
index 0000000..91dc16a
--- /dev/null
+++ b/tests/managers/thl/test_user_manager/test_user_metadata.py
@@ -0,0 +1,88 @@
+from uuid import uuid4
+
+import pytest
+
+from generalresearch.models.thl.user_profile import UserMetadata
+from test_utils.models.conftest import user, user_manager, user_factory
+
+
+class TestUserMetadataManager:
+
+ def test_get_notset(self, user, user_manager, user_metadata_manager):
+ # The row in the db won't exist. It just returns the default obj with everything None (except for the user_id)
+ um1 = user_metadata_manager.get(user_id=user.user_id)
+ assert um1 == UserMetadata(user_id=user.user_id)
+
+ def test_create(self, user_factory, product, user_metadata_manager):
+ from generalresearch.models.thl.user import User
+
+ u1: User = user_factory(product=product)
+
+ email_address = f"{uuid4().hex}@example.com"
+ um = UserMetadata(user_id=u1.user_id, email_address=email_address)
+ # This happens in the model itself, nothing to do with the manager (a model_validator)
+ assert um.email_sha256 is not None
+
+ user_metadata_manager.update(um)
+ um2 = user_metadata_manager.get(email_address=email_address)
+ assert um == um2
+
+ def test_create_no_email(self, product, user_factory, user_metadata_manager):
+ from generalresearch.models.thl.user import User
+
+ u1: User = user_factory(product=product)
+ um = UserMetadata(user_id=u1.user_id)
+ assert um.email_sha256 is None
+
+ user_metadata_manager.update(um)
+ um2 = user_metadata_manager.get(user_id=u1.user_id)
+ assert um == um2
+
+ def test_update(self, product, user_factory, user_metadata_manager):
+ from generalresearch.models.thl.user import User
+
+ u: User = user_factory(product=product)
+
+ email_address = f"{uuid4().hex}@example1.com"
+ um = UserMetadata(user_id=u.user_id, email_address=email_address)
+ user_metadata_manager.update(user_metadata=um)
+
+ um.email_address = email_address.replace("example1", "example2")
+ user_metadata_manager.update(user_metadata=um)
+
+ um2 = user_metadata_manager.get(email_address=um.email_address)
+ assert um2.email_address != email_address
+
+ assert um2 == UserMetadata(
+ user_id=u.user_id,
+ email_address=email_address.replace("example1", "example2"),
+ )
+
+ def test_filter(self, user_factory, product, user_metadata_manager):
+ from generalresearch.models.thl.user import User
+
+ user1: User = user_factory(product=product)
+ user2: User = user_factory(product=product)
+
+ email_address = f"{uuid4().hex}@example.com"
+ res = user_metadata_manager.filter(email_addresses=[email_address])
+ assert len(res) == 0
+
+ # Create 2 user metadata with the same email address
+ user_metadata_manager.update(
+ user_metadata=UserMetadata(
+ user_id=user1.user_id, email_address=email_address
+ )
+ )
+ user_metadata_manager.update(
+ user_metadata=UserMetadata(
+ user_id=user2.user_id, email_address=email_address
+ )
+ )
+
+ res = user_metadata_manager.filter(email_addresses=[email_address])
+ assert len(res) == 2
+
+ with pytest.raises(expected_exception=ValueError) as e:
+ res = user_metadata_manager.get(email_address=email_address)
+ assert "More than 1 result returned!" in str(e.value)