From 8c1940445503fd6678d0961600f2be81622793a2 Mon Sep 17 00:00:00 2001 From: Max Nanis Date: Tue, 24 Feb 2026 17:26:15 -0500 Subject: Extensive use of type checking. Movement of pytest conf towards handling managers (for db agnostic unittest). Starting to organize pytests. --- jb/flow/setup_tasks.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 jb/flow/setup_tasks.py (limited to 'jb/flow/setup_tasks.py') diff --git a/jb/flow/setup_tasks.py b/jb/flow/setup_tasks.py new file mode 100644 index 0000000..4664374 --- /dev/null +++ b/jb/flow/setup_tasks.py @@ -0,0 +1,36 @@ +from jb.config import TOPIC_ARN, SUBSCRIPTION +from jb.decorators import SNS_CLIENT, AMT_CLIENT +from jb.config import settings + + +def initial_setup(): + # Run once for initial setup. Not on each server start or anything + subscription = SNS_CLIENT.subscribe( # type: ignore + TopicArn=TOPIC_ARN, + Protocol="https", + Endpoint=f"https://jamesbillings67.com/{settings.sns_path}/", + ReturnSubscriptionArn=True, + ) + + +def check_sns_configuration(): + SNS_CLIENT.get_topic_attributes(TopicArn=TOPIC_ARN) + + # check this TOPIC_ARN exists + # (doesnt have permission, dont need this anyways) + # res = SNS_CLIENT.list_topics() + # arns = {x["TopicArn"] for x in res["Topics"]} + # assert TOPIC_ARN in arns, f"SNS Topic {TOPIC_ARN} doesn't exist!" + + subs = SNS_CLIENT.list_subscriptions_by_topic(TopicArn=TOPIC_ARN) + assert SUBSCRIPTION in subs["Subscriptions"] + + AMT_CLIENT.send_test_event_notification( + Notification={ + "Destination": TOPIC_ARN, + "Transport": "SNS", + "Version": "2006-05-05", + "EventTypes": ["AssignmentSubmitted"], + }, + TestEventType="AssignmentSubmitted", + ) -- cgit v1.2.3