diff options
| author | Max Nanis | 2026-03-06 16:49:46 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-03-06 16:49:46 -0500 |
| commit | 91d040211a4ed6e4157896256a762d3854777b5e (patch) | |
| tree | cd95922ea4257dc8d3f4e4cbe8534474709a20dc /test_utils/managers/upk/conftest.py | |
| download | generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.tar.gz generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.zip | |
Initial commitv3.3.4
Diffstat (limited to 'test_utils/managers/upk/conftest.py')
| -rw-r--r-- | test_utils/managers/upk/conftest.py | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/test_utils/managers/upk/conftest.py b/test_utils/managers/upk/conftest.py new file mode 100644 index 0000000..61be924 --- /dev/null +++ b/test_utils/managers/upk/conftest.py @@ -0,0 +1,161 @@ +import os +import time +from typing import Optional +from uuid import UUID + +import pandas as pd +import pytest + +from generalresearch.pg_helper import PostgresConfig + + +def insert_data_from_csv( + thl_web_rw: PostgresConfig, + table_name: str, + fp: Optional[str] = None, + disable_fk_checks: bool = False, + df: Optional[pd.DataFrame] = None, +): + assert fp is not None or df is not None and not (fp is not None and df is not None) + if fp: + df = pd.read_csv(fp, dtype=str) + df = df.where(pd.notnull(df), None) + cols = list(df.columns) + col_str = ", ".join(cols) + values_str = ", ".join(["%s"] * len(cols)) + if "id" in df.columns and len(df["id"].iloc[0]) == 36: + df["id"] = df["id"].map(lambda x: UUID(x).hex) + args = df.to_dict("tight")["data"] + + with thl_web_rw.make_connection() as conn: + with conn.cursor() as c: + if disable_fk_checks: + c.execute("SET CONSTRAINTS ALL DEFERRED") + c.executemany( + f"INSERT INTO {table_name} ({col_str}) VALUES ({values_str})", + params_seq=args, + ) + conn.commit() + + +@pytest.fixture(scope="session") +def category_data(thl_web_rw, category_manager) -> None: + fp = os.path.join(os.path.dirname(__file__), "marketplace_category.csv.gz") + insert_data_from_csv( + thl_web_rw, + fp=fp, + table_name="marketplace_category", + disable_fk_checks=True, + ) + # Don't strictly need to do this, but probably we should + category_manager.populate_caches() + cats = category_manager.categories.values() + path_id = {c.path: c.id for c in cats} + data = [ + {"id": c.id, "parent_id": path_id[c.parent_path]} for c in cats if c.parent_path + ] + query = """ + UPDATE marketplace_category + SET parent_id = %(parent_id)s + WHERE id = %(id)s; + """ + with thl_web_rw.make_connection() as conn: + with conn.cursor() as c: + c.executemany(query=query, params_seq=data) + conn.commit() + + +@pytest.fixture(scope="session") +def property_data(thl_web_rw) -> None: + fp = os.path.join(os.path.dirname(__file__), "marketplace_property.csv.gz") + insert_data_from_csv(thl_web_rw, fp=fp, table_name="marketplace_property") + + +@pytest.fixture(scope="session") +def item_data(thl_web_rw) -> None: + fp = os.path.join(os.path.dirname(__file__), "marketplace_item.csv.gz") + insert_data_from_csv(thl_web_rw, fp=fp, table_name="marketplace_item") + + +@pytest.fixture(scope="session") +def propertycategoryassociation_data( + thl_web_rw, category_data, property_data, category_manager +) -> None: + table_name = "marketplace_propertycategoryassociation" + fp = os.path.join(os.path.dirname(__file__), f"{table_name}.csv.gz") + # Need to lookup category pk from uuid + category_manager.populate_caches() + df = pd.read_csv(fp, dtype=str) + df["category_id"] = df["category_id"].map( + lambda x: category_manager.categories[x].id + ) + insert_data_from_csv(thl_web_rw, df=df, table_name=table_name) + + +@pytest.fixture(scope="session") +def propertycountry_data(thl_web_rw, property_data) -> None: + fp = os.path.join(os.path.dirname(__file__), "marketplace_propertycountry.csv.gz") + insert_data_from_csv(thl_web_rw, fp=fp, table_name="marketplace_propertycountry") + + +@pytest.fixture(scope="session") +def propertymarketplaceassociation_data(thl_web_rw, property_data) -> None: + table_name = "marketplace_propertymarketplaceassociation" + fp = os.path.join(os.path.dirname(__file__), f"{table_name}.csv.gz") + insert_data_from_csv(thl_web_rw, fp=fp, table_name=table_name) + + +@pytest.fixture(scope="session") +def propertyitemrange_data(thl_web_rw, property_data, item_data) -> None: + table_name = "marketplace_propertyitemrange" + fp = os.path.join(os.path.dirname(__file__), f"{table_name}.csv.gz") + insert_data_from_csv(thl_web_rw, fp=fp, table_name=table_name) + + +@pytest.fixture(scope="session") +def question_data(thl_web_rw) -> None: + table_name = "marketplace_question" + fp = os.path.join(os.path.dirname(__file__), f"{table_name}.csv.gz") + insert_data_from_csv( + thl_web_rw, fp=fp, table_name=table_name, disable_fk_checks=True + ) + + +@pytest.fixture(scope="session") +def clear_upk_tables(thl_web_rw): + tables = [ + "marketplace_propertyitemrange", + "marketplace_propertymarketplaceassociation", + "marketplace_propertycategoryassociation", + "marketplace_category", + "marketplace_item", + "marketplace_property", + "marketplace_propertycountry", + "marketplace_question", + ] + table_str = ", ".join(tables) + + with thl_web_rw.make_connection() as conn: + with conn.cursor() as c: + c.execute(f"TRUNCATE {table_str} RESTART IDENTITY CASCADE;") + conn.commit() + + +@pytest.fixture(scope="session") +def upk_data( + clear_upk_tables, + category_data, + property_data, + item_data, + propertycategoryassociation_data, + propertycountry_data, + propertymarketplaceassociation_data, + propertyitemrange_data, + question_data, +) -> None: + # Wait a second to make sure the HarmonizerCache refresh loop pulls these in + time.sleep(2) + + +def test_fixtures(upk_data): + pass |
