Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,12 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
}

void AsyncWebSocketClient::_onPoll() {
asyncsrv::unique_lock_type lock(_lock);

if (!_client) {
return;
}

asyncsrv::unique_lock_type lock(_lock);
if (_client && _client->canSend() && (!_controlQueue.empty() || !_messageQueue.empty())) {
_runQueue();
} else if (_keepAlivePeriod > 0 && (millis() - _lastMessageTime) >= _keepAlivePeriod && (_controlQueue.empty() && _messageQueue.empty())) {
Expand Down Expand Up @@ -444,12 +445,12 @@ bool AsyncWebSocketClient::canSend() const {
}

bool AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, size_t len, bool mask) {
asyncsrv::lock_guard_type lock(_lock);

if (!_client) {
return false;
}

asyncsrv::lock_guard_type lock(_lock);

_controlQueue.emplace_back(opcode, data, len, mask);
async_ws_log_v("[%s][%" PRIu32 "] QUEUE CTRL (%u) << %" PRIu8, _server->url(), _clientId, _controlQueue.size(), opcode);

Expand All @@ -461,12 +462,12 @@ bool AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si
}

bool AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask) {
if (!_client || buffer->size() == 0 || _status != WS_CONNECTED) {
asyncsrv::unique_lock_type lock(_lock);

if (!_client || !buffer || buffer->empty() || _status != WS_CONNECTED) {
return false;
}

asyncsrv::unique_lock_type lock(_lock);

if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) {
if (closeWhenFull) {
_status = WS_DISCONNECTED;
Expand Down Expand Up @@ -545,6 +546,7 @@ void AsyncWebSocketClient::_onError(int8_t err) {
}

void AsyncWebSocketClient::_onTimeout(uint32_t time) {
asyncsrv::lock_guard_type lock(_lock);
if (!_client) {
return;
}
Expand All @@ -553,7 +555,9 @@ void AsyncWebSocketClient::_onTimeout(uint32_t time) {
}

void AsyncWebSocketClient::_onDisconnect() {
asyncsrv::lock_guard_type lock(_lock);
async_ws_log_v("[%s][%" PRIu32 "] DISCONNECT", _server->url(), _clientId);
_status = WS_DISCONNECTED;
_client = nullptr;
_server->_handleDisconnect(this);
}
Expand Down Expand Up @@ -947,6 +951,8 @@ bool AsyncWebSocketClient::binary(const __FlashStringHelper *data, size_t len) {
#endif

IPAddress AsyncWebSocketClient::remoteIP() const {
asyncsrv::lock_guard_type lock(_lock);

if (!_client) {
return IPAddress((uint32_t)0U);
}
Expand All @@ -955,6 +961,8 @@ IPAddress AsyncWebSocketClient::remoteIP() const {
}

uint16_t AsyncWebSocketClient::remotePort() const {
asyncsrv::lock_guard_type lock(_lock);

if (!_client) {
return 0;
}
Expand Down Expand Up @@ -1031,6 +1039,7 @@ AsyncWebSocketClient *AsyncWebSocket::client(uint32_t id) {
}

void AsyncWebSocket::close(uint32_t id, uint16_t code, const char *message) {
asyncsrv::lock_guard_type lock(_lock);
if (AsyncWebSocketClient *c = client(id)) {
c->close(code, message);
}
Expand Down Expand Up @@ -1062,6 +1071,7 @@ void AsyncWebSocket::cleanupClients(uint16_t maxClients) {
}

bool AsyncWebSocket::ping(uint32_t id, const uint8_t *data, size_t len) {
asyncsrv::lock_guard_type lock(_lock);
AsyncWebSocketClient *c = client(id);
return c && c->ping(data, len);
}
Expand All @@ -1081,6 +1091,7 @@ AsyncWebSocket::SendStatus AsyncWebSocket::pingAll(const uint8_t *data, size_t l
}

bool AsyncWebSocket::text(uint32_t id, const uint8_t *message, size_t len) {
asyncsrv::lock_guard_type lock(_lock);
AsyncWebSocketClient *c = client(id);
return c && c->text(makeSharedBuffer(message, len));
}
Expand Down Expand Up @@ -1127,6 +1138,7 @@ bool AsyncWebSocket::text(uint32_t id, AsyncWebSocketMessageBuffer *buffer) {
return enqueued;
}
bool AsyncWebSocket::text(uint32_t id, AsyncWebSocketSharedBuffer buffer) {
asyncsrv::lock_guard_type lock(_lock);
AsyncWebSocketClient *c = client(id);
return c && c->text(buffer);
}
Expand Down Expand Up @@ -1190,6 +1202,7 @@ AsyncWebSocket::SendStatus AsyncWebSocket::textAll(AsyncWebSocketSharedBuffer bu
}

bool AsyncWebSocket::binary(uint32_t id, const uint8_t *message, size_t len) {
asyncsrv::lock_guard_type lock(_lock);
AsyncWebSocketClient *c = client(id);
return c && c->binary(makeSharedBuffer(message, len));
}
Expand Down Expand Up @@ -1226,6 +1239,7 @@ bool AsyncWebSocket::binary(uint32_t id, AsyncWebSocketMessageBuffer *buffer) {
return enqueued;
}
bool AsyncWebSocket::binary(uint32_t id, AsyncWebSocketSharedBuffer buffer) {
asyncsrv::lock_guard_type lock(_lock);
AsyncWebSocketClient *c = client(id);
return c && c->binary(buffer);
}
Expand Down
1 change: 1 addition & 0 deletions src/AsyncWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class AsyncWebSocketClient {
uint16_t remotePort() const;

bool shouldBeDeleted() const {
asyncsrv::lock_guard_type lock(_lock);
return !_client;
}

Expand Down
Loading