Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
micropython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
xpstem
micropython
Commits
89a95b7c
Commit
89a95b7c
authored
Jun 17, 2020
by
Jim Mussared
Committed by
Damien George
Jul 18, 2020
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
examples/bluetooth: Add simple UART demo with central and peripheral.
parent
e152d0c1
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
343 additions
and
0 deletions
+343
-0
examples/bluetooth/ble_simple_central.py
examples/bluetooth/ble_simple_central.py
+245
-0
examples/bluetooth/ble_simple_peripheral.py
examples/bluetooth/ble_simple_peripheral.py
+98
-0
No files found.
examples/bluetooth/ble_simple_central.py
0 → 100644
View file @
89a95b7c
# This example finds and connects to a peripheral running the
# UART service (e.g. ble_simple_peripheral.py).
import
bluetooth
import
random
import
struct
import
time
import
micropython
from
ble_advertising
import
decode_services
,
decode_name
from
micropython
import
const
_IRQ_CENTRAL_CONNECT
=
const
(
1
)
_IRQ_CENTRAL_DISCONNECT
=
const
(
2
)
_IRQ_GATTS_WRITE
=
const
(
3
)
_IRQ_GATTS_READ_REQUEST
=
const
(
4
)
_IRQ_SCAN_RESULT
=
const
(
5
)
_IRQ_SCAN_DONE
=
const
(
6
)
_IRQ_PERIPHERAL_CONNECT
=
const
(
7
)
_IRQ_PERIPHERAL_DISCONNECT
=
const
(
8
)
_IRQ_GATTC_SERVICE_RESULT
=
const
(
9
)
_IRQ_GATTC_SERVICE_DONE
=
const
(
10
)
_IRQ_GATTC_CHARACTERISTIC_RESULT
=
const
(
11
)
_IRQ_GATTC_CHARACTERISTIC_DONE
=
const
(
12
)
_IRQ_GATTC_DESCRIPTOR_RESULT
=
const
(
13
)
_IRQ_GATTC_DESCRIPTOR_DONE
=
const
(
14
)
_IRQ_GATTC_READ_RESULT
=
const
(
15
)
_IRQ_GATTC_READ_DONE
=
const
(
16
)
_IRQ_GATTC_WRITE_DONE
=
const
(
17
)
_IRQ_GATTC_NOTIFY
=
const
(
18
)
_IRQ_GATTC_INDICATE
=
const
(
19
)
_ADV_IND
=
const
(
0x00
)
_ADV_DIRECT_IND
=
const
(
0x01
)
_ADV_SCAN_IND
=
const
(
0x02
)
_ADV_NONCONN_IND
=
const
(
0x03
)
_UART_SERVICE_UUID
=
bluetooth
.
UUID
(
"6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
)
_UART_RX_CHAR_UUID
=
bluetooth
.
UUID
(
"6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
)
_UART_TX_CHAR_UUID
=
bluetooth
.
UUID
(
"6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
)
class
BLESimpleCentral
:
def
__init__
(
self
,
ble
):
self
.
_ble
=
ble
self
.
_ble
.
active
(
True
)
self
.
_ble
.
irq
(
handler
=
self
.
_irq
)
self
.
_reset
()
def
_reset
(
self
):
# Cached name and address from a successful scan.
self
.
_name
=
None
self
.
_addr_type
=
None
self
.
_addr
=
None
# Callbacks for completion of various operations.
# These reset back to None after being invoked.
self
.
_scan_callback
=
None
self
.
_conn_callback
=
None
self
.
_read_callback
=
None
# Persistent callback for when new data is notified from the device.
self
.
_notify_callback
=
None
# Connected device.
self
.
_conn_handle
=
None
self
.
_start_handle
=
None
self
.
_end_handle
=
None
self
.
_tx_handle
=
None
self
.
_rx_handle
=
None
def
_irq
(
self
,
event
,
data
):
if
event
==
_IRQ_SCAN_RESULT
:
addr_type
,
addr
,
adv_type
,
rssi
,
adv_data
=
data
if
adv_type
in
(
_ADV_IND
,
_ADV_DIRECT_IND
,)
and
_UART_SERVICE_UUID
in
decode_services
(
adv_data
):
# Found a potential device, remember it and stop scanning.
self
.
_addr_type
=
addr_type
self
.
_addr
=
bytes
(
addr
)
# Note: addr buffer is owned by caller so need to copy it.
self
.
_name
=
decode_name
(
adv_data
)
or
"?"
self
.
_ble
.
gap_scan
(
None
)
elif
event
==
_IRQ_SCAN_DONE
:
if
self
.
_scan_callback
:
if
self
.
_addr
:
# Found a device during the scan (and the scan was explicitly stopped).
self
.
_scan_callback
(
self
.
_addr_type
,
self
.
_addr
,
self
.
_name
)
self
.
_scan_callback
=
None
else
:
# Scan timed out.
self
.
_scan_callback
(
None
,
None
,
None
)
elif
event
==
_IRQ_PERIPHERAL_CONNECT
:
# Connect successful.
conn_handle
,
addr_type
,
addr
,
=
data
if
addr_type
==
self
.
_addr_type
and
addr
==
self
.
_addr
:
self
.
_conn_handle
=
conn_handle
self
.
_ble
.
gattc_discover_services
(
self
.
_conn_handle
)
elif
event
==
_IRQ_PERIPHERAL_DISCONNECT
:
# Disconnect (either initiated by us or the remote end).
conn_handle
,
_
,
_
,
=
data
if
conn_handle
==
self
.
_conn_handle
:
# If it was initiated by us, it'll already be reset.
self
.
_reset
()
elif
event
==
_IRQ_GATTC_SERVICE_RESULT
:
# Connected device returned a service.
conn_handle
,
start_handle
,
end_handle
,
uuid
=
data
print
(
"service"
,
data
)
if
conn_handle
==
self
.
_conn_handle
and
uuid
==
_UART_SERVICE_UUID
:
self
.
_start_handle
,
self
.
_end_handle
=
start_handle
,
end_handle
elif
event
==
_IRQ_GATTC_SERVICE_DONE
:
# Service query complete.
if
self
.
_start_handle
and
self
.
_end_handle
:
self
.
_ble
.
gattc_discover_characteristics
(
self
.
_conn_handle
,
self
.
_start_handle
,
self
.
_end_handle
)
else
:
print
(
"Failed to find uart service."
)
elif
event
==
_IRQ_GATTC_CHARACTERISTIC_RESULT
:
# Connected device returned a characteristic.
conn_handle
,
def_handle
,
value_handle
,
properties
,
uuid
=
data
if
conn_handle
==
self
.
_conn_handle
and
uuid
==
_UART_RX_CHAR_UUID
:
self
.
_rx_handle
=
value_handle
if
conn_handle
==
self
.
_conn_handle
and
uuid
==
_UART_TX_CHAR_UUID
:
self
.
_tx_handle
=
value_handle
elif
event
==
_IRQ_GATTC_CHARACTERISTIC_DONE
:
# Characteristic query complete.
if
self
.
_tx_handle
is
not
None
and
self
.
_rx_handle
is
not
None
:
# We've finished connecting and discovering device, fire the connect callback.
if
self
.
_conn_callback
:
self
.
_conn_callback
()
else
:
print
(
"Failed to find uart rx characteristic."
)
elif
event
==
_IRQ_GATTC_WRITE_DONE
:
conn_handle
,
value_handle
,
status
=
data
print
(
"TX complete"
)
elif
event
==
_IRQ_GATTC_NOTIFY
:
conn_handle
,
value_handle
,
notify_data
=
data
if
conn_handle
==
self
.
_conn_handle
and
value_handle
==
self
.
_tx_handle
:
if
self
.
_notify_callback
:
self
.
_notify_callback
(
notify_data
)
# Returns true if we've successfully connected and discovered characteristics.
def
is_connected
(
self
):
return
(
self
.
_conn_handle
is
not
None
and
self
.
_tx_handle
is
not
None
and
self
.
_rx_handle
is
not
None
)
# Find a device advertising the environmental sensor service.
def
scan
(
self
,
callback
=
None
):
self
.
_addr_type
=
None
self
.
_addr
=
None
self
.
_scan_callback
=
callback
self
.
_ble
.
gap_scan
(
2000
,
30000
,
30000
)
# Connect to the specified device (otherwise use cached address from a scan).
def
connect
(
self
,
addr_type
=
None
,
addr
=
None
,
callback
=
None
):
self
.
_addr_type
=
addr_type
or
self
.
_addr_type
self
.
_addr
=
addr
or
self
.
_addr
self
.
_conn_callback
=
callback
if
self
.
_addr_type
is
None
or
self
.
_addr
is
None
:
return
False
self
.
_ble
.
gap_connect
(
self
.
_addr_type
,
self
.
_addr
)
return
True
# Disconnect from current device.
def
disconnect
(
self
):
if
not
self
.
_conn_handle
:
return
self
.
_ble
.
gap_disconnect
(
self
.
_conn_handle
)
self
.
_reset
()
# Send data over the UART
def
write
(
self
,
v
,
response
=
False
):
if
not
self
.
is_connected
():
return
self
.
_ble
.
gattc_write
(
self
.
_conn_handle
,
self
.
_rx_handle
,
v
,
1
if
response
else
0
)
# Set handler for when data is received over the UART.
def
on_notify
(
self
,
callback
):
self
.
_notify_callback
=
callback
def
demo
():
ble
=
bluetooth
.
BLE
()
central
=
BLESimpleCentral
(
ble
)
not_found
=
False
def
on_scan
(
addr_type
,
addr
,
name
):
if
addr_type
is
not
None
:
print
(
"Found peripheral:"
,
addr_type
,
addr
,
name
)
central
.
connect
()
else
:
nonlocal
not_found
not_found
=
True
print
(
"No peripheral found."
)
central
.
scan
(
callback
=
on_scan
)
# Wait for connection...
while
not
central
.
is_connected
():
time
.
sleep_ms
(
100
)
if
not_found
:
return
print
(
"Connected"
)
def
on_rx
(
v
):
print
(
"RX"
,
v
)
central
.
on_notify
(
on_rx
)
with_response
=
False
i
=
0
while
central
.
is_connected
():
try
:
v
=
str
(
i
)
+
"_"
print
(
"TX"
,
v
)
central
.
write
(
v
,
with_response
)
except
:
print
(
"TX failed"
)
i
+=
1
time
.
sleep_ms
(
400
if
with_response
else
30
)
print
(
"Disconnected"
)
if
__name__
==
"__main__"
:
demo
()
examples/bluetooth/ble_simple_peripheral.py
0 → 100644
View file @
89a95b7c
# This example demonstrates a UART periperhal.
import
bluetooth
import
random
import
struct
import
time
from
ble_advertising
import
advertising_payload
from
micropython
import
const
_IRQ_CENTRAL_CONNECT
=
const
(
1
)
_IRQ_CENTRAL_DISCONNECT
=
const
(
2
)
_IRQ_GATTS_WRITE
=
const
(
3
)
_UART_UUID
=
bluetooth
.
UUID
(
"6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
)
_UART_TX
=
(
bluetooth
.
UUID
(
"6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
),
bluetooth
.
FLAG_READ
|
bluetooth
.
FLAG_NOTIFY
,
)
_UART_RX
=
(
bluetooth
.
UUID
(
"6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
),
bluetooth
.
FLAG_WRITE
|
bluetooth
.
FLAG_WRITE_NO_RESPONSE
,
)
_UART_SERVICE
=
(
_UART_UUID
,
(
_UART_TX
,
_UART_RX
,),
)
class
BLESimplePeripheral
:
def
__init__
(
self
,
ble
,
name
=
"mpy-uart"
):
self
.
_ble
=
ble
self
.
_ble
.
active
(
True
)
self
.
_ble
.
irq
(
handler
=
self
.
_irq
)
((
self
.
_handle_tx
,
self
.
_handle_rx
,),)
=
self
.
_ble
.
gatts_register_services
(
(
_UART_SERVICE
,)
)
self
.
_connections
=
set
()
self
.
_write_callback
=
None
self
.
_payload
=
advertising_payload
(
name
=
name
,
services
=
[
_UART_UUID
],)
self
.
_advertise
()
def
_irq
(
self
,
event
,
data
):
# Track connections so we can send notifications.
if
event
==
_IRQ_CENTRAL_CONNECT
:
conn_handle
,
_
,
_
,
=
data
print
(
"New connection"
,
conn_handle
)
self
.
_connections
.
add
(
conn_handle
)
elif
event
==
_IRQ_CENTRAL_DISCONNECT
:
conn_handle
,
_
,
_
,
=
data
print
(
"Disconnected"
,
conn_handle
)
self
.
_connections
.
remove
(
conn_handle
)
# Start advertising again to allow a new connection.
self
.
_advertise
()
elif
event
==
_IRQ_GATTS_WRITE
:
conn_handle
,
value_handle
=
data
value
=
self
.
_ble
.
gatts_read
(
value_handle
)
if
value_handle
==
self
.
_handle_rx
and
self
.
_write_callback
:
self
.
_write_callback
(
value
)
def
send
(
self
,
data
):
for
conn_handle
in
self
.
_connections
:
self
.
_ble
.
gatts_notify
(
conn_handle
,
self
.
_handle_tx
,
data
)
def
is_connected
(
self
):
return
len
(
self
.
_connections
)
>
0
def
_advertise
(
self
,
interval_us
=
500000
):
print
(
"Starting advertising"
)
self
.
_ble
.
gap_advertise
(
interval_us
,
adv_data
=
self
.
_payload
)
def
on_write
(
self
,
callback
):
self
.
_write_callback
=
callback
def
demo
():
ble
=
bluetooth
.
BLE
()
p
=
BLESimplePeripheral
(
ble
)
def
on_rx
(
v
):
print
(
"RX"
,
v
)
p
.
on_write
(
on_rx
)
i
=
0
while
True
:
if
p
.
is_connected
():
# Short burst of queued notifications.
for
_
in
range
(
3
):
data
=
str
(
i
)
+
"_"
print
(
"TX"
,
data
)
p
.
send
(
data
)
i
+=
1
time
.
sleep_ms
(
100
)
if
__name__
==
"__main__"
:
demo
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment