Skip to content

Instantly share code, notes, and snippets.

@plamut
Created May 10, 2019 17:34
Show Gist options
  • Save plamut/b6f6851dc68212b382ddc02b877b577e to your computer and use it in GitHub Desktop.
Save plamut/b6f6851dc68212b382ddc02b877b577e to your computer and use it in GitHub Desktop.
diff --git a/api_core/google/api_core/bidi.py b/api_core/google/api_core/bidi.py
index 053363a5033..90aa1ccf098 100644
--- a/api_core/google/api_core/bidi.py
+++ b/api_core/google/api_core/bidi.py
@@ -535,13 +535,13 @@ class BackgroundConsumer(object):
# Python 2.7.
with self._wake:
if self._paused:
- _LOGGER.debug("paused, waiting for waking.")
+ _LOGGER.debug("\x1b[0;31m paused, waiting for waking.\x1b[0m")
self._wake.wait()
- _LOGGER.debug("woken.")
+ _LOGGER.debug("\x1b[0;31m woken.\x1b[0m")
- _LOGGER.debug("waiting for recv.")
+ _LOGGER.debug("\x1b[1;31m waiting for recv. \x1b[0m")
response = self._bidi_rpc.recv()
- _LOGGER.debug("recved response.")
+ _LOGGER.debug("\x1b[1;31m recved response.\x1b[0m")
self._on_response(response)
except exceptions.GoogleAPICallError as exc:
@@ -609,11 +609,15 @@ class BackgroundConsumer(object):
"""
with self._wake:
self._paused = True
+ ##############
+ _LOGGER.debug("\x1b[1;34mBackground consumer now PAUSED.\x1b[0m")
+ ################
def resume(self):
"""Resumes the response stream."""
with self._wake:
self._paused = False
+ _LOGGER.debug("\x1b[1;34mBackground consumer now RESUMED, notifying threads.\x1b[0m")
self._wake.notifyAll()
@property
diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
index e41341afab3..ca08c7209e5 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
@@ -114,6 +114,9 @@ class Dispatcher(object):
ack_ids = [item.ack_id for item in items]
request = types.StreamingPullRequest(ack_ids=ack_ids)
+ #################
+ _LOGGER.debug(f"\x1b[1;32mdispatcher sending ACK for {len(ack_ids)} messages\x1b[0m")
+ ###################
self._manager.send(request)
# Remove the message from lease management.
diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
index ba4c7673bcb..953261f4d81 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
@@ -217,11 +217,15 @@ class StreamingPullManager(object):
def maybe_pause_consumer(self):
"""Check the current load and pause the consumer if needed."""
+ #############
+ _LOGGER.debug("Current load: %.2f", self.load)
+ ################
with self._pause_resume_lock:
if self.load >= 1.0:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug(
- "Message backlog over load at %.2f, pausing.", self.load
+ "\x1b[0;36mMessage backlog over load at %.2f, pausing.\x1b[0m",
+ self.load,
)
self._consumer.pause()
@@ -249,10 +253,10 @@ class StreamingPullManager(object):
self._maybe_release_messages()
if self.load < self.flow_control.resume_threshold:
- _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
+ _LOGGER.debug("\x1b[0;36mCurrent load is %.2f, resuming consumer.\x1b[0m", self.load)
self._consumer.resume()
else:
- _LOGGER.debug("Did not resume, current load is %.2f.", self.load)
+ _LOGGER.debug("\x1b[0;36mDid not resume, current load is %.2f.\x1b[0m", self.load)
def _maybe_release_messages(self):
"""Release (some of) the held messages if the current load allows for it.
@@ -277,8 +281,8 @@ class StreamingPullManager(object):
[requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)]
)
_LOGGER.debug(
- "Released held message to leaser, scheduling callback for it, "
- "still on hold %s.",
+ "\x1b[1;37mReleased held message to leaser, scheduling callback for it, "
+ "still on hold %s.\x1b[0m",
self._messages_on_hold.qsize(),
)
self._scheduler.schedule(self._callback, msg)
@@ -477,8 +481,9 @@ class StreamingPullManager(object):
After the messages have all had their ack deadline updated, execute
the callback for each message using the executor.
"""
+
_LOGGER.debug(
- "Processing %s received message(s), currenty on hold %s.",
+ "\x1b[1;37mProcessing %s received message(s), currenty on hold %s.\x1b[0m",
len(response.received_messages),
self._messages_on_hold.qsize(),
)
@@ -511,7 +516,7 @@ class StreamingPullManager(object):
self._messages_on_hold.put(message)
_LOGGER.debug(
- "Scheduling callbacks for %s new messages, new total on hold %s.",
+ "\x1b[1;37mScheduling callbacks for %s new messages, new total on hold %s.\x1b[0m",
len(invoke_callbacks_for),
self._messages_on_hold.qsize(),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment