Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions BLE/BLE_Beacon/BLE_Beacon_temperature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""BLE beacon advertising example using aioble and SSD1327 OLED.

Broadcasts a BLE advertisement containing the board name and a live
temperature reading from the WSEN-PADS sensor. The beacon is visible
from any BLE scanner app (nRF Connect, LightBlue, etc.).

Hardware:
- STM32WB55 BLE radio
- SSD1327 128x128 OLED display (round)
- WSEN-PADS pressure + temperature sensor

BLE payload:
- Complete Local Name: "STeaMi-XXXX" (last 2 bytes of MAC address)
- Manufacturer Specific Data: temperature as int16 (x100, in 0.01 C)
"""

import sys

sys.path.insert(0, "/remote")

import bluetooth
import struct
import uasyncio as asyncio

import aioble
import ssd1327
from machine import I2C, SPI, Pin
from steami_screen import GRAY, GREEN, LIGHT, Screen, SSD1327Display, WHITE
from wsen_pads import WSEN_PADS

# === BLE setup ===
ble = bluetooth.BLE()
ble.active(True)

mac_bytes = ble.config("mac")[1]
mac_suffix = "".join(f"{b:02X}" for b in mac_bytes[-2:])
DEVICE_NAME = f"STeaMi-{mac_suffix}"
print("Device name:", DEVICE_NAME)

# === Display ===
spi = SPI(1)
dc = Pin("DATA_COMMAND_DISPLAY")
res = Pin("RST_DISPLAY")
cs = Pin("CS_DISPLAY")
display = SSD1327Display(ssd1327.WS_OLED_128X128_SPI(spi, dc, res, cs))
screen = Screen(display)

# === Sensor ===
i2c = I2C(1)
pads = WSEN_PADS(i2c)

# === BLE parameters ===
ADV_INTERVAL_US = 200_000 # 200 ms between advertisements
ADV_TIMEOUT_MS = 500 # advertise for 500 ms per cycle
SENSOR_INTERVAL_MS = 1000 # read sensor every 1 second

# === Shared state ===
temperature = 0.0


def build_adv_payload(name, temp_raw):
"""Build a BLE advertising payload.

Contains:
- Complete Local Name (type 0x09)
- Manufacturer Specific Data (type 0xFF): int16 temperature x100
"""
payload = bytearray()

# Complete Local Name
name_bytes = name.encode()
payload += bytes((len(name_bytes) + 1, 0x09)) + name_bytes

# Manufacturer Specific Data: temperature encoded as int16 (x100)
man_data = struct.pack("h", temp_raw)
payload += bytes((len(man_data) + 1, 0xFF)) + man_data

return payload


async def sensor_task():
"""Read temperature from WSEN-PADS every second."""
global temperature
while True:
try:
temperature = pads.temperature()
except Exception as e:
print("Sensor error:", e)
await asyncio.sleep_ms(SENSOR_INTERVAL_MS)


async def ble_task():
"""Advertise BLE beacon with device name and temperature."""
while True:
# Encode temperature as int16 (multiply by 100 to keep 2 decimals)
temp_raw = int(temperature * 100)
adv_payload = build_adv_payload(DEVICE_NAME, temp_raw)

try:
await aioble.advertise(
interval_us=ADV_INTERVAL_US,
adv_data=adv_payload,
connectable=False,
timeout_ms=ADV_TIMEOUT_MS,
)
except asyncio.TimeoutError:
pass # Normal: non-connectable advertisement timeout

await asyncio.sleep_ms(100)


async def display_task():
"""Update OLED display with current beacon state."""
while True:
screen.clear()
screen.title("BLE BEACON")
screen.value(f"{temperature:.1f}", unit="C")
screen.subtitle(DEVICE_NAME, "Advertising...")
screen.show()
await asyncio.sleep_ms(500)


async def main():
await asyncio.gather(
sensor_task(),
ble_task(),
display_task(),
)


asyncio.run(main())
219 changes: 219 additions & 0 deletions BLE/BLE_RSSI_Proximity/RSSI_Proximity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
"""BLE RSSI proximity detector example using aioble and SSD1327 OLED.

Two STeaMi boards run this same file:
- Hold button UP during the 10-second startup window → BEACON mode
- Release button UP → SCANNER mode (default)

The scanner reads the RSSI of the beacon and maps it to a proximity level
displayed as a gauge on the round OLED. RSSI is averaged over N samples
to reduce noise while keeping fast response.

Hardware:
- 2 x STM32WB55 BLE radio
- SSD1327 128x128 OLED display (round)
- MCP23009E D-PAD (button UP used for mode selection)

Learning goals:
- BLE scanning with aioble
- RSSI as a proxy for distance
- Signal averaging to reduce noise
"""

import sys

sys.path.insert(0, "/remote")

import bluetooth
import uasyncio as asyncio
from time import sleep_ms

import aioble
import ssd1327
from machine import I2C, SPI, Pin
from mcp23009e import MCP23009E
from mcp23009e.const import (
MCP23009_BTN_UP,
MCP23009_DIR_INPUT,
MCP23009_I2C_ADDR,
MCP23009_LOGIC_LOW,
MCP23009_PULLUP,
)
from steami_screen import GRAY, LIGHT, RED, Screen, SSD1327Display

# === BLE setup ===
ble = bluetooth.BLE()
ble.active(True)

mac_bytes = ble.config("mac")[1]
mac_suffix = "".join(f"{b:02X}" for b in mac_bytes[-2:])
DEVICE_NAME = f"STeaMi-{mac_suffix}"
BEACON_NAME = "STeaMi-BEACON"

# === Display ===
spi = SPI(1)
dc = Pin("DATA_COMMAND_DISPLAY")
res = Pin("RST_DISPLAY")
cs = Pin("CS_DISPLAY")
display = SSD1327Display(ssd1327.WS_OLED_128X128_SPI(spi, dc, res, cs))
screen = Screen(display)

# === Buttons ===
i2c = I2C(1)
reset_expander = Pin("RST_EXPANDER", Pin.OUT)
mcp = MCP23009E(i2c, address=MCP23009_I2C_ADDR, reset_pin=reset_expander)
mcp.setup(MCP23009_BTN_UP, MCP23009_DIR_INPUT, pullup=MCP23009_PULLUP)

# === BLE parameters ===
ADV_INTERVAL_US = 50_000 # 50ms beacon interval for fast detection
ADV_TIMEOUT_MS = 100 # Short advertising window
SCAN_DURATION_MS = 500 # Very short scan bursts for fast response
RSSI_SAMPLES = 2 # Few samples for fast reaction
RSSI_MIN = -90 # dBm considered far
RSSI_MAX = -30 # dBm considered very close

# === Mode selection window ===
MODE_SELECTION_S = 10

# === Shared state ===
rssi_samples = []
current_rssi = RSSI_MIN


def build_adv_payload(name):
"""Build a minimal BLE advertising payload with device name."""
payload = bytearray()
name_bytes = name.encode()
payload += bytes((len(name_bytes) + 1, 0x09)) + name_bytes
return payload


def rssi_to_proximity(rssi):
"""Map RSSI value to a 0-100 proximity percentage."""
clamped = max(RSSI_MIN, min(RSSI_MAX, rssi))
return int((clamped - RSSI_MIN) / (RSSI_MAX - RSSI_MIN) * 100)


def rssi_to_color(proximity):
"""Return display color based on proximity level."""
if proximity > 70:
return RED
elif proximity > 40:
return LIGHT
else:
return GRAY


def select_mode():
"""Show a 10-second countdown and detect button UP to select mode."""
print("Hold UP button for BEACON mode, release for SCANNER mode.")
for remaining in range(MODE_SELECTION_S, 0, -1):
screen.clear()
screen.title("SELECT MODE")
screen.subtitle(
"Hold UP: BEACON",
"Release: SCANNER",
f"Starting in {remaining}s...",
)
screen.show()
sleep_ms(1000)
if mcp.get_level(MCP23009_BTN_UP) == MCP23009_LOGIC_LOW:
print("Button UP held -> BEACON mode")
return True
print("Button UP released -> SCANNER mode")
return False


# =============================================================================
# === BEACON MODE =============================================================
# =============================================================================


async def beacon_ble_task():
"""Advertise as STeaMi-BEACON continuously."""
adv_payload = build_adv_payload(BEACON_NAME)
print(f"Beacon mode: advertising as {BEACON_NAME}")
while True:
try:
await aioble.advertise(
interval_us=ADV_INTERVAL_US,
adv_data=adv_payload,
connectable=False,
timeout_ms=ADV_TIMEOUT_MS,
)
except asyncio.TimeoutError:
pass


async def beacon_display_task():
"""Show beacon status on OLED."""
while True:
screen.clear()
screen.title("BEACON")
screen.subtitle(BEACON_NAME, "Broadcasting...")
screen.show()
await asyncio.sleep_ms(1000)


async def run_beacon():
await asyncio.gather(
beacon_ble_task(),
beacon_display_task(),
)


# =============================================================================
# === SCANNER MODE ============================================================
# =============================================================================


async def scanner_ble_task():
global current_rssi
display_counter = 0
while True:
async with aioble.scan(
SCAN_DURATION_MS,
interval_us=10000,
window_us=10000,
active=True,
) as scanner:
async for result in scanner:
if result.name() == BEACON_NAME:
current_rssi = result.rssi # Valeur brute directe
print(f"RSSI: {current_rssi} dBm")
display_counter += 1
if display_counter >= 3:
display_counter = 0
proximity = rssi_to_proximity(current_rssi)
color = rssi_to_color(proximity)
screen.clear()
screen.title("PROXIMITY")
screen.gauge(proximity, min_val=0, max_val=100, color=color)
screen.value(str(current_rssi), unit="dBm")
screen.subtitle(f"{proximity}%")
screen.show()


async def run_scanner():
await scanner_ble_task()


# =============================================================================
# === ENTRY POINT =============================================================
# =============================================================================

is_beacon = select_mode()

if is_beacon:
screen.clear()
screen.title("BEACON MODE")
screen.subtitle("Starting...")
screen.show()
sleep_ms(1000)
asyncio.run(run_beacon())
else:
screen.clear()
screen.title("SCANNER MODE")
screen.subtitle("Starting...")
screen.show()
sleep_ms(1000)
asyncio.run(run_scanner())