diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index c21eb913c38..84ee62fe9e8 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -423,10 +423,12 @@ static Result> ReadMessageInternal( body, file->ReadAt(offset + metadata_length, decoder.next_required_size())); } - if (body->size() < decoder.next_required_size()) { - return Status::IOError("Expected to be able to read ", - decoder.next_required_size(), - " bytes for message body, got ", body->size()); + if (body->size() != decoder.next_required_size()) { + // The streaming decoder got out of sync with the actual advertised + // metadata and body size, which signals an invalid IPC file. + return Status::IOError("Invalid IPC file: advertised body size is ", body->size(), + ", but message decoder expects to read ", + decoder.next_required_size(), " bytes instead"); } RETURN_NOT_OK(decoder.Consume(body)); return result; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index b79fbf6dd71..a4883c25b3c 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -624,6 +624,7 @@ Result> LoadRecordBatchSubset( FieldVector filtered_fields; std::shared_ptr filtered_schema; + // TODO factor this out? for (int i = 0; i < schema->num_fields(); ++i) { const Field& field = *schema->field(i); if (!inclusion_mask || (*inclusion_mask)[i]) { @@ -645,6 +646,8 @@ Result> LoadRecordBatchSubset( } } + // TODO factor out these steps? + // Dictionary resolution needs to happen on the unfiltered columns, // because fields are mapped structurally (by path in the original schema). RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, diff --git a/testing b/testing index a871ddc17a4..249079a810c 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit a871ddc17a4dd936b7aa43898d59f86a11c3a2b5 +Subproject commit 249079a810caedda6898464003c7ef8a47efeeae