Created
September 4, 2024 23:26
-
-
Save felipecrv/b6eb548a8a5f6d13a102c591c72af224 to your computer and use it in GitHub Desktop.
Streaming a GZIP'd CSV HTTP response into a stream of Arrow RecordBatches
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request | |
import pyarrow.csv as csv | |
import gzip | |
CSV_FORMAT = 'text/csv' | |
response = urllib.request.urlopen('http://localhost:8000') | |
content_type = response.headers['Content-Type'] | |
if content_type != CSV_FORMAT: | |
raise ValueError(f"Expected {CSV_FORMAT}, got {content_type}") | |
with gzip.GzipFile(fileobj=response, mode="rb") as response_stream: | |
csv_stream = csv.open_csv(response_stream) | |
# stream the RecordBatches | |
for batch in csv_stream: | |
print(batch) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import http.server | |
import gzip | |
from io import BytesIO | |
class GzipCSVHandler(http.server.BaseHTTPRequestHandler): | |
def do_GET(self): | |
# Sample CSV content | |
csv_content = "name,age,city\nJohn Doe,29,New York\nJane Smith,34,Los Angeles\n" | |
# Gzip compression | |
gzip_buffer = BytesIO() | |
with gzip.GzipFile(fileobj=gzip_buffer, mode="wb") as gzip_file: | |
gzip_file.write(csv_content.encode("utf-8")) | |
# Prepare gzipped data | |
gzipped_data = gzip_buffer.getvalue() | |
# Send response headers | |
self.send_response(200) | |
self.send_header("Content-type", "text/csv") | |
self.send_header("Content-Encoding", "gzip") | |
self.send_header("Content-Disposition", "attachment; filename=data.csv.gz") | |
self.send_header("Content-Length", str(len(gzipped_data))) | |
self.end_headers() | |
# Send gzipped CSV data | |
self.wfile.write(gzipped_data) | |
if __name__ == "__main__": | |
server_address = ("", 8000) # Serve on all available interfaces, port 8000 | |
httpd = http.server.HTTPServer(server_address, GzipCSVHandler) | |
print("Serving on port 8000...") | |
httpd.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment