Skip to content

Instantly share code, notes, and snippets.

@atemate
Last active May 28, 2024 21:46
Show Gist options
  • Save atemate/57c29d01ccb4a22fe34290d6d0d49731 to your computer and use it in GitHub Desktop.
Save atemate/57c29d01ccb4a22fe34290d6d0d49731 to your computer and use it in GitHub Desktop.
PubSub request/response with dynamically created subscriptions with filters

Documentation: https://cloud.google.com/pubsub/docs/subscription-message-filter

Usage:

python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Publish:

$ python publisher.py
11237580821027878
11237607528300183
11237851938500420
11237794950102544
11237530420659009
11237682617640967
11237764827784213
11237482585729123
11237609517612153
Published messages to projects/<project-id>/topics/artem-aws-manual.

Subscribe (can also run subscriber_viewer.py in a separate terminal - it does not acknoledge messages, just prints):

$ N=3 python subscriber.py
Created subscription within 5.055 sec: name: "projects/<project-id>/subscriptions/artem-aws-manual-sub-b5f8da6c-4783-4f6b-8c2d-48812264adc5-3"
topic: "projects/<project-id>/topics/artem-aws-manual"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}
expiration_policy {
  ttl {
    seconds: 2678400
  }
}
filter: "attributes.request_id=\"request-3\""
state: ACTIVE

Subscribed within 0.002 sec

Received Message {
  data: b'Message: 10d5b311-c571-40b1-93ed-3cce9b4ec763'
  ordering_key: ''
  attributes: {
    "request_id": "request-3"
  }
}
^C
Subscription deleted within 2.419 sec
Traceback (most recent call last):
  ...
KeyboardInterrupt

Also try to run multiple different subscribers with different N in order to emulate multiple pipelines running in parallel.

from google.cloud import pubsub_v1
from uuid import uuid4
# TODO(developer)
project_id = "..." # dev proj id
topic_id = "artem-aws-manual"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data_str = f"Message: {uuid4()}"
# Data must be a bytestring
data = data_str.encode("utf-8")
# When you publish a message, the client returns a future.
attributes = {"request_id": f"request-{n}"}
future = publisher.publish(topic_path, data, **attributes)
print(future.result())
print(f"Published messages to {topic_path}.")
google-cloud-pubsub
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
from time import perf_counter
from contextlib import contextmanager
from typing import Callable
from uuid import uuid4
N = os.environ["N"] # emulates the pipeline - will receive "request-N"
PROJECT_ID = "..." # dev proj id
TOPIC_ID = "artem-aws-manual"
TIMEOUT = None # none means no timeout
vertex_pipeline_id = uuid4() # here for uniqueness
SUBSCRIPTION_ID = f"{TOPIC_ID}-sub-{vertex_pipeline_id}-{N}"
@contextmanager
def catchtime() -> Callable[[], float]:
t1 = t2 = perf_counter()
yield lambda: t2 - t1
t2 = perf_counter()
subscriber = pubsub_v1.SubscriberClient()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
# The `subscription_path` method creates a fully qualified identifier in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
filter = f'attributes.request_id="request-{N}"'
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}")
message.ack()
with subscriber:
try:
with catchtime() as t:
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path, "filter": filter}
)
print(f"Created subscription within {t():.3f} sec: {subscription}")
with catchtime() as t:
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Subscribed within {t():.3f} sec")
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=TIMEOUT)
except TimeoutError:
print(f"timeout {TIMEOUT} sec")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
finally:
with catchtime() as t:
subscriber.delete_subscription(request={"subscription": subscription_path})
print(f"Subscription deleted within {t():.3f} sec")
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
from time import perf_counter
from contextlib import contextmanager
from typing import Callable
from uuid import uuid4
PROJECT_ID = "..."
TOPIC_ID = "artem-aws-manual"
TIMEOUT = None # none means no timeout
SUBSCRIPTION_ID = f"{TOPIC_ID}-sub-view"
subscriber = pubsub_v1.SubscriberClient()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
# The `subscription_path` method creates a fully qualified identifier in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received (not ack!) {message}")
with subscriber:
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=TIMEOUT)
except TimeoutError:
print(f"timeout {TIMEOUT} sec")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment