-
-
Save walkermatt/2871026 to your computer and use it in GitHub Desktop.
from threading import Timer | |
def debounce(wait): | |
""" Decorator that will postpone a functions | |
execution until after wait seconds | |
have elapsed since the last time it was invoked. """ | |
def decorator(fn): | |
def debounced(*args, **kwargs): | |
def call_it(): | |
fn(*args, **kwargs) | |
try: | |
debounced.t.cancel() | |
except(AttributeError): | |
pass | |
debounced.t = Timer(wait, call_it) | |
debounced.t.start() | |
return debounced | |
return decorator |
import unittest | |
import time | |
from debounce import debounce | |
class TestDebounce(unittest.TestCase): | |
@debounce(10) | |
def increment(self): | |
""" Simple function that | |
increments a counter when | |
called, used to test the | |
debounce function decorator """ | |
self.count += 1 | |
def setUp(self): | |
self.count = 0 | |
def test_debounce(self): | |
""" Test that the increment | |
function is being debounced. | |
The counter should only be incremented | |
once 10 seconds after the last call | |
to the function """ | |
self.assertTrue(self.count == 0) | |
self.increment() | |
self.increment() | |
time.sleep(9) | |
self.assertTrue(self.count == 0) | |
self.increment() | |
self.increment() | |
self.increment() | |
self.increment() | |
self.assertTrue(self.count == 0) | |
time.sleep(10) | |
self.assertTrue(self.count == 1) | |
if __name__ == '__main__': | |
unittest.main() |
Didn't know about threading.Timer
, I like this solution!
An optimization would be to protect the Timer
creation using a threading.Lock
. This would prevent problems with debounce
being called between Timer
's initialization and calling .start()
.
Another would be to use functools.wraps
to preserve docstrings.
Hi everyone ! A few months ago I had the same need to have a working debounce annotation, after stumbling upon this discussion I created this open source project: https://github.com/salesforce/decorator-operations
The idea of the project is to regroup useful annotations such as debounce, throttle, filter... There are only 4 annotations available for now, but you're more than welcome to suggest new features or suggestions on how to improve the existing ones :D
👍 thanks for sharing
@KarlPatach that sounds good 👍
// feature 1
In addition to debounce
and throttle
also has a situation, which can has both feature.
if calls too frequently, the executor may delayed indefinitely.
some states cannot be updated in time.
then wish debounce has a max delay time allow execute one time.
// I don't known how to call it, has a debounce time, and a max delay time
// feature 2
if asyncio version has supported, that will be nice.
// finally
I wonder is them thread safe
@KarlPatach That's awesome 🎉
My implementation (fully-typed and thread-safe):
import threading
from typing import Any, Callable, Optional, TypeVar, cast
class Debouncer:
def __init__(self, f: Callable[..., Any], interval: float):
self.f = f
self.interval = interval
self._timer: Optional[threading.Timer] = None
self._lock = threading.Lock()
def __call__(self, *args, **kwargs) -> None:
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = threading.Timer(self.interval, self.f, args, kwargs)
self._timer.start()
VoidFunction = TypeVar("VoidFunction", bound=Callable[..., None])
def debounce(interval: float):
"""
Wait `interval` seconds before calling `f`, and cancel if called again.
The decorated function will return None immediately,
ignoring the delayed return value of `f`.
"""
def decorator(f: VoidFunction) -> VoidFunction:
if interval <= 0:
return f
return cast(VoidFunction, Debouncer(f, interval))
return decorator
@kylebebak here is a great visualization for understanding throttle vs debounce:
http://demo.nimius.net/debounce_throttle/