|
import logging |
|
import sys |
|
import time |
|
|
|
from google.cloud import pubsub |
|
from google.api_core.exceptions import AlreadyExists |
|
|
|
|
|
TOPIC_PATH = 'projects/local-dev/topics/foo' |
|
SUBSCRIPTION_PATH = 'projects/local-dev/subscriptions/bar' |
|
|
|
LOG_FORMAT = '[%(asctime)-15s %(name)s] %(message)s' |
|
logging.basicConfig(format=LOG_FORMAT, stream=sys.stdout, level=logging.DEBUG) |
|
logger = logging.getLogger('Example app') |
|
|
|
|
|
def main(): |
|
initialise_pubsub_mock() |
|
publisher = pubsub.PublisherClient() |
|
|
|
# Start debugger. Once debugger has been entered, pause the pubsub emulator |
|
# (e.g. send it a SIGSTOP) and then continue execution by entering 'c' at |
|
# the debugger. Then once the first call to `publish` has timed out, resume |
|
# the pubsub emulator (e.g. send it a SIGCONT). Two messages will be |
|
# published to the topic, even though the application timed out on the first |
|
# call to `publish`. |
|
import pdb; pdb.set_trace() |
|
|
|
message_future = publisher.publish(TOPIC_PATH, "I'm going to timeout") |
|
|
|
try: |
|
message_future.exception(timeout=3) |
|
except Exception as exception: |
|
logger.info("Exception raised: {}".format(exception)) |
|
|
|
logger.info("About to publish second message") |
|
publisher.publish(TOPIC_PATH, "Second attempt") |
|
logger.info("Returned from second call to publish()") |
|
|
|
|
|
def initialise_pubsub_mock(): |
|
create_pubsub_topic(TOPIC_PATH) |
|
create_pubsub_subscription(TOPIC_PATH, SUBSCRIPTION_PATH) |
|
|
|
|
|
def create_pubsub_topic(topic_path): |
|
publisher = pubsub.PublisherClient() |
|
|
|
try: |
|
topic = publisher.create_topic(topic_path) |
|
except AlreadyExists: |
|
logger.info("Pubsub topic {} already exists, continuing".format(topic_path)) |
|
else: |
|
logger.info("Created pubsub topic {}".format(topic)) |
|
|
|
|
|
def create_pubsub_subscription(topic_path, subscription_path): |
|
subscriber = pubsub.SubscriberClient() |
|
|
|
try: |
|
subscriber.create_subscription(subscription_path, topic_path) |
|
except AlreadyExists: |
|
logger.info( |
|
"Pubsub subscription {} already exists, continuing".format( |
|
subscription_path |
|
) |
|
) |
|
else: |
|
logger.info("Created pubsub subscription {}".format(subscription_path)) |
|
|
|
if __name__ == "__main__": |
|
main() |