Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit c80c3c8

Browse files
committed
introduce col fetch callback
1 parent 048b75e commit c80c3c8

File tree

1 file changed

+126
-152
lines changed

1 file changed

+126
-152
lines changed

omniscidb/QueryEngine/Execute.cpp

+126-152
Original file line numberDiff line numberDiff line change
@@ -2987,173 +2987,147 @@ FetchResult Executor::fetchChunks(
29872987
std::vector<std::vector<int64_t>> all_num_rows;
29882988
std::vector<std::vector<uint64_t>> all_frag_offsets;
29892989

2990-
// in MT case we want to preserve "the order of insertion" into all_frag_col_buffers
2990+
auto fetch_column_callback = [&](std::shared_ptr<const InputColDescriptor> col_id,
2991+
const std::vector<size_t>& selected_frag_ids,
2992+
std::vector<const int8_t*>& frag_col_buffers,
2993+
const bool parallelized =
2994+
false) -> bool /*empty_frag*/ {
2995+
if (interrupted_.load()) {
2996+
throw QueryExecutionError(ERR_INTERRUPTED);
2997+
}
2998+
const auto fragments_it = all_tables_fragments.find(col_id->getTableRef());
2999+
CHECK(fragments_it != all_tables_fragments.end());
3000+
const auto fragments = fragments_it->second;
3001+
auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3002+
CHECK(it != plan_state_->global_to_local_col_ids_.end());
3003+
CHECK_LT(static_cast<size_t>(it->second),
3004+
plan_state_->global_to_local_col_ids_.size());
3005+
const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3006+
if (!fragments->size()) {
3007+
return true;
3008+
}
3009+
auto memory_level_for_column = memory_level;
3010+
if (plan_state_->columns_to_fetch_.find(*col_id) ==
3011+
plan_state_->columns_to_fetch_.end()) {
3012+
memory_level_for_column = Data_Namespace::CPU_LEVEL;
3013+
}
3014+
if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3015+
// determine if we need special treatment to linearlize multi-frag table
3016+
// i.e., a column that is classified as varlen type, i.e., array
3017+
// for now, we can support more types in this way
3018+
CHECK(!parallelized); // otherwise recursive tbb parallel for with deadlocks
3019+
if (needLinearizeAllFragments(
3020+
*col_id, ra_exe_unit, selected_fragments, memory_level)) {
3021+
bool for_lazy_fetch = false;
3022+
if (plan_state_->columns_to_not_fetch_.find(*col_id) !=
3023+
plan_state_->columns_to_not_fetch_.end()) {
3024+
for_lazy_fetch = true;
3025+
VLOG(2) << "Try to linearize lazy fetch column (col_id: " << col_id->getColId()
3026+
<< ")";
3027+
}
3028+
frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3029+
col_id->getColInfo(),
3030+
all_tables_fragments,
3031+
chunks,
3032+
chunk_iterators,
3033+
for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3034+
for_lazy_fetch ? 0 : device_id,
3035+
device_allocator);
3036+
} else {
3037+
frag_col_buffers[it->second] =
3038+
column_fetcher.getAllTableColumnFragments(col_id->getColInfo(),
3039+
all_tables_fragments,
3040+
memory_level_for_column,
3041+
device_id,
3042+
device_allocator,
3043+
thread_idx);
3044+
}
3045+
} else {
3046+
frag_col_buffers[it->second] =
3047+
column_fetcher.getOneTableColumnFragment(col_id->getColInfo(),
3048+
frag_id,
3049+
all_tables_fragments,
3050+
chunks,
3051+
chunk_iterators,
3052+
memory_level_for_column,
3053+
device_id,
3054+
device_allocator);
3055+
}
3056+
return false;
3057+
};
3058+
3059+
// in MT fetching for GPU, we want to preserve "the order of insertion" into
3060+
// all_frag_col_buffers
29913061
std::vector<std::vector<size_t>> selected_frag_ids_vec;
2992-
if(memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
2993-
std::mutex all_frag;
3062+
if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL) {
29943063
std::atomic<bool> empty_frags{false};
2995-
tbb::task_arena limitedArena(1);
3064+
tbb::task_arena limitedArena(8);
3065+
std::vector<size_t> idx_frags_to_inearize;
29963066
for (const auto& selected_frag_ids : frag_ids_crossjoin) {
29973067
selected_frag_ids_vec.push_back(selected_frag_ids);
3068+
for (const auto& col_id : col_global_ids) {
3069+
CHECK(col_id);
3070+
if (!col_id->isVirtual() &&
3071+
needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3072+
idx_frags_to_inearize.push_back(selected_frag_ids_vec.size() - 1);
3073+
}
3074+
}
29983075
}
29993076
all_frag_col_buffers.resize(selected_frag_ids_vec.size());
30003077

3078+
// Try MT fetching for frags that do not need linearization
30013079
limitedArena.execute([&]() {
3002-
tbb::parallel_for(
3003-
0ul, selected_frag_ids_vec.size(), [&](const size_t idx) {
3004-
const auto& selected_frag_ids = selected_frag_ids_vec[idx];
3005-
std::vector<const int8_t*> frag_col_buffers(
3006-
plan_state_->global_to_local_col_ids_.size());
3007-
for (const auto& col_id : col_global_ids) {
3008-
if (interrupted_.load()) {
3009-
throw QueryExecutionError(ERR_INTERRUPTED);
3010-
}
3011-
CHECK(col_id);
3012-
if (col_id->isVirtual()) {
3013-
continue;
3014-
}
3015-
const auto fragments_it = all_tables_fragments.find(col_id->getTableRef());
3016-
CHECK(fragments_it != all_tables_fragments.end());
3017-
const auto fragments = fragments_it->second;
3018-
auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3019-
CHECK(it != plan_state_->global_to_local_col_ids_.end());
3020-
CHECK_LT(static_cast<size_t>(it->second),
3021-
plan_state_->global_to_local_col_ids_.size());
3022-
const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3023-
if (!fragments->size()) {
3024-
empty_frags = true;
3025-
tbb::task::current_context()->cancel_group_execution();
3026-
}
3027-
auto memory_level_for_column = memory_level;
3028-
if (plan_state_->columns_to_fetch_.find(*col_id) ==
3029-
plan_state_->columns_to_fetch_.end()) {
3030-
memory_level_for_column = Data_Namespace::CPU_LEVEL;
3031-
}
3032-
if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3033-
// determine if we need special treatment to linearlize multi-frag table
3034-
// i.e., a column that is classified as varlen type, i.e., array
3035-
// for now, we can support more types in this way
3036-
all_frag.lock();
3037-
if (needLinearizeAllFragments(
3038-
*col_id, ra_exe_unit, selected_fragments, memory_level)) {
3039-
bool for_lazy_fetch = false;
3040-
if (plan_state_->columns_to_not_fetch_.find(*col_id) !=
3041-
plan_state_->columns_to_not_fetch_.end()) {
3042-
for_lazy_fetch = true;
3043-
VLOG(2) << "Try to linearize lazy fetch column (col_id: "
3044-
<< col_id->getColId() << ")";
3045-
}
3046-
frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3047-
col_id->getColInfo(),
3048-
all_tables_fragments,
3049-
chunks,
3050-
chunk_iterators,
3051-
for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3052-
for_lazy_fetch ? 0 : device_id,
3053-
device_allocator);
3054-
} else {
3055-
frag_col_buffers[it->second] =
3056-
column_fetcher.getAllTableColumnFragments(col_id->getColInfo(),
3057-
all_tables_fragments,
3058-
memory_level_for_column,
3059-
device_id,
3060-
device_allocator,
3061-
/*thread_idx=*/0);
3062-
}
3063-
all_frag.unlock();
3064-
} else {
3065-
LOG(INFO) << "Pushing to idx " << idx;
3066-
frag_col_buffers[it->second] =
3067-
column_fetcher.getOneTableColumnFragment(col_id->getColInfo(),
3068-
frag_id,
3069-
all_tables_fragments,
3070-
chunks,
3071-
chunk_iterators,
3072-
memory_level_for_column,
3073-
device_id,
3074-
device_allocator);
3075-
}
3080+
tbb::parallel_for(0ul, selected_frag_ids_vec.size(), [&](const size_t idx) {
3081+
if (std::find(idx_frags_to_inearize.begin(), idx_frags_to_inearize.end(), idx) ==
3082+
idx_frags_to_inearize.end()) {
3083+
const auto& selected_frag_ids = selected_frag_ids_vec[idx];
3084+
std::vector<const int8_t*> frag_col_buffers(
3085+
plan_state_->global_to_local_col_ids_.size());
3086+
for (const auto& col_id : col_global_ids) {
3087+
CHECK(col_id);
3088+
if (!col_id->isVirtual() &&
3089+
fetch_column_callback(
3090+
col_id, selected_frag_ids, frag_col_buffers, true)) {
3091+
empty_frags = true; // not virtual, but empty frags
3092+
tbb::task::current_context()->cancel_group_execution();
30763093
}
3077-
LOG(INFO) << "frag_col_buffers size to push: " << frag_col_buffers.size();
3078-
all_frag_col_buffers[idx] = frag_col_buffers;
3079-
});
3094+
}
3095+
all_frag_col_buffers[idx] = frag_col_buffers;
3096+
}
3097+
});
30803098
});
30813099
if (empty_frags) {
30823100
return {};
30833101
}
3102+
for (const size_t idx :
3103+
idx_frags_to_inearize) { // linear frags materialization is already
3104+
// parallelized, avoid nested tbb
3105+
const auto& selected_frag_ids = selected_frag_ids_vec[idx];
3106+
std::vector<const int8_t*> frag_col_buffers(
3107+
plan_state_->global_to_local_col_ids_.size());
3108+
for (const auto& col_id : col_global_ids) {
3109+
CHECK(col_id);
3110+
if (!col_id->isVirtual() &&
3111+
fetch_column_callback(col_id, selected_frag_ids, frag_col_buffers)) {
3112+
return {}; // not virtual, but empty frags
3113+
}
3114+
}
3115+
all_frag_col_buffers[idx] = frag_col_buffers;
3116+
}
30843117
} else {
30853118
for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3086-
std::vector<const int8_t*> frag_col_buffers(
3087-
plan_state_->global_to_local_col_ids_.size());
3088-
for (const auto& col_id : col_global_ids) {
3089-
if (interrupted_.load()) {
3090-
throw QueryExecutionError(ERR_INTERRUPTED);
3091-
}
3092-
CHECK(col_id);
3093-
if (col_id->isVirtual()) {
3094-
continue;
3095-
}
3096-
const auto fragments_it = all_tables_fragments.find(col_id->getTableRef());
3097-
CHECK(fragments_it != all_tables_fragments.end());
3098-
const auto fragments = fragments_it->second;
3099-
auto it = plan_state_->global_to_local_col_ids_.find(*col_id);
3100-
CHECK(it != plan_state_->global_to_local_col_ids_.end());
3101-
CHECK_LT(static_cast<size_t>(it->second),
3102-
plan_state_->global_to_local_col_ids_.size());
3103-
const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]];
3104-
if (!fragments->size()) {
3105-
return{};
3106-
}
3107-
auto memory_level_for_column = memory_level;
3108-
if (plan_state_->columns_to_fetch_.find(*col_id) ==
3109-
plan_state_->columns_to_fetch_.end()) {
3110-
memory_level_for_column = Data_Namespace::CPU_LEVEL;
3111-
}
3112-
if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
3113-
// determine if we need special treatment to linearlize multi-frag table
3114-
// i.e., a column that is classified as varlen type, i.e., array
3115-
// for now, we can support more types in this way
3116-
if (needLinearizeAllFragments(
3117-
*col_id, ra_exe_unit, selected_fragments, memory_level)) {
3118-
bool for_lazy_fetch = false;
3119-
if (plan_state_->columns_to_not_fetch_.find(*col_id) !=
3120-
plan_state_->columns_to_not_fetch_.end()) {
3121-
for_lazy_fetch = true;
3122-
VLOG(2) << "Try to linearize lazy fetch column (col_id: "
3123-
<< col_id->getColId() << ")";
3124-
}
3125-
frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments(
3126-
col_id->getColInfo(),
3127-
all_tables_fragments,
3128-
chunks,
3129-
chunk_iterators,
3130-
for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3131-
for_lazy_fetch ? 0 : device_id,
3132-
device_allocator);
3133-
} else {
3134-
frag_col_buffers[it->second] =
3135-
column_fetcher.getAllTableColumnFragments(col_id->getColInfo(),
3136-
all_tables_fragments,
3137-
memory_level_for_column,
3138-
device_id,
3139-
device_allocator,
3140-
thread_idx);
3141-
}
3142-
} else {
3143-
frag_col_buffers[it->second] =
3144-
column_fetcher.getOneTableColumnFragment(col_id->getColInfo(),
3145-
frag_id,
3146-
all_tables_fragments,
3147-
chunks,
3148-
chunk_iterators,
3149-
memory_level_for_column,
3150-
device_id,
3151-
device_allocator);
3152-
}
3153-
}
3154-
selected_frag_ids_vec.push_back(selected_frag_ids);
3155-
all_frag_col_buffers.push_back(frag_col_buffers);
3156-
}
3119+
std::vector<const int8_t*> frag_col_buffers(
3120+
plan_state_->global_to_local_col_ids_.size());
3121+
for (const auto& col_id : col_global_ids) {
3122+
CHECK(col_id);
3123+
if (!col_id->isVirtual() &&
3124+
fetch_column_callback(col_id, selected_frag_ids, frag_col_buffers)) {
3125+
return {}; // not virtual, but empty frags
3126+
}
3127+
}
3128+
selected_frag_ids_vec.push_back(selected_frag_ids);
3129+
all_frag_col_buffers.push_back(frag_col_buffers);
3130+
}
31573131
}
31583132
std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags(
31593133
ra_exe_unit, selected_frag_ids_vec, ra_exe_unit.input_descs, all_tables_fragments);

0 commit comments

Comments
 (0)