Created
April 3, 2024 20:26
-
-
Save westonpace/04f33da51931e0a990997d9ac81b623c to your computer and use it in GitHub Desktop.
Parquet Take Implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use arrow_array::RecordBatch; | |
use parquet::arrow::{ | |
arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection, RowSelector}, | |
ProjectionMask, | |
}; | |
struct IndicesToRowSelection<'a, I: Iterator<Item = &'a u32>> { | |
iter: I, | |
start: u32, | |
end: u32, | |
last: u32, | |
next: Option<RowSelector>, | |
} | |
impl<'a, I: Iterator<Item = &'a u32>> Iterator for IndicesToRowSelection<'a, I> { | |
type Item = RowSelector; | |
fn next(&mut self) -> Option<Self::Item> { | |
if let Some(next) = self.next.take() { | |
return Some(next); | |
} | |
loop { | |
let next_idx = self.iter.next(); | |
if let Some(next_idx) = next_idx { | |
if *next_idx < self.start { | |
continue; | |
} | |
if *next_idx >= self.end { | |
return None; | |
} | |
let to_skip = *next_idx - self.last; | |
self.last = *next_idx + 1; | |
if to_skip > 0 { | |
self.next = Some(RowSelector::select(1)); | |
return Some(RowSelector::skip(to_skip as usize)); | |
} else { | |
return Some(RowSelector::select(1)); | |
} | |
} else { | |
return None; | |
} | |
} | |
} | |
} | |
pub mod sync { | |
use std::fs::File; | |
use parquet::{ | |
arrow::arrow_reader::ParquetRecordBatchReaderBuilder, file::reader::ChunkReader, | |
}; | |
use super::*; | |
pub fn take_task<T: ChunkReader + 'static>( | |
file: T, | |
metadata: ArrowReaderMetadata, | |
row_group_number: u32, | |
row_indices: &[u32], | |
column_indices: &[u32], | |
use_selection: bool, | |
) -> Vec<RecordBatch> { | |
let start = if row_group_number == 0 { | |
0 | |
} else { | |
(0..row_group_number) | |
.map(|rg_num| metadata.metadata().row_group(rg_num as usize).num_rows()) | |
.sum() | |
}; | |
let end = start | |
+ metadata | |
.metadata() | |
.row_group(row_group_number as usize) | |
.num_rows(); | |
let selection = IndicesToRowSelection { | |
iter: row_indices.iter(), | |
start: start as u32, | |
end: end as u32, | |
last: start as u32, | |
next: None, | |
}; | |
let selection = RowSelection::from_iter(selection); | |
if !selection.selects_any() { | |
return Vec::new(); | |
} | |
let builder = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata); | |
let parquet_schema = builder.parquet_schema(); | |
let projection = ProjectionMask::roots( | |
parquet_schema, | |
column_indices.iter().map(|col_idx| *col_idx as usize), | |
); | |
if use_selection { | |
let reader = builder | |
.with_limit(row_indices.len()) | |
.with_row_groups(vec![row_group_number as usize]) | |
.with_row_selection(selection) | |
.with_projection(projection) | |
.build() | |
.unwrap(); | |
reader | |
.collect::<std::result::Result<Vec<_>, arrow_schema::ArrowError>>() | |
.unwrap() | |
} else { | |
let reader = builder.with_projection(projection).build().unwrap(); | |
reader | |
.collect::<std::result::Result<Vec<_>, arrow_schema::ArrowError>>() | |
.unwrap() | |
} | |
} | |
pub trait TryClone { | |
fn try_clone(&self) -> std::io::Result<Self> | |
where | |
Self: Sized; | |
} | |
impl TryClone for File { | |
fn try_clone(&self) -> std::io::Result<Self> | |
where | |
Self: Sized, | |
{ | |
self.try_clone() | |
} | |
} | |
pub fn take<T: ChunkReader + TryClone + 'static>( | |
file: T, | |
row_indices: &[u32], | |
column_indices: &[u32], | |
use_selection: bool, | |
metadata: Option<ArrowReaderMetadata>, | |
) -> Vec<RecordBatch> { | |
std::thread::scope(|scope| { | |
let metadata = metadata.unwrap_or_else(|| { | |
let options = ArrowReaderOptions::new().with_page_index(true); | |
ArrowReaderMetadata::load(&file, options).unwrap() | |
}); | |
let task_handles = (0..metadata.metadata().num_row_groups()) | |
.map(|row_group_number| { | |
let file = file.try_clone().unwrap(); | |
let metadata = metadata.clone(); | |
scope.spawn(move || { | |
take_task( | |
file, | |
metadata, | |
row_group_number as u32, | |
row_indices, | |
column_indices, | |
use_selection, | |
) | |
}) | |
}) | |
.collect::<Vec<_>>(); | |
task_handles | |
.into_iter() | |
.flat_map(|handle| handle.join().unwrap().into_iter()) | |
.collect::<Vec<_>>() | |
}) | |
} | |
} | |
pub mod r#async { | |
use async_trait::async_trait; | |
use futures::TryStreamExt; | |
use parquet::arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}; | |
use super::*; | |
pub async fn take_task<T: AsyncFileReader + Send + TryClone + Unpin + 'static>( | |
file: T, | |
metadata: ArrowReaderMetadata, | |
row_indices: &[u32], | |
column_indices: &[u32], | |
use_selection: bool, | |
) -> Vec<RecordBatch> { | |
let total_num_rows = metadata | |
.metadata() | |
.row_groups() | |
.iter() | |
.map(|rg| rg.num_rows()) | |
.sum::<i64>(); | |
let selection = IndicesToRowSelection { | |
iter: row_indices.iter(), | |
start: 0, | |
end: total_num_rows as u32, | |
last: 0 as u32, | |
next: None, | |
}; | |
let selection = RowSelection::from_iter(selection); | |
if !selection.selects_any() { | |
return Vec::new(); | |
} | |
let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(file, metadata); | |
let parquet_schema = builder.parquet_schema(); | |
let projection = ProjectionMask::roots( | |
parquet_schema, | |
column_indices.iter().map(|col_idx| *col_idx as usize), | |
); | |
if use_selection { | |
let reader = builder | |
.with_limit(row_indices.len()) | |
.with_row_selection(selection) | |
.with_projection(projection) | |
.build() | |
.unwrap(); | |
reader.try_collect::<Vec<_>>().await.unwrap() | |
} else { | |
let reader = builder.with_projection(projection).build().unwrap(); | |
reader.try_collect::<Vec<_>>().await.unwrap() | |
} | |
} | |
#[async_trait] | |
pub trait TryClone { | |
async fn try_clone(&self) -> std::io::Result<Self> | |
where | |
Self: Sized; | |
} | |
#[async_trait] | |
impl TryClone for tokio::fs::File { | |
async fn try_clone(&self) -> std::io::Result<Self> | |
where | |
Self: Sized, | |
{ | |
self.try_clone().await | |
} | |
} | |
pub async fn take<T: AsyncFileReader + Unpin + TryClone + 'static>( | |
mut file: T, | |
row_indices: &[u32], | |
column_indices: &[u32], | |
use_selection: bool, | |
metadata: Option<ArrowReaderMetadata>, | |
) -> Vec<RecordBatch> { | |
let metadata = if metadata.is_some() { | |
metadata.unwrap() | |
} else { | |
let options = ArrowReaderOptions::new().with_page_index(true); | |
ArrowReaderMetadata::load_async(&mut file, options) | |
.await | |
.unwrap() | |
}; | |
take_task(file, metadata, row_indices, column_indices, use_selection).await | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use std::path::Path; | |
use super::*; | |
#[tokio::test] | |
async fn test_take_selection_async() { | |
let path_str = "/tmp/input_rgs_100000.parquet"; | |
let path = Path::new(path_str); | |
let file = tokio::fs::OpenOptions::new() | |
.read(true) | |
.open(path) | |
.await | |
.unwrap(); | |
r#async::take(file, &[1], &[3], true, None).await; | |
} | |
#[tokio::test] | |
async fn test_take_no_selection_async() { | |
let path_str = "/tmp/input_rgs_100000.parquet"; | |
let path = Path::new(path_str); | |
let file = tokio::fs::OpenOptions::new() | |
.read(true) | |
.open(path) | |
.await | |
.unwrap(); | |
r#async::take(file, &[1], &[3], false, None).await; | |
} | |
#[test] | |
fn test_take_selection_sync() { | |
let path_str = "/tmp/input_rgs_100000.parquet"; | |
let path = Path::new(path_str); | |
let file = std::fs::OpenOptions::new().read(true).open(path).unwrap(); | |
sync::take(file, &[1], &[3], true, None); | |
} | |
#[test] | |
fn test_take_no_selection_sync() { | |
let path_str = "/tmp/input_rgs_100000.parquet"; | |
let path = Path::new(path_str); | |
let file = std::fs::OpenOptions::new().read(true).open(path).unwrap(); | |
sync::take(file, &[1], &[3], false, None); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment