Skip to content

Instantly share code, notes, and snippets.

@asnr
Last active June 26, 2019 18:28
Show Gist options
  • Save asnr/36b7f6ee7e2f31f50b1d07020e6ba3f9 to your computer and use it in GitHub Desktop.
Save asnr/36b7f6ee7e2f31f50b1d07020e6ba3f9 to your computer and use it in GitHub Desktop.
Reproduce Pub/Sub publish even after timeout

To reproduce this behaviour, download all the files in this gist to a directory and run

$ docker-compose run --rm app python /app/pubsub_client.py

When a python debugger opens up,

  1. switch to another terminal and pause the pubsub emulator (e.g. send it a SIGSTOP).
  2. Then continue the pubsub_client.py execution in the debugger.
  3. Wait for the first call to publish to time out (3 seconds).
  4. Resume the pubsub emulator (e.g. send it a SIGCONT).

Two messages are published to the topic, even though the application timed out on the first call to publish.

version: '2'
services:
pubsub:
build:
dockerfile: Dockerfile.pubsub
context: .
app:
build:
dockerfile: Dockerfile.client
context: .
volumes:
- .:/app
links:
- pubsub
environment:
PUBSUB_EMULATOR_HOST: "pubsub:8999"
PUBSUB_PROJECT_ID: local-dev
FROM python:2.7-jessie
RUN pip install google-cloud-pubsub==0.42.1
FROM google/cloud-sdk:245.0.0-alpine
RUN apk --update add openjdk8-jre
RUN gcloud components install --quiet beta pubsub-emulator
RUN mkdir -p /var/lib/pubsub/data
VOLUME /var/lib/pubsub/data
CMD [ "gcloud", "beta", "emulators", "pubsub", "start", "--data-dir=/var/lib/pubsub/data", "--host-port=0.0.0.0:8999", "--log-http", "--verbosity=debug", "--user-output-enabled" ]
import logging
import sys
import time
from google.cloud import pubsub
from google.api_core.exceptions import AlreadyExists
TOPIC_PATH = 'projects/local-dev/topics/foo'
SUBSCRIPTION_PATH = 'projects/local-dev/subscriptions/bar'
LOG_FORMAT = '[%(asctime)-15s %(name)s] %(message)s'
logging.basicConfig(format=LOG_FORMAT, stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger('Example app')
def main():
initialise_pubsub_mock()
publisher = pubsub.PublisherClient()
# Start debugger. Once debugger has been entered, pause the pubsub emulator
# (e.g. send it a SIGSTOP) and then continue execution by entering 'c' at
# the debugger. Then once the first call to `publish` has timed out, resume
# the pubsub emulator (e.g. send it a SIGCONT). Two messages will be
# published to the topic, even though the application timed out on the first
# call to `publish`.
import pdb; pdb.set_trace()
message_future = publisher.publish(TOPIC_PATH, "I'm going to timeout")
try:
message_future.exception(timeout=3)
except Exception as exception:
logger.info("Exception raised: {}".format(exception))
logger.info("About to publish second message")
publisher.publish(TOPIC_PATH, "Second attempt")
logger.info("Returned from second call to publish()")
def initialise_pubsub_mock():
create_pubsub_topic(TOPIC_PATH)
create_pubsub_subscription(TOPIC_PATH, SUBSCRIPTION_PATH)
def create_pubsub_topic(topic_path):
publisher = pubsub.PublisherClient()
try:
topic = publisher.create_topic(topic_path)
except AlreadyExists:
logger.info("Pubsub topic {} already exists, continuing".format(topic_path))
else:
logger.info("Created pubsub topic {}".format(topic))
def create_pubsub_subscription(topic_path, subscription_path):
subscriber = pubsub.SubscriberClient()
try:
subscriber.create_subscription(subscription_path, topic_path)
except AlreadyExists:
logger.info(
"Pubsub subscription {} already exists, continuing".format(
subscription_path
)
)
else:
logger.info("Created pubsub subscription {}".format(subscription_path))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment