aboutsummaryrefslogtreecommitdiff
path: root/generalresearch/managers/thl/profiling/uqa.py
diff options
context:
space:
mode:
Diffstat (limited to 'generalresearch/managers/thl/profiling/uqa.py')
-rw-r--r--generalresearch/managers/thl/profiling/uqa.py211
1 files changed, 211 insertions, 0 deletions
diff --git a/generalresearch/managers/thl/profiling/uqa.py b/generalresearch/managers/thl/profiling/uqa.py
new file mode 100644
index 0000000..6800d32
--- /dev/null
+++ b/generalresearch/managers/thl/profiling/uqa.py
@@ -0,0 +1,211 @@
+import logging
+from datetime import datetime, timedelta, timezone
+from typing import Collection, List, Optional
+
+from generalresearch.managers.base import PostgresManagerWithRedis
+from generalresearch.models.thl.profiling.user_question_answer import (
+ DUMMY_UQA,
+ UserQuestionAnswer,
+)
+from generalresearch.models.thl.user import User
+
+logger = logging.getLogger()
+
+
+class UQAManager(PostgresManagerWithRedis):
+
+ CACHE_PREFIX = "thl-grpc:uqa-cache-v2"
+
+ def redis_key(self, user: User) -> str:
+ return f"{self.CACHE_PREFIX}:{user.user_id}"
+
+ def redis_lock_key(self, user: User) -> str:
+ return self.redis_key(user) + ":lock"
+
+ def update_cache(
+ self,
+ user: User,
+ uqas: List[UserQuestionAnswer],
+ ):
+ """
+ Adds new answers to the redis cache for this user. If the cache
+ doesn't exist, does a db query to populate the cache.
+ """
+ REDIS = self.redis_client
+ redis_key = self.redis_key(user)
+ redis_lock = self.redis_lock_key(user)
+ json_answers = [uqa.model_dump_json() for uqa in uqas]
+ with REDIS.lock(redis_lock, timeout=2):
+ # Append the new answers into the cache.
+ res = REDIS.rpushx(redis_key, *json_answers)
+ if res == 0:
+ # If the cache doesn't exist, we need to make it from scratch
+ uqas = self.get_from_db(user)
+ all_json_answers = {uqa.model_dump_json() for uqa in uqas}
+ # And then make sure thew new answers are in it (b/c we query from the RR)
+ all_json_answers.update(set(json_answers))
+ REDIS.rpush(redis_key, *all_json_answers)
+ REDIS.expire(redis_key, 3600)
+ self.clear_user_demographic_cache(user)
+ return res
+
+ def clear_cache(self, user: User) -> None:
+ self.redis_client.delete(self.redis_key(user))
+
+ def recreate_cache(self, user: User) -> Collection[UserQuestionAnswer]:
+ REDIS = self.redis_client
+ redis_key = self.redis_key(user)
+ redis_lock = self.redis_lock_key(user)
+ with REDIS.lock(redis_lock, timeout=2):
+ # once we've acquired the lock, we need to check again if someone else just made it
+ if REDIS.exists(redis_key):
+ values = REDIS.lrange(redis_key, 0, -1)
+ uqas = [UserQuestionAnswer.model_validate_json(x) for x in values]
+ else:
+ uqas = self.get_from_db(user)
+ if not uqas:
+ # We can't set this to an empty list in redis (no
+ # difference between None and []).
+ #
+ # I don't know what is the best thing to do here, so I'm
+ # gonna push in a "dummy" UQA, just to indicate this has
+ # been set and we shouldn't query the db anymore unless
+ # something changes...
+ REDIS.rpush(redis_key, DUMMY_UQA.model_dump_json())
+ else:
+ REDIS.rpush(redis_key, *[uqa.model_dump_json() for uqa in uqas])
+ REDIS.expire(redis_key, 3600 * 3 * 24)
+
+ uqas = self._dedupe_and_clean_uqas(uqas)
+ self.clear_user_demographic_cache(user)
+ return uqas
+
+ def _dedupe_and_clean_uqas(
+ self,
+ uqas: List[UserQuestionAnswer],
+ ) -> List[UserQuestionAnswer]:
+ # Remove anything older than 30 days
+ uqas = [uqa for uqa in uqas if not uqa.is_stale()]
+
+ # Dedupe, latest answer per question
+ new_uqas = set()
+ seen_question_ids = set()
+ uqas = sorted(uqas, key=lambda x: x.timestamp, reverse=True)
+ for uqa in uqas:
+ if uqa.question_id not in seen_question_ids:
+ seen_question_ids.add(uqa.question_id)
+ new_uqas.add(uqa)
+
+ return sorted(new_uqas, key=lambda x: x.timestamp, reverse=True)
+
+ def get(self, user: User) -> List[UserQuestionAnswer]:
+ uqas = self.get_from_cache(user=user)
+
+ if uqas is None:
+ uqas = self.recreate_cache(user)
+ return self._dedupe_and_clean_uqas(uqas)
+
+ def get_from_cache(self, user: User) -> Optional[List[UserQuestionAnswer]]:
+ redis_key = self.redis_key(user)
+
+ # Do the exists check and the list retrieval in a single transaction
+ with self.redis_client.pipeline() as pipe:
+ exists = pipe.exists(redis_key)
+ if exists:
+ pipe.lrange(redis_key, 0, -1)
+ result = pipe.execute()
+ exists = result[0]
+ values = result[1]
+ if not exists:
+ logger.info(f"{redis_key} doesn't exist")
+ return None
+ uqas = [UserQuestionAnswer.model_validate_json(x) for x in values]
+ logger.info(f"{redis_key} exists")
+ return uqas
+
+ def get_from_db(self, user: User) -> List[UserQuestionAnswer]:
+ logger.info(f"get_uqa_from_db: {user.user_id}")
+ # Only store the latest row per question_id. We don't need it multiple times.
+ since = datetime.now(tz=timezone.utc) - timedelta(days=30)
+
+ # We CAN use the RR, b/c either
+ # 1) the cache expired and the user hasn't sent an answer recently
+ # or 2) The user just sent an answer, so we'll make sure it gets put into the results
+ # after this query runs.
+ query = f"""
+ WITH ranked AS (
+ SELECT
+ uqa.*,
+ ROW_NUMBER() OVER (
+ PARTITION BY question_id
+ ORDER BY created DESC
+ ) AS rn
+ FROM marketplace_userquestionanswer uqa
+ WHERE uqa.user_id = %(user_id)s
+ AND uqa.created > %(since)s
+ )
+ SELECT
+ r.question_id::uuid,
+ r.created::timestamptz AS timestamp,
+ r.calc_answer::jsonb AS calc_answers,
+ r.answer::jsonb,
+ r.user_id,
+ mq.property_code,
+ mq.country_iso,
+ mq.language_iso
+ FROM ranked r
+ JOIN marketplace_question mq
+ ON r.question_id = mq.id
+ WHERE rn = 1
+ ORDER BY r.created;
+ """
+ res = self.pg_config.execute_sql_query(
+ query=query,
+ params={"user_id": user.user_id, "since": since},
+ )
+ uqas = [UserQuestionAnswer.model_validate(x) for x in res]
+ return uqas
+
+ def clear_user_demographic_cache(
+ self,
+ user: User,
+ ) -> None:
+ # this will get regenerated by thl-grpc when an offerwall call is made
+ redis_key = f"thl-grpc:user-demographics:{user.user_id}"
+ self.redis_client.delete(redis_key)
+
+ return None
+
+ def create(
+ self,
+ user: User,
+ uqas: List[UserQuestionAnswer],
+ session_id: Optional[str] = None,
+ ):
+ for uqa in uqas:
+ if uqa.user_id is None:
+ uqa.user_id = user.user_id
+ else:
+ assert uqa.user_id == user.user_id
+ self.create_in_db(uqas=uqas, session_id=session_id)
+ self.update_cache(user=user, uqas=uqas)
+ return None
+
+ def create_in_db(
+ self, uqas: List[UserQuestionAnswer], session_id: Optional[str] = None
+ ):
+ values = [uqa.model_dump_mysql(session_id=session_id) for uqa in uqas]
+ query = """
+ INSERT INTO marketplace_userquestionanswer
+ (created, session_id, answer, question_id, user_id, calc_answer)
+ VALUES (
+ %(created)s, %(session_id)s, %(answer)s,
+ %(question_id)s, %(user_id)s, %(calc_answer)s
+ );
+ """
+
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.executemany(query=query, params_seq=values)
+ conn.commit()
+ return None