From d4dff2cbe9f7575ced1ef8cf3708039e90bed214 Mon Sep 17 00:00:00 2001 From: Artem Kroviakkov Date: Mon, 13 Nov 2023 09:14:46 -0700 Subject: [PATCH 1/3] MT GPU fetch, l0 async --- omniscidb/BufferProvider/BufferProvider.h | 2 +- omniscidb/CudaMgr/CudaMgr.cpp | 13 +- omniscidb/CudaMgr/CudaMgr.h | 9 +- omniscidb/CudaMgr/CudaMgrNoCuda.cpp | 9 +- .../DataMgr/Allocators/DeviceAllocator.h | 1 + omniscidb/DataMgr/Allocators/GpuAllocator.cpp | 4 + omniscidb/DataMgr/Allocators/GpuAllocator.h | 1 + .../BufferMgr/CpuBufferMgr/CpuBuffer.cpp | 3 +- omniscidb/DataMgr/DataMgrBufferProvider.cpp | 10 +- omniscidb/DataMgr/DataMgrBufferProvider.h | 2 +- omniscidb/DataMgr/GpuMgr.h | 7 +- omniscidb/L0Mgr/L0Mgr.cpp | 116 +++++++++- omniscidb/L0Mgr/L0Mgr.h | 51 ++++- omniscidb/L0Mgr/L0MgrNoL0.cpp | 19 ++ omniscidb/QueryEngine/ColumnFetcher.cpp | 21 +- omniscidb/QueryEngine/ColumnFetcher.h | 11 +- omniscidb/QueryEngine/Compiler/Backend.cpp | 4 +- omniscidb/QueryEngine/Execute.cpp | 215 ++++++++++++------ omniscidb/QueryEngine/Execute.h | 2 +- .../QueryEngine/JoinHashTable/HashJoin.cpp | 1 - .../QueryEngine/QueryExecutionContext.cpp | 9 +- omniscidb/QueryEngine/RelAlgExecutor.cpp | 1 - 22 files changed, 385 insertions(+), 126 deletions(-) diff --git a/omniscidb/BufferProvider/BufferProvider.h b/omniscidb/BufferProvider/BufferProvider.h index 15027b48ec..374e40496f 100644 --- a/omniscidb/BufferProvider/BufferProvider.h +++ b/omniscidb/BufferProvider/BufferProvider.h @@ -44,7 +44,7 @@ class BufferProvider { const int8_t* host_ptr, const size_t num_bytes, const int device_id) const = 0; - virtual void synchronizeStream(const int device_id) const = 0; + virtual void synchronizeDeviceDataStream(const int device_id) const = 0; virtual void copyFromDevice(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/CudaMgr/CudaMgr.cpp b/omniscidb/CudaMgr/CudaMgr.cpp index b3ec362bb3..6ea0a9a9fd 100644 --- a/omniscidb/CudaMgr/CudaMgr.cpp +++ b/omniscidb/CudaMgr/CudaMgr.cpp @@ -112,6 +112,17 @@ void CudaMgr::copyHostToDevice(int8_t* device_ptr, cuMemcpyHtoD(reinterpret_cast(device_ptr), host_ptr, num_bytes)); } +void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + if constexpr (async_data_load_available) { + copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num); + } else { + copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num); + } +} + void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, @@ -120,7 +131,7 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr, checkError(cuMemcpyHtoDAsync( reinterpret_cast(device_ptr), host_ptr, num_bytes, stream_)); } -void CudaMgr::synchronizeStream(const int device_num) { +void CudaMgr::synchronizeDeviceDataStream(const int device_num) { setContext(device_num); checkError(cuStreamSynchronize(stream_)); } diff --git a/omniscidb/CudaMgr/CudaMgr.h b/omniscidb/CudaMgr/CudaMgr.h index 17ba40c280..794c018208 100644 --- a/omniscidb/CudaMgr/CudaMgr.h +++ b/omniscidb/CudaMgr/CudaMgr.h @@ -96,12 +96,17 @@ class CudaMgr : public GpuMgr { const size_t num_bytes, const int device_num) override; + void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) override; + void copyHostToDeviceAsync(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, const int device_num) override; - void synchronizeStream(const int device_num) override; + void synchronizeDeviceDataStream(const int device_num) override; void copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, @@ -289,7 +294,7 @@ class CudaMgr : public GpuMgr { omnisci::DeviceGroup device_group_; std::vector device_contexts_; mutable std::mutex device_cleanup_mutex_; - static constexpr bool async_data_load_available{true}; + static constexpr bool async_data_load_available{false}; }; } // Namespace CudaMgr_Namespace diff --git a/omniscidb/CudaMgr/CudaMgrNoCuda.cpp b/omniscidb/CudaMgr/CudaMgrNoCuda.cpp index 4a9969e363..342ead5541 100644 --- a/omniscidb/CudaMgr/CudaMgrNoCuda.cpp +++ b/omniscidb/CudaMgr/CudaMgrNoCuda.cpp @@ -43,7 +43,14 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr, CHECK(false); } -void CudaMgr::synchronizeStream(const int device_num) { +void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + CHECK(false); +} + +void CudaMgr::synchronizeDeviceDataStream(const int device_num) { CHECK(false); } diff --git a/omniscidb/DataMgr/Allocators/DeviceAllocator.h b/omniscidb/DataMgr/Allocators/DeviceAllocator.h index 162deb6449..4d53ee1618 100644 --- a/omniscidb/DataMgr/Allocators/DeviceAllocator.h +++ b/omniscidb/DataMgr/Allocators/DeviceAllocator.h @@ -53,4 +53,5 @@ class DeviceAllocator : public Allocator { virtual void setDeviceMem(int8_t* device_ptr, unsigned char uc, const size_t num_bytes) const = 0; + virtual void sync() = 0; }; diff --git a/omniscidb/DataMgr/Allocators/GpuAllocator.cpp b/omniscidb/DataMgr/Allocators/GpuAllocator.cpp index fb3208f369..1f9f49b4dc 100644 --- a/omniscidb/DataMgr/Allocators/GpuAllocator.cpp +++ b/omniscidb/DataMgr/Allocators/GpuAllocator.cpp @@ -84,3 +84,7 @@ void GpuAllocator::setDeviceMem(int8_t* device_ptr, const size_t num_bytes) const { buffer_provider_->setDeviceMem(device_ptr, uc, num_bytes, device_id_); } + +void GpuAllocator::sync() { + buffer_provider_->synchronizeDeviceDataStream(device_id_); +} \ No newline at end of file diff --git a/omniscidb/DataMgr/Allocators/GpuAllocator.h b/omniscidb/DataMgr/Allocators/GpuAllocator.h index c9757f80f7..7e6040157b 100644 --- a/omniscidb/DataMgr/Allocators/GpuAllocator.h +++ b/omniscidb/DataMgr/Allocators/GpuAllocator.h @@ -58,6 +58,7 @@ class GpuAllocator : public DeviceAllocator { void setDeviceMem(int8_t* device_ptr, unsigned char uc, const size_t num_bytes) const override; + void sync() override; private: std::vector owned_buffers_; diff --git a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp index 396daddb07..44128b3bc8 100644 --- a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp +++ b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp @@ -48,7 +48,8 @@ void CpuBuffer::readData(int8_t* const dst, memcpy(dst, mem_ + offset, num_bytes); } else if (dst_memory_level == GPU_LEVEL) { CHECK_GE(dst_device_id, 0); - gpu_mgr_->copyHostToDevice(dst, mem_ + offset, num_bytes, dst_device_id); + gpu_mgr_->copyHostToDeviceAsyncIfPossible( + dst, mem_ + offset, num_bytes, dst_device_id); } else { LOG(FATAL) << "Unsupported buffer type"; } diff --git a/omniscidb/DataMgr/DataMgrBufferProvider.cpp b/omniscidb/DataMgr/DataMgrBufferProvider.cpp index 2080a4476d..319741007f 100644 --- a/omniscidb/DataMgr/DataMgrBufferProvider.cpp +++ b/omniscidb/DataMgr/DataMgrBufferProvider.cpp @@ -57,18 +57,14 @@ void DataMgrBufferProvider::copyToDeviceAsyncIfPossible(int8_t* device_ptr, CHECK(data_mgr_); const auto gpu_mgr = data_mgr_->getGpuMgr(); CHECK(gpu_mgr); - if (gpu_mgr->canLoadAsync()) { - gpu_mgr->copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_id); - } else { - gpu_mgr->copyHostToDevice(device_ptr, host_ptr, num_bytes, device_id); - } + gpu_mgr->copyHostToDeviceAsyncIfPossible(device_ptr, host_ptr, num_bytes, device_id); } -void DataMgrBufferProvider::synchronizeStream(const int device_num) const { +void DataMgrBufferProvider::synchronizeDeviceDataStream(const int device_num) const { CHECK(data_mgr_); const auto gpu_mgr = data_mgr_->getGpuMgr(); CHECK(gpu_mgr); - gpu_mgr->synchronizeStream(device_num); + gpu_mgr->synchronizeDeviceDataStream(device_num); } void DataMgrBufferProvider::copyFromDevice(int8_t* host_ptr, diff --git a/omniscidb/DataMgr/DataMgrBufferProvider.h b/omniscidb/DataMgr/DataMgrBufferProvider.h index dc1dc9a2f2..459e186271 100644 --- a/omniscidb/DataMgr/DataMgrBufferProvider.h +++ b/omniscidb/DataMgr/DataMgrBufferProvider.h @@ -45,7 +45,7 @@ class DataMgrBufferProvider : public BufferProvider { const int8_t* host_ptr, const size_t num_bytes, const int device_id) const override; - void synchronizeStream(const int device_id) const override; + void synchronizeDeviceDataStream(const int device_id) const override; void copyFromDevice(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/DataMgr/GpuMgr.h b/omniscidb/DataMgr/GpuMgr.h index e31c65ecfa..ddf473ead9 100644 --- a/omniscidb/DataMgr/GpuMgr.h +++ b/omniscidb/DataMgr/GpuMgr.h @@ -38,7 +38,12 @@ struct GpuMgr { const size_t num_bytes, const int device_num) = 0; - virtual void synchronizeStream(const int device_num) = 0; + virtual void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) = 0; + + virtual void synchronizeDeviceDataStream(const int device_num) = 0; virtual void copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, diff --git a/omniscidb/L0Mgr/L0Mgr.cpp b/omniscidb/L0Mgr/L0Mgr.cpp index 091d9d6796..524ee4ddde 100644 --- a/omniscidb/L0Mgr/L0Mgr.cpp +++ b/omniscidb/L0Mgr/L0Mgr.cpp @@ -139,8 +139,87 @@ void* allocate_device_mem(const size_t num_bytes, L0Device& device) { return mem; } -L0Device::L0Device(const L0Driver& driver, ze_device_handle_t device) +L0DataFetcher::L0DataFetcher(const L0Driver& driver, ze_device_handle_t device) : device_(device), driver_(driver) { + ze_command_queue_desc_t command_queue_fetch_desc = { + ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC, + nullptr, + 0, + 0, + 0, + ZE_COMMAND_QUEUE_MODE_ASYNCHRONOUS, + ZE_COMMAND_QUEUE_PRIORITY_NORMAL}; + L0_SAFE_CALL(zeCommandQueueCreate( + driver.ctx(), device_, &command_queue_fetch_desc, &queue_handle_)); + current_cl_bytes = {{}, 0}; + L0_SAFE_CALL( + zeCommandListCreate(driver.ctx(), device_, &cl_desc, ¤t_cl_bytes.first)); +} + +L0DataFetcher::~L0DataFetcher() { + zeCommandQueueDestroy(queue_handle_); + zeCommandListDestroy(current_cl_bytes.first); + for (auto& dead_handle : graveyard) { + zeCommandListDestroy(dead_handle); + } + for (auto& cl_handle : recycled) { + zeCommandListDestroy(cl_handle); + } +} + +void L0DataFetcher::recycleGraveyard() { + while (recycled.size() < GRAVEYARD_LIMIT && graveyard.size()) { + recycled.push_back(graveyard.front()); + graveyard.pop_front(); + L0_SAFE_CALL(zeCommandListReset(recycled.back())); + } + for (auto& dead_handle : graveyard) { + L0_SAFE_CALL(zeCommandListDestroy(recycled.back())); + } + graveyard.clear(); +} + +void L0DataFetcher::appendCopyCommand(void* dst, + const void* src, + const size_t num_bytes) { + std::unique_lock cl_lock(current_cl_lock); + L0_SAFE_CALL(zeCommandListAppendMemoryCopy( + current_cl_bytes.first, dst, src, num_bytes, nullptr, 0, nullptr)); + current_cl_bytes.second += num_bytes; + if (current_cl_bytes.second >= 128 * 1024 * 1024) { + ze_command_list_handle_t cl_h_copy = current_cl_bytes.first; + graveyard.push_back(current_cl_bytes.first); + current_cl_bytes = {{}, 0}; + if (recycled.size()) { + current_cl_bytes.first = recycled.front(); + recycled.pop_front(); + } else { + L0_SAFE_CALL( + zeCommandListCreate(driver_.ctx(), device_, &cl_desc, ¤t_cl_bytes.first)); + } + cl_lock.unlock(); + L0_SAFE_CALL(zeCommandListClose(cl_h_copy)); + L0_SAFE_CALL( + zeCommandQueueExecuteCommandLists(queue_handle_, 1, &cl_h_copy, nullptr)); + } +} + +void L0DataFetcher::sync() { + if (current_cl_bytes.second) { + L0_SAFE_CALL(zeCommandListClose(current_cl_bytes.first)); + L0_SAFE_CALL(zeCommandQueueExecuteCommandLists( + queue_handle_, 1, ¤t_cl_bytes.first, nullptr)); + } + L0_SAFE_CALL( + zeCommandQueueSynchronize(queue_handle_, std::numeric_limits::max())); + L0_SAFE_CALL(zeCommandListReset(current_cl_bytes.first)); + if (graveyard.size() > GRAVEYARD_LIMIT) { + recycleGraveyard(); + } +} + +L0Device::L0Device(const L0Driver& driver, ze_device_handle_t device) + : device_(device), driver_(driver), data_fetcher(driver, device) { ze_command_queue_handle_t queue_handle; ze_command_queue_desc_t command_queue_desc = {ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC, nullptr, @@ -219,7 +298,6 @@ std::shared_ptr L0Device::create_module(uint8_t* code, }; ze_module_handle_t handle; ze_module_build_log_handle_t buildlog = nullptr; - auto status = zeModuleCreate(ctx(), device_, &desc, &handle, &buildlog); if (log) { size_t logSize = 0; @@ -329,6 +407,40 @@ void L0Manager::copyHostToDevice(int8_t* device_ptr, cl->submit(*queue); } +void L0Manager::copyHostToDeviceAsync(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + if (!num_bytes) + return; + CHECK(host_ptr); + CHECK(device_ptr); + CHECK_GT(num_bytes, 0); + CHECK_GE(device_num, 0); + CHECK_LT(device_num, drivers_[0]->devices().size()); + + auto& device = drivers()[0]->devices()[device_num]; + device->data_fetcher.appendCopyCommand(device_ptr, host_ptr, num_bytes); +} + +void L0Manager::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + if constexpr (async_data_load_available) { + copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num); + } else { + copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num); + } +} + +void L0Manager::synchronizeDeviceDataStream(const int device_num) { + CHECK_GE(device_num, 0); + CHECK_LT(device_num, drivers_[0]->devices().size()); + auto& device = drivers()[0]->devices()[device_num]; + device->data_fetcher.sync(); +} + void L0Manager::copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/L0Mgr/L0Mgr.h b/omniscidb/L0Mgr/L0Mgr.h index 6e0e687c48..6dd72a5582 100644 --- a/omniscidb/L0Mgr/L0Mgr.h +++ b/omniscidb/L0Mgr/L0Mgr.h @@ -16,7 +16,9 @@ #pragma once #include +#include #include +#include #include #include "DataMgr/GpuMgr.h" @@ -56,6 +58,35 @@ class L0Kernel; class L0CommandList; class L0CommandQueue; +class L0DataFetcher { +#ifdef HAVE_L0 + static constexpr uint16_t GRAVEYARD_LIMIT{500}; + ze_device_handle_t device_; + ze_command_queue_handle_t queue_handle_; + std::pair current_cl_bytes; + std::list graveyard; + std::list recycled; + ze_command_list_desc_t cl_desc = {ZE_STRUCTURE_TYPE_COMMAND_LIST_DESC, + nullptr, + 0, + ZE_COMMAND_LIST_FLAG_MAXIMIZE_THROUGHPUT}; + std::mutex current_cl_lock; +#endif + const L0Driver& driver_; + void recycleGraveyard(); + + public: + void appendCopyCommand(void* dst, const void* src, const size_t num_bytes); + void sync(); + +#ifdef HAVE_L0 + L0DataFetcher(const L0Driver& driver, ze_device_handle_t device); + ~L0DataFetcher(); +#else + L0DataFetcher() = default; +#endif +}; + class L0Device { private: #ifdef HAVE_L0 @@ -68,6 +99,7 @@ class L0Device { std::shared_ptr command_queue_; public: + L0DataFetcher data_fetcher; std::shared_ptr command_queue() const; std::unique_ptr create_command_list() const; @@ -169,7 +201,6 @@ class L0CommandList { public: void copy(void* dst, const void* src, const size_t num_bytes); - template void launch(L0Kernel& kernel, const GroupCount& gc, Args&&... args) { #ifdef HAVE_L0 @@ -202,13 +233,15 @@ class L0Manager : public GpuMgr { void copyHostToDeviceAsync(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, - const int device_num) override { - CHECK(false); - } - void synchronizeStream(const int device_num) override { - LOG(WARNING) - << "L0 has no async data transfer enabled, synchronizeStream() has no effect"; - } + const int device_num) override; + + void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) override; + + void synchronizeDeviceDataStream(const int device_num) override; + void copyHostToDevice(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, @@ -261,7 +294,7 @@ class L0Manager : public GpuMgr { private: std::vector> drivers_; - static constexpr bool async_data_load_available{false}; + static constexpr bool async_data_load_available{true}; }; } // namespace l0 diff --git a/omniscidb/L0Mgr/L0MgrNoL0.cpp b/omniscidb/L0Mgr/L0MgrNoL0.cpp index f544d4062e..8b14c3c091 100644 --- a/omniscidb/L0Mgr/L0MgrNoL0.cpp +++ b/omniscidb/L0Mgr/L0MgrNoL0.cpp @@ -69,6 +69,25 @@ void L0Manager::copyHostToDevice(int8_t* device_ptr, const int device_num) { CHECK(false); } + +void L0Manager::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + CHECK(false); +} + +void L0Manager::copyHostToDeviceAsync(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + CHECK(false); +} + +void L0Manager::synchronizeDeviceDataStream(const int device_num) { + CHECK(false); +} + void L0Manager::copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 62fd71f33a..ba897d90f9 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -59,7 +59,6 @@ std::pair ColumnFetcher::getOneColumnFragment( const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, DataProvider* data_provider, ColumnCacheMap& column_cache) { @@ -115,7 +114,6 @@ JoinColumn ColumnFetcher::makeJoinColumn( const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, std::vector>& malloc_owner, DataProvider* data_provider, @@ -142,7 +140,6 @@ JoinColumn ColumnFetcher::makeJoinColumn( effective_mem_lvl, effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id, device_allocator, - thread_idx, chunks_owner, data_provider, column_cache); @@ -364,8 +361,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( std::list& chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, - DeviceAllocator* device_allocator, - const size_t thread_idx) const { + DeviceAllocator* device_allocator) const { auto timer = DEBUG_TIMER(__func__); int db_id = col_info->db_id; int table_id = col_info->table_id; @@ -477,8 +473,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( total_data_buf_size, total_idx_buf_size, total_num_tuples, - device_allocator, - thread_idx); + device_allocator); } else { CHECK(type->isVarLenArray()); VLOG(2) << "Linearize variable-length multi-frag array column (col_id: " << col_id @@ -496,8 +491,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( total_data_buf_size, total_idx_buf_size, total_num_tuples, - device_allocator, - thread_idx); + device_allocator); } } if (type->isString()) { @@ -516,8 +510,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( total_data_buf_size, total_idx_buf_size, total_num_tuples, - device_allocator, - thread_idx); + device_allocator); } } CHECK(res.first); // check merged data buffer @@ -573,8 +566,7 @@ MergedChunk ColumnFetcher::linearizeVarLenArrayColFrags( const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const { + DeviceAllocator* device_allocator) const { // for linearization of varlen col we have to deal with not only data buffer // but also its underlying index buffer which is responsible for offset of varlen value // basically we maintain per-device linearized (data/index) buffer @@ -902,8 +894,7 @@ MergedChunk ColumnFetcher::linearizeFixedLenArrayColFrags( const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const { + DeviceAllocator* device_allocator) const { int64_t linearization_time_ms = 0; auto clock_begin = timer_start(); // linearize collected fragments diff --git a/omniscidb/QueryEngine/ColumnFetcher.h b/omniscidb/QueryEngine/ColumnFetcher.h index 6a43f56e26..f0a27473ab 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.h +++ b/omniscidb/QueryEngine/ColumnFetcher.h @@ -46,7 +46,6 @@ class ColumnFetcher { const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, DataProvider* data_provider, ColumnCacheMap& column_cache); @@ -59,7 +58,6 @@ class ColumnFetcher { const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, std::vector>& malloc_owner, DataProvider* data_provider, @@ -90,8 +88,7 @@ class ColumnFetcher { std::list& chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, - DeviceAllocator* device_allocator, - const size_t thread_idx) const; + DeviceAllocator* device_allocator) const; void freeTemporaryCpuLinearizedIdxBuf(); void freeLinearizedBuf(); @@ -118,8 +115,7 @@ class ColumnFetcher { const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const; + DeviceAllocator* device_allocator) const; MergedChunk linearizeFixedLenArrayColFrags( std::list>& chunk_holder, @@ -133,8 +129,7 @@ class ColumnFetcher { const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const; + DeviceAllocator* device_allocator) const; void addMergedChunkIter(const int table_id, const int col_id, diff --git a/omniscidb/QueryEngine/Compiler/Backend.cpp b/omniscidb/QueryEngine/Compiler/Backend.cpp index d51143d843..00b5ccf2a2 100644 --- a/omniscidb/QueryEngine/Compiler/Backend.cpp +++ b/omniscidb/QueryEngine/Compiler/Backend.cpp @@ -865,7 +865,7 @@ std::shared_ptr L0Backend::generateNativeGPUCode( CHECK(module); CHECK(wrapper_func); - DUMP_MODULE(module, "before.linking.spirv.ll") + // DUMP_MODULE(module, "before.linking.spirv.ll") CHECK(exts.find(ExtModuleKinds::spirv_helper_funcs_module) != exts.end()); @@ -884,7 +884,7 @@ std::shared_ptr L0Backend::generateNativeGPUCode( } } - DUMP_MODULE(module, "after.linking.spirv.ll") + // DUMP_MODULE(module, "after.linking.spirv.ll") // set proper calling conv & mangle spirv built-ins for (auto& Fn : *module) { diff --git a/omniscidb/QueryEngine/Execute.cpp b/omniscidb/QueryEngine/Execute.cpp index 156ca07501..f5841f7ce3 100644 --- a/omniscidb/QueryEngine/Execute.cpp +++ b/omniscidb/QueryEngine/Execute.cpp @@ -21,6 +21,7 @@ #include "QueryEngine/Execute.h" #include +#include #include #include @@ -2877,7 +2878,7 @@ std::map> get_table_id_to_frag_offsets( std::pair>, std::vector>> Executor::getRowCountAndOffsetForAllFrags( const RelAlgExecutionUnit& ra_exe_unit, - const CartesianProduct>>& frag_ids_crossjoin, + const std::vector>& frag_ids_crossjoin, const std::vector& input_descs, const std::map& all_tables_fragments) { std::vector> all_num_rows; @@ -2990,80 +2991,151 @@ FetchResult Executor::fetchChunks( std::vector> all_frag_col_buffers; std::vector> all_num_rows; std::vector> all_frag_offsets; - for (const auto& selected_frag_ids : frag_ids_crossjoin) { - std::vector frag_col_buffers( - plan_state_->global_to_local_col_ids_.size()); - for (const auto& col_id : col_global_ids) { - if (interrupted_.load()) { - throw QueryExecutionError(ERR_INTERRUPTED); - } - CHECK(col_id); - if (col_id->isVirtual()) { - continue; - } - const auto fragments_it = all_tables_fragments.find(col_id->getTableRef()); - CHECK(fragments_it != all_tables_fragments.end()); - const auto fragments = fragments_it->second; - auto it = plan_state_->global_to_local_col_ids_.find(*col_id); - CHECK(it != plan_state_->global_to_local_col_ids_.end()); - CHECK_LT(static_cast(it->second), - plan_state_->global_to_local_col_ids_.size()); - const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]]; - if (!fragments->size()) { - return {}; + + auto fetch_column_callback = [&](std::shared_ptr col_id, + const std::vector& selected_frag_ids, + std::vector& frag_col_buffers, + const bool parallelized = + false) -> bool /*empty_frag*/ { + if (interrupted_.load()) { + throw QueryExecutionError(ERR_INTERRUPTED); + } + const auto fragments_it = all_tables_fragments.find(col_id->getTableRef()); + CHECK(fragments_it != all_tables_fragments.end()); + const auto fragments = fragments_it->second; + auto it = plan_state_->global_to_local_col_ids_.find(*col_id); + CHECK(it != plan_state_->global_to_local_col_ids_.end()); + CHECK_LT(static_cast(it->second), + plan_state_->global_to_local_col_ids_.size()); + const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]]; + if (!fragments->size()) { + return true; + } + auto memory_level_for_column = memory_level; + if (plan_state_->columns_to_fetch_.find(*col_id) == + plan_state_->columns_to_fetch_.end()) { + memory_level_for_column = Data_Namespace::CPU_LEVEL; + } + if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) { + // determine if we need special treatment to linearlize multi-frag table + // i.e., a column that is classified as varlen type, i.e., array + // for now, we can support more types in this way + CHECK(!parallelized); // otherwise recursive tbb parallel for with deadlocks + if (needLinearizeAllFragments( + *col_id, ra_exe_unit, selected_fragments, memory_level)) { + bool for_lazy_fetch = false; + if (plan_state_->columns_to_not_fetch_.find(*col_id) != + plan_state_->columns_to_not_fetch_.end()) { + for_lazy_fetch = true; + VLOG(2) << "Try to linearize lazy fetch column (col_id: " << col_id->getColId() + << ")"; + } + frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments( + col_id->getColInfo(), + all_tables_fragments, + chunks, + chunk_iterators, + for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level, + for_lazy_fetch ? 0 : device_id, + device_allocator); + } else { + frag_col_buffers[it->second] = + column_fetcher.getAllTableColumnFragments(col_id->getColInfo(), + all_tables_fragments, + memory_level_for_column, + device_id, + device_allocator, + thread_idx); } - auto memory_level_for_column = memory_level; - if (plan_state_->columns_to_fetch_.find(*col_id) == - plan_state_->columns_to_fetch_.end()) { - memory_level_for_column = Data_Namespace::CPU_LEVEL; + } else { + frag_col_buffers[it->second] = + column_fetcher.getOneTableColumnFragment(col_id->getColInfo(), + frag_id, + all_tables_fragments, + chunks, + chunk_iterators, + memory_level_for_column, + device_id, + device_allocator); + } + return false; + }; + + // in MT fetching for GPU, we want to preserve "the order of insertion" into + // all_frag_col_buffers + std::vector> selected_frag_ids_vec; + if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL) { + std::atomic empty_frags{false}; + tbb::task_arena limitedArena(8); + std::vector idx_frags_to_inearize; + for (const auto& selected_frag_ids : frag_ids_crossjoin) { + selected_frag_ids_vec.push_back(selected_frag_ids); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) { + idx_frags_to_inearize.push_back(selected_frag_ids_vec.size() - 1); + } } - if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) { - // determine if we need special treatment to linearlize multi-frag table - // i.e., a column that is classified as varlen type, i.e., array - // for now, we can support more types in this way - if (needLinearizeAllFragments( - *col_id, ra_exe_unit, selected_fragments, memory_level)) { - bool for_lazy_fetch = false; - if (plan_state_->columns_to_not_fetch_.find(*col_id) != - plan_state_->columns_to_not_fetch_.end()) { - for_lazy_fetch = true; - VLOG(2) << "Try to linearize lazy fetch column (col_id: " - << col_id->getColId() << ")"; + } + all_frag_col_buffers.resize(selected_frag_ids_vec.size()); + + // Try MT fetching for frags that do not need linearization + limitedArena.execute([&]() { + tbb::parallel_for(0ul, selected_frag_ids_vec.size(), [&](const size_t idx) { + if (std::find(idx_frags_to_inearize.begin(), idx_frags_to_inearize.end(), idx) == + idx_frags_to_inearize.end()) { + const auto& selected_frag_ids = selected_frag_ids_vec[idx]; + std::vector frag_col_buffers( + plan_state_->global_to_local_col_ids_.size()); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + fetch_column_callback( + col_id, selected_frag_ids, frag_col_buffers, true)) { + empty_frags = true; // not virtual, but empty frags + tbb::task::current_context()->cancel_group_execution(); + } } - frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments( - col_id->getColInfo(), - all_tables_fragments, - chunks, - chunk_iterators, - for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level, - for_lazy_fetch ? 0 : device_id, - device_allocator, - thread_idx); - } else { - frag_col_buffers[it->second] = - column_fetcher.getAllTableColumnFragments(col_id->getColInfo(), - all_tables_fragments, - memory_level_for_column, - device_id, - device_allocator, - thread_idx); + all_frag_col_buffers[idx] = frag_col_buffers; + } + }); + }); + if (empty_frags) { + return {}; + } + for (const size_t idx : + idx_frags_to_inearize) { // linear frags materialization is already + // parallelized, avoid nested tbb + const auto& selected_frag_ids = selected_frag_ids_vec[idx]; + std::vector frag_col_buffers( + plan_state_->global_to_local_col_ids_.size()); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + fetch_column_callback(col_id, selected_frag_ids, frag_col_buffers)) { + return {}; // not virtual, but empty frags + } + } + all_frag_col_buffers[idx] = frag_col_buffers; + } + } else { + for (const auto& selected_frag_ids : frag_ids_crossjoin) { + std::vector frag_col_buffers( + plan_state_->global_to_local_col_ids_.size()); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + fetch_column_callback(col_id, selected_frag_ids, frag_col_buffers)) { + return {}; // not virtual, but empty frags } - } else { - frag_col_buffers[it->second] = - column_fetcher.getOneTableColumnFragment(col_id->getColInfo(), - frag_id, - all_tables_fragments, - chunks, - chunk_iterators, - memory_level_for_column, - device_id, - device_allocator); } + selected_frag_ids_vec.push_back(selected_frag_ids); + all_frag_col_buffers.push_back(frag_col_buffers); } - all_frag_col_buffers.push_back(frag_col_buffers); } std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags( - ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments); + ra_exe_unit, selected_frag_ids_vec, ra_exe_unit.input_descs, all_tables_fragments); return {all_frag_col_buffers, all_num_rows, all_frag_offsets}; } @@ -3087,6 +3159,7 @@ FetchResult Executor::fetchUnionChunks( std::vector> all_frag_col_buffers; std::vector> all_num_rows; std::vector> all_frag_offsets; + std::vector> selected_frag_ids_vec; CHECK(!selected_fragments.empty()); CHECK_LE(2u, ra_exe_unit.input_descs.size()); @@ -3185,12 +3258,16 @@ FetchResult Executor::fetchUnionChunks( device_allocator); } } + selected_frag_ids_vec.push_back(selected_frag_ids); all_frag_col_buffers.push_back(frag_col_buffers); } std::vector> num_rows; std::vector> frag_offsets; - std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags( - ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments); + std::tie(num_rows, frag_offsets) = + getRowCountAndOffsetForAllFrags(ra_exe_unit, + selected_frag_ids_vec, + ra_exe_unit.input_descs, + all_tables_fragments); all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end()); all_frag_offsets.insert( all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end()); diff --git a/omniscidb/QueryEngine/Execute.h b/omniscidb/QueryEngine/Execute.h index 1ba1ef6376..e99c54bc67 100644 --- a/omniscidb/QueryEngine/Execute.h +++ b/omniscidb/QueryEngine/Execute.h @@ -572,7 +572,7 @@ class Executor : public StringDictionaryProvider { std::pair>, std::vector>> getRowCountAndOffsetForAllFrags( const RelAlgExecutionUnit& ra_exe_unit, - const CartesianProduct>>& frag_ids_crossjoin, + const std::vector>& frag_ids_crossjoin, const std::vector& input_descs, const std::map& all_tables_fragments); diff --git a/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp b/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp index 74f4f81984..aeb9adb790 100644 --- a/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp +++ b/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp @@ -52,7 +52,6 @@ JoinColumn HashJoin::fetchJoinColumn( effective_memory_level, device_id, dev_buff_owner, - /*thread_idx=*/0, chunks_owner, malloc_owner, data_provider_, diff --git a/omniscidb/QueryEngine/QueryExecutionContext.cpp b/omniscidb/QueryEngine/QueryExecutionContext.cpp index aa9cbe4448..8a1f45d13f 100644 --- a/omniscidb/QueryEngine/QueryExecutionContext.cpp +++ b/omniscidb/QueryEngine/QueryExecutionContext.cpp @@ -352,6 +352,7 @@ std::vector QueryExecutionContext::launchGpuCode( << " ms"; launchClock->start(); } + auto timer_kernel = DEBUG_TIMER("Actual kernel"); kernel->launch(ko, kernel_params); if (executor_->getConfig().exec.watchdog.enable_dynamic || allow_runtime_interrupt) { @@ -365,6 +366,7 @@ std::vector QueryExecutionContext::launchGpuCode( gpu_allocator_->copyFromDevice(reinterpret_cast(error_codes.data()), reinterpret_cast(err_desc), error_codes.size() * sizeof(error_codes[0])); + timer_kernel.stop(); *error_code = aggregate_error_codes(error_codes); if (*error_code > 0) { return {}; @@ -858,9 +860,9 @@ QueryExecutionContext::prepareKernelParams( const uint64_t num_fragments = static_cast(col_buffers.size()); const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0}; + std::vector multifrag_col_dev_buffers; + std::vector flatened_col_buffers; if (col_count) { - std::vector multifrag_col_dev_buffers; - std::vector flatened_col_buffers; std::vector col_buffs_offsets; for (auto& buffers : col_buffers) { flatened_col_buffers.insert( @@ -959,6 +961,7 @@ QueryExecutionContext::prepareKernelParams( reinterpret_cast(kernel_metadata_gpu_buf->getMemoryPtr() + alloc_size)); CHECK_EQ(nullptr, params[GROUPBY_BUF]); - buffer_provider->synchronizeStream(device_id); + buffer_provider->synchronizeDeviceDataStream(device_id); + return {params, kernel_metadata_gpu_buf}; } diff --git a/omniscidb/QueryEngine/RelAlgExecutor.cpp b/omniscidb/QueryEngine/RelAlgExecutor.cpp index 2421fbb855..64288098c9 100644 --- a/omniscidb/QueryEngine/RelAlgExecutor.cpp +++ b/omniscidb/QueryEngine/RelAlgExecutor.cpp @@ -1154,7 +1154,6 @@ std::unique_ptr RelAlgExecutor::createWindowFunctionConte memory_level, 0, nullptr, - /*thread_idx=*/0, chunks_owner, data_provider_, column_cache_map); From c00af13b912b3e4b64b77ba473ebf45ced691a37 Mon Sep 17 00:00:00 2001 From: Artem Kroviakov Date: Fri, 24 Nov 2023 02:43:26 -0700 Subject: [PATCH 2/3] Move l0 fetcher completely into device class --- omniscidb/L0Mgr/L0Mgr.cpp | 111 ++++++++++++++++++++++---------------- omniscidb/L0Mgr/L0Mgr.h | 78 ++++++++++++++++----------- 2 files changed, 110 insertions(+), 79 deletions(-) diff --git a/omniscidb/L0Mgr/L0Mgr.cpp b/omniscidb/L0Mgr/L0Mgr.cpp index 524ee4ddde..6210fffe7a 100644 --- a/omniscidb/L0Mgr/L0Mgr.cpp +++ b/omniscidb/L0Mgr/L0Mgr.cpp @@ -139,8 +139,7 @@ void* allocate_device_mem(const size_t num_bytes, L0Device& device) { return mem; } -L0DataFetcher::L0DataFetcher(const L0Driver& driver, ze_device_handle_t device) - : device_(device), driver_(driver) { +L0Device::L0DataFetcher::L0DataFetcher(L0Device& device) : my_device_(device) { ze_command_queue_desc_t command_queue_fetch_desc = { ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC, nullptr, @@ -149,54 +148,64 @@ L0DataFetcher::L0DataFetcher(const L0Driver& driver, ze_device_handle_t device) 0, ZE_COMMAND_QUEUE_MODE_ASYNCHRONOUS, ZE_COMMAND_QUEUE_PRIORITY_NORMAL}; - L0_SAFE_CALL(zeCommandQueueCreate( - driver.ctx(), device_, &command_queue_fetch_desc, &queue_handle_)); - current_cl_bytes = {{}, 0}; - L0_SAFE_CALL( - zeCommandListCreate(driver.ctx(), device_, &cl_desc, ¤t_cl_bytes.first)); -} - -L0DataFetcher::~L0DataFetcher() { + L0_SAFE_CALL(zeCommandQueueCreate(my_device_.driver_.ctx(), + my_device_.device_, + &command_queue_fetch_desc, + &queue_handle_)); + cur_cl_bytes_ = {{}, 0}; + L0_SAFE_CALL(zeCommandListCreate(my_device_.driver_.ctx(), + my_device_.device_, + &cl_desc_, + &cur_cl_bytes_.cl_handle_)); +} + +L0Device::L0DataFetcher::~L0DataFetcher() { zeCommandQueueDestroy(queue_handle_); - zeCommandListDestroy(current_cl_bytes.first); - for (auto& dead_handle : graveyard) { + zeCommandListDestroy(cur_cl_bytes_.cl_handle_); + for (auto& dead_handle : graveyard_) { zeCommandListDestroy(dead_handle); } - for (auto& cl_handle : recycled) { + for (auto& cl_handle : recycled_) { zeCommandListDestroy(cl_handle); } } -void L0DataFetcher::recycleGraveyard() { - while (recycled.size() < GRAVEYARD_LIMIT && graveyard.size()) { - recycled.push_back(graveyard.front()); - graveyard.pop_front(); - L0_SAFE_CALL(zeCommandListReset(recycled.back())); +void L0Device::L0DataFetcher::recycleGraveyard() { + while (recycled_.size() < GRAVEYARD_LIMIT && graveyard_.size()) { + recycled_.push_back(graveyard_.front()); + graveyard_.pop_front(); + L0_SAFE_CALL(zeCommandListReset(recycled_.back())); } - for (auto& dead_handle : graveyard) { - L0_SAFE_CALL(zeCommandListDestroy(recycled.back())); + for (auto& dead_handle : graveyard_) { + L0_SAFE_CALL(zeCommandListDestroy(dead_handle)); + } + graveyard_.clear(); +} + +void L0Device::L0DataFetcher::setCLRecycledOrNew() { + cur_cl_bytes_ = {{}, 0}; + if (recycled_.size()) { + cur_cl_bytes_.cl_handle_ = recycled_.front(); + recycled_.pop_front(); + } else { + L0_SAFE_CALL(zeCommandListCreate(my_device_.driver_.ctx(), + my_device_.device_, + &cl_desc_, + &cur_cl_bytes_.cl_handle_)); } - graveyard.clear(); } -void L0DataFetcher::appendCopyCommand(void* dst, - const void* src, - const size_t num_bytes) { - std::unique_lock cl_lock(current_cl_lock); +void L0Device::L0DataFetcher::appendCopyCommand(void* dst, + const void* src, + const size_t num_bytes) { + std::unique_lock cl_lock(cur_cl_lock_); L0_SAFE_CALL(zeCommandListAppendMemoryCopy( - current_cl_bytes.first, dst, src, num_bytes, nullptr, 0, nullptr)); - current_cl_bytes.second += num_bytes; - if (current_cl_bytes.second >= 128 * 1024 * 1024) { - ze_command_list_handle_t cl_h_copy = current_cl_bytes.first; - graveyard.push_back(current_cl_bytes.first); - current_cl_bytes = {{}, 0}; - if (recycled.size()) { - current_cl_bytes.first = recycled.front(); - recycled.pop_front(); - } else { - L0_SAFE_CALL( - zeCommandListCreate(driver_.ctx(), device_, &cl_desc, ¤t_cl_bytes.first)); - } + cur_cl_bytes_.cl_handle_, dst, src, num_bytes, nullptr, 0, nullptr)); + cur_cl_bytes_.bytes_ += num_bytes; + if (cur_cl_bytes_.bytes_ >= CL_BYTES_LIMIT) { + ze_command_list_handle_t cl_h_copy = cur_cl_bytes_.cl_handle_; + graveyard_.push_back(cur_cl_bytes_.cl_handle_); + setCLRecycledOrNew(); cl_lock.unlock(); L0_SAFE_CALL(zeCommandListClose(cl_h_copy)); L0_SAFE_CALL( @@ -204,22 +213,22 @@ void L0DataFetcher::appendCopyCommand(void* dst, } } -void L0DataFetcher::sync() { - if (current_cl_bytes.second) { - L0_SAFE_CALL(zeCommandListClose(current_cl_bytes.first)); +void L0Device::L0DataFetcher::sync() { + if (cur_cl_bytes_.bytes_) { + L0_SAFE_CALL(zeCommandListClose(cur_cl_bytes_.cl_handle_)); L0_SAFE_CALL(zeCommandQueueExecuteCommandLists( - queue_handle_, 1, ¤t_cl_bytes.first, nullptr)); + queue_handle_, 1, &cur_cl_bytes_.cl_handle_, nullptr)); } L0_SAFE_CALL( zeCommandQueueSynchronize(queue_handle_, std::numeric_limits::max())); - L0_SAFE_CALL(zeCommandListReset(current_cl_bytes.first)); - if (graveyard.size() > GRAVEYARD_LIMIT) { + L0_SAFE_CALL(zeCommandListReset(cur_cl_bytes_.cl_handle_)); + if (graveyard_.size() > GRAVEYARD_LIMIT) { recycleGraveyard(); } } L0Device::L0Device(const L0Driver& driver, ze_device_handle_t device) - : device_(device), driver_(driver), data_fetcher(driver, device) { + : device_(device), driver_(driver), data_fetcher_(*this) { ze_command_queue_handle_t queue_handle; ze_command_queue_desc_t command_queue_desc = {ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC, nullptr, @@ -271,6 +280,14 @@ unsigned L0Device::maxSharedLocalMemory() const { return compute_props_.maxSharedLocalMemory; } +void L0Device::transferToDevice(void* dst, const void* src, const size_t num_bytes) { + data_fetcher_.appendCopyCommand(dst, src, num_bytes); +} + +void L0Device::syncDataTransfers() { + data_fetcher_.sync(); +} + L0CommandQueue::L0CommandQueue(ze_command_queue_handle_t handle) : handle_(handle) {} ze_command_queue_handle_t L0CommandQueue::handle() const { @@ -420,7 +437,7 @@ void L0Manager::copyHostToDeviceAsync(int8_t* device_ptr, CHECK_LT(device_num, drivers_[0]->devices().size()); auto& device = drivers()[0]->devices()[device_num]; - device->data_fetcher.appendCopyCommand(device_ptr, host_ptr, num_bytes); + device->transferToDevice(device_ptr, host_ptr, num_bytes); } void L0Manager::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, @@ -438,7 +455,7 @@ void L0Manager::synchronizeDeviceDataStream(const int device_num) { CHECK_GE(device_num, 0); CHECK_LT(device_num, drivers_[0]->devices().size()); auto& device = drivers()[0]->devices()[device_num]; - device->data_fetcher.sync(); + device->syncDataTransfers(); } void L0Manager::copyDeviceToHost(int8_t* host_ptr, diff --git a/omniscidb/L0Mgr/L0Mgr.h b/omniscidb/L0Mgr/L0Mgr.h index 6dd72a5582..4cd01cbb16 100644 --- a/omniscidb/L0Mgr/L0Mgr.h +++ b/omniscidb/L0Mgr/L0Mgr.h @@ -58,51 +58,65 @@ class L0Kernel; class L0CommandList; class L0CommandQueue; -class L0DataFetcher { +class L0Device { + protected: + const L0Driver& driver_; + std::shared_ptr command_queue_; #ifdef HAVE_L0 - static constexpr uint16_t GRAVEYARD_LIMIT{500}; ze_device_handle_t device_; - ze_command_queue_handle_t queue_handle_; - std::pair current_cl_bytes; - std::list graveyard; - std::list recycled; - ze_command_list_desc_t cl_desc = {ZE_STRUCTURE_TYPE_COMMAND_LIST_DESC, - nullptr, - 0, - ZE_COMMAND_LIST_FLAG_MAXIMIZE_THROUGHPUT}; - std::mutex current_cl_lock; + ze_device_properties_t props_; + ze_device_compute_properties_t compute_props_; #endif - const L0Driver& driver_; - void recycleGraveyard(); - - public: - void appendCopyCommand(void* dst, const void* src, const size_t num_bytes); - void sync(); - + private: + /* + This component for data fetching to L0 devices is used for + more efficient asynchronous data transfers. It allows to amortize + the costs of data transfers which is especially useful in case of + many relatively small data transfers. + */ + class L0DataFetcher { #ifdef HAVE_L0 - L0DataFetcher(const L0Driver& driver, ze_device_handle_t device); - ~L0DataFetcher(); -#else - L0DataFetcher() = default; + static constexpr size_t GRAVEYARD_LIMIT{500}; + static constexpr size_t CL_BYTES_LIMIT{128 * 1024 * 1024}; + static constexpr ze_command_list_desc_t cl_desc_ = { + ZE_STRUCTURE_TYPE_COMMAND_LIST_DESC, + nullptr, + 0, + ZE_COMMAND_LIST_FLAG_MAXIMIZE_THROUGHPUT}; + struct CLBytesTracker { + ze_command_list_handle_t cl_handle_; + uint64_t bytes_; + }; + CLBytesTracker cur_cl_bytes_; + ze_command_queue_handle_t queue_handle_; + std::list graveyard_; + std::list recycled_; + std::mutex cur_cl_lock_; #endif -}; + L0Device& my_device_; + void recycleGraveyard(); + void setCLRecycledOrNew(); + + public: + void appendCopyCommand(void* dst, const void* src, const size_t num_bytes); + void sync(); -class L0Device { - private: #ifdef HAVE_L0 - ze_device_handle_t device_; - ze_device_properties_t props_; - ze_device_compute_properties_t compute_props_; + L0DataFetcher(L0Device& device); + ~L0DataFetcher(); +#else + L0DataFetcher() = default; #endif - - const L0Driver& driver_; - std::shared_ptr command_queue_; + }; + L0DataFetcher data_fetcher_; public: - L0DataFetcher data_fetcher; std::shared_ptr command_queue() const; std::unique_ptr create_command_list() const; + void transferToDevice(void* dst, const void* src, const size_t num_bytes); + void syncDataTransfers(); + std::shared_ptr create_module(uint8_t* code, size_t len, bool log = false) const; From f6e9626aed787dcc3cd518169340e163702c90d6 Mon Sep 17 00:00:00 2001 From: Artem Kroviakov Date: Fri, 24 Nov 2023 09:36:35 -0700 Subject: [PATCH 3/3] change tbb parallel for version --- omniscidb/QueryEngine/Execute.cpp | 46 +++++++++++++++++++------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/omniscidb/QueryEngine/Execute.cpp b/omniscidb/QueryEngine/Execute.cpp index f5841f7ce3..5500df868a 100644 --- a/omniscidb/QueryEngine/Execute.cpp +++ b/omniscidb/QueryEngine/Execute.cpp @@ -3020,7 +3020,11 @@ FetchResult Executor::fetchChunks( // determine if we need special treatment to linearlize multi-frag table // i.e., a column that is classified as varlen type, i.e., array // for now, we can support more types in this way - CHECK(!parallelized); // otherwise recursive tbb parallel for with deadlocks + + // If fetch_column_callback is called from parallel code region, we get deadlocks + // due to tbb nested parallel_for, the calls below are internally parallelized. We + // should land here when called from a sequential code region + CHECK(!parallelized); if (needLinearizeAllFragments( *col_id, ra_exe_unit, selected_fragments, memory_level)) { bool for_lazy_fetch = false; @@ -3082,24 +3086,28 @@ FetchResult Executor::fetchChunks( // Try MT fetching for frags that do not need linearization limitedArena.execute([&]() { - tbb::parallel_for(0ul, selected_frag_ids_vec.size(), [&](const size_t idx) { - if (std::find(idx_frags_to_inearize.begin(), idx_frags_to_inearize.end(), idx) == - idx_frags_to_inearize.end()) { - const auto& selected_frag_ids = selected_frag_ids_vec[idx]; - std::vector frag_col_buffers( - plan_state_->global_to_local_col_ids_.size()); - for (const auto& col_id : col_global_ids) { - CHECK(col_id); - if (!col_id->isVirtual() && - fetch_column_callback( - col_id, selected_frag_ids, frag_col_buffers, true)) { - empty_frags = true; // not virtual, but empty frags - tbb::task::current_context()->cancel_group_execution(); + tbb::parallel_for( + tbb::blocked_range(0, selected_frag_ids_vec.size()), [&](auto r) { + for (size_t idx = r.begin(); idx != r.end(); ++idx) { + if (std::find(idx_frags_to_inearize.begin(), + idx_frags_to_inearize.end(), + idx) == idx_frags_to_inearize.end()) { + const auto& selected_frag_ids = selected_frag_ids_vec[idx]; + std::vector frag_col_buffers( + plan_state_->global_to_local_col_ids_.size()); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + fetch_column_callback( + col_id, selected_frag_ids, frag_col_buffers, true)) { + empty_frags = true; // not virtual, but empty frags + tbb::task::current_context()->cancel_group_execution(); + } + } + all_frag_col_buffers[idx] = frag_col_buffers; + } } - } - all_frag_col_buffers[idx] = frag_col_buffers; - } - }); + }); }); if (empty_frags) { return {}; @@ -3134,6 +3142,8 @@ FetchResult Executor::fetchChunks( all_frag_col_buffers.push_back(frag_col_buffers); } } + // selected_frag_ids_vec here is just a vector representation of frag_ids_crossjoin + // we need it in this form to have a proper fragment order when doing MT fetching to GPU std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags( ra_exe_unit, selected_frag_ids_vec, ra_exe_unit.input_descs, all_tables_fragments); return {all_frag_col_buffers, all_num_rows, all_frag_offsets};