Skip to content

Instantly share code, notes, and snippets.

@turicas
Created November 7, 2011 08:06
Show Gist options
  • Save turicas/1344429 to your computer and use it in GitHub Desktop.
Save turicas/1344429 to your computer and use it in GitHub Desktop.
Non-blocking file-like class for read() (Python)
import threading
class NonBlocking(threading.Thread):
def __init__(self, fp):
threading.Thread.__init__(self)
self.fp = fp
self.read_buffer = []
self.running = True
self.start()
def run(self):
while self.running:
byte_read = self.fp.read(1)
self.read_buffer.append(byte_read)
self.fp.close()
def read(self, size=None):
if not len(self.read_buffer):
value = None
else:
if not size:
size = len(self.read_buffer)
value = ''.join(self.read_buffer[:size])
self.read_buffer = self.read_buffer[size:]
return value
def readline(self):
if '\n' in self.read_buffer:
line_feed_index = self.read_buffer.index('\n')
value = ''.join(self.read_buffer[:line_feed_index + 1])
self.read_buffer = self.read_buffer[line_feed_index + 1:]
else:
value = None
return value
def close(self):
self.running = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment