Last active
August 29, 2015 14:20
-
-
Save kzinglzy/a0e7ddcc8e775fd00480 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !/usr/bin/env python3 | |
# coding: utf-8 | |
import asyncio | |
import time | |
import fcntl | |
import os | |
import ctypes | |
import errno | |
""" Async file wrapper for asyncio. | |
Just a fork from https://gist.github.com/l04m33/1aa059b1a85c73bc7222. Thanks a lot. | |
""" | |
def is_async(result): | |
return asyncio.iscoroutine(result) or isinstance(result, asyncio.Future) | |
class AsyncFile: | |
""" Async file wrapper. | |
""" | |
MAX_BLOCK_SIZE = 8192 | |
def __init__(self, *, loop=None, file=None, mode='rb'): | |
if 'b' not in file.mode: | |
raise RuntimeError('Only binary mode is supported') | |
fl = fcntl.fcntl(file, fcntl.F_GETFL) | |
if fcntl.fcntl(file, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0: | |
errcode = ctypes.get_errno() | |
raise OSError((errcode, errno.errorcode[errcode])) | |
self.file = file | |
self.loop = loop or asyncio.get_event_loop() | |
self.buffer = bytearray() | |
def seek(self, offset, whence=None): | |
return self.file.seek(offset) if whence is None else \ | |
self.file.seek(offset, whence) | |
@asyncio.coroutine | |
def read(self, n=-1): | |
future = asyncio.Future(loop=self.loop) | |
if n == 0: | |
future.set_result(b'') | |
else: | |
max_size = self.MAX_BLOCK_SIZE | |
read_size = min(max_size, n) if n >= 0 else max_size | |
self.buffer.clear() | |
self.read_handler = self.loop.call_soon(self._read, | |
future, read_size, n) | |
return future | |
def _read(self, future, n, total): | |
try: | |
res = self.file.read(n) | |
except Exception as exc: | |
future.set_exception(exc) | |
else: | |
if res is None: # Blocked | |
self.read_handler = self.loop.call_soon(self._read, | |
future, n, total) | |
elif not res: # EOF | |
future.set_result(bytes(self.buffer)) | |
else: | |
self.buffer.extend(res) | |
if total > 0: | |
read_more = total - len(self.buffer) | |
if read_more <= 0: # Enough | |
res, self.buffer = self.buffer[:n], self.buffer[n:] | |
future.set_result(bytes(res)) | |
else: | |
read_more_size = min(self.MAX_BLOCK_SIZE, read_more) | |
self.read_handler = self.loop.call_soon( | |
self._read, future, read_more_size, total) | |
else: | |
self.read_handler = self.loop.call_soon( | |
self._read, future, self.MAX_BLOCK_SIZE, | |
total) | |
@asyncio.coroutine | |
def write(self, data): | |
future = asyncio.Future(loop=self.loop) | |
if len(data) == 0: | |
future.set_result(0) | |
else: | |
self.write_handler = self.loop.call_soon(self._write, future, | |
data, 0) | |
return future | |
def _write(self, future, data, written): | |
try: | |
size = self.file.write(data) | |
except BlockingIOError: | |
self.write_handler = self.loop.call_soon(self._write, future, | |
data, written) | |
except Exception as exc: | |
future.set_exception(exc) | |
else: | |
total = written + size | |
if size < len(data): | |
data = data[size:] | |
self.write_handler = self.loop.call_soon(self._write, future, | |
data, total) | |
else: | |
future.set_result(total) | |
@asyncio.coroutine | |
def copy_to(self, dest, copy_len=-1): | |
copied_size = 0 | |
max_size = self.MAX_BLOCK_SIZE | |
while copy_len != 0: | |
read_size = min(copy_len, max_size) if copy_len > 0 else max_size | |
rcontent = yield from self.read(read_size) | |
rlen = len(rcontent) | |
if rlen <= 0: | |
break | |
write_res = dest.write(rcontent) | |
if is_async(write_res): | |
yield from write_res | |
copied_size += rlen | |
copy_len = copy_len - len(rcontent) if copy_len > 0 else copy_len | |
return copied_size | |
def close(self): | |
self.file.close() | |
if hasattr(self, 'read_handler'): | |
self.read_handler.cancel() | |
if hasattr(self, 'write_handler'): | |
self.write_handler.cancel() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, trace): | |
self.close() | |
def async_file_read(file_path, n=0, loop=None): | |
""" Async read a file. | |
""" | |
def _read(file, n, output): | |
with open(file_path, 'rb') as f: | |
output.append(f.read(n)) | |
buffer = [] | |
loop = loop or asyncio.get_event_loop() | |
yield from loop.run_in_executor(None, _read, file_path, | |
n or 8192, buffer) | |
return buffer[0] | |
# ------------- TEST --------------- | |
@asyncio.coroutine | |
def test_async_file(): | |
name = '/web/tmp/test.txt' | |
with open(name, 'rb') as f: | |
f = AsyncFile(file=f) | |
res = yield from f.read() | |
return res | |
@asyncio.coroutine | |
def test_async_read(): | |
name = '/web/tmp/test.txt' | |
return (yield from async_file_read(name)) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
t = time.time() | |
for i in range(1000): | |
# loop.run_until_complete(test_async_file()) | |
loop.run_until_complete(test_async_read()) | |
print(time.time() - t) | |
#---------Simple result---------- | |
# Usetime AsyncFile: 1.4s | |
# Usetime async_file_read: 2.4s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment