Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ static unique_ptr<Expression> BindMakeTypeFunctionExpression(FunctionBindExpress
auto type_name = args.front().second.GetValue<string>();
auto qualified_name = QualifiedName::Parse(type_name);

auto unbound_type = LogicalType::UNBOUND(make_uniq<TypeExpression>(qualified_name.catalog, qualified_name.schema,
qualified_name.name, std::move(type_args)));
auto unbound_type = LogicalType::UNBOUND(make_uniq<TypeExpression>(
qualified_name.Catalog(), qualified_name.Schema(), qualified_name.Name(), std::move(type_args)));

// Bind the unbound type
auto binder = Binder::CreateBinder(input.context);
Expand Down
38 changes: 36 additions & 2 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "column_reader.hpp"

#include "duckdb/common/vector/flat_vector.hpp"
#include "duckdb/common/vector/constant_vector.hpp"

#include <algorithm>
#include <memory>
Expand Down Expand Up @@ -206,6 +207,19 @@ idx_t ColumnReader::GroupRowsAvailable() {
return group_rows_available;
}

bool ColumnReader::AllValuesAreNull() const {
// for repeated columns the null_count/num_values statistics do not reliably indicate that every value is NULL
// (num_values counts leaf slots, not rows), so we only trust this for non-repeated columns
if (MaxRepeat() != 0 || !chunk || !chunk->__isset.meta_data) {
return false;
}
auto &chunk_meta = chunk->meta_data;
if (!chunk_meta.__isset.statistics || !chunk_meta.statistics.__isset.null_count) {
return false;
}
return chunk_meta.statistics.null_count == chunk_meta.num_values;
}

void ColumnReader::PlainSkip(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values) {
throw NotImplementedException("PlainSkip not implemented");
}
Expand Down Expand Up @@ -731,6 +745,20 @@ void ColumnReader::ReadData(idx_t read_now, data_ptr_t define_out, data_ptr_t re
}
// read the defines/repeats
const auto all_valid = PrepareRead(read_now, define_out, repeat_out, result_offset);
if (!IsRoot() && AllValuesAreNull()) {
// every value is NULL: the parent still needs the define/repeat levels we just read, but there are no
// values to decode - set the result to NULL and skip the encoding read
if (result_offset == 0) {
// we own the entire vector - emit a constant NULL
ConstantVector::SetNull(result, count_t(read_now));
} else {
for (idx_t i = 0; i < read_now; i++) {
FlatVector::SetNull(result, result_offset + i, true);
}
}
page_rows_available -= read_now;
return;
}
// read the data according to the encoder
const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
switch (encoding) {
Expand Down Expand Up @@ -790,14 +818,20 @@ idx_t ColumnReader::ReadInternal(ColumnReaderInput &input, Vector &result) {
}

idx_t ColumnReader::Read(ColumnReaderInput &input, Vector &result) {
if (IsRoot() && AllValuesAreNull()) {
// a top-level column that is entirely NULL - emit a constant NULL vector without reading anything.
// (nested columns are handled in ReadData: they still need to emit their define/repeat levels)
ConstantVector::SetNull(result, count_t(input.num_values));
return input.num_values;
}
BeginRead(input.define_out, input.repeat_out);
return ReadInternal(input, result);
}

void ColumnReader::Select(ColumnReaderInput &input, Vector &result, const SelectionVector &sel,
idx_t approved_tuple_count) {
auto &num_values = input.num_values;
if (SupportsDirectSelect() && approved_tuple_count < num_values) {
if (SupportsDirectSelect() && approved_tuple_count < num_values && !(IsRoot() && AllValuesAreNull())) {
DirectSelect(input, result, sel, approved_tuple_count);
return;
}
Expand Down Expand Up @@ -834,7 +868,7 @@ void ColumnReader::Filter(ColumnReaderInput &input, Vector &result, const TableF
TableFilterState &filter_state, SelectionVector &sel, idx_t &approved_tuple_count,
bool is_first_filter) {
auto &num_values = input.num_values;
if (SupportsDirectFilter() && is_first_filter) {
if (SupportsDirectFilter() && is_first_filter && !(IsRoot() && AllValuesAreNull())) {
DirectFilter(input, result, filter, filter_state, sel, approved_tuple_count);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

#include "column_reader.hpp"
#include "parquet_reader.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/helper.hpp"
#include "duckdb/common/operator/multiply.hpp"
#include "duckdb/common/unique_ptr.hpp"
#include "duckdb/common/vector.hpp"
#include "parquet_column_schema.hpp"
Expand All @@ -26,7 +28,12 @@ void DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &
auto decoder = make_uniq<DbpDecoder>(buffer.ptr, buffer.len);
value_count = decoder->TotalValues();
result_buffer.reset();
result_buffer.resize(allocator, sizeof(uint32_t) * value_count);
// value_count is read from the file, so the buffer size can overflow on a corrupt input
idx_t result_size;
if (!TryMultiplyOperator::Operation<idx_t, idx_t, idx_t>(value_count, sizeof(uint32_t), result_size)) {
throw InvalidInputException("DELTA_BYTE_ARRAY value count is too large - corrupt file?");
}
result_buffer.resize(allocator, result_size);
decoder->GetBatch<uint32_t>(result_buffer.ptr, value_count);
decoder->Finalize();
buffer.inc(buffer.len - decoder->BufferPtr().len);
Expand Down
12 changes: 12 additions & 0 deletions src/duckdb/extension/parquet/include/column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ class ColumnReader {
inline bool IsSkipped() const {
return !chunk;
}
//! Set the parent reader (the composite reader this reader is a child of). Top-level readers have no parent.
void SetParent(ColumnReader &parent_p) {
parent = parent_p;
}
//! Whether this is a top-level reader (i.e. it has no parent reader)
bool IsRoot() const {
return !parent;
}
//! Whether every value in the current row group's column chunk is NULL (according to its statistics)
bool AllValuesAreNull() const;

void InitializeCryptoMetadata(const duckdb_parquet::EncryptionAlgorithm &encryption_algorithm,
idx_t row_group_ordinal_p) {
Expand Down Expand Up @@ -364,6 +374,8 @@ class ColumnReader {
void DecompressInternal(CompressionCodec::type codec, const_data_ptr_t src, idx_t src_size, data_ptr_t dst,
idx_t dst_size);
const ColumnChunk *chunk = nullptr;
//! The composite reader this reader is a child of (struct/list/variant/expression). Null for top-level readers.
optional_ptr<ColumnReader> parent;

TProtocol *protocol;
idx_t page_rows_available;
Expand Down
30 changes: 14 additions & 16 deletions src/duckdb/extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,21 @@ struct ParquetPrefetchMetrics {
}
};

//! Where the scan is in its async execution.
enum class ParquetScanState : uint8_t {
SCHEDULE, //! schedule the next row group's I/O
PROCESS, //! process the current row group into a output chunk
RESUME_PAYLOAD, //! resume decoding the payload columns after the filter-column I/O blocked
FINISHED //! the scan is done
};

struct ParquetReaderScanState {
public:
ColumnReader &GetColumnReader(idx_t i);

public:
vector<idx_t> group_idx_list;
int64_t current_group;
//! The row group index this scan state decodes
idx_t group_index;
idx_t offset_in_group;
idx_t group_offset;
shared_ptr<CachingFileHandle> file_handle;
vector<unique_ptr<ColumnReader>> column_readers;
duckdb_base_std::unique_ptr<duckdb_apache::thrift::protocol::TProtocol> thrift_file_proto;

ParquetScanState scan_state;
//! Set while resuming payload-column decode after the filter-column I/O blocked (vs a fresh row-group pass)
bool resuming_payload = false;
SelectionVector sel;

ResizeableBuffer define_buf;
Expand Down Expand Up @@ -334,14 +327,15 @@ class ParquetReader : public BaseFileReader {
LocalTableFunctionState &lstate) override;
void PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p,
LocalTableFunctionState &lstate_p) override;
AsyncResult ScheduleIO(ClientContext &context, GlobalTableFunctionState &gstate,
LocalTableFunctionState &lstate) override;
AsyncResult Scan(ClientContext &context, GlobalTableFunctionState &global_state,
LocalTableFunctionState &local_state, DataChunk &chunk) override;
void FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) override;
double GetProgressInFile(ClientContext &context) override;

public:
void InitializeScan(ClientContext &context, ParquetReaderScanState &state, vector<idx_t> groups_to_read) const;
AsyncResult Scan(ClientContext &context, ParquetReaderScanState &state, DataChunk &output);
void InitializeScan(ClientContext &context, ParquetReaderScanState &state, idx_t group_to_read) const;

idx_t NumRows() const;
idx_t NumRowGroups() const;
Expand Down Expand Up @@ -410,10 +404,14 @@ class ParquetReader : public BaseFileReader {
ParquetPrefetchStrategy ColumnWisePrefetch(ParquetReaderScanState &state, ThriftFileTransport &trans,
const duckdb_parquet::RowGroup &group, bool filters_look_unselective,
bool log_prefetch) const;
//! Switch to the next row group and schedule its I/O (prepare column buffers, prefetch the bytes).
AsyncResult Schedule(ClientContext &context, ParquetReaderScanState &state, DataChunk &result, bool log_prefetch);
//! Register the read-heads to fetch, and select prefetch strategy
ParquetPrefetchStrategy RegisterRowGroupReads(ClientContext &context, ParquetReaderScanState &state);
//! Build the async I/O tasks for the registered read-heads
AsyncResult ScheduleRowGroupReads(ParquetReaderScanState &state, ParquetPrefetchStrategy strategy);
//! Process up to STANDARD_VECTOR_SIZE rows of the current row group into result.
AsyncResult Process(ParquetReaderScanState &state, DataChunk &result, bool log_prefetch);
AsyncResult Process(ClientContext &context, ParquetReaderScanState &state, DataChunk &result);
//! Log and finalize the row group's prefetch metrics
void FinishRowGroup(ClientContext &context, ParquetReaderScanState &state, bool log_prefetch);
//! Process filters
AsyncResult ProcessFilters(ParquetReaderScanState &state, DataChunk &result, idx_t scan_count, uint8_t *define_ptr,
uint8_t *repeat_ptr, bool log_prefetch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ struct ShreddedGroupView {
bool has_typed_value = false;
ParquetGroupKind kind = ParquetGroupKind::LEAF;
LogicalType typed_type;
//! The raw 'typed_value' Vector (LEAF primitive / OBJECT struct / ARRAY list) - used to Reference it
//! directly into a shredded output (see ParquetVariantConversion::ConvertToShredded)
optional_ptr<Vector> typed_value_vec;

//! LEAF: the typed primitive values (type-erased; read typed via GetData<T> where T is known)
UnifiedVectorFormat leaf_format;
Expand Down Expand Up @@ -222,6 +225,13 @@ class ParquetVariantIterator {
//! The (lazily-decoded) Variant metadata of the current row
const VariantMetadata &GetMetadata() const;

//! The recursive view of the Parquet group tree (used by the shredded-conversion path)
const ShreddedGroupView &GetRootView() const {
return root_view;
}
//! Emit the binary value in ['data', 'end') of the current row into the builder (BeginRow must precede)
void EmitBinary(const_data_ptr_t data, const_data_ptr_t end, VariantBuilder &builder) const;

private:
ShreddedGroupView root_view;

Expand All @@ -231,20 +241,11 @@ class ParquetVariantIterator {
mutable unique_ptr<VariantMetadata> current_metadata;
};

//! BuildVariant source wrapping a ParquetVariantIterator (mirrors VariantIteratorSource in core)
struct ParquetVariantIteratorSource {
explicit ParquetVariantIteratorSource(ParquetVariantIterator &iterator) : iterator(iterator) {
}
bool Emit(idx_t row, VariantBuilder &builder);

ParquetVariantIterator &iterator;
};

//! Convert a shredded Parquet VARIANT (metadata + group) into the canonical VARIANT 'result' in a single
//! pass through the shared VariantBuilder
//! Convert a Parquet VARIANT (metadata + group) into DuckDB's SHREDDED VARIANT format: the Parquet
//! typed_value columns are referenced directly where they map exactly, and leftover/binary 'value' data
//! (including the entire value when there is no 'typed_value' at all) goes into the unshredded component.
class ParquetVariantConversion {
public:
static void Convert(Vector &metadata, Vector &group, Vector &result, idx_t count);
//! Convert binary Variant values (each row being the metadata blob followed by the value blob) into the
//! canonical VARIANT 'result' in a single pass
static void ConvertBinary(Vector &metadata_and_value, Vector &result, idx_t count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "column_reader.hpp"
#include "reader/templated_column_reader.hpp"
#include "duckdb/common/types/data_chunk.hpp"

namespace duckdb {

Expand All @@ -33,13 +34,21 @@ class VariantColumnReader : public ColumnReader {

void Skip(idx_t num_values) override;
idx_t GroupRowsAvailable() override;
void Convert(Vector &metadata, Vector &group, Vector &result, idx_t count);
void PrepareChunk(DataChunk &chunk, idx_t &capacity, const vector<LogicalType> &types, idx_t count);
uint64_t TotalCompressedSize() override;
void RegisterPrefetch(ThriftFileTransport &transport, bool allow_merge) override;
static bool TypedValueLayoutToType(const LogicalType &typed_value, LogicalType &logical_type);

protected:
idx_t metadata_reader_idx;
idx_t value_reader_idx;

DataChunk intermediate_chunk;
idx_t intermediate_capacity = 0;

DataChunk shredded_chunk;
idx_t shredded_capacity = 0;
};

} // namespace duckdb
24 changes: 14 additions & 10 deletions src/duckdb/extension/parquet/parquet_multi_file_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,17 @@ struct ParquetReadBindData : public TableFunctionData {
};

struct ParquetReadGlobalState : public GlobalTableFunctionState {
explicit ParquetReadGlobalState(optional_ptr<const PhysicalOperator> op_p)
: row_group_index(0), batch_index(0), op(op_p) {
explicit ParquetReadGlobalState(optional_ptr<const PhysicalOperator> op_p) : row_group_index(0), op(op_p) {
}
//! Index of row group within file currently up for scanning
idx_t row_group_index;
//! Batch index of the next row group to be scanned
idx_t batch_index;
//! (Optional) pointer to physical operator performing the scan
optional_ptr<const PhysicalOperator> op;
};

struct ParquetReadLocalState : public LocalTableFunctionState {
ParquetReaderScanState scan_state;
vector<idx_t> group_indexes;
idx_t group_index;
};

static void ParseFileRowNumberOption(MultiFileReaderBindData &bind_data, ParquetOptions &options,
Expand Down Expand Up @@ -763,15 +760,24 @@ bool ParquetReader::TryInitializeScan(ClientContext &context, GlobalTableFunctio
return false;
}
// The current reader has rowgroups left to be scanned
lstate.group_indexes = {gstate.row_group_index};
lstate.group_index = gstate.row_group_index;
gstate.row_group_index++;
return true;
}

void ParquetReader::PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p,
LocalTableFunctionState &lstate_p) {
auto &gstate = gstate_p.Cast<ParquetReadGlobalState>();
auto &lstate = lstate_p.Cast<ParquetReadLocalState>();
lstate.scan_state.op = gstate.op;
InitializeScan(context, lstate.scan_state, lstate.group_index);
}

AsyncResult ParquetReader::ScheduleIO(ClientContext &context, GlobalTableFunctionState &gstate_p,
LocalTableFunctionState &lstate_p) {
auto &lstate = lstate_p.Cast<ParquetReadLocalState>();
InitializeScan(context, lstate.scan_state, lstate.group_indexes);
auto strategy = RegisterRowGroupReads(context, lstate.scan_state);
return ScheduleRowGroupReads(lstate.scan_state, strategy);
}

void ParquetReader::FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) {
Expand All @@ -789,10 +795,8 @@ AsyncResult ParquetReader::Scan(ClientContext &context, GlobalTableFunctionState
}
}
#endif
auto &gstate = gstate_p.Cast<ParquetReadGlobalState>();
auto &local_state = local_state_p.Cast<ParquetReadLocalState>();
local_state.scan_state.op = gstate.op;
return Scan(context, local_state.scan_state, chunk);
return Process(context, local_state.scan_state, chunk);
}

unique_ptr<MultiFileReaderInterface> ParquetMultiFileInfo::Copy() {
Expand Down
Loading
Loading