diff --git a/examples/NimBLE_Stream_Client/NimBLE_Stream_Client.ino b/examples/NimBLE_Stream_Client/NimBLE_Stream_Client.ino index cd9e7ca60..541fad15a 100644 --- a/examples/NimBLE_Stream_Client/NimBLE_Stream_Client.ino +++ b/examples/NimBLE_Stream_Client/NimBLE_Stream_Client.ino @@ -15,7 +15,6 @@ #include #include -#include // Service and Characteristic UUIDs (must match the server) #define SERVICE_UUID "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" @@ -64,7 +63,7 @@ class ClientCallbacks : public NimBLEClientCallbacks { void onDisconnect(NimBLEClient* pClient, int reason) override { Serial.printf("Disconnected from server, reason: %d\n", reason); connected = false; - bleStream.deinit(); + bleStream.end(); // Restart scanning Serial.println("Restarting scan..."); @@ -118,20 +117,12 @@ bool connectToServer() { * Initialize the stream client with the remote characteristic * subscribeNotify=true means we'll receive notifications in the RX buffer */ - if (!bleStream.init(pRemoteCharacteristic, true)) { + if (!bleStream.begin(pRemoteCharacteristic, true)) { Serial.println("Failed to initialize BLE stream!"); pClient->disconnect(); return false; } - /** Start the stream (begins internal buffers and tasks) */ - if (!bleStream.begin()) { - Serial.println("Failed to start BLE stream!"); - bleStream.deinit(); - pClient->disconnect(); - return false; - } - Serial.println("BLE Stream initialized successfully!"); connected = true; return true; diff --git a/examples/NimBLE_Stream_Client/README.md b/examples/NimBLE_Stream_Client/README.md index 5b91330fb..ffd348450 100644 --- a/examples/NimBLE_Stream_Client/README.md +++ b/examples/NimBLE_Stream_Client/README.md @@ -7,7 +7,7 @@ This example demonstrates how to use the `NimBLEStreamClient` class to connect t - Uses Arduino Stream interface (print, println, read, available, etc.) - Automatic server discovery and connection - Bidirectional communication -- Buffered TX/RX using FreeRTOS ring buffers +- Buffered TX/RX using ring buffers - Automatic reconnection on disconnect - Similar usage to Serial communication diff --git a/examples/NimBLE_Stream_Echo/NimBLE_Stream_Echo.ino b/examples/NimBLE_Stream_Echo/NimBLE_Stream_Echo.ino index 66d818e43..f64a5a2ae 100644 --- a/examples/NimBLE_Stream_Echo/NimBLE_Stream_Echo.ino +++ b/examples/NimBLE_Stream_Echo/NimBLE_Stream_Echo.ino @@ -3,7 +3,7 @@ * * A minimal example demonstrating NimBLEStreamServer. * Echoes back any data received from BLE clients. - * + * * This is the simplest way to use the NimBLE Stream interface, * showing just the essential setup and read/write operations. * @@ -13,7 +13,6 @@ #include #include -#include NimBLEStreamServer bleStream; @@ -25,12 +24,22 @@ void setup() { NimBLEDevice::init("BLE-Echo"); NimBLEDevice::createServer(); - // Initialize stream with default UUIDs, allow writes - bleStream.init(NimBLEUUID(uint16_t(0xc0de)), // Service UUID - NimBLEUUID(uint16_t(0xfeed)), // Characteristic UUID - true, // canWrite - false); // secure - bleStream.begin(); + /** + * Initialize the stream server with: + * - Service UUID + * - Characteristic UUID + * - txBufSize: 1024 bytes for outgoing data (notifications) + * - rxBufSize: 1024 bytes for incoming data (writes) + * - secure: false (no encryption required - set to true for secure connections) + */ + if (!bleStream.begin(NimBLEUUID(uint16_t(0xc0de)), // Service UUID + NimBLEUUID(uint16_t(0xfeed)), // Characteristic UUID + 1024, // txBufSize + 1024, // rxBufSize + false)) { // secure + Serial.println("Failed to initialize BLE stream"); + return; + } // Start advertising NimBLEDevice::getAdvertising()->start(); @@ -39,12 +48,12 @@ void setup() { void loop() { // Echo any received data back to the client - if (bleStream.hasSubscriber() && bleStream.available()) { + if (bleStream.ready() && bleStream.available()) { Serial.print("Echo: "); while (bleStream.available()) { char c = bleStream.read(); Serial.write(c); - bleStream.write(c); // Echo back + bleStream.write(c); // Echo back } Serial.println(); } diff --git a/examples/NimBLE_Stream_Server/NimBLE_Stream_Server.ino b/examples/NimBLE_Stream_Server/NimBLE_Stream_Server.ino index 6257c54ea..086954551 100644 --- a/examples/NimBLE_Stream_Server/NimBLE_Stream_Server.ino +++ b/examples/NimBLE_Stream_Server/NimBLE_Stream_Server.ino @@ -3,8 +3,8 @@ * * Demonstrates using NimBLEStreamServer to create a BLE GATT server * that behaves like a serial port using the Arduino Stream interface. - * - * This allows you to use familiar methods like print(), println(), + * + * This allows you to use familiar methods like print(), println(), * read(), and available() over BLE, similar to how you would use Serial. * * Created: November 2025 @@ -13,7 +13,6 @@ #include #include -#include // Create the stream server instance NimBLEStreamServer bleStream; @@ -48,8 +47,8 @@ void setup() { /** Initialize NimBLE and set the device name */ NimBLEDevice::init("NimBLE-Stream"); - /** - * Create the BLE server and set callbacks + /** + * Create the BLE server and set callbacks * Note: The stream will create its own service and characteristic */ NimBLEServer* pServer = NimBLEDevice::createServer(); @@ -58,26 +57,22 @@ void setup() { /** * Initialize the stream server with: * - Service UUID - * - Characteristic UUID - * - canWrite: true (allows clients to write data to us) + * - Characteristic UUID + * - txBufSize: 1024 bytes for outgoing data (notifications) + * - rxBufSize: 1024 bytes for incoming data (writes) * - secure: false (no encryption required - set to true for secure connections) */ - if (!bleStream.init(NimBLEUUID(SERVICE_UUID), - NimBLEUUID(CHARACTERISTIC_UUID), - true, // canWrite - allow receiving data - false)) // secure + if (!bleStream.begin(NimBLEUUID(SERVICE_UUID), + NimBLEUUID(CHARACTERISTIC_UUID), + 1024, // txBufSize + 1024, // rxBufSize + false)) // secure { Serial.println("Failed to initialize BLE stream!"); return; } - /** Start the stream (begins internal buffers and tasks) */ - if (!bleStream.begin()) { - Serial.println("Failed to start BLE stream!"); - return; - } - - /** + /** * Create advertising instance and add service UUID * This makes the stream service discoverable */ @@ -94,32 +89,31 @@ void setup() { void loop() { // Check if a client is subscribed (connected and listening) - if (bleStream.hasSubscriber()) { - + if (bleStream.ready()) { // Send a message every 2 seconds using Stream methods static unsigned long lastSend = 0; if (millis() - lastSend > 2000) { lastSend = millis(); - + // Using familiar Serial-like methods! bleStream.print("Hello from BLE Server! Time: "); bleStream.println(millis()); - + // You can also use printf bleStream.printf("Free heap: %d bytes\n", ESP.getFreeHeap()); - + Serial.println("Sent data to client via BLE stream"); } // Check if we received any data from the client if (bleStream.available()) { Serial.print("Received from client: "); - + // Read all available data (just like Serial.read()) while (bleStream.available()) { char c = bleStream.read(); - Serial.write(c); // Echo to Serial - bleStream.write(c); // Echo back to BLE client + Serial.write(c); // Echo to Serial + bleStream.write(c); // Echo back to BLE client } Serial.println(); } diff --git a/examples/NimBLE_Stream_Server/README.md b/examples/NimBLE_Stream_Server/README.md index 5e0a582c7..90ce5e1e2 100644 --- a/examples/NimBLE_Stream_Server/README.md +++ b/examples/NimBLE_Stream_Server/README.md @@ -7,7 +7,7 @@ This example demonstrates how to use the `NimBLEStreamServer` class to create a - Uses Arduino Stream interface (print, println, read, available, etc.) - Automatic connection management - Bidirectional communication -- Buffered TX/RX using FreeRTOS ring buffers +- Buffered TX/RX using ring buffers - Similar usage to Serial communication ## How it Works diff --git a/examples/STREAM_EXAMPLES.md b/examples/STREAM_EXAMPLES.md deleted file mode 100644 index 34f6a9dcc..000000000 --- a/examples/STREAM_EXAMPLES.md +++ /dev/null @@ -1,145 +0,0 @@ -# NimBLE Stream Examples - -This document describes the new Stream-based examples that demonstrate using BLE characteristics with the familiar Arduino Stream interface. - -## Overview - -The NimBLE Stream classes (`NimBLEStreamServer` and `NimBLEStreamClient`) provide a simple way to use BLE characteristics like serial ports. You can use familiar methods like `print()`, `println()`, `read()`, and `available()` just like you would with `Serial`. - -## Available Examples - -### 1. NimBLE_Stream_Echo -**Difficulty:** Beginner -**Purpose:** Introduction to the Stream API - -This is the simplest example, perfect for getting started. It creates a BLE server that echoes back any data it receives. Uses default UUIDs for quick setup. - -**Key Features:** -- Minimal setup code -- Simple echo functionality -- Good starting point for learning - -**Use Case:** Testing BLE connectivity, learning the basics - ---- - -### 2. NimBLE_Stream_Server -**Difficulty:** Intermediate -**Purpose:** Full-featured server implementation - -A complete BLE server example that demonstrates all the major features of `NimBLEStreamServer`. Shows connection management, bidirectional communication, and proper server setup. - -**Key Features:** -- Custom service and characteristic UUIDs -- Connection callbacks -- Periodic message sending -- Echo functionality -- MTU negotiation -- Connection parameter updates - -**Use Case:** Building custom BLE services, wireless data logging, remote monitoring - ---- - -### 3. NimBLE_Stream_Client -**Difficulty:** Intermediate -**Purpose:** Full-featured client implementation - -Demonstrates how to scan for, connect to, and communicate with a BLE server using `NimBLEStreamClient`. Pairs with the NimBLE_Stream_Server example. - -**Key Features:** -- Automatic server discovery -- Connection management -- Reconnection on disconnect -- Bidirectional communication -- Serial passthrough (type in Serial monitor to send via BLE) - -**Use Case:** Building BLE client applications, data collection from BLE devices - -## Quick Start - -### Running the Echo Example - -1. Upload `NimBLE_Stream_Echo` to your ESP32 -2. Open Serial monitor (115200 baud) -3. Use a BLE app (like nRF Connect) to connect -4. Find service UUID `0xc0de`, characteristic `0xfeed` -5. Subscribe to notifications -6. Write data to see it echoed back - -### Running Server + Client Examples - -1. Upload `NimBLE_Stream_Server` to one ESP32 -2. Upload `NimBLE_Stream_Client` to another ESP32 -3. Open Serial monitors for both (115200 baud) -4. Client will automatically find and connect to server -5. Observe bidirectional communication -6. Type in either Serial monitor to send data - -## Common Use Cases - -- **Wireless Serial Communication:** Replace physical serial connections with BLE -- **Data Logging:** Stream sensor data to a mobile device or another microcontroller -- **Remote Control:** Send commands between devices using familiar print/println -- **Debugging:** Use BLE as a wireless alternative to USB serial debugging -- **IoT Applications:** Simple data exchange between ESP32 devices - -## Stream Interface Methods - -The examples demonstrate these familiar methods: - -**Output (Sending Data):** -- `print(value)` - Print data without newline -- `println(value)` - Print data with newline -- `printf(format, ...)` - Formatted output -- `write(data)` - Write raw bytes - -**Input (Receiving Data):** -- `available()` - Check if data is available -- `read()` - Read a single byte -- `peek()` - Preview next byte without removing it - -**Status:** -- `hasSubscriber()` - Check if a client is connected (server only) -- `ready()` or `operator bool()` - Check if stream is ready - -## Configuration - -Both server and client support configuration before calling `begin()`: - -```cpp -bleStream.setTxBufSize(2048); // TX buffer size -bleStream.setRxBufSize(2048); // RX buffer size -bleStream.setTxTaskStackSize(4096); // Task stack size -bleStream.setTxTaskPriority(1); // Task priority -``` - -## Compatibility - -These examples work with: -- ESP32 (all variants: ESP32, ESP32-C3, ESP32-S3, etc.) -- Nordic chips (with n-able Arduino core) -- Any BLE client supporting GATT characteristics with notifications - -## Additional Resources - -- [Arduino Stream Reference](https://www.arduino.cc/reference/en/language/functions/communication/stream/) -- [NimBLE-Arduino Documentation](https://h2zero.github.io/NimBLE-Arduino/) -- Each example includes a detailed README.md file - -## Troubleshooting - -**Client can't find server:** -- Check that both devices are powered on -- Verify UUIDs match between server and client -- Ensure server advertising is started - -**Data not received:** -- Verify client has subscribed to notifications -- Check that buffers aren't full (increase buffer sizes) -- Ensure both `init()` and `begin()` were called - -**Connection drops:** -- Check for interference -- Reduce connection interval if needed -- Ensure devices are within BLE range diff --git a/src/NimBLEDevice.h b/src/NimBLEDevice.h index 28baadf63..df63b3dc0 100644 --- a/src/NimBLEDevice.h +++ b/src/NimBLEDevice.h @@ -245,7 +245,7 @@ class NimBLEDevice { # endif # ifdef ESP_PLATFORM -# if NIMBLE_CPP_SCAN_DUPL_ENABLED +# if NIMBLE_CPP_SCAN_DUPL_ENABLED static uint16_t m_scanDuplicateSize; static uint8_t m_scanFilterMode; static uint16_t m_scanDuplicateResetTime; @@ -306,6 +306,7 @@ class NimBLEDevice { # if MYNEWT_VAL(BLE_ROLE_CENTRAL) || MYNEWT_VAL(BLE_ROLE_PERIPHERAL) # include "NimBLEConnInfo.h" +# include "NimBLEStream.h" # endif # include "NimBLEAddress.h" diff --git a/src/NimBLEStream.cpp b/src/NimBLEStream.cpp index 8a74db0fc..c57f1afdb 100644 --- a/src/NimBLEStream.cpp +++ b/src/NimBLEStream.cpp @@ -15,18 +15,207 @@ * limitations under the License. */ -#ifdef ESP_PLATFORM -# include "NimBLEStream.h" -# if CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) - -# include "NimBLEDevice.h" -# include "rom/uart.h" +#include "NimBLEStream.h" +#if CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) + +# include "NimBLEDevice.h" +# include "NimBLELog.h" +# if defined(CONFIG_NIMBLE_CPP_IDF) +# include "os/os_mbuf.h" +# include "nimble/nimble_port.h" +# else +# include "nimble/porting/nimble/include/os/os_mbuf.h" +# include "nimble/porting/nimble/include/nimble/nimble_port.h" +# endif +# include +# include +# include +# include static const char* LOG_TAG = "NimBLEStream"; +struct NimBLEStream::ByteRingBuffer { + /** @brief Guard for ByteRingBuffer to manage locking. */ + struct Guard { + const ByteRingBuffer& _b; + bool _locked; + Guard(const ByteRingBuffer& b) : _b(b), _locked(b.lock()) {} + ~Guard() { + if (_locked) _b.unlock(); + } + operator bool() const { return _locked; } // Allows: if (Guard g{*this}) { ... } + }; + + /** @brief Construct a ByteRingBuffer with the specified capacity. */ + explicit ByteRingBuffer(size_t capacity) : m_capacity(capacity) { + memset(&m_mutex, 0, sizeof(m_mutex)); + auto rc = ble_npl_mutex_init(&m_mutex); + if (rc != BLE_NPL_OK) { + NIMBLE_LOGE(LOG_TAG, "Failed to initialize ring buffer mutex, error: %d", rc); + return; + } + + m_buf = static_cast(malloc(capacity)); + if (!m_buf) { + NIMBLE_LOGE(LOG_TAG, "Failed to allocate ring buffer memory"); + return; + } + } + + /** @brief Destroy the ByteRingBuffer and release resources. */ + ~ByteRingBuffer() { + if (m_buf) { + free(m_buf); + } + ble_npl_mutex_deinit(&m_mutex); + } + + /** @brief Check if the ByteRingBuffer is valid. */ + bool valid() const { return m_buf != nullptr; } + + /** @brief Get the capacity of the ByteRingBuffer. */ + size_t capacity() const { return m_capacity; } + + /** @brief Get the current size of the ByteRingBuffer. */ + size_t size() const { + Guard g(*this); + return g ? m_size : 0; + } + + /** @brief Get the available free space in the ByteRingBuffer. */ + size_t freeSize() const { + Guard g(*this); + return g ? m_capacity - m_size : 0; + } + + /** + * @brief Write data to the ByteRingBuffer. + * @param data Pointer to the data to write. + * @param len Length of the data to write. + * @returns the number of bytes actually written, which may be less than len if the buffer does not have enough free space. + */ + size_t write(const uint8_t* data, size_t len) { + if (!data || len == 0) { + return 0; + } + + Guard g(*this); + if (!g || m_size >= m_capacity) { + return 0; + } + + size_t count = std::min(len, m_capacity - m_size); + size_t first = std::min(count, m_capacity - m_head); + memcpy(m_buf + m_head, data, first); + size_t remain = count - first; + if (remain > 0) { + memcpy(m_buf, data + first, remain); + } + + m_head = (m_head + count) % m_capacity; + m_size += count; + return count; + } + + /** + * @brief Read data from the ByteRingBuffer. + * @param out Pointer to the buffer where read data will be stored. + * @param len Maximum number of bytes to read. + * @returns the number of bytes actually read. + */ + size_t read(uint8_t* out, size_t len) { + if (!out || len == 0) { + return 0; + } + + Guard g(*this); + if (!g || m_size == 0) { + return 0; + } + + size_t count = std::min(len, m_size); + size_t first = std::min(count, m_capacity - m_tail); + memcpy(out, m_buf + m_tail, first); + size_t remain = count - first; + if (remain > 0) { + memcpy(out + first, m_buf, remain); + } + + m_tail = (m_tail + count) % m_capacity; + m_size -= count; + return count; + } + + /** + * @brief Peek at data in the ByteRingBuffer without removing it. + * @param out Pointer to the buffer where peeked data will be stored. + * @param len Maximum number of bytes to peek. + * @returns the number of bytes actually peeked. + */ + size_t peek(uint8_t* out, size_t len) { + if (!out || len == 0) { + return 0; + } + + Guard g(*this); + if (!g || m_size == 0) { + return 0; + } + + size_t count = std::min(len, m_size); + size_t first = std::min(count, m_capacity - m_tail); + memcpy(out, m_buf + m_tail, first); + size_t remain = count - first; + if (remain > 0) { + memcpy(out + first, m_buf, remain); + } + + return count; + } + + /** + * @brief Drop data from the ByteRingBuffer without reading it. + * @param len Maximum number of bytes to drop. + * @returns the number of bytes actually dropped. + */ + size_t drop(size_t len) { + if (len == 0) { + return 0; + } + + Guard g(*this); + if (!g || m_size == 0) { + return 0; + } + size_t count = std::min(len, m_size); + m_tail = (m_tail + count) % m_capacity; + m_size -= count; + return count; + } + + private: + /** + * @brief Lock the ByteRingBuffer for exclusive access. + * @return true if the lock was successfully acquired, false otherwise. + */ + bool lock() const { return valid() && ble_npl_mutex_pend(&m_mutex, BLE_NPL_TIME_FOREVER) == BLE_NPL_OK; } + + /** + * @brief Unlock the ByteRingBuffer after exclusive access. + */ + void unlock() const { ble_npl_mutex_release(&m_mutex); } + + uint8_t* m_buf{nullptr}; + size_t m_capacity{0}; + size_t m_head{0}; + size_t m_tail{0}; + size_t m_size{0}; + mutable ble_npl_mutex m_mutex; +}; + // Stub Print/Stream implementations when Arduino not available -# if !NIMBLE_CPP_ARDUINO_STRING_AVAILABLE -# include +# if !NIMBLE_CPP_ARDUINO_STRING_AVAILABLE +# include size_t Print::print(const char* s) { if (!s) return 0; @@ -72,272 +261,310 @@ size_t Print::printf(const char* fmt, ...) { free(buf); return ret; } -# endif - -void NimBLEStream::txTask(void* arg) { - NimBLEStream* pStream = static_cast(arg); - for (;;) { - size_t itemSize = 0; - void* item = xRingbufferReceive(pStream->m_txBuf, &itemSize, portMAX_DELAY); - if (item) { - pStream->send(reinterpret_cast(item), itemSize); - vRingbufferReturnItem(pStream->m_txBuf, item); - } - } -} +# endif +/** + * @brief Initialize the NimBLEStream, creating TX and RX buffers and setting up events. + * @return true if initialization was successful, false otherwise. + */ bool NimBLEStream::begin() { - if (m_txBuf || m_rxBuf || m_txTask) { - NIMBLE_UART_LOGW(LOG_TAG, "Already initialized"); + if (m_txBuf || m_rxBuf) { + NIMBLE_LOGW(LOG_TAG, "Already initialized"); return true; } + if (m_txBufSize == 0 && m_rxBufSize == 0) { + NIMBLE_LOGE(LOG_TAG, "Cannot initialize stream with both TX and RX buffer sizes set to 0"); + return false; + } + + if (ble_npl_callout_init(&m_txDrainCallout, nimble_port_get_dflt_eventq(), NimBLEStream::txDrainCalloutCb, this) != 0) { + NIMBLE_LOGE(LOG_TAG, "Failed to initialize TX drain callout"); + return false; + } + m_coInitialized = true; + + ble_npl_event_init(&m_txDrainEvent, NimBLEStream::txDrainEventCb, this); + m_eventInitialized = true; + if (m_txBufSize) { - m_txBuf = xRingbufferCreate(m_txBufSize, RINGBUF_TYPE_BYTEBUF); - if (!m_txBuf) { - NIMBLE_UART_LOGE(LOG_TAG, "Failed to create TX ringbuffer"); + m_txBuf = new ByteRingBuffer(m_txBufSize); + if (!m_txBuf || !m_txBuf->valid()) { + NIMBLE_LOGE(LOG_TAG, "Failed to create TX ringbuffer"); + end(); return false; } } if (m_rxBufSize) { - m_rxBuf = xRingbufferCreate(m_rxBufSize, RINGBUF_TYPE_BYTEBUF); - if (!m_rxBuf) { - NIMBLE_UART_LOGE(LOG_TAG, "Failed to create RX ringbuffer"); - if (m_txBuf) { - vRingbufferDelete(m_txBuf); - m_txBuf = nullptr; - } + m_rxBuf = new ByteRingBuffer(m_rxBufSize); + if (!m_rxBuf || !m_rxBuf->valid()) { + NIMBLE_LOGE(LOG_TAG, "Failed to create RX ringbuffer"); + end(); return false; } } - if (xTaskCreate(txTask, "NimBLEStreamTx", m_txTaskStackSize, this, m_txTaskPriority, &m_txTask) != pdPASS) { - NIMBLE_UART_LOGE(LOG_TAG, "Failed to create stream tx task"); - if (m_rxBuf) { - vRingbufferDelete(m_rxBuf); - m_rxBuf = nullptr; - } - if (m_txBuf) { - vRingbufferDelete(m_txBuf); - m_txBuf = nullptr; - } - return false; - } - return true; } -bool NimBLEStream::end() { - // Release any buffered RX item - if (m_rxState.item && m_rxBuf) { - vRingbufferReturnItem(m_rxBuf, m_rxState.item); - m_rxState.item = nullptr; +/** + * @brief Clean up the NimBLEStream, stopping events and deleting buffers. + */ +void NimBLEStream::end() { + if (m_coInitialized) { + ble_npl_callout_stop(&m_txDrainCallout); + ble_npl_callout_deinit(&m_txDrainCallout); + m_coInitialized = false; } - m_rxState.itemSize = 0; - m_rxState.offset = 0; - if (m_txTask) { - vTaskDelete(m_txTask); - m_txTask = nullptr; + if (m_eventInitialized) { + ble_npl_eventq_remove(nimble_port_get_dflt_eventq(), &m_txDrainEvent); + ble_npl_event_deinit(&m_txDrainEvent); + m_eventInitialized = false; } if (m_txBuf) { - vRingbufferDelete(m_txBuf); + delete m_txBuf; m_txBuf = nullptr; } if (m_rxBuf) { - vRingbufferDelete(m_rxBuf); + delete m_rxBuf; m_rxBuf = nullptr; } - - return true; } +/** + * @brief Write data to the stream, which will be sent over BLE. + * @param data Pointer to the data to write. + * @param len Length of the data to write. + * @return the number of bytes actually written to the stream buffer. + */ size_t NimBLEStream::write(const uint8_t* data, size_t len) { - if (!m_txBuf || !data || len == 0) { + if (!m_txBuf) { return 0; } - ble_npl_time_t timeout = 0; - ble_npl_time_ms_to_ticks(getTimeout(), &timeout); - size_t chunk = std::min(len, xRingbufferGetCurFreeSize(m_txBuf)); - if (xRingbufferSend(m_txBuf, data, chunk, static_cast(timeout)) != pdTRUE) { - return 0; - } - return chunk; + auto written = m_txBuf->write(data, len); + drainTx(); + return written; } +/** + * @brief Get the available free space in the stream's TX buffer. + * @return the number of bytes that can be written to the stream without blocking. + */ size_t NimBLEStream::availableForWrite() const { - return m_txBuf ? xRingbufferGetCurFreeSize(m_txBuf) : 0; + return m_txBuf ? m_txBuf->freeSize() : 0; } -void NimBLEStream::flush(uint32_t timeout_ms) { - if (!m_txBuf) { +/** + * @brief Schedule the stream to attempt to send data from the TX buffer. + * @details This should be called whenever new data is written to the TX buffer + * or when a send attempt fails due to lack of BLE buffers. + */ +void NimBLEStream::drainTx() { + if (!m_txBuf || m_txBuf->size() == 0) { return; } - ble_npl_time_t deadline = timeout_ms > 0 ? ble_npl_time_get() + ble_npl_time_ms_to_ticks32(timeout_ms) : 0; + ble_npl_eventq_put(nimble_port_get_dflt_eventq(), &m_txDrainEvent); +} - // Wait until TX ring is drained - while (m_txBuf && xRingbufferGetCurFreeSize(m_txBuf) < m_txBufSize) { - if (deadline > 0 && ble_npl_time_get() >= deadline) { - break; - } - ble_npl_time_delay(ble_npl_time_ms_to_ticks32(1)); +/** + * @brief Event callback for when the stream is scheduled to drain the TX buffer. + * @param ev Pointer to the event that triggered the callback. + * @details This will attempt to send data from the TX buffer. If sending fails due to + * lack of BLE buffers, it will reschedule itself to try again after a short delay. + */ +void NimBLEStream::txDrainEventCb(struct ble_npl_event* ev) { + if (!ev) { + return; + } + + auto* stream = static_cast(ble_npl_event_get_arg(ev)); + if (!stream) { + return; + } + + if (stream->send()) { + // Schedule a short delayed retry to give the stack time to free buffers, use 5ms for now + // TODO: consider options for the delay time and retry strategy if the stack is persistently out of buffers + ble_npl_callout_reset(&stream->m_txDrainCallout, ble_npl_time_ms_to_ticks32(5)); } } +/** + * @brief Callout callback for when the stream is scheduled to retry draining the TX buffer. + * @param ev Pointer to the event that triggered the callback. + * @details This will call drainTx() to attempt to send data from the TX buffer again. + */ +void NimBLEStream::txDrainCalloutCb(struct ble_npl_event* ev) { + if (!ev) { + return; + } + + auto* stream = static_cast(ble_npl_event_get_arg(ev)); + if (!stream) { + return; + } + + stream->drainTx(); +} + +/** + * @brief Get the number of bytes available to read from the stream. + * @return the number of bytes that can be read from the stream. + */ int NimBLEStream::available() { if (!m_rxBuf) { - NIMBLE_UART_LOGE(LOG_TAG, "Invalid RX buffer"); return 0; } - // Count buffered RX item remainder - size_t buffered = m_rxState.itemSize > m_rxState.offset ? m_rxState.itemSize - m_rxState.offset : 0; - - // Query items in RX ring - UBaseType_t waiting = 0; - vRingbufferGetInfo(m_rxBuf, nullptr, nullptr, nullptr, nullptr, &waiting); - - return static_cast(buffered + waiting); + return static_cast(m_rxBuf->size()); } +/** + * @brief Read a single byte from the stream. + * @return the byte read as an int, or -1 if no data is available. + */ int NimBLEStream::read() { if (!m_rxBuf) { return -1; } - // Return from buffered item if available - if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) { - uint8_t byte = m_rxState.item[m_rxState.offset++]; - - // Release item if we've consumed it all - if (m_rxState.offset >= m_rxState.itemSize) { - vRingbufferReturnItem(m_rxBuf, m_rxState.item); - m_rxState.item = nullptr; - m_rxState.itemSize = 0; - m_rxState.offset = 0; - } - - return static_cast(byte); - } - - // Fetch next item from ringbuffer - size_t itemSize = 0; - uint8_t* item = static_cast(xRingbufferReceive(m_rxBuf, &itemSize, 0)); - if (!item || itemSize == 0) { + uint8_t byte = 0; + if (m_rxBuf->read(&byte, 1) == 0) { return -1; } - // Store in buffer state and return first byte - m_rxState.item = item; - m_rxState.itemSize = itemSize; - m_rxState.offset = 1; // Already consumed first byte - - return static_cast(item[0]); + return static_cast(byte); } +/** + * @brief Peek at the next byte in the stream without removing it. + * @return the byte peeked as an int, or -1 if no data is available. + */ int NimBLEStream::peek() { if (!m_rxBuf) { return -1; } - // Return from buffered item if available - if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) { - return static_cast(m_rxState.item[m_rxState.offset]); + uint8_t byte = 0; + if (m_rxBuf->peek(&byte, 1) == 0) { + return -1; } - // Fetch next item from ringbuffer if not already buffered - if (!m_rxState.item) { - size_t itemSize = 0; - uint8_t* item = static_cast(xRingbufferReceive(m_rxBuf, &itemSize, 0)); - if (!item || itemSize == 0) { - return -1; - } + return static_cast(byte); +} - // Store in buffer state - m_rxState.item = item; - m_rxState.itemSize = itemSize; - m_rxState.offset = 0; +/** + * @brief Read data from the stream into a buffer. + * @param buffer Pointer to the buffer where read data will be stored. + * @param len Maximum number of bytes to read. + * @return the number of bytes actually read from the stream. + */ +size_t NimBLEStream::read(uint8_t* buffer, size_t len) { + if (!m_rxBuf) { + return 0; } - return static_cast(m_rxState.item[m_rxState.offset]); + return m_rxBuf->read(buffer, len); } -size_t NimBLEStream::read(uint8_t* buffer, size_t len) { - if (!m_rxBuf || !buffer || len == 0) { +/** + * @brief Push received data into the stream's RX buffer. + * @param data Pointer to the data to push into the RX buffer. + * @param len Length of the data to push. + * @return the number of bytes actually pushed into the RX buffer, which may be less than + * len if the buffer does not have enough free space. + */ +size_t NimBLEStream::pushRx(const uint8_t* data, size_t len) { + if (!m_rxBuf) { return 0; } - size_t total = 0; + if (m_rxBuf->freeSize() < len) { + NIMBLE_LOGE(LOG_TAG, "RX buffer full, dropping data"); + return 0; + } - // First, consume any buffered RX item remainder - if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) { - size_t available = m_rxState.itemSize - m_rxState.offset; - size_t copyLen = std::min(len, available); - memcpy(buffer, m_rxState.item + m_rxState.offset, copyLen); - m_rxState.offset += copyLen; - total += copyLen; + return m_rxBuf->write(data, len); +} - // Release item if fully consumed - if (m_rxState.offset >= m_rxState.itemSize) { - vRingbufferReturnItem(m_rxBuf, m_rxState.item); - m_rxState.item = nullptr; - m_rxState.itemSize = 0; - m_rxState.offset = 0; - } +# if MYNEWT_VAL(BLE_ROLE_PERIPHERAL) +/** + * @brief Initialize the NimBLEStreamServer with an existing characteristic. + * @param pChr Pointer to the existing NimBLECharacteristic to use for the stream. + * @param txBufSize Size of the TX buffer. + * @param rxBufSize Size of the RX buffer. + * @return true if initialization was successful, false otherwise. + * @details The provided characteristic must have the NOTIFY property set for write operations + * to work and the WRITE or WRITE_NR property set for read operations to work. + * The stream will manage setting its own callbacks on the characteristic, but will save and call + * any existing user callbacks if they were set. + * The RX buffer will only be created if the characteristic has WRITE or WRITE_NR properties. + * The TX buffer will only be created if the characteristic has NOTIFY properties. + */ +bool NimBLEStreamServer::begin(NimBLECharacteristic* pChr, uint32_t txBufSize, uint32_t rxBufSize) { + if (!NimBLEDevice::isInitialized()) { + return false; + } - if (total >= len) { - return total; - } + if (m_pChr) { + NIMBLE_LOGW(LOG_TAG, "Already initialized with a characteristic"); + return true; } - // Drain additional RX ringbuffer items - while (total < len) { - size_t itemSize = 0; - uint8_t* item = static_cast(xRingbufferReceive(m_rxBuf, &itemSize, 0)); - if (!item || itemSize == 0) { - break; - } + if (!pChr) { + NIMBLE_LOGE(LOG_TAG, "Characteristic is null"); + return false; + } - size_t copyLen = std::min(len - total, itemSize); - memcpy(buffer + total, item, copyLen); - total += copyLen; + auto props = pChr->getProperties(); + bool canWrite = (props & NIMBLE_PROPERTY::WRITE) || (props & NIMBLE_PROPERTY::WRITE_NR); + if (!canWrite && rxBufSize > 0) { + NIMBLE_LOGW(LOG_TAG, "Characteristic does not support WRITE, ignoring RX buffer size"); + } - // If we didn't consume the entire item, buffer it - if (copyLen < itemSize) { - m_rxState.item = item; - m_rxState.itemSize = itemSize; - m_rxState.offset = copyLen; - } else { - // Item fully consumed - vRingbufferReturnItem(m_rxBuf, item); - } + bool canNotify = props & NIMBLE_PROPERTY::NOTIFY; + if (!canNotify && txBufSize > 0) { + NIMBLE_LOGW(LOG_TAG, "Characteristic does not support NOTIFY, ignoring TX buffer size"); } - return total; -} + m_rxBufSize = canWrite ? rxBufSize : 0; // disable RX if not writable + m_txBufSize = canNotify ? txBufSize : 0; // disable TX if notifications not supported -size_t NimBLEStream::pushRx(const uint8_t* data, size_t len) { - if (!m_rxBuf || !data || len == 0) { - NIMBLE_UART_LOGE(LOG_TAG, "Invalid RX buffer or data"); - return 0; + if (!NimBLEStream::begin()) { + NIMBLE_LOGE(LOG_TAG, "Failed to initialize stream buffers"); + return false; } - if (xRingbufferSend(m_rxBuf, data, len, 0) != pdTRUE) { - NIMBLE_UART_LOGE(LOG_TAG, "RX buffer full, dropping %zu bytes", len); - return 0; - } - return len; + m_charCallbacks.m_userCallbacks = pChr->getCallbacks(); + pChr->setCallbacks(&m_charCallbacks); + m_pChr = pChr; + return true; } -# if MYNEWT_VAL(BLE_ROLE_PERIPHERAL) -bool NimBLEStreamServer::init(const NimBLEUUID& svcUuid, const NimBLEUUID& chrUuid, bool canWrite, bool secure) { +/** + * @brief Initialize the NimBLEStreamServer, creating a BLE service and characteristic for streaming. + * @param svcUuid UUID of the BLE service to create. + * @param chrUuid UUID of the BLE characteristic to create. + * @param txBufSize Size of the TX buffer, set to 0 to disable TX and create a write-only characteristic. + * @param rxBufSize Size of the RX buffer, set to 0 to disable RX and create a notify-only characteristic. + * @param secure Whether the characteristic requires encryption. + * @return true if initialization was successful, false otherwise. + */ +bool NimBLEStreamServer::begin( + const NimBLEUUID& svcUuid, const NimBLEUUID& chrUuid, uint32_t txBufSize, uint32_t rxBufSize, bool secure) { if (!NimBLEDevice::isInitialized()) { - NIMBLE_UART_LOGE(LOG_TAG, "NimBLEDevice not initialized"); + NIMBLE_LOGE(LOG_TAG, "NimBLEDevice not initialized"); + return false; + } + + if (m_pChr != nullptr) { + NIMBLE_LOGE(LOG_TAG, "NimBLEStreamServer already initialized;"); return false; } @@ -346,62 +573,92 @@ bool NimBLEStreamServer::init(const NimBLEUUID& svcUuid, const NimBLEUUID& chrUu pServer = NimBLEDevice::createServer(); } - NimBLEService* pSvc = pServer->getServiceByUUID(svcUuid); - if (!pSvc) { - pSvc = pServer->createService(svcUuid); - } - + auto pSvc = pServer->createService(svcUuid); if (!pSvc) { - NIMBLE_UART_LOGE(LOG_TAG, "Failed to create service"); return false; } // Create characteristic with notify + write properties for bidirectional stream - uint32_t props = NIMBLE_PROPERTY::NOTIFY; - if (secure) { - props |= NIMBLE_PROPERTY::READ_ENC; + uint32_t props = 0; + if (txBufSize > 0) { + props |= NIMBLE_PROPERTY::NOTIFY; + if (secure) { + props |= NIMBLE_PROPERTY::READ_ENC; + } } - if (canWrite) { + if (rxBufSize > 0) { props |= NIMBLE_PROPERTY::WRITE | NIMBLE_PROPERTY::WRITE_NR; if (secure) { props |= NIMBLE_PROPERTY::WRITE_ENC; } - } else { - m_rxBufSize = 0; // disable RX if not writable } - m_pChr = pSvc->getCharacteristic(chrUuid); - if (!m_pChr) { - m_pChr = pSvc->createCharacteristic(chrUuid, props); + auto pChr = pSvc->createCharacteristic(chrUuid, props); + if (!pChr) { + NIMBLE_LOGE(LOG_TAG, "Failed to create characteristic"); + goto error; + } + + m_deleteSvcOnEnd = true; // mark service for deletion on end since we created it here + + if (!pSvc->start()) { + NIMBLE_LOGE(LOG_TAG, "Failed to start service"); + goto error; } - if (!m_pChr) { - NIMBLE_UART_LOGE(LOG_TAG, "Failed to create characteristic"); - return false; + if (!begin(pChr, txBufSize, rxBufSize)) { + NIMBLE_LOGE(LOG_TAG, "Failed to initialize stream with characteristic"); + goto error; } - m_pChr->setCallbacks(&m_charCallbacks); - return pSvc->start(); + return true; + +error: + pServer->removeService(pSvc, true); // delete service and all its characteristics + m_pChr = nullptr; // reset characteristic pointer as it's now invalid after service removal + end(); + return false; } -void NimBLEStreamServer::deinit() { +/** + * @brief Stop the NimBLEStreamServer + * @details This will stop the stream and delete the service created if it was created by this class. + */ +void NimBLEStreamServer::end() { if (m_pChr) { - NimBLEService* pSvc = m_pChr->getService(); - if (pSvc) { - pSvc->removeCharacteristic(m_pChr, true); + if (m_deleteSvcOnEnd) { + auto pSvc = m_pChr->getService(); + if (pSvc) { + auto pServer = pSvc->getServer(); + if (pServer) { + pServer->removeService(pSvc, true); + } + } + } else { + m_pChr->setCallbacks(m_charCallbacks.m_userCallbacks); // restore any user callbacks } - m_pChr = nullptr; } + + m_pChr = nullptr; + m_charCallbacks.m_peerHandle = BLE_HS_CONN_HANDLE_NONE; + m_charCallbacks.m_userCallbacks = nullptr; + m_deleteSvcOnEnd = false; NimBLEStream::end(); } +/** + * @brief Write data to the stream, which will be sent as BLE notifications. + * @param data Pointer to the data to write. + * @param len Length of the data to write. + * @return the number of bytes actually written to the stream buffer. + */ size_t NimBLEStreamServer::write(const uint8_t* data, size_t len) { - if (!m_pChr || len == 0 || !hasSubscriber()) { + if (!m_pChr || len == 0 || !ready()) { return 0; } -# if MYNEWT_VAL(NIMBLE_CPP_LOG_LEVEL) >= 4 +# if MYNEWT_VAL(NIMBLE_CPP_LOG_LEVEL) >= 4 // Skip server gap events to avoid log recursion static const char filterStr[] = "handleGapEvent"; constexpr size_t filterLen = sizeof(filterStr) - 1; @@ -412,163 +669,266 @@ size_t NimBLEStreamServer::write(const uint8_t* data, size_t len) { } } } -# endif +# endif return NimBLEStream::write(data, len); } -bool NimBLEStreamServer::send(const uint8_t* data, size_t len) { - if (!m_pChr || !len || !hasSubscriber()) { +/** + * @brief Attempt to send data from the TX buffer as BLE notifications. + * @return true if a retry should be scheduled, false otherwise. + * @details This will try to send as much data as possible from the TX buffer in chunks + * that fit within the current BLE MTU. If sending fails due to lack of BLE buffers, it will return true + * to indicate that a retry should be scheduled, but it will not drop any data from the TX buffer. + * For other errors or if all data is sent, it returns false. + */ +bool NimBLEStreamServer::send() { + if (!m_pChr || !m_txBuf || !ready()) { + return false; + } + + size_t mtu = ble_att_mtu(getPeerHandle()); + if (mtu < 23) { return false; } - size_t offset = 0; - while (offset < len) { - size_t chunkLen = std::min(len - offset, getMaxLength()); - while (!m_pChr->notify(data + offset, chunkLen, getPeerHandle())) { - // Retry on ENOMEM (mbuf shortage) + size_t maxDataLen = std::min(mtu - 3, sizeof(m_txChunkBuf)); + + while (m_txBuf->size()) { + size_t chunkLen = m_txBuf->peek(m_txChunkBuf, maxDataLen); + if (!chunkLen) { + break; + } + + if (!m_pChr->notify(m_txChunkBuf, chunkLen, getPeerHandle())) { if (m_rc == BLE_HS_ENOMEM || os_msys_num_free() <= 2) { - ble_npl_time_delay(ble_npl_time_ms_to_ticks32(8)); // wait for a minimum connection event time - continue; + // NimBLE stack out of buffers, likely due to pending notifications/indications + // Don't drop data, but wait for stack to free buffers and try again later + return true; } - return false; + + return false; // disconnect or other error don't retry send, preserve data for next attempt } - offset += chunkLen; + m_txBuf->drop(chunkLen); } - return true; + + return false; // no more data to send } -void NimBLEStreamServer::ChrCallbacks::onWrite(NimBLECharacteristic* pCharacteristic, NimBLEConnInfo& connInfo) { - // Push received data into RX buffer - auto val = pCharacteristic->getValue(); - if (val.size() > 0) { - m_parent->pushRx(val.data(), val.size()); +/** + * @brief Check if the stream is ready to send/receive data, which requires an active BLE connection. + * @return true if the stream is ready, false otherwise. + */ +bool NimBLEStreamServer::ready() const { + if (m_charCallbacks.m_peerHandle == BLE_HS_CONN_HANDLE_NONE) { + return false; } - if (m_userCallbacks) { - m_userCallbacks->onWrite(pCharacteristic, connInfo); - } + return ble_gap_conn_find(m_charCallbacks.m_peerHandle, NULL) == 0; } -void NimBLEStreamServer::ChrCallbacks::onSubscribe(NimBLECharacteristic* pCharacteristic, - NimBLEConnInfo& connInfo, - uint16_t subValue) { - // only one subscriber supported - if (m_peerHandle != BLE_HS_CONN_HANDLE_NONE && subValue) { - return; +/** + * @brief Callback for when the characteristic is written to by a client. + * @param pChr Pointer to the characteristic that was written to. + * @param connInfo Information about the connection that performed the write. + * @details This will push the received data into the RX buffer and call any user-defined callbacks. + */ +void NimBLEStreamServer::ChrCallbacks::onWrite(NimBLECharacteristic* pChr, NimBLEConnInfo& connInfo) { + // Push received data into RX buffer + auto val = pChr->getValue(); + m_parent->pushRx(val.data(), val.size()); + + if (m_userCallbacks) { + m_userCallbacks->onWrite(pChr, connInfo); } +} - m_peerHandle = subValue ? connInfo.getConnHandle() : BLE_HS_CONN_HANDLE_NONE; +/** + * @brief Callback for when a client subscribes or unsubscribes to notifications/indications. + * @param pChr Pointer to the characteristic that was subscribed/unsubscribed. + * @param connInfo Information about the connection that performed the subscribe/unsubscribe. + * @param subValue The new subscription value (0 for unsubscribe, non-zero for subscribe). + * @details This will track the subscriber's connection handle and call any user-defined callbacks. + * Only one subscriber is supported; if another client tries to subscribe while one is already subscribed, it will be ignored. + */ +void NimBLEStreamServer::ChrCallbacks::onSubscribe(NimBLECharacteristic* pChr, NimBLEConnInfo& connInfo, uint16_t subValue) { + // If we have a stored peer handle, ensure it still refers to an active connection. + // If the connection has gone away without an explicit unsubscribe, clear it so a new + // subscriber can be accepted. if (m_peerHandle != BLE_HS_CONN_HANDLE_NONE) { - m_maxLen = ble_att_mtu(m_peerHandle) - 3; - if (!m_parent->begin()) { - NIMBLE_UART_LOGE(LOG_TAG, "NimBLEStreamServer failed to begin"); + if (ble_gap_conn_find(m_peerHandle, nullptr) != 0) { + m_peerHandle = BLE_HS_CONN_HANDLE_NONE; } + } + + // only one subscriber supported + if (subValue && m_peerHandle != BLE_HS_CONN_HANDLE_NONE) { + NIMBLE_LOGI(LOG_TAG, + "Already have a subscriber, rejecting new subscription from conn handle %d", + connInfo.getConnHandle()); return; } - m_parent->end(); + m_peerHandle = subValue ? connInfo.getConnHandle() : BLE_HS_CONN_HANDLE_NONE; if (m_userCallbacks) { - m_userCallbacks->onSubscribe(pCharacteristic, connInfo, subValue); + m_userCallbacks->onSubscribe(pChr, connInfo, subValue); } } -void NimBLEStreamServer::ChrCallbacks::onStatus(NimBLECharacteristic* pCharacteristic, int code) { +/** + * @brief Callback for when the connection status changes (e.g. disconnect). + * @param pChr Pointer to the characteristic associated with the status change. + * @param connInfo Information about the connection that changed status. + * @param code The new status code (e.g. success or error code). + * @details The code is used to track when the stack is out of buffers (BLE_HS_ENOMEM) + * to trigger retries without dropping data. User-defined callbacks will also be called if set. + */ +void NimBLEStreamServer::ChrCallbacks::onStatus(NimBLECharacteristic* pChr, NimBLEConnInfo& connInfo, int code) { m_parent->m_rc = code; if (m_userCallbacks) { - m_userCallbacks->onStatus(pCharacteristic, code); + m_userCallbacks->onStatus(pChr, connInfo, code); } } -# endif // MYNEWT_VAL(BLE_ROLE_PERIPHERAL) +# endif // MYNEWT_VAL(BLE_ROLE_PERIPHERAL) + +# if MYNEWT_VAL(BLE_ROLE_CENTRAL) +/** + * @brief Initialize the NimBLEStreamClient, setting up the remote characteristic and subscribing to notifications if requested. + * @param pChr Pointer to the remote characteristic to use for streaming. + * @param subscribe Whether to subscribe to notifications/indications from the characteristic for RX. + * @param txBufSize Size of the TX buffer. + * @param rxBufSize Size of the RX buffer. + * @return true if initialization was successful, false otherwise. + * @details The characteristic must support write without response for TX. + * If subscribe is true, it will subscribe to notifications/indications for RX. + * If subscribe is false, the RX buffer will not be created and no notifications will be received. + */ +bool NimBLEStreamClient::begin(NimBLERemoteCharacteristic* pChr, bool subscribe, uint32_t txBufSize, uint32_t rxBufSize) { + if (!NimBLEDevice::isInitialized()) { + NIMBLE_LOGE(LOG_TAG, "NimBLE stack not initialized, call NimBLEDevice::init() first"); + return false; + } + + if (m_pChr) { + NIMBLE_LOGW(LOG_TAG, "Already initialized, must end() first"); + return true; + } -# if MYNEWT_VAL(BLE_ROLE_CENTRAL) -bool NimBLEStreamClient::init(NimBLERemoteCharacteristic* pChr, bool subscribe) { if (!pChr) { + NIMBLE_LOGE(LOG_TAG, "Remote characteristic is null"); return false; } - m_pChr = pChr; - m_writeWithRsp = !pChr->canWriteNoResponse(); + if (!pChr->canWriteNoResponse()) { + NIMBLE_LOGE(LOG_TAG, "Characteristic does not support write without response"); + return false; + } + + if (subscribe && !pChr->canNotify() && !pChr->canIndicate()) { + NIMBLE_LOGW(LOG_TAG, "Characteristic does not support subscriptions, RX disabled"); + subscribe = false; // disable subscribe if not supported + } + + m_txBufSize = txBufSize; + m_rxBufSize = subscribe ? rxBufSize : 0; // disable RX buffer if not subscribing + + if (!NimBLEStream::begin()) { + NIMBLE_LOGE(LOG_TAG, "Failed to initialize stream buffers"); + return false; + } // Subscribe to notifications/indications for RX if requested - if (subscribe && (pChr->canNotify() || pChr->canIndicate())) { + if (subscribe) { using namespace std::placeholders; if (!pChr->subscribe(pChr->canNotify(), std::bind(&NimBLEStreamClient::notifyCallback, this, _1, _2, _3, _4))) { - NIMBLE_UART_LOGE(LOG_TAG, "Failed to subscribe for notifications"); + NIMBLE_LOGE(LOG_TAG, "Failed to subscribe for %s", pChr->canNotify() ? "notifications" : "indications"); + end(); + return false; } } - if (!subscribe) { - m_rxBufSize = 0; // disable RX if not subscribing - } - + m_pChr = pChr; return true; } -void NimBLEStreamClient::deinit() { +/** + * @brief Clean up the NimBLEStreamClient, unsubscribing from notifications and clearing the remote characteristic reference. + */ +void NimBLEStreamClient::end() { if (m_pChr && (m_pChr->canNotify() || m_pChr->canIndicate())) { m_pChr->unsubscribe(); } - NimBLEStream::end(); + m_pChr = nullptr; + NimBLEStream::end(); } -bool NimBLEStreamClient::send(const uint8_t* data, size_t len) { - if (!m_pChr || !data || len == 0) { +/** + * @brief Write data to the stream, which will be sent as BLE writes to the remote characteristic. + * @return True if a retry should be scheduled due to lack of BLE buffers, false otherwise. + * @details This will try to send as much data as possible from the TX buffer in chunks + * that fit within the memory buffers. If sending fails due to lack of BLE buffers, it will return true + * to indicate that a retry should be scheduled, but it will not drop any data from the TX buffer. + * For other errors or if all data is sent, it returns false. + */ +bool NimBLEStreamClient::send() { + if (!ready() || !m_txBuf) { return false; } - return m_pChr->writeValue(data, len, m_writeWithRsp); -} -void NimBLEStreamClient::notifyCallback(NimBLERemoteCharacteristic* pChar, uint8_t* pData, size_t len, bool isNotify) { - if (pData && len > 0) { - pushRx(pData, len); + auto mtu = m_pChr->getClient()->getMTU(); + if (mtu < 23) { + return false; } - if (m_userNotifyCallback) { - m_userNotifyCallback(pChar, pData, len, isNotify); - } -} + size_t maxDataLen = std::min(mtu - 3, sizeof(m_txChunkBuf)); -// UART logging support -int NimBLEStream::uart_log_printfv(const char* format, va_list arg) { - static char loc_buf[64]; - char* temp = loc_buf; - uint32_t len; - va_list copy; - va_copy(copy, arg); - len = vsnprintf(NULL, 0, format, copy); - va_end(copy); - if (len >= sizeof(loc_buf)) { - temp = (char*)malloc(len + 1); - if (temp == NULL) { - return 0; + while (m_txBuf->size()) { + size_t chunkLen = m_txBuf->peek(m_txChunkBuf, maxDataLen); + if (!chunkLen) { + break; } - } - int wlen = vsnprintf(temp, len + 1, format, arg); - for (int i = 0; i < wlen; i++) { - uart_tx_one_char(temp[i]); - } + if (!m_pChr->writeValue(m_txChunkBuf, chunkLen, false)) { + if (os_msys_num_free() <= 2) { + // NimBLE stack out of buffers, likely due to pending writes + // Don't drop data, wait for stack to free buffers and try again later + return true; + } + + break; // preserve data, no retry + } - if (len >= sizeof(loc_buf)) { - free(temp); + m_txBuf->drop(chunkLen); } - return wlen; + return false; // don't retry, it's either sent or we are disconnected } -int NimBLEStream::uart_log_printf(const char* format, ...) { - int len; - va_list arg; - va_start(arg, format); - len = NimBLEStream::uart_log_printfv(format, arg); - va_end(arg); - return len; +/** + * @brief Check if the stream is ready for communication. + * @return true if the stream is ready, false otherwise. + * @details The stream is considered ready if the remote characteristic is set and the client connection is active. + */ +bool NimBLEStreamClient::ready() const { + return m_pChr != nullptr && m_pChr->getClient()->isConnected(); } -# endif // MYNEWT_VAL(BLE_ROLE_CENTRAL) -# endif // CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) -#endif // ESP_PLATFORM \ No newline at end of file +/** + * @brief Callback for when a notification or indication is received from the remote characteristic. + * @param pChar Pointer to the characteristic that sent the notification/indication. + * @param pData Pointer to the data received in the notification/indication. + * @param len Length of the data received. + * @param isNotify True if the data was received as a notification, false if it was received as an indication. + * @details This will push the received data into the RX buffer and call any user-defined callbacks. + */ +void NimBLEStreamClient::notifyCallback(NimBLERemoteCharacteristic* pChar, uint8_t* pData, size_t len, bool isNotify) { + pushRx(pData, len); + if (m_userNotifyCallback) { + m_userNotifyCallback(pChar, pData, len, isNotify); + } +} +# endif // MYNEWT_VAL(BLE_ROLE_CENTRAL) +#endif // CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) diff --git a/src/NimBLEStream.h b/src/NimBLEStream.h index 206b46979..b955fb4b7 100644 --- a/src/NimBLEStream.h +++ b/src/NimBLEStream.h @@ -15,21 +15,25 @@ * limitations under the License. */ -#ifdef ESP_PLATFORM -# ifndef NIMBLE_CPP_STREAM_H -# define NIMBLE_CPP_STREAM_H +#ifndef NIMBLE_CPP_STREAM_H +#define NIMBLE_CPP_STREAM_H -# include "syscfg/syscfg.h" -# if CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) +#include "syscfg/syscfg.h" +#if CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) -# include "NimBLEUUID.h" -# include -# include -# include +# if defined(CONFIG_NIMBLE_CPP_IDF) +# include "nimble/nimble_npl.h" +# else +# include "nimble/nimble/include/nimble/nimble_npl.h" +# endif + +# include +# include + +# if NIMBLE_CPP_ARDUINO_STRING_AVAILABLE +# include +# else -# if NIMBLE_CPP_ARDUINO_STRING_AVAILABLE -# include -# else // Minimal Stream/Print stubs when Arduino not available class Print { public: @@ -46,194 +50,158 @@ class Stream : public Print { virtual int available() = 0; virtual int read() = 0; virtual int peek() = 0; + virtual void flush() {} void setTimeout(unsigned long timeout) { m_timeout = timeout; } unsigned long getTimeout() const { return m_timeout; } protected: unsigned long m_timeout{0}; }; -# endif +# endif class NimBLEStream : public Stream { public: NimBLEStream() = default; virtual ~NimBLEStream() { end(); } - bool begin(); - bool end(); - - // Configure TX/RX buffer sizes and task parameters before begin() - void setTxBufSize(uint32_t size) { m_txBufSize = size; } - void setRxBufSize(uint32_t size) { m_rxBufSize = size; } - void setTxTaskStackSize(uint32_t size) { m_txTaskStackSize = size; } - void setTxTaskPriority(uint32_t priority) { m_txTaskPriority = priority; } - - // These logging macros exist to provide log output over UART so that the stream class can - // be used to redirect logs without causing recursion issues. - static int uart_log_printfv(const char* format, va_list arg); - static int uart_log_printf(const char* format, ...); - // Print/Stream TX methods virtual size_t write(const uint8_t* data, size_t len) override; virtual size_t write(uint8_t data) override { return write(&data, 1); } // Template for other integral types (char, int, long, etc.) - template ::value && !std::is_same::value, int>::type = 0> - size_t write(T data) { + template + typename std::enable_if::value && !std::is_same::value, size_t>::type write(T data) { return write(static_cast(data)); } - size_t availableForWrite() const; - void flush(uint32_t timeout_ms = 0); + virtual void flush() override {} - // Stream RX methods - virtual int available() override; - virtual int read() override; - virtual int peek() override; + size_t availableForWrite() const; // Read up to len bytes into buffer (non-blocking) size_t read(uint8_t* buffer, size_t len); - // Serial-like helpers - bool ready() const { return isReady(); } - operator bool() const { return ready(); } + // Stream RX methods + virtual int available() override; + virtual int read() override; + virtual int peek() override; + virtual bool ready() const = 0; + + operator bool() const { return ready(); } using Print::write; + struct ByteRingBuffer; + protected: - static void txTask(void* arg); - virtual bool send(const uint8_t* data, size_t len) = 0; - virtual bool isReady() const = 0; - - // Push received data into RX ring (called by subclass callbacks) - size_t pushRx(const uint8_t* data, size_t len); - - // RX buffering state: avoids requeueing/fragmentation - struct RxState { - uint8_t* item{nullptr}; - size_t itemSize{0}; - size_t offset{0}; - }; - - RingbufHandle_t m_txBuf{nullptr}; - RingbufHandle_t m_rxBuf{nullptr}; - TaskHandle_t m_txTask{nullptr}; - uint32_t m_txTaskStackSize{4096}; - uint32_t m_txTaskPriority{tskIDLE_PRIORITY + 1}; + bool begin(); + void drainTx(); + size_t pushRx(const uint8_t* data, size_t len); + virtual void end(); + virtual bool send() = 0; + static void txDrainEventCb(struct ble_npl_event* ev); + static void txDrainCalloutCb(struct ble_npl_event* ev); + + ByteRingBuffer* m_txBuf{nullptr}; + ByteRingBuffer* m_rxBuf{nullptr}; + uint8_t m_txChunkBuf[MYNEWT_VAL(BLE_ATT_PREFERRED_MTU)]; uint32_t m_txBufSize{1024}; uint32_t m_rxBufSize{1024}; - - mutable RxState m_rxState{}; // Track current RX item to avoid requeueing + ble_npl_event m_txDrainEvent{NULL}; + ble_npl_callout m_txDrainCallout{NULL}; + bool m_coInitialized{false}; + bool m_eventInitialized{false}; }; -# if MYNEWT_VAL(BLE_ROLE_PERIPHERAL) -# include "NimBLECharacteristic.h" +# if MYNEWT_VAL(BLE_ROLE_PERIPHERAL) +# include "NimBLECharacteristic.h" class NimBLEStreamServer : public NimBLEStream { public: NimBLEStreamServer() : m_charCallbacks(this) {} - ~NimBLEStreamServer() = default; + ~NimBLEStreamServer() override { end(); } + // non-copyable NimBLEStreamServer(const NimBLEStreamServer&) = delete; NimBLEStreamServer& operator=(const NimBLEStreamServer&) = delete; - bool init(const NimBLEUUID& svcUuid = NimBLEUUID(uint16_t(0xc0de)), - const NimBLEUUID& chrUuid = NimBLEUUID(uint16_t(0xfeed)), - bool canWrite = false, - bool secure = false); - void deinit(); + bool begin(NimBLECharacteristic* chr, uint32_t txBufSize = 1024, uint32_t rxBufSize = 1024); + + // Convenience overload to create service/characteristic internally; service will be deleted on end() + bool begin(const NimBLEUUID& svcUuid, + const NimBLEUUID& chrUuid, + uint32_t txBufSize = 1024, + uint32_t rxBufSize = 1024, + bool secure = false); + + void end() override; size_t write(const uint8_t* data, size_t len) override; uint16_t getPeerHandle() const { return m_charCallbacks.m_peerHandle; } - bool hasSubscriber() const { return m_charCallbacks.m_peerHandle != BLE_HS_CONN_HANDLE_NONE; } - size_t getMaxLength() const { return m_charCallbacks.m_maxLen; } void setCallbacks(NimBLECharacteristicCallbacks* pCallbacks) { m_charCallbacks.m_userCallbacks = pCallbacks; } + bool ready() const override; using NimBLEStream::write; // Inherit template write overloads - private: - bool send(const uint8_t* data, size_t len) override; - bool isReady() const override { return hasSubscriber(); } + protected: + bool send() override; struct ChrCallbacks : public NimBLECharacteristicCallbacks { ChrCallbacks(NimBLEStreamServer* parent) - : m_parent(parent), m_userCallbacks(nullptr), m_peerHandle(BLE_HS_CONN_HANDLE_NONE), m_maxLen(0) {} + : m_parent(parent), m_userCallbacks(nullptr), m_peerHandle(BLE_HS_CONN_HANDLE_NONE) {} void onWrite(NimBLECharacteristic* pCharacteristic, NimBLEConnInfo& connInfo) override; void onSubscribe(NimBLECharacteristic* pCharacteristic, NimBLEConnInfo& connInfo, uint16_t subValue) override; - void onStatus(NimBLECharacteristic* pCharacteristic, int code) override; + void onStatus(NimBLECharacteristic* pCharacteristic, NimBLEConnInfo& connInfo, int code) override; // override this to avoid recursion when debug logs are enabled - void onStatus(NimBLECharacteristic* pCharacteristic, NimBLEConnInfo& connInfo, int code) { - if (m_userCallbacks) { - m_userCallbacks->onStatus(pCharacteristic, connInfo, code); + void onStatus(NimBLECharacteristic* pCharacteristic, int code) override { + if (m_userCallbacks != nullptr) { + m_userCallbacks->onStatus(pCharacteristic, code); } } NimBLEStreamServer* m_parent; NimBLECharacteristicCallbacks* m_userCallbacks; uint16_t m_peerHandle; - uint16_t m_maxLen; } m_charCallbacks; NimBLECharacteristic* m_pChr{nullptr}; int m_rc{0}; + // Whether to delete the BLE service when end() is called; set to false if service is managed externally + bool m_deleteSvcOnEnd{false}; }; -# endif // BLE_ROLE_PERIPHERAL +# endif // BLE_ROLE_PERIPHERAL -# if MYNEWT_VAL(BLE_ROLE_CENTRAL) -# include "NimBLERemoteCharacteristic.h" +# if MYNEWT_VAL(BLE_ROLE_CENTRAL) +# include "NimBLERemoteCharacteristic.h" class NimBLEStreamClient : public NimBLEStream { public: - NimBLEStreamClient() = default; - ~NimBLEStreamClient() = default; + NimBLEStreamClient() = default; + ~NimBLEStreamClient() override { end(); } + // non-copyable NimBLEStreamClient(const NimBLEStreamClient&) = delete; NimBLEStreamClient& operator=(const NimBLEStreamClient&) = delete; // Attach a discovered remote characteristic; app owns discovery/connection. // Set subscribeNotify=true to receive notifications into RX buffer. - bool init(NimBLERemoteCharacteristic* pChr, bool subscribeNotify = false); - void deinit(); - void setWriteWithResponse(bool useWithRsp) { m_writeWithRsp = useWithRsp; } + bool begin(NimBLERemoteCharacteristic* pChr, + bool subscribeNotify = false, + uint32_t txBufSize = 1024, + uint32_t rxBufSize = 1024); + void end() override; void setNotifyCallback(NimBLERemoteCharacteristic::notify_callback cb) { m_userNotifyCallback = cb; } + bool ready() const override; using NimBLEStream::write; // Inherit template write overloads - private: - bool send(const uint8_t* data, size_t len) override; - bool isReady() const override { return m_pChr != nullptr; } + protected: + bool send() override; void notifyCallback(NimBLERemoteCharacteristic* pChar, uint8_t* pData, size_t len, bool isNotify); NimBLERemoteCharacteristic* m_pChr{nullptr}; - bool m_writeWithRsp{false}; NimBLERemoteCharacteristic::notify_callback m_userNotifyCallback{nullptr}; }; -# endif // BLE_ROLE_CENTRAL - -# endif // CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) - -# if MYNEWT_VAL(NIMBLE_CPP_LOG_LEVEL) >= 4 -# define NIMBLE_UART_LOGD(tag, format, ...) NimBLEStream::uart_log_printf("D %s: " format "\n", tag, ##__VA_ARGS__) -# else -# define NIMBLE_UART_LOGD(tag, format, ...) (void)tag -# endif - -# if MYNEWT_VAL(NIMBLE_CPP_LOG_LEVEL) >= 3 -# define NIMBLE_UART_LOGI(tag, format, ...) NimBLEStream::uart_log_printf("I %s: " format "\n", tag, ##__VA_ARGS__) -# else -# define NIMBLE_UART_LOGI(tag, format, ...) (void)tag -# endif - -# if MYNEWT_VAL(NIMBLE_CPP_LOG_LEVEL) >= 2 -# define NIMBLE_UART_LOGW(tag, format, ...) NimBLEStream::uart_log_printf("W %s: " format "\n", tag, ##__VA_ARGS__) -# else -# define NIMBLE_UART_LOGW(tag, format, ...) (void)tag -# endif - -# if MYNEWT_VAL(NIMBLE_CPP_LOG_LEVEL) >= 1 -# define NIMBLE_UART_LOGE(tag, format, ...) NimBLEStream::uart_log_printf("E %s: " format "\n", tag, ##__VA_ARGS__) -# else -# define NIMBLE_UART_LOGE(tag, format, ...) (void)tag -# endif - -# endif // NIMBLE_CPP_STREAM_H -#endif // ESP_PLATFORM +# endif // BLE_ROLE_CENTRAL + +#endif // CONFIG_BT_NIMBLE_ENABLED && (MYNEWT_VAL(BLE_ROLE_PERIPHERAL) || MYNEWT_VAL(BLE_ROLE_CENTRAL)) +#endif // NIMBLE_CPP_STREAM_H diff --git a/src/nimble/console/console.h b/src/nimble/console/console.h index 4b32c8b15..33aae03d8 100644 --- a/src/nimble/console/console.h +++ b/src/nimble/console/console.h @@ -5,9 +5,10 @@ #ifdef __cplusplus extern "C" { -#endif - +#define console_printf(_fmt, ...) ::printf(_fmt, ##__VA_ARGS__) +#else #define console_printf(_fmt, ...) printf(_fmt, ##__VA_ARGS__) +#endif #ifdef __cplusplus } diff --git a/src/nimconfig.h b/src/nimconfig.h index 5b230f0ab..72a8a7068 100644 --- a/src/nimconfig.h +++ b/src/nimconfig.h @@ -180,12 +180,12 @@ # define CONFIG_BT_NIMBLE_USE_ESP_TIMER (1) # endif -#define MYNEWT_VAL_BLE_USE_ESP_TIMER (CONFIG_BT_NIMBLE_USE_ESP_TIMER) +# define MYNEWT_VAL_BLE_USE_ESP_TIMER (CONFIG_BT_NIMBLE_USE_ESP_TIMER) -#ifdef CONFIG_BT_NIMBLE_EXT_ADV // Workaround for PlatformIO build flags causing redefinition warnings -# undef MYNEWT_VAL_BLE_EXT_ADV -# define MYNEWT_VAL_BLE_EXT_ADV (CONFIG_BT_NIMBLE_EXT_ADV) -#endif +# ifdef CONFIG_BT_NIMBLE_EXT_ADV // Workaround for PlatformIO build flags causing redefinition warnings +# undef MYNEWT_VAL_BLE_EXT_ADV +# define MYNEWT_VAL_BLE_EXT_ADV (CONFIG_BT_NIMBLE_EXT_ADV) +# endif #else // !ESP_PLATFORM # if defined(NRF51)