diff --git a/cpp/test/tools/command_e2e_test.cc b/cpp/test/tools/command_e2e_test.cc index de03cf782..97da629e3 100644 --- a/cpp/test/tools/command_e2e_test.cc +++ b/cpp/test/tools/command_e2e_test.cc @@ -255,6 +255,41 @@ TEST(CliE2E, MetaReportsFileSummary) { EXPECT_NE(out.str().find("\ttable\t"), std::string::npos); } +TEST(CliE2E, SketchPrintsHumanReadableLayout) { + Fixture f; + std::ostringstream out; + std::ostringstream err; + int code = tsfile_cli::run_cli({"sketch", f.path}, out, err); + EXPECT_EQ(code, 0) << err.str(); + EXPECT_TRUE(err.str().empty()); + EXPECT_NE(out.str().find("TsFile Sketch"), std::string::npos); + EXPECT_NE(out.str().find("POSITION|\tCONTENT"), std::string::npos); + EXPECT_NE(out.str().find("[magic head] TsFile"), std::string::npos); + EXPECT_NE(out.str().find("[ChunkGroup]"), std::string::npos); + EXPECT_NE(out.str().find("[Chunk Header]"), std::string::npos); + EXPECT_NE(out.str().find("[Page Header]"), std::string::npos); + EXPECT_NE(out.str().find("[TimeseriesMetadata]"), std::string::npos); + EXPECT_NE(out.str().find("[ChunkMetadata]"), std::string::npos); + EXPECT_NE(out.str().find("[TsFileMetadata]"), std::string::npos); + EXPECT_NE(out.str().find("[Bloom Filter Size]"), std::string::npos); + EXPECT_NE(out.str().find("[Bloom Filter]"), std::string::npos); + EXPECT_NE(out.str().find("END of TsFile"), std::string::npos); + EXPECT_EQ(out.str().find("[TsFileProperties]"), std::string::npos); + EXPECT_NE(out.str().find("IndexOfTimerseriesIndex Tree"), + std::string::npos); +} + +TEST(CliE2E, SketchRejectsFormatFlag) { + Fixture f; + std::ostringstream out; + std::ostringstream err; + int code = tsfile_cli::run_cli({"sketch", "-f", "json", f.path}, out, err); + EXPECT_EQ(code, 1); + EXPECT_NE(err.str().find("-f/--format is not valid for sketch"), + std::string::npos) + << err.str(); +} + TEST(CliE2E, CountReportsSeriesCountsAndTotal) { Fixture f; std::ostringstream out; diff --git a/cpp/tools/cli/run_cli.cc b/cpp/tools/cli/run_cli.cc index 27ca751f0..c77fa7a59 100644 --- a/cpp/tools/cli/run_cli.cc +++ b/cpp/tools/cli/run_cli.cc @@ -57,6 +57,7 @@ void print_usage(std::ostream& os) { " cat all rows of a device/table\n" " count number of rows (per series, plus a total)\n" " sample deterministic sample rows (use -n and --seed)\n" + " sketch human-readable file layout/self-check view\n" " write import CSV/TSV rows into a new table tsfile " "(--table, --columns, -o)\n" "Options:\n" @@ -89,9 +90,9 @@ void print_usage(std::ostream& os) { } bool is_known_command(const std::string& c) { - static const std::set kCmds = {"ls", "schema", "meta", - "stats", "head", "cat", - "count", "sample", "write"}; + static const std::set kCmds = { + "ls", "schema", "meta", "stats", "head", + "cat", "count", "sample", "write", "sketch"}; return kCmds.find(c) != kCmds.end(); } @@ -188,8 +189,34 @@ bool validate_write_flags(const ParsedArgs& p, std::ostream& err) { bool validate_read_flag_applicability(const ParsedArgs& p, std::ostream& err) { const std::string& c = p.command; const bool is_row = (c == "head" || c == "cat" || c == "sample"); - const bool scoped = - is_row || c == "schema" || c == "stats" || c == "count"; + const bool scoped = is_row || c == "schema" || c == "stats" || c == "count"; + + if (c == "sketch") { + if (p.format != ParsedArgs::Format::kAuto) { + err << "Error: -f/--format is not valid for sketch\n"; + return false; + } + if (!p.device.empty()) { + err << "Error: -d/--device is not valid for sketch\n"; + return false; + } + if (!p.table.empty()) { + err << "Error: -t/--table is not valid for sketch\n"; + return false; + } + if (!p.measurements.empty()) { + err << "Error: -m/--measurements is not valid for sketch\n"; + return false; + } + if (p.no_header) { + err << "Error: --no-header is not valid for sketch\n"; + return false; + } + if (!p.model.empty()) { + err << "Error: --model is not valid for sketch\n"; + return false; + } + } if (!p.output.empty()) { err << "Error: -o/--output is only valid for write\n"; @@ -299,6 +326,10 @@ int run_cli(const std::vector& args, std::ostream& out, } storage::libtsfile_init(); + if (p.command == "sketch") { + return cmd_sketch(p, out, err); + } + storage::TsFileReader reader; int open_ret = reader.open(p.file); if (open_ret != 0) { diff --git a/cpp/tools/commands/cmd_sketch.cc b/cpp/tools/commands/cmd_sketch.cc new file mode 100644 index 000000000..c6d001c79 --- /dev/null +++ b/cpp/tools/commands/cmd_sketch.cc @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cli/exit_codes.h" +#include "commands/commands.h" +#include "commands/sketch_layout.h" +#include "common/allocator/byte_stream.h" +#include "format/output_format.h" +#include "utils/errno_define.h" + +namespace tsfile_cli { +namespace { + +std::string to_string_i64(int64_t v) { + std::ostringstream ss; + ss << v; + return ss.str(); +} + +std::string to_string_u32(uint32_t v) { + std::ostringstream ss; + ss << v; + return ss.str(); +} + +std::string metadata_node_type_name(storage::MetaIndexNodeType type) { + switch (type) { + case storage::INTERNAL_DEVICE: + return "INTERNAL_DEVICE"; + case storage::LEAF_DEVICE: + return "LEAF_DEVICE"; + case storage::INTERNAL_MEASUREMENT: + return "INTERNAL_MEASUREMENT"; + case storage::LEAF_MEASUREMENT: + return "LEAF_MEASUREMENT"; + default: + return "INVALID_META_NODE_TYPE"; + } +} + +std::string column_category_name(common::ColumnCategory category) { + switch (category) { + case common::ColumnCategory::TAG: + return "TAG"; + case common::ColumnCategory::FIELD: + return "FIELD"; + case common::ColumnCategory::ATTRIBUTE: + return "ATTRIBUTE"; + case common::ColumnCategory::TIME: + return "TIME"; + default: + return "UNKNOWN"; + } +} + +std::string entry_key(const std::shared_ptr& entry) { + if (entry == nullptr) { + return ""; + } + if (entry->is_device_level() && entry->get_device_id() != nullptr) { + return entry->get_device_id()->get_device_name(); + } + return entry->get_name().to_std_string(); +} + +uint32_t java_uvar_int_size(uint32_t value) { + uint32_t position = 1; + while ((value & 0xFFFFFF80) != 0) { + value >>= 7; + position++; + } + return position; +} + +uint32_t entry_serialized_size( + const std::shared_ptr& entry) { + if (entry == nullptr) { + return 0; + } + common::ByteStream bs(128, common::MOD_TSFILE_READER); + if (entry->serialize_to(bs) != common::E_OK) { + return 0; + } + return bs.total_size(); +} + +int signed_marker(char marker) { return static_cast(marker); } + +bool is_one_page_chunk(char chunk_type) { + return (static_cast(chunk_type) & 0x3F) == + static_cast( + storage::ONLY_ONE_PAGE_CHUNK_HEADER_MARKER); +} + +class SketchPrinter { + public: + int run(const ParsedArgs& args, std::ostream& out, std::ostream& err) { + int ret = layout_.load_file(args.file); + if (ret != common::E_OK) { + print_java_load_error(err); + layout_.close(); + return kExitFile; + } + int metadata_ret = common::E_OK; + if (layout_.metadata_loaded_) { + metadata_ret = layout_.collect_metadata_layout(); + if (metadata_ret != common::E_OK) { + layout_.add_blocker("metadata layout parsing stopped: " + + layout_.error_text(metadata_ret)); + } + } + int scan_ret = layout_.scan_data_area(); + if (scan_ret != common::E_OK) { + layout_.add_blocker("data area scan stopped: " + + layout_.error_text(scan_ret)); + } + print(out); + layout_.close(); + if (metadata_ret != common::E_OK || scan_ret != common::E_OK || + !layout_.blockers_.empty()) { + print_java_load_error(err); + return kExitFile; + } + return kExitOk; + } + + private: + void print_java_load_error(std::ostream& err) { + err << "Cannot load file " << layout_.path_ + << " because the file has crashed.\n"; + } + + void print(std::ostream& out) { + out << "-------------------------------- TsFile Sketch " + "--------------------------------\n"; + out << "file path: " << layout_.path_ << "\n"; + out << "file length: " << layout_.file_size_ << "\n"; + print_file_info(out); + print_chunks(out); + print_separator(out); + print_metadata_and_timeseries(out); + print_tsfile_metadata(out); + print_line(out, layout_.file_size_, "END of TsFile"); + print_index_tree(out); + print_blockers(out); + out << "---------------------------------- TsFile Sketch End " + "----------------------------------\n"; + } + + void print_line(std::ostream& out, int64_t pos, + const std::string& content) const { + out << std::setw(20) << pos << "|\t" << content << "\n"; + } + + void print_split(std::ostream& out, const std::string& content) const { + out << "||||||||||||||||||||| " << content << "\n"; + } + + void print_file_info(std::ostream& out) { + out << "\n"; + out << std::setw(20) << "POSITION" + << "|\tCONTENT\n"; + out << std::setw(20) << "--------" + << " \t-------\n"; + print_line(out, 0, "[magic head] " + layout_.head_magic_); + print_line( + out, storage::MAGIC_STRING_TSFILE_LEN, + "[version number] " + + to_string_i64(static_cast(layout_.version_))); + } + + void print_chunks(std::ostream& out) { + for (const ChunkGroupSketch& group : layout_.groups_) { + print_split( + out, + "[ChunkGroup] of " + group.device + ", num of Chunks:" + + to_string_u32(static_cast(group.chunks.size()))); + print_line(out, group.offset, "[ChunkGroup Header]"); + print_line( + out, group.offset, + "\t[marker] " + to_string_i64(static_cast( + storage::CHUNK_GROUP_HEADER_MARKER))); + print_line(out, group.offset + 1, + "\t[deviceID] " + group.device + + " size=" + to_string_u32(group.device_id_size)); + for (const ChunkSketch& chunk : group.chunks) { + print_chunk(out, chunk); + } + print_split(out, "[ChunkGroup] of " + group.device + " ends"); + } + } + + void print_chunk(std::ostream& out, const ChunkSketch& chunk) { + std::string line = "[Chunk] of " + chunk.path; + if (!chunk.statistics.empty()) { + line += ", " + chunk.statistics; + } + print_line(out, chunk.offset, line); + + std::ostringstream header; + header << "\t[Chunk Header] marker=" + << signed_marker(chunk.header.chunk_type_) + << ", measurementID=" << chunk.header.measurement_name_ + << ", dataSize=" << chunk.header.data_size_ + << ", dataType=" << tsdatatype_name(chunk.header.data_type_) + << ", compressionType=" + << compression_name(chunk.header.compression_type_) + << ", encodingType=" + << tsencoding_name(chunk.header.encoding_type_) + << ", size=" << chunk.header_size; + print_line(out, chunk.offset, header.str()); + + int64_t page_pos = chunk.offset + chunk.header_size; + const bool one_page = is_one_page_chunk(chunk.header.chunk_type_); + for (const PageSketch& page : chunk.pages) { + std::ostringstream page_header; + if (one_page) { + page_header << "\t\t[Page Header] " + << " HeaderSize:" << page.header_size + << ", UncompressedSize:" << page.uncompressed_size + << ", CompressedSize:" << page.compressed_size; + print_line(out, page_pos, page_header.str()); + page_pos += page.header_size; + print_line(out, page_pos, + "\t\t[Page Data] Size:" + + to_string_u32(page.compressed_size)); + } else { + const int java_page_id = page.page_id + 1; + page_header << "\t\t[PageHeader-" << java_page_id << "] " + << " HeaderSize:" << page.header_size + << ", UncompressedSize:" << page.uncompressed_size + << ", CompressedSize:" << page.compressed_size; + if (page.has_statistics && !page.statistics.empty()) { + page_header << ", " << page.statistics; + } + print_line(out, page_pos, page_header.str()); + page_pos += page.header_size; + std::ostringstream page_data; + page_data << "\t\t[Page-" << java_page_id + << "] , CompressedSize:" << page.compressed_size; + if (page.has_statistics && !page.statistics.empty()) { + page_data << ", " << page.statistics; + } + print_line(out, page_pos, page_data.str()); + } + page_pos += page.compressed_size; + } + } + + void print_separator(std::ostream& out) { + print_line(out, layout_.separator_offset_, + "[marker] " + to_string_i64(layout_.separator_marker_)); + } + + struct LayoutItem { + int64_t offset = 0; + int kind = 0; // 0 node, 1 timeseries + size_t index = 0; + }; + + void print_metadata_and_timeseries(std::ostream& out) { + std::vector items; + for (size_t i = 0; i < layout_.metadata_nodes_.size(); ++i) { + if (layout_.metadata_nodes_[i].offset >= 0 && + layout_.metadata_nodes_[i].offset < + layout_.file_metadata_pos_) { + LayoutItem item; + item.offset = layout_.metadata_nodes_[i].offset; + item.kind = 0; + item.index = i; + items.push_back(item); + } + } + for (std::map::const_iterator it = + layout_.timeseries_by_offset_.begin(); + it != layout_.timeseries_by_offset_.end(); ++it) { + LayoutItem item; + item.offset = it->first; + item.kind = 1; + item.index = 0; + items.push_back(item); + } + std::sort(items.begin(), items.end(), + [](const LayoutItem& a, const LayoutItem& b) { + if (a.offset != b.offset) { + return a.offset < b.offset; + } + return a.kind < b.kind; + }); + for (const LayoutItem& item : items) { + if (item.kind == 0) { + print_metadata_node(out, + layout_.metadata_nodes_[item.index].offset, + *layout_.metadata_nodes_[item.index].node); + } else { + std::map::const_iterator it = + layout_.timeseries_by_offset_.find(item.offset); + if (it != layout_.timeseries_by_offset_.end()) { + print_timeseries(out, it->second); + } + } + } + } + + void print_timeseries(std::ostream& out, const TimeseriesSketch& ts) { + std::ostringstream ss; + ss << "[TimeseriesMetadata] of " << ts.path + << ", tsDataType:" << ts.data_type + << ", sizeWithoutChunkMetadata:" << ts.size_without_chunk_metadata; + if (!ts.statistics.empty()) { + ss << ", " << ts.statistics; + } + print_line(out, ts.offset, ss.str()); + int64_t chunk_pos = ts.offset + ts.size_without_chunk_metadata; + for (size_t i = 0; i < ts.chunks.size(); ++i) { + std::ostringstream chunk_line; + chunk_line << "\t[ChunkMetadata] offset=" << ts.chunks[i].offset + << ", size=" << ts.chunks[i].size; + print_line(out, chunk_pos, chunk_line.str()); + chunk_pos += ts.chunks[i].size; + } + } + + int64_t print_metadata_node(std::ostream& out, int64_t offset, + const storage::MetaIndexNode& node) { + print_line(out, offset, "[MetadataIndexNode]"); + print_line(out, offset, + "\t childrenCnt=" + to_string_u32(static_cast( + node.children_.size()))); + int64_t pos = + offset + + java_uvar_int_size(static_cast(node.children_.size())); + for (const std::shared_ptr& child : + node.children_) { + std::ostringstream child_line; + child_line << "\t<" << entry_key(child) << ", " + << child->get_offset() << ">"; + print_line(out, pos, child_line.str()); + pos += entry_serialized_size(child); + } + print_line(out, pos, "\tendOffset=" + to_string_i64(node.end_offset_)); + pos += 8; + print_line(out, pos, + "\tnodeType=" + metadata_node_type_name(node.node_type_)); + pos += 1; + return pos; + } + + void print_tsfile_metadata(std::ostream& out) { + print_split(out, "[TsFileMetadata] begins"); + if (!layout_.metadata_loaded_) { + print_line(out, layout_.file_metadata_pos_, + "[TsFileMetadata] "); + print_line(out, layout_.file_size_ - kSketchFileTailSize, + "[TsFileMetadataSize] " + + to_string_u32(layout_.file_metadata_size_)); + print_line(out, + layout_.file_size_ - storage::MAGIC_STRING_TSFILE_LEN, + "[magic tail] " + layout_.tail_magic_); + return; + } + print_line(out, layout_.footer_.table_index_count_offset, + "TableIndexRootCnt=" + + to_string_u32(layout_.footer_.table_index_count)); + for (const FooterRootSketch& root : layout_.footer_.roots) { + print_line(out, root.table_name_offset, + "[Table Name] " + root.table + + ", size=" + to_string_u32(root.table_name_size)); + if (root.node != nullptr) { + print_metadata_node(out, root.node_offset, *root.node); + } + } + + print_line(out, layout_.footer_.table_schema_count_offset, + "TableSchemaCnt=" + + to_string_u32(layout_.footer_.table_schema_count)); + for (const FooterSchemaSketch& schema : layout_.footer_.schemas) { + if (schema.schema != nullptr) { + print_line( + out, schema.schema_offset, + "[TableSchema] " + + table_schema_string(schema.table, *schema.schema) + + ", size=" + to_string_u32(schema.schema_size)); + } + } + + print_line(out, layout_.footer_.meta_offset_offset, + "[Meta Offset] " + + to_string_i64(layout_.footer_.meta_offset_value)); + print_line(out, layout_.footer_.bloom_filter_size_offset, + "[Bloom Filter Size] bit vector byte array length=" + + to_string_u32(layout_.footer_.bloom_filter_data_size) + + to_string_u32(layout_.footer_.bloom_filter_hash_count)); + std::ostringstream bloom; + bloom << "[Bloom Filter] , filterCapacity=" + << layout_.footer_.bloom_filter_size << ", hashFunctionSize=" + << layout_.footer_.bloom_filter_hash_count; + print_line(out, layout_.footer_.bloom_filter_offset, bloom.str()); + + print_split(out, "[TsFileMetadata] ends"); + print_line(out, layout_.file_size_ - kSketchFileTailSize, + "[TsFileMetadataSize] " + + to_string_u32(layout_.file_metadata_size_)); + print_line(out, layout_.file_size_ - storage::MAGIC_STRING_TSFILE_LEN, + "[magic tail] " + layout_.tail_magic_); + } + + std::string measurement_schema_string( + const std::shared_ptr& measurement) { + if (measurement == nullptr) { + return "[]"; + } + std::ostringstream ss; + ss << "[" << measurement->measurement_name_ << "," + << tsdatatype_name(measurement->data_type_) << "," + << tsencoding_name(measurement->encoding_) << ","; + if (!measurement->props_.empty()) { + ss << "{"; + bool first = true; + for (const std::pair& prop : + measurement->props_) { + if (!first) { + ss << ", "; + } + first = false; + ss << prop.first << "=" << prop.second; + } + ss << "}"; + } + ss << "," << compression_name(measurement->compression_type_) << "]"; + return ss.str(); + } + + std::string table_schema_string(const std::string& table, + storage::TableSchema& schema) { + std::ostringstream ss; + ss << "TableSchema{tableName='" << table << "', columnSchemas=["; + std::vector> measurements = + schema.get_measurement_schemas(); + std::vector categories = + schema.get_column_categories(); + for (size_t i = 0; i < measurements.size(); ++i) { + if (i != 0) { + ss << ", "; + } + ss << measurement_schema_string(measurements[i]); + } + ss << "], columnTypes=["; + for (size_t i = 0; i < categories.size(); ++i) { + if (i != 0) { + ss << ", "; + } + ss << column_category_name(categories[i]); + } + ss << "]}"; + return ss.str(); + } + + void print_index_tree(std::ostream& out) { + out << "---------------------------- IndexOfTimerseriesIndex Tree " + "-----------------------------\n"; + for (const FooterRootSketch& root : layout_.footer_.roots) { + if (root.node == nullptr) { + continue; + } + out << root.table << "\n"; + print_tree_node(out, root.node, 0); + } + } + + void print_blockers(std::ostream& out) { + if (layout_.blockers_.empty()) { + return; + } + print_split(out, "[Blockers]"); + out << "Blockers\n"; + for (const std::string& blocker : layout_.blockers_) { + print_line(out, -1, "[BLOCKER] " + blocker); + } + } + + void print_tree_node(std::ostream& out, + const std::shared_ptr& node, + int depth) { + if (node == nullptr) { + return; + } + out << tree_indent(depth) + << "[MetadataIndex:" << metadata_node_type_name(node->node_type_) + << "]\n"; + for (const std::shared_ptr& child : + node->children_) { + out << tree_indent(depth) << "└──────[" << entry_key(child) << "," + << child->get_offset() << "]\n"; + std::map>::iterator + child_node = layout_.node_by_offset_.find(child->get_offset()); + if (node->node_type_ != storage::LEAF_MEASUREMENT && + child_node != layout_.node_by_offset_.end()) { + print_tree_node(out, child_node->second, depth + 1); + } + } + } + + std::string tree_indent(int depth) const { + std::string result = "\t"; + for (int i = 0; i < depth; ++i) { + result += "\t\t"; + } + return result; + } + + SketchLayout layout_; +}; + +} // namespace + +int cmd_sketch(const ParsedArgs& args, std::ostream& out, std::ostream& err) { + SketchPrinter printer; + return printer.run(args, out, err); +} + +} // namespace tsfile_cli diff --git a/cpp/tools/commands/commands.h b/cpp/tools/commands/commands.h index ea7038ff5..6f539b806 100644 --- a/cpp/tools/commands/commands.h +++ b/cpp/tools/commands/commands.h @@ -70,6 +70,7 @@ int cmd_cat(const ParsedArgs& args, storage::TsFileReader& reader, int cmd_sample(const ParsedArgs& args, storage::TsFileReader& reader, OutputFormat fmt, std::ostream& out, std::ostream& err); int cmd_write(const ParsedArgs& args, std::ostream& out, std::ostream& err); +int cmd_sketch(const ParsedArgs& args, std::ostream& out, std::ostream& err); } // namespace tsfile_cli diff --git a/cpp/tools/commands/sketch_layout.cc b/cpp/tools/commands/sketch_layout.cc new file mode 100644 index 000000000..6116572f8 --- /dev/null +++ b/cpp/tools/commands/sketch_layout.cc @@ -0,0 +1,941 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "commands/sketch_layout.h" + +#include +#include +#include +#include + +#include "common/allocator/byte_stream.h" +#include "common/device_id.h" +#include "format/output_format.h" +#include "utils/errno_define.h" + +namespace tsfile_cli { +namespace { + +constexpr int32_t kMinChunkHeaderSize = 7; +constexpr int32_t kMaxHeaderProbeBytes = 1 << 20; +constexpr int32_t kDeviceHeaderProbeBytes = 64 << 10; +const char* const kMetadataMissing = ""; + +std::string to_string_i64(int64_t v) { + std::ostringstream ss; + ss << v; + return ss.str(); +} + +std::string java_bool_string(bool value) { return value ? "true" : "false"; } + +std::string java_floating_string(double value) { + std::ostringstream ss; + ss << std::setprecision(15) << value; + std::string result = ss.str(); + if (result.find('.') == std::string::npos && + result.find('E') == std::string::npos && + result.find('e') == std::string::npos) { + result += ".0"; + } + return result; +} + +std::string java_stat_base(storage::Statistic* stat) { + std::ostringstream ss; + ss << "startTime: " << stat->start_time_ << " endTime: " << stat->end_time_ + << " count: " << stat->count_; + return ss.str(); +} + +template +std::string java_numeric_stat_string(storage::Statistic* stat, T min_value, + T max_value, T first_value, T last_value, + const std::string& sum_value) { + std::ostringstream ss; + ss << java_stat_base(stat) << " [minValue:" << min_value + << ",maxValue:" << max_value << ",firstValue:" << first_value + << ",lastValue:" << last_value << ",sumValue:" << sum_value << "]"; + return ss.str(); +} + +std::string statistic_string(storage::Statistic* stat) { + if (stat == nullptr) { + return std::string(); + } + switch (stat->get_type()) { + case common::VECTOR: + return java_stat_base(stat); + case common::INT32: + case common::DATE: { + storage::Int32Statistic* typed = + dynamic_cast(stat); + return typed == nullptr + ? stat->to_string() + : java_numeric_stat_string( + stat, typed->min_value_, typed->max_value_, + typed->first_value_, typed->last_value_, + to_string_i64(typed->sum_value_)); + } + case common::INT64: + case common::TIMESTAMP: { + storage::Int64Statistic* typed = + dynamic_cast(stat); + return typed == nullptr + ? stat->to_string() + : java_numeric_stat_string( + stat, typed->min_value_, typed->max_value_, + typed->first_value_, typed->last_value_, + java_floating_string(typed->sum_value_)); + } + case common::FLOAT: { + storage::FloatStatistic* typed = + dynamic_cast(stat); + return typed == nullptr + ? stat->to_string() + : java_numeric_stat_string( + stat, java_floating_string(typed->min_value_), + java_floating_string(typed->max_value_), + java_floating_string(typed->first_value_), + java_floating_string(typed->last_value_), + java_floating_string(typed->sum_value_)); + } + case common::DOUBLE: { + storage::DoubleStatistic* typed = + dynamic_cast(stat); + return typed == nullptr + ? stat->to_string() + : java_numeric_stat_string( + stat, java_floating_string(typed->min_value_), + java_floating_string(typed->max_value_), + java_floating_string(typed->first_value_), + java_floating_string(typed->last_value_), + java_floating_string(typed->sum_value_)); + } + case common::BOOLEAN: { + storage::BooleanStatistic* typed = + dynamic_cast(stat); + if (typed == nullptr) { + return stat->to_string(); + } + std::ostringstream ss; + ss << java_stat_base(stat) + << " [firstValue=" << java_bool_string(typed->first_value_) + << ", lastValue=" << java_bool_string(typed->last_value_) + << ", sumValue=" << typed->sum_value_ << "]"; + return ss.str(); + } + case common::TEXT: { + storage::TextStatistic* typed = + dynamic_cast(stat); + if (typed == nullptr) { + return stat->to_string(); + } + std::ostringstream ss; + ss << java_stat_base(stat) + << " [firstValue:" << typed->first_value_.to_std_string() + << ",lastValue:" << typed->last_value_.to_std_string() << "]"; + return ss.str(); + } + case common::STRING: { + storage::StringStatistic* typed = + dynamic_cast(stat); + if (typed == nullptr) { + return stat->to_string(); + } + std::ostringstream ss; + ss << java_stat_base(stat) + << " [firstValue:" << typed->first_value_.to_std_string() + << ", lastValue:" << typed->last_value_.to_std_string() + << ", minValue:" << typed->min_value_.to_std_string() + << ", maxValue:" << typed->max_value_.to_std_string() << "]"; + return ss.str(); + } + case common::BLOB: + return "BlobStatistics{}"; + default: + return stat->to_string(); + } +} + +bool is_chunk_marker(unsigned char marker) { + const unsigned char marker_type = marker & 0x3F; + return marker == static_cast(storage::CHUNK_HEADER_MARKER) || + marker == static_cast( + storage::ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) || + marker_type == + static_cast(storage::CHUNK_HEADER_MARKER) || + marker_type == static_cast( + storage::ONLY_ONE_PAGE_CHUNK_HEADER_MARKER); +} + +bool is_one_page_chunk(char chunk_type) { + return (static_cast(chunk_type) & 0x3F) == + static_cast( + storage::ONLY_ONE_PAGE_CHUNK_HEADER_MARKER); +} + +int64_t read_i64_from_big_endian(char* buffer) { + return static_cast(common::SerializationUtil::read_ui64(buffer)); +} + +uint32_t java_int_string_size(const std::string& s) { + return static_cast(4 + s.size()); +} + +uint32_t java_var_int_size(uint32_t value) { + uint32_t u_value = value << 1; + uint32_t position = 1; + while ((u_value & 0xFFFFFF80) != 0) { + u_value >>= 7; + position++; + } + return position; +} + +uint32_t unsigned_var_int_size(uint32_t value) { + uint32_t position = 1; + while ((value & 0xFFFFFF80) != 0) { + value >>= 7; + position++; + } + return position; +} + +} // namespace + +SketchLayout::SketchLayout() + : pa_(common::MOD_TSFILE_READER), tsfile_meta_(&pa_) { + pa_.init(4096, common::MOD_TSFILE_READER); +} + +SketchLayout::~SketchLayout() { + node_by_offset_.clear(); + metadata_nodes_.clear(); + footer_.roots.clear(); +} + +int SketchLayout::load_file(const std::string& path) { + path_ = path; + int ret = file_.open(path_); + if (ret != common::E_OK) { + return ret; + } + file_size_ = file_.file_size(); + if (file_size_ < kSketchFileHeaderSize + kSketchFileTailSize) { + return common::E_TSFILE_CORRUPTED; + } + + std::vector head; + ret = read_block(0, kSketchFileHeaderSize, &head); + if (ret != common::E_OK) { + return ret; + } + head_magic_.assign(head.data(), storage::MAGIC_STRING_TSFILE_LEN); + version_ = head[storage::MAGIC_STRING_TSFILE_LEN]; + + std::vector tail; + ret = read_block(file_size_ - kSketchFileTailSize, kSketchFileTailSize, + &tail); + if (ret != common::E_OK) { + return ret; + } + file_metadata_size_ = common::SerializationUtil::read_ui32(tail.data()); + tail_magic_.assign(tail.data() + 4, storage::MAGIC_STRING_TSFILE_LEN); + file_metadata_pos_ = file_size_ - + static_cast(file_metadata_size_) - + kSketchFileTailSize; + if (file_metadata_size_ == 0 || + file_metadata_pos_ < kSketchFileHeaderSize || + file_metadata_pos_ >= file_size_) { + return common::E_TSFILE_CORRUPTED; + } + if (file_metadata_size_ > + static_cast(std::numeric_limits::max())) { + add_blocker( + "TsFileMetadata is larger than the C++ sketch reader " + "can buffer; metadata output is truncated."); + return common::E_OK; + } + + ret = read_block(file_metadata_pos_, file_metadata_size_, &metadata_buf_); + if (ret != common::E_OK) { + return ret; + } + common::ByteStream metadata_stream; + metadata_stream.wrap_from(metadata_buf_.data(), + static_cast(metadata_buf_.size())); + ret = tsfile_meta_.deserialize_from(metadata_stream); + if (ret != common::E_OK) { + return ret; + } + if (metadata_stream.read_pos() > metadata_buf_.size()) { + return common::E_TSFILE_CORRUPTED; + } + ret = trace_footer_layout(); + if (ret == common::E_OK) { + metadata_loaded_ = true; + } + return ret; +} + +int SketchLayout::collect_metadata_layout() { + for (auto& root_entry : tsfile_meta_.table_metadata_index_node_map_) { + const std::string& table = root_entry.first; + int64_t offset = -1; + int64_t end_offset = -1; + auto off = root_offsets_by_table_.find(table); + if (off != root_offsets_by_table_.end()) { + offset = off->second; + } + auto end = root_ends_by_table_.find(table); + if (end != root_ends_by_table_.end()) { + end_offset = end->second; + } + add_metadata_node(table, "", offset, end_offset, true, + root_entry.second); + int ret = traverse_index_node(table, root_entry.second, true, + std::shared_ptr()); + if (ret != common::E_OK) { + return ret; + } + } + return common::E_OK; +} + +int SketchLayout::scan_data_area() { + int64_t pos = kSketchFileHeaderSize; + ChunkGroupSketch* current_group = nullptr; + while (pos < file_metadata_pos_) { + unsigned char marker = 0; + int ret = read_marker(pos, &marker); + if (ret != common::E_OK) { + return ret; + } + if (marker == static_cast(storage::SEPARATOR_MARKER)) { + separator_offset_ = pos; + separator_marker_ = marker; + break; + } + if (marker == + static_cast(storage::CHUNK_GROUP_HEADER_MARKER)) { + ChunkGroupSketch group; + group.offset = pos; + ret = parse_chunk_group_header(pos, &group); + if (ret != common::E_OK) { + return ret; + } + groups_.push_back(group); + current_group = &groups_.back(); + pos += 1 + group.device_id_size; + continue; + } + if (marker == + static_cast(storage::OPERATION_INDEX_RANGE)) { + OperationIndexRangeSketch op; + op.offset = pos; + ret = parse_operation_index_range(pos, &op); + if (ret != common::E_OK) { + return ret; + } + operation_ranges_.push_back(op); + pos += 1 + 16; + continue; + } + if (is_chunk_marker(marker)) { + if (current_group == nullptr) { + return common::E_TSFILE_CORRUPTED; + } + ChunkSketch chunk; + ret = parse_chunk(pos, current_group->device, &chunk); + if (ret != common::E_OK) { + return ret; + } + pos += chunk.header_size + chunk.header.data_size_; + current_group->chunks.push_back(chunk); + continue; + } + return common::E_TSFILE_CORRUPTED; + } + if (separator_offset_ < 0) { + return common::E_TSFILE_CORRUPTED; + } + return common::E_OK; +} + +void SketchLayout::close() { file_.close(); } + +void SketchLayout::add_blocker(const std::string& blocker) { + if (std::find(blockers_.begin(), blockers_.end(), blocker) == + blockers_.end()) { + blockers_.push_back(blocker); + } +} + +std::string SketchLayout::error_text(int ret) const { + return std::string(error_code_message(ret)) + " (code " + + to_string_i64(ret) + ")"; +} + +int SketchLayout::read_block(int64_t offset, int64_t len, + std::vector* out) { + if (len < 0) { + return common::E_OUT_OF_RANGE; + } + if (len > static_cast(std::numeric_limits::max())) { + add_blocker("read range [" + to_string_i64(offset) + ", " + + to_string_i64(offset + len) + + ") exceeds the C++ sketch reader buffer limit; output " + "is truncated."); + return common::E_OUT_OF_RANGE; + } + out->assign(static_cast(len), '\0'); + if (len == 0) { + return common::E_OK; + } + int32_t read_len = 0; + int ret = + file_.read(offset, out->data(), static_cast(len), read_len); + if (ret != common::E_OK) { + return ret; + } + return read_len == static_cast(len) ? common::E_OK + : common::E_TSFILE_CORRUPTED; +} + +int SketchLayout::read_marker(int64_t offset, unsigned char* marker) { + char ch = 0; + int32_t read_len = 0; + int ret = file_.read(offset, &ch, 1, read_len); + if (ret != common::E_OK) { + return ret; + } + if (read_len != 1) { + return common::E_TSFILE_CORRUPTED; + } + *marker = static_cast(ch); + return common::E_OK; +} + +int SketchLayout::trace_footer_layout() { + common::ByteStream bs; + bs.wrap_from(metadata_buf_.data(), + static_cast(metadata_buf_.size())); + + int ret = common::E_OK; + footer_.table_index_count_offset = file_metadata_pos_ + bs.read_pos(); + if (RET_FAIL(common::SerializationUtil::read_var_uint( + footer_.table_index_count, bs))) { + return ret; + } + + int64_t java_pos = footer_.table_index_count_offset + 4; + common::PageArena trace_pa(common::MOD_TSFILE_READER); + trace_pa.init(4096, common::MOD_TSFILE_READER); + for (uint32_t i = 0; i < footer_.table_index_count; ++i) { + FooterRootSketch root; + if (RET_FAIL(common::SerializationUtil::read_var_str(root.table, bs))) { + return ret; + } + root.table_name_offset = java_pos; + root.table_name_size = java_int_string_size(root.table); + java_pos += root.table_name_size; + + const int64_t actual_node_offset = file_metadata_pos_ + bs.read_pos(); + root.node_offset = java_pos; + storage::MetaIndexNode trace_node(&trace_pa); + if (RET_FAIL(trace_node.device_deserialize_from(bs))) { + return ret; + } + root.node_size = static_cast( + file_metadata_pos_ + bs.read_pos() - actual_node_offset); + java_pos += root.node_size; + auto node_it = + tsfile_meta_.table_metadata_index_node_map_.find(root.table); + if (node_it != tsfile_meta_.table_metadata_index_node_map_.end()) { + root.node = node_it->second; + } + root_offsets_by_table_[root.table] = root.node_offset; + root_ends_by_table_[root.table] = root.node_offset + root.node_size; + footer_.roots.push_back(root); + } + + footer_.table_schema_count_offset = java_pos; + if (RET_FAIL(common::SerializationUtil::read_var_uint( + footer_.table_schema_count, bs))) { + return ret; + } + java_pos += 4; + for (uint32_t i = 0; i < footer_.table_schema_count; ++i) { + FooterSchemaSketch schema; + if (RET_FAIL( + common::SerializationUtil::read_var_str(schema.table, bs))) { + return ret; + } + schema.table_name_offset = java_pos; + schema.table_name_size = java_int_string_size(schema.table); + const uint32_t actual_schema_start = bs.read_pos(); + schema.schema_offset = java_pos; + storage::TableSchema trace_schema; + if (RET_FAIL(trace_schema.deserialize(bs))) { + return ret; + } + schema.schema_size = bs.read_pos() - actual_schema_start; + java_pos += schema.table_name_size + schema.schema_size; + auto schema_it = tsfile_meta_.table_schemas_.find(schema.table); + if (schema_it != tsfile_meta_.table_schemas_.end()) { + schema.schema = schema_it->second; + } + footer_.schemas.push_back(schema); + } + + footer_.meta_offset_offset = java_pos; + if (RET_FAIL(common::SerializationUtil::read_i64(footer_.meta_offset_value, + bs))) { + return ret; + } + java_pos += 8; + + footer_.bloom_filter_size_offset = java_pos; + if (RET_FAIL(common::SerializationUtil::read_var_uint( + footer_.bloom_filter_data_size, bs))) { + return ret; + } + java_pos += 4; + footer_.bloom_filter_offset = java_pos; + if (bs.remaining_size() < footer_.bloom_filter_data_size) { + return common::E_TSFILE_CORRUPTED; + } + bs.wrapped_buf_advance_read_pos(footer_.bloom_filter_data_size); + java_pos += footer_.bloom_filter_data_size; + if (RET_FAIL(common::SerializationUtil::read_var_uint( + footer_.bloom_filter_size, bs))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::read_var_uint( + footer_.bloom_filter_hash_count, bs))) { + return ret; + } + footer_.bloom_filter_serialized_size = footer_.bloom_filter_data_size + 4; + + footer_.properties_offset = file_metadata_pos_ + bs.read_pos(); + if (RET_FAIL(common::SerializationUtil::read_var_int( + footer_.properties_count, bs))) { + return ret; + } + for (int32_t i = 0; i < footer_.properties_count; ++i) { + FooterPropertySketch prop; + prop.offset = file_metadata_pos_ + bs.read_pos(); + std::string* value = nullptr; + if (RET_FAIL(common::SerializationUtil::read_var_str(prop.key, bs))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::read_var_char_ptr(value, bs))) { + delete value; + return ret; + } + if (value != nullptr) { + prop.value = *value; + delete value; + } + footer_.properties.push_back(prop); + } + + if (bs.read_pos() != metadata_buf_.size()) { + return common::E_TSFILE_CORRUPTED; + } + return common::E_OK; +} + +void SketchLayout::add_metadata_node( + const std::string& table, const std::string& device, int64_t offset, + int64_t end_offset, bool device_entries, + std::shared_ptr node) { + MetadataNodeSketch rec; + rec.table = table; + rec.device = device; + rec.offset = offset; + rec.end_offset = end_offset; + rec.device_entries = device_entries; + rec.node = node; + metadata_nodes_.push_back(rec); + if (offset >= 0) { + node_by_offset_[offset] = node; + } +} + +int SketchLayout::traverse_index_node( + const std::string& table, + const std::shared_ptr& node, bool device_entries, + std::shared_ptr device_id) { + if (node == nullptr) { + return common::E_OK; + } + if (node->node_type_ == storage::LEAF_MEASUREMENT) { + return collect_timeseries_index(table, node, device_id); + } + + for (size_t i = 0; i < node->children_.size(); ++i) { + const std::shared_ptr& child = + node->children_[i]; + int64_t child_end = node->end_offset_; + if (i + 1 < node->children_.size()) { + child_end = node->children_[i + 1]->get_offset(); + } + std::shared_ptr next_device = device_id; + bool child_device_entries = false; + if (node->node_type_ == storage::LEAF_DEVICE) { + next_device = child->get_device_id(); + child_device_entries = false; + } else if (node->node_type_ == storage::INTERNAL_DEVICE) { + child_device_entries = true; + } else if (node->node_type_ == storage::INTERNAL_MEASUREMENT) { + child_device_entries = false; + } else { + return common::E_TSFILE_CORRUPTED; + } + + std::shared_ptr child_node; + int ret = read_index_node(child->get_offset(), child_end, + child_device_entries, &child_node); + if (ret != common::E_OK) { + return ret; + } + std::string device_name = + next_device == nullptr ? "" : next_device->get_device_name(); + add_metadata_node(table, device_name, child->get_offset(), child_end, + child_device_entries, child_node); + ret = traverse_index_node(table, child_node, child_device_entries, + next_device); + if (ret != common::E_OK) { + return ret; + } + } + return common::E_OK; +} + +int SketchLayout::read_index_node( + int64_t offset, int64_t end_offset, bool device_entries, + std::shared_ptr* out) { + if (end_offset <= offset) { + return common::E_TSFILE_CORRUPTED; + } + std::vector buf; + int ret = read_block(offset, end_offset - offset, &buf); + if (ret != common::E_OK) { + return ret; + } + void* node_buf = pa_.alloc(sizeof(storage::MetaIndexNode)); + if (node_buf == nullptr) { + return common::E_OOM; + } + storage::MetaIndexNode* node = new (node_buf) storage::MetaIndexNode(&pa_); + std::shared_ptr holder( + node, storage::MetaIndexNode::self_deleter); + common::ByteStream bs; + bs.wrap_from(buf.data(), static_cast(buf.size())); + ret = device_entries ? node->device_deserialize_from(bs) + : node->deserialize_from(bs); + if (ret != common::E_OK) { + return ret; + } + *out = holder; + return common::E_OK; +} + +int SketchLayout::collect_timeseries_index( + const std::string& table, + const std::shared_ptr& node, + const std::shared_ptr& device_id) { + (void)table; + for (size_t i = 0; i < node->children_.size(); ++i) { + const std::shared_ptr& child = + node->children_[i]; + int64_t start = child->get_offset(); + int64_t end = node->end_offset_; + if (i + 1 < node->children_.size()) { + end = node->children_[i + 1]->get_offset(); + } + if (end <= start) { + return common::E_TSFILE_CORRUPTED; + } + std::vector buf; + int ret = read_block(start, end - start, &buf); + if (ret != common::E_OK) { + return ret; + } + common::ByteStream bs; + bs.wrap_from(buf.data(), static_cast(buf.size())); + while (bs.has_remaining()) { + uint32_t before = bs.read_pos(); + void* ts_buf = pa_.alloc(sizeof(storage::TimeseriesIndex)); + if (ts_buf == nullptr) { + return common::E_OOM; + } + storage::TimeseriesIndex* ts_index = + new (ts_buf) storage::TimeseriesIndex(); + ret = ts_index->deserialize_from(bs, &pa_); + if (ret != common::E_OK) { + return ret; + } + if (bs.read_pos() <= before) { + return common::E_TSFILE_CORRUPTED; + } + add_timeseries_record(start + before, start + bs.read_pos(), + device_id, ts_index); + } + } + return common::E_OK; +} + +void SketchLayout::add_timeseries_record( + int64_t offset, int64_t end_offset, + const std::shared_ptr& device_id, + storage::TimeseriesIndex* ts_index) { + std::string measurement = ts_index->get_measurement_name().to_std_string(); + std::string device = + device_id == nullptr ? std::string() : device_id->get_device_name(); + std::string path = + device.empty() ? measurement : device + "." + measurement; + + TimeseriesSketch rec; + rec.offset = offset; + rec.end_offset = end_offset; + rec.device = device; + rec.measurement = measurement; + rec.path = path; + rec.data_type = tsdatatype_name(ts_index->get_data_type()); + rec.statistics = statistic_string(ts_index->get_statistic()); + rec.size_without_chunk_metadata = + static_cast(end_offset - offset); + common::SimpleList* chunks = + ts_index->get_chunk_meta_list(); + rec.chunk_count = chunks == nullptr ? 0 : chunks->size(); + + if (chunks == nullptr) { + timeseries_by_offset_[offset] = rec; + return; + } + uint32_t chunk_index = 0; + for (common::SimpleList::Iterator it = chunks->begin(); + it != chunks->end(); it++, chunk_index++) { + storage::ChunkMeta* chunk_meta = it.get(); + if (chunk_meta == nullptr) { + continue; + } + ChunkMetadataSketch chunk_rec; + chunk_rec.offset = chunk_meta->offset_of_chunk_header_; + chunk_rec.size = + serialized_chunk_metadata_size(chunk_meta, chunk_index != 0); + if (rec.size_without_chunk_metadata >= chunk_rec.size) { + rec.size_without_chunk_metadata -= chunk_rec.size; + } + rec.chunks.push_back(chunk_rec); + std::string stat = statistic_string(chunk_meta->statistic_); + if (stat.empty() && rec.chunk_count == 1) { + stat = rec.statistics; + } + chunk_path_by_offset_[chunk_meta->offset_of_chunk_header_] = path; + chunk_stat_by_offset_[chunk_meta->offset_of_chunk_header_] = stat; + } + timeseries_by_offset_[offset] = rec; +} + +uint32_t SketchLayout::serialized_chunk_metadata_size( + storage::ChunkMeta* chunk_meta, bool include_statistics) { + if (include_statistics && chunk_meta->statistic_ == nullptr) { + include_statistics = false; + } + common::ByteStream bs(128, common::MOD_TSFILE_READER); + if (chunk_meta->serialize_to(bs, include_statistics) != common::E_OK) { + return 0; + } + return bs.total_size(); +} + +int SketchLayout::parse_chunk_group_header(int64_t marker_offset, + ChunkGroupSketch* group) { + const int64_t remaining = file_metadata_pos_ - marker_offset - 1; + const int32_t read_size = static_cast( + std::min(remaining, kDeviceHeaderProbeBytes)); + if (read_size <= 0) { + return common::E_TSFILE_CORRUPTED; + } + std::vector buf; + int ret = read_block(marker_offset + 1, read_size, &buf); + if (ret != common::E_OK) { + return ret; + } + common::ByteStream bs; + bs.wrap_from(buf.data(), read_size); + std::shared_ptr device_id = + std::make_shared(); + ret = device_id->deserialize(bs); + if (ret != common::E_OK) { + return ret; + } + group->device = device_id->get_device_name(); + group->device_id_size = bs.read_pos(); + if (group->device_id_size == 0) { + return common::E_TSFILE_CORRUPTED; + } + return common::E_OK; +} + +int SketchLayout::parse_operation_index_range(int64_t marker_offset, + OperationIndexRangeSketch* op) { + std::vector buf; + int ret = read_block(marker_offset + 1, 16, &buf); + if (ret != common::E_OK) { + return ret; + } + op->min_plan_index = read_i64_from_big_endian(buf.data()); + op->max_plan_index = read_i64_from_big_endian(buf.data() + 8); + return common::E_OK; +} + +int SketchLayout::parse_chunk(int64_t chunk_offset, const std::string& device, + ChunkSketch* chunk) { + (void)device; + int ret = parse_chunk_header(chunk_offset, chunk); + if (ret != common::E_OK) { + return ret; + } + const int64_t data_offset = chunk_offset + chunk->header_size; + const int64_t total_end = data_offset + chunk->header.data_size_; + if (total_end > file_metadata_pos_) { + return common::E_TSFILE_CORRUPTED; + } + std::vector data; + if (chunk->header.data_size_ > + static_cast(std::numeric_limits::max())) { + add_blocker("chunk data at offset " + to_string_i64(data_offset) + + " exceeds the C++ sketch reader buffer limit; page " + "output is truncated."); + } else { + ret = read_block(data_offset, chunk->header.data_size_, &data); + if (ret != common::E_OK) { + return ret; + } + ret = parse_pages(data_offset, data, chunk); + if (ret != common::E_OK) { + return ret; + } + } + + std::map::const_iterator path_it = + chunk_path_by_offset_.find(chunk_offset); + if (path_it != chunk_path_by_offset_.end()) { + chunk->path = path_it->second; + } else { + chunk->path = kMetadataMissing; + add_blocker("chunk metadata missing for chunk header offset " + + to_string_i64(chunk_offset)); + } + std::map::const_iterator stat_it = + chunk_stat_by_offset_.find(chunk_offset); + if (stat_it != chunk_stat_by_offset_.end()) { + chunk->statistics = stat_it->second; + } + return common::E_OK; +} + +int SketchLayout::parse_chunk_header(int64_t chunk_offset, ChunkSketch* chunk) { + const int64_t remaining = file_metadata_pos_ - chunk_offset; + const int32_t max_probe = static_cast( + std::min(remaining, kMaxHeaderProbeBytes)); + if (max_probe < kMinChunkHeaderSize) { + return common::E_TSFILE_CORRUPTED; + } + int32_t probe = std::min(4096, max_probe); + probe = std::max(probe, kMinChunkHeaderSize); + while (probe <= max_probe) { + std::vector buf; + int ret = read_block(chunk_offset, probe, &buf); + if (ret != common::E_OK) { + return ret; + } + common::ByteStream bs; + bs.wrap_from(buf.data(), probe); + storage::ChunkHeader header; + ret = header.deserialize_from(bs); + if (ret == common::E_OK) { + chunk->offset = chunk_offset; + chunk->header = header; + chunk->header_size = bs.read_pos(); + return common::E_OK; + } + if (probe == max_probe) { + return common::E_TSFILE_CORRUPTED; + } + probe = std::min(max_probe, probe * 2); + } + return common::E_TSFILE_CORRUPTED; +} + +int SketchLayout::parse_pages(int64_t data_offset, + const std::vector& data, + ChunkSketch* chunk) { + common::ByteStream bs; + bs.wrap_from(data.data(), static_cast(data.size())); + const bool one_page = is_one_page_chunk(chunk->header.chunk_type_); + int page_id = 0; + while (bs.has_remaining()) { + const uint32_t header_start = bs.read_pos(); + storage::PageHeader page_header; + int ret = page_header.deserialize_from(bs, !one_page, + chunk->header.data_type_); + if (ret != common::E_OK) { + return ret; + } + const uint32_t header_end = bs.read_pos(); + if (bs.remaining_size() < page_header.compressed_size_) { + return common::E_TSFILE_CORRUPTED; + } + PageSketch page; + page.page_id = page_id++; + page.header_offset = data_offset + header_start; + page.data_offset = data_offset + header_end; + page.uncompressed_size = page_header.uncompressed_size_; + page.compressed_size = page_header.compressed_size_; + const uint32_t actual_header_size = header_end - header_start; + const uint32_t actual_size_prefix = + unsigned_var_int_size(page.uncompressed_size) + + unsigned_var_int_size(page.compressed_size); + const uint32_t java_size_prefix = + java_var_int_size(page.uncompressed_size) + + java_var_int_size(page.compressed_size); + page.header_size = + actual_header_size - actual_size_prefix + java_size_prefix; + page.statistics = statistic_string(page_header.statistic_); + page.has_statistics = page_header.statistic_ != nullptr; + chunk->pages.push_back(page); + bs.wrapped_buf_advance_read_pos(page_header.compressed_size_); + if (one_page) { + break; + } + } + chunk->header.num_of_pages_ = static_cast(chunk->pages.size()); + if (bs.has_remaining()) { + return common::E_TSFILE_CORRUPTED; + } + return common::E_OK; +} + +} // namespace tsfile_cli diff --git a/cpp/tools/commands/sketch_layout.h b/cpp/tools/commands/sketch_layout.h new file mode 100644 index 000000000..26d1d6d14 --- /dev/null +++ b/cpp/tools/commands/sketch_layout.h @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef TSFILE_CLI_SKETCH_LAYOUT_H +#define TSFILE_CLI_SKETCH_LAYOUT_H + +#include +#include +#include +#include +#include + +#include "common/allocator/page_arena.h" +#include "common/constant/tsfile_constant.h" +#include "common/schema.h" +#include "common/tsfile_common.h" +#include "file/read_file.h" + +namespace tsfile_cli { + +constexpr int kSketchFileHeaderSize = storage::MAGIC_STRING_TSFILE_LEN + 1; +constexpr int kSketchFileTailSize = 4 + storage::MAGIC_STRING_TSFILE_LEN; + +struct ChunkMetadataSketch { + int64_t offset = 0; + uint32_t size = 0; +}; + +struct PageSketch { + int page_id = 0; + int64_t header_offset = 0; + int64_t data_offset = 0; + uint32_t header_size = 0; + uint32_t uncompressed_size = 0; + uint32_t compressed_size = 0; + std::string statistics; + bool has_statistics = false; +}; + +struct ChunkSketch { + int64_t offset = 0; + storage::ChunkHeader header; + uint32_t header_size = 0; + std::string path; + std::string statistics; + std::vector pages; +}; + +struct ChunkGroupSketch { + int64_t offset = 0; + std::string device; + uint32_t device_id_size = 0; + std::vector chunks; +}; + +struct OperationIndexRangeSketch { + int64_t offset = 0; + int64_t min_plan_index = 0; + int64_t max_plan_index = 0; +}; + +struct TimeseriesSketch { + int64_t offset = 0; + int64_t end_offset = 0; + uint32_t size_without_chunk_metadata = 0; + std::string device; + std::string measurement; + std::string path; + std::string data_type; + std::string statistics; + uint32_t chunk_count = 0; + std::vector chunks; +}; + +struct MetadataNodeSketch { + int64_t offset = 0; + int64_t end_offset = 0; + std::string table; + std::string device; + bool device_entries = false; + std::shared_ptr node; +}; + +struct FooterRootSketch { + std::string table; + int64_t table_name_offset = 0; + uint32_t table_name_size = 0; + int64_t node_offset = 0; + uint32_t node_size = 0; + std::shared_ptr node; +}; + +struct FooterSchemaSketch { + std::string table; + int64_t table_name_offset = 0; + uint32_t table_name_size = 0; + int64_t schema_offset = 0; + uint32_t schema_size = 0; + std::shared_ptr schema; +}; + +struct FooterPropertySketch { + std::string key; + std::string value; + int64_t offset = 0; +}; + +struct FooterSketch { + int64_t table_index_count_offset = 0; + uint32_t table_index_count = 0; + std::vector roots; + int64_t table_schema_count_offset = 0; + uint32_t table_schema_count = 0; + std::vector schemas; + int64_t meta_offset_offset = 0; + int64_t meta_offset_value = 0; + int64_t bloom_filter_size_offset = 0; + int64_t bloom_filter_offset = 0; + uint32_t bloom_filter_data_size = 0; + uint32_t bloom_filter_size = 0; + uint32_t bloom_filter_hash_count = 0; + uint32_t bloom_filter_serialized_size = 0; + int64_t properties_offset = 0; + int32_t properties_count = 0; + std::vector properties; +}; + +class SketchLayout { + public: + SketchLayout(); + ~SketchLayout(); + + int load_file(const std::string& path); + int collect_metadata_layout(); + int scan_data_area(); + void close(); + + void add_blocker(const std::string& blocker); + std::string error_text(int ret) const; + + std::string path_; + int64_t file_size_ = 0; + std::string head_magic_; + std::string tail_magic_; + char version_ = 0; + uint32_t file_metadata_size_ = 0; + int64_t file_metadata_pos_ = 0; + int64_t separator_offset_ = -1; + unsigned char separator_marker_ = 0; + bool metadata_loaded_ = false; + FooterSketch footer_; + std::vector groups_; + std::vector operation_ranges_; + std::vector metadata_nodes_; + std::vector blockers_; + std::map timeseries_by_offset_; + std::map> node_by_offset_; + + private: + int read_block(int64_t offset, int64_t len, std::vector* out); + int read_marker(int64_t offset, unsigned char* marker); + int trace_footer_layout(); + void add_metadata_node(const std::string& table, const std::string& device, + int64_t offset, int64_t end_offset, + bool device_entries, + std::shared_ptr node); + int traverse_index_node(const std::string& table, + const std::shared_ptr& node, + bool device_entries, + std::shared_ptr device_id); + int read_index_node(int64_t offset, int64_t end_offset, bool device_entries, + std::shared_ptr* out); + int collect_timeseries_index( + const std::string& table, + const std::shared_ptr& node, + const std::shared_ptr& device_id); + void add_timeseries_record( + int64_t offset, int64_t end_offset, + const std::shared_ptr& device_id, + storage::TimeseriesIndex* ts_index); + uint32_t serialized_chunk_metadata_size(storage::ChunkMeta* chunk_meta, + bool include_statistics); + int parse_chunk_group_header(int64_t marker_offset, + ChunkGroupSketch* group); + int parse_operation_index_range(int64_t marker_offset, + OperationIndexRangeSketch* op); + int parse_chunk(int64_t chunk_offset, const std::string& device, + ChunkSketch* chunk); + int parse_chunk_header(int64_t chunk_offset, ChunkSketch* chunk); + int parse_pages(int64_t data_offset, const std::vector& data, + ChunkSketch* chunk); + + common::PageArena pa_; + storage::TsFileMeta tsfile_meta_; + storage::ReadFile file_; + std::vector metadata_buf_; + std::map chunk_path_by_offset_; + std::map chunk_stat_by_offset_; + std::map root_offsets_by_table_; + std::map root_ends_by_table_; +}; + +} // namespace tsfile_cli + +#endif // TSFILE_CLI_SKETCH_LAYOUT_H