Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 208 additions & 24 deletions src/board_controller/ble_lib_board.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,166 @@
#include <condition_variable>
#include <deque>
#include <functional>
#include <future>
#include <memory>
#include <string.h>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>

#ifdef _WIN32
#include <objbase.h>
#ifdef _MSC_VER
#pragma comment(lib, "ole32.lib")
#endif
#endif

#include "ble_lib_board.h"

#include "bluetooth_types.h"
#include "get_dll_dir.h"

namespace
{
#ifdef _WIN32
class BLEThreadDispatcher
{
public:
BLEThreadDispatcher ()
: stop (false), started (false), worker (&BLEThreadDispatcher::worker_loop, this)
{
std::unique_lock<std::mutex> lock (mutex);
cv.wait (lock, [this] { return started; });
}

~BLEThreadDispatcher ()
{
{
std::lock_guard<std::mutex> lock (mutex);
stop = true;
}
cv.notify_one ();
if (worker.joinable ())
{
worker.join ();
}
}

template <typename Func>
typename std::enable_if<!std::is_void<typename std::result_of<Func ()>::type>::value,
typename std::result_of<Func ()>::type>::type
run (Func func)
{
typedef typename std::result_of<Func ()>::type result_type;
if (is_worker_thread ())
{
return func ();
}

std::shared_ptr<std::packaged_task<result_type ()>> task (
new std::packaged_task<result_type ()> (std::move (func)));
std::future<result_type> future = task->get_future ();
{
std::lock_guard<std::mutex> lock (mutex);
queue.push_back ([task] () { (*task) (); });
}
cv.notify_one ();
return future.get ();
}

template <typename Func>
typename std::enable_if<std::is_void<typename std::result_of<Func ()>::type>::value,
void>::type
run (Func func)
{
if (is_worker_thread ())
{
func ();
return;
}

std::shared_ptr<std::packaged_task<void ()>> task (
new std::packaged_task<void ()> (std::move (func)));
std::future<void> future = task->get_future ();
{
std::lock_guard<std::mutex> lock (mutex);
queue.push_back ([task] () { (*task) (); });
}
cv.notify_one ();
future.get ();
}

private:
void worker_loop ()
{
HRESULT coinit_result = CoInitializeEx (NULL, COINIT_MULTITHREADED);
bool co_initialized = SUCCEEDED (coinit_result);

{
std::lock_guard<std::mutex> lock (mutex);
worker_id = std::this_thread::get_id ();
started = true;
}
cv.notify_one ();

for (;;)
{
std::function<void ()> task;
{
std::unique_lock<std::mutex> lock (mutex);
cv.wait (lock, [this] { return stop || !queue.empty (); });
if (stop && queue.empty ())
{
break;
}
task = queue.front ();
queue.pop_front ();
}
task ();
}

if (co_initialized)
{
CoUninitialize ();
}
}

bool is_worker_thread ()
{
std::lock_guard<std::mutex> lock (mutex);
return std::this_thread::get_id () == worker_id;
}

std::mutex mutex;
std::condition_variable cv;
std::deque<std::function<void ()>> queue;
bool stop;
bool started;
std::thread worker;
std::thread::id worker_id;
};

BLEThreadDispatcher &get_ble_thread_dispatcher ()
{
static BLEThreadDispatcher dispatcher;
return dispatcher;
}

template <typename Func>
typename std::result_of<Func ()>::type run_on_simpleble_thread (Func func)
{
return get_ble_thread_dispatcher ().run (std::move (func));
}
#else
template <typename Func>
typename std::result_of<Func ()>::type run_on_simpleble_thread (Func func)
{
return func ();
}
#endif
} // namespace

#ifndef STATIC_SIMPLEBLE
DLLLoader *BLELibBoard::dll_loader = NULL;
std::mutex BLELibBoard::mutex;
Expand Down Expand Up @@ -449,9 +604,10 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_connect (simpleble_peripheral_
return SIMPLEBLE_FAILURE;
}

return func (handle);
return run_on_simpleble_thread ([func, handle] () { return func (handle); });
#else
return ::simpleble_peripheral_connect (handle);
return run_on_simpleble_thread (
[handle] () { return ::simpleble_peripheral_connect (handle); });
Comment on lines +607 to +610
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route every peripheral API call through dispatcher

This patch moves only a subset of simpleble_peripheral_* calls onto run_on_simpleble_thread; simpleble_peripheral_address and simpleble_peripheral_identifier still run on the caller thread, but their handles are later used by dispatched calls like connect/read/write. In current call flows (for example scan callbacks in src/board_controller/openbci/ganglion_native.cpp and src/board_controller/brainalive/brainalive.cpp), the same simpleble_peripheral_t is touched from callback thread(s) and then from the dispatcher thread, which can still violate Windows COM/thread-affinity expectations and cause the same class of failures this change is intended to prevent.

Useful? React with 👍 / 👎.

#endif
}

Expand All @@ -474,9 +630,10 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_disconnect (simpleble_peripher
return SIMPLEBLE_FAILURE;
}

return func (handle);
return run_on_simpleble_thread ([func, handle] () { return func (handle); });
#else
return ::simpleble_peripheral_disconnect (handle);
return run_on_simpleble_thread (
[handle] () { return ::simpleble_peripheral_disconnect (handle); });
#endif
}

Expand All @@ -499,9 +656,10 @@ void BLELibBoard::simpleble_peripheral_release_handle (simpleble_peripheral_t ha
return;
}

func (handle);
run_on_simpleble_thread ([func, handle] () { func (handle); });
#else
return ::simpleble_peripheral_release_handle (handle);
return run_on_simpleble_thread (
[handle] () { return ::simpleble_peripheral_release_handle (handle); });
#endif
}

Expand All @@ -524,9 +682,10 @@ size_t BLELibBoard::simpleble_peripheral_services_count (simpleble_peripheral_t
return 0;
}

return func (handle);
return run_on_simpleble_thread ([func, handle] () { return func (handle); });
#else
return ::simpleble_peripheral_services_count (handle);
return run_on_simpleble_thread (
[handle] () { return ::simpleble_peripheral_services_count (handle); });
#endif
}

Expand All @@ -550,9 +709,11 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_services_get (
return SIMPLEBLE_FAILURE;
}

return func (handle, index, services);
return run_on_simpleble_thread (
[func, handle, index, services] () { return func (handle, index, services); });
#else
return ::simpleble_peripheral_services_get (handle, index, services);
return run_on_simpleble_thread ([handle, index, services] ()
{ return ::simpleble_peripheral_services_get (handle, index, services); });
#endif
}

Expand All @@ -578,10 +739,15 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_write_request (simpleble_perip
return SIMPLEBLE_FAILURE;
}

return func (handle, service, characteristic, data, data_length);
return run_on_simpleble_thread ([func, handle, service, characteristic, data, data_length] ()
{ return func (handle, service, characteristic, data, data_length); });
#else
return ::simpleble_peripheral_write_request (
handle, service, characteristic, data, data_length);
return run_on_simpleble_thread (
[handle, service, characteristic, data, data_length] ()
{
return ::simpleble_peripheral_write_request (
handle, service, characteristic, data, data_length);
});
#endif
}

Expand All @@ -606,10 +772,15 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_write_command (simpleble_perip
return SIMPLEBLE_FAILURE;
}

return func (handle, service, characteristic, data, data_length);
return run_on_simpleble_thread ([func, handle, service, characteristic, data, data_length] ()
{ return func (handle, service, characteristic, data, data_length); });
#else
return ::simpleble_peripheral_write_command (
handle, service, characteristic, data, data_length);
return run_on_simpleble_thread (
[handle, service, characteristic, data, data_length] ()
{
return ::simpleble_peripheral_write_command (
handle, service, characteristic, data, data_length);
});
#endif
}

Expand Down Expand Up @@ -640,9 +811,14 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_notify (simpleble_peripheral_t
return SIMPLEBLE_FAILURE;
}

return func (handle, service, characteristic, callback, userdata);
return run_on_simpleble_thread ([func, handle, service, characteristic, callback, userdata] ()
{ return func (handle, service, characteristic, callback, userdata); });
#else
return ::simpleble_peripheral_notify (handle, service, characteristic, callback, userdata);
return run_on_simpleble_thread (
[handle, service, characteristic, callback, userdata] () {
return ::simpleble_peripheral_notify (
handle, service, characteristic, callback, userdata);
});
#endif
}

Expand All @@ -666,9 +842,11 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_unsubscribe (
return SIMPLEBLE_FAILURE;
}

return func (handle, service, characteristic);
return run_on_simpleble_thread ([func, handle, service, characteristic] ()
{ return func (handle, service, characteristic); });
#else
return ::simpleble_peripheral_unsubscribe (handle, service, characteristic);
return run_on_simpleble_thread ([handle, service, characteristic] ()
{ return ::simpleble_peripheral_unsubscribe (handle, service, characteristic); });
#endif
}

Expand Down Expand Up @@ -744,9 +922,13 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_read (simpleble_peripheral_t h
return SIMPLEBLE_FAILURE;
}

return func (handle, service, characteristic, data, data_length);
return run_on_simpleble_thread ([func, handle, service, characteristic, data, data_length] ()
{ return func (handle, service, characteristic, data, data_length); });
#else
return ::simpleble_peripheral_read (handle, service, characteristic, data, data_length);
return run_on_simpleble_thread (
[handle, service, characteristic, data, data_length] () {
return ::simpleble_peripheral_read (handle, service, characteristic, data, data_length);
});
#endif
}

Expand All @@ -770,8 +952,10 @@ simpleble_err_t BLELibBoard::simpleble_peripheral_is_connected (
return SIMPLEBLE_FAILURE;
}

return func (handle, connected);
return run_on_simpleble_thread (
[func, handle, connected] () { return func (handle, connected); });
#else
return ::simpleble_peripheral_is_connected (handle, connected);
return run_on_simpleble_thread (
[handle, connected] () { return ::simpleble_peripheral_is_connected (handle, connected); });
#endif
}
Loading