-
-
Save plamut/b6f6851dc68212b382ddc02b877b577e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/api_core/google/api_core/bidi.py b/api_core/google/api_core/bidi.py | |
index 053363a5033..90aa1ccf098 100644 | |
--- a/api_core/google/api_core/bidi.py | |
+++ b/api_core/google/api_core/bidi.py | |
@@ -535,13 +535,13 @@ class BackgroundConsumer(object): | |
# Python 2.7. | |
with self._wake: | |
if self._paused: | |
- _LOGGER.debug("paused, waiting for waking.") | |
+ _LOGGER.debug("\x1b[0;31m paused, waiting for waking.\x1b[0m") | |
self._wake.wait() | |
- _LOGGER.debug("woken.") | |
+ _LOGGER.debug("\x1b[0;31m woken.\x1b[0m") | |
- _LOGGER.debug("waiting for recv.") | |
+ _LOGGER.debug("\x1b[1;31m waiting for recv. \x1b[0m") | |
response = self._bidi_rpc.recv() | |
- _LOGGER.debug("recved response.") | |
+ _LOGGER.debug("\x1b[1;31m recved response.\x1b[0m") | |
self._on_response(response) | |
except exceptions.GoogleAPICallError as exc: | |
@@ -609,11 +609,15 @@ class BackgroundConsumer(object): | |
""" | |
with self._wake: | |
self._paused = True | |
+ ############## | |
+ _LOGGER.debug("\x1b[1;34mBackground consumer now PAUSED.\x1b[0m") | |
+ ################ | |
def resume(self): | |
"""Resumes the response stream.""" | |
with self._wake: | |
self._paused = False | |
+ _LOGGER.debug("\x1b[1;34mBackground consumer now RESUMED, notifying threads.\x1b[0m") | |
self._wake.notifyAll() | |
@property | |
diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | |
index e41341afab3..ca08c7209e5 100644 | |
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | |
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | |
@@ -114,6 +114,9 @@ class Dispatcher(object): | |
ack_ids = [item.ack_id for item in items] | |
request = types.StreamingPullRequest(ack_ids=ack_ids) | |
+ ################# | |
+ _LOGGER.debug(f"\x1b[1;32mdispatcher sending ACK for {len(ack_ids)} messages\x1b[0m") | |
+ ################### | |
self._manager.send(request) | |
# Remove the message from lease management. | |
diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | |
index ba4c7673bcb..953261f4d81 100644 | |
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | |
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | |
@@ -217,11 +217,15 @@ class StreamingPullManager(object): | |
def maybe_pause_consumer(self): | |
"""Check the current load and pause the consumer if needed.""" | |
+ ############# | |
+ _LOGGER.debug("Current load: %.2f", self.load) | |
+ ################ | |
with self._pause_resume_lock: | |
if self.load >= 1.0: | |
if self._consumer is not None and not self._consumer.is_paused: | |
_LOGGER.debug( | |
- "Message backlog over load at %.2f, pausing.", self.load | |
+ "\x1b[0;36mMessage backlog over load at %.2f, pausing.\x1b[0m", | |
+ self.load, | |
) | |
self._consumer.pause() | |
@@ -249,10 +253,10 @@ class StreamingPullManager(object): | |
self._maybe_release_messages() | |
if self.load < self.flow_control.resume_threshold: | |
- _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) | |
+ _LOGGER.debug("\x1b[0;36mCurrent load is %.2f, resuming consumer.\x1b[0m", self.load) | |
self._consumer.resume() | |
else: | |
- _LOGGER.debug("Did not resume, current load is %.2f.", self.load) | |
+ _LOGGER.debug("\x1b[0;36mDid not resume, current load is %.2f.\x1b[0m", self.load) | |
def _maybe_release_messages(self): | |
"""Release (some of) the held messages if the current load allows for it. | |
@@ -277,8 +281,8 @@ class StreamingPullManager(object): | |
[requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)] | |
) | |
_LOGGER.debug( | |
- "Released held message to leaser, scheduling callback for it, " | |
- "still on hold %s.", | |
+ "\x1b[1;37mReleased held message to leaser, scheduling callback for it, " | |
+ "still on hold %s.\x1b[0m", | |
self._messages_on_hold.qsize(), | |
) | |
self._scheduler.schedule(self._callback, msg) | |
@@ -477,8 +481,9 @@ class StreamingPullManager(object): | |
After the messages have all had their ack deadline updated, execute | |
the callback for each message using the executor. | |
""" | |
+ | |
_LOGGER.debug( | |
- "Processing %s received message(s), currenty on hold %s.", | |
+ "\x1b[1;37mProcessing %s received message(s), currenty on hold %s.\x1b[0m", | |
len(response.received_messages), | |
self._messages_on_hold.qsize(), | |
) | |
@@ -511,7 +516,7 @@ class StreamingPullManager(object): | |
self._messages_on_hold.put(message) | |
_LOGGER.debug( | |
- "Scheduling callbacks for %s new messages, new total on hold %s.", | |
+ "\x1b[1;37mScheduling callbacks for %s new messages, new total on hold %s.\x1b[0m", | |
len(invoke_callbacks_for), | |
self._messages_on_hold.qsize(), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment