Commit 24678fe4 authored by Jim Mussared's avatar Jim Mussared

drivers: Remove drivers that are now in micropython-lib.

Signed-off-by: default avatarJim Mussared <jim.mussared@gmail.com>
parent d84c6ef0
This directory contains drivers for specific hardware. The drivers are This directory contains C drivers for specific hardware. The drivers are
intended to work across multiple ports. intended to work across multiple ports.
module("wm8960.py", opt=3)
This diff is collapsed.
# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George
import sys
if sys.platform.startswith("esp"):
from esp import dht_readinto
elif sys.platform == "mimxrt":
from mimxrt import dht_readinto
elif sys.platform == "rp2":
from rp2 import dht_readinto
elif sys.platform == "pyboard":
from pyb import dht_readinto
else:
from machine import dht_readinto
class DHTBase:
def __init__(self, pin):
self.pin = pin
self.buf = bytearray(5)
def measure(self):
buf = self.buf
dht_readinto(self.pin, buf)
if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xFF != buf[4]:
raise Exception("checksum error")
class DHT11(DHTBase):
def humidity(self):
return self.buf[0]
def temperature(self):
return self.buf[2]
class DHT22(DHTBase):
def humidity(self):
return (self.buf[0] << 8 | self.buf[1]) * 0.1
def temperature(self):
t = ((self.buf[2] & 0x7F) << 8 | self.buf[3]) * 0.1
if self.buf[2] & 0x80:
t = -t
return t
module("dht.py", opt=3)
This diff is collapsed.
# Driver test for official MicroPython LCD160CR display
# MIT license; Copyright (c) 2017 Damien P. George
import time, math, framebuf, lcd160cr
def get_lcd(lcd):
if type(lcd) is str:
lcd = lcd160cr.LCD160CR(lcd)
return lcd
def show_adc(lcd, adc):
data = [adc.read_core_temp(), adc.read_core_vbat(), 3.3]
try:
data[2] = adc.read_vref()
except:
pass
for i in range(3):
lcd.set_text_color((825, 1625, 1600)[i], 0)
if lcd.h == 160:
lcd.set_font(2)
lcd.set_pos(0, 100 + i * 16)
else:
lcd.set_font(2, trans=1)
lcd.set_pos(0, lcd.h - 60 + i * 16)
lcd.write("%4s: " % ("TEMP", "VBAT", "VREF")[i])
if i > 0:
s = "%6.3fV" % data[i]
else:
s = "%5.1f°C" % data[i]
if lcd.h == 160:
lcd.set_font(1, bold=0, scale=1)
else:
lcd.set_font(1, bold=0, scale=1, trans=1)
lcd.set_pos(45, lcd.h - 60 + i * 16)
lcd.write(s)
def test_features(lcd, orient=lcd160cr.PORTRAIT):
# if we run on pyboard then use ADC and RTC features
try:
import pyb
adc = pyb.ADCAll(12, 0xF0000)
rtc = pyb.RTC()
except:
adc = None
rtc = None
# set orientation and clear screen
lcd = get_lcd(lcd)
lcd.set_orient(orient)
lcd.set_pen(0, 0)
lcd.erase()
# create M-logo
mlogo = framebuf.FrameBuffer(bytearray(17 * 17 * 2), 17, 17, framebuf.RGB565)
mlogo.fill(0)
mlogo.fill_rect(1, 1, 15, 15, 0xFFFFFF)
mlogo.vline(4, 4, 12, 0)
mlogo.vline(8, 1, 12, 0)
mlogo.vline(12, 4, 12, 0)
mlogo.vline(14, 13, 2, 0)
# create inline framebuf
offx = 14
offy = 19
w = 100
h = 75
fbuf = framebuf.FrameBuffer(bytearray(w * h * 2), w, h, framebuf.RGB565)
lcd.set_spi_win(offx, offy, w, h)
# initialise loop parameters
tx = ty = 0
t0 = time.ticks_us()
for i in range(300):
# update position of cross-hair
t, tx2, ty2 = lcd.get_touch()
if t:
tx2 -= offx
ty2 -= offy
if tx2 >= 0 and ty2 >= 0 and tx2 < w and ty2 < h:
tx, ty = tx2, ty2
else:
tx = (tx + 1) % w
ty = (ty + 1) % h
# create and show the inline framebuf
fbuf.fill(lcd.rgb(128 + int(64 * math.cos(0.1 * i)), 128, 192))
fbuf.line(
w // 2,
h // 2,
w // 2 + int(40 * math.cos(0.2 * i)),
h // 2 + int(40 * math.sin(0.2 * i)),
lcd.rgb(128, 255, 64),
)
fbuf.hline(0, ty, w, lcd.rgb(64, 64, 64))
fbuf.vline(tx, 0, h, lcd.rgb(64, 64, 64))
fbuf.rect(tx - 3, ty - 3, 7, 7, lcd.rgb(64, 64, 64))
for phase in (-0.2, 0, 0.2):
x = w // 2 - 8 + int(50 * math.cos(0.05 * i + phase))
y = h // 2 - 8 + int(32 * math.sin(0.05 * i + phase))
fbuf.blit(mlogo, x, y)
for j in range(-3, 3):
fbuf.text(
"MicroPython",
5,
h // 2 + 9 * j + int(20 * math.sin(0.1 * (i + j))),
lcd.rgb(128 + 10 * j, 0, 128 - 10 * j),
)
lcd.show_framebuf(fbuf)
# show results from the ADC
if adc:
show_adc(lcd, adc)
# show the time
if rtc:
lcd.set_pos(2, 0)
lcd.set_font(1)
t = rtc.datetime()
lcd.write(
"%4d-%02d-%02d %2d:%02d:%02d.%01d"
% (t[0], t[1], t[2], t[4], t[5], t[6], t[7] // 100000)
)
# compute the frame rate
t1 = time.ticks_us()
dt = time.ticks_diff(t1, t0)
t0 = t1
# show the frame rate
lcd.set_pos(2, 9)
lcd.write("%.2f fps" % (1000000 / dt))
def test_mandel(lcd, orient=lcd160cr.PORTRAIT):
# set orientation and clear screen
lcd = get_lcd(lcd)
lcd.set_orient(orient)
lcd.set_pen(0, 0xFFFF)
lcd.erase()
# function to compute Mandelbrot pixels
def in_set(c):
z = 0
for i in range(32):
z = z * z + c
if abs(z) > 100:
return i
return 0
# cache width and height of LCD
w = lcd.w
h = lcd.h
# create the buffer for each line and set SPI parameters
line = bytearray(w * 2)
lcd.set_spi_win(0, 0, w, h)
spi = lcd.fast_spi()
# draw the Mandelbrot set line-by-line
hh = (h - 1) / 3.2
ww = (w - 1) / 2.4
for v in range(h):
for u in range(w):
c = in_set((v / hh - 2.3) + (u / ww - 1.2) * 1j)
if c < 16:
rgb = c << 12 | c << 6
else:
rgb = 0xF800 | c << 6
line[2 * u] = rgb
line[2 * u + 1] = rgb >> 8
spi.write(line)
def test_all(lcd, orient=lcd160cr.PORTRAIT):
lcd = get_lcd(lcd)
test_features(lcd, orient)
test_mandel(lcd, orient)
print("To run all tests: test_all(<lcd>)")
print("Individual tests are: test_features, test_mandel")
print('<lcd> argument should be a connection, eg "X", or an LCD160CR object')
# TODO: Split these into separate directories with their own manifests.
options.defaults(lcd160cr=False, ssd1306=False, test=False)
if options.lcd160cr:
module("lcd160cr.py", opt=3)
if options.test:
module("lcd160cr_test.py", opt=3)
if options.ssd1306:
module("ssd1306.py", opt=3)
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
from micropython import const
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP, # display off
# address setting
SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE, # start at line 0
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO,
self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET,
0x00,
SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV,
0x80,
SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
SET_CONTRAST,
0xFF, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
SET_IREF_SELECT,
0x30, # enable internal IREF during display on
# charge pump
SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01, # display on
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def rotate(self, rotate):
self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
self.write_cmd(SET_SEG_REMAP | (rotate & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width != 128:
# narrow displays use centred columns
col_offset = (128 - self.width) // 2
x0 += col_offset
x1 += col_offset
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.addr, self.write_list)
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
import time
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
def write_data(self, buf):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(buf)
self.cs(1)
"""
The MIT License (MIT)
Copyright (c) 2013-2022 Ibrahim Abdelkader <iabdalkader@openmv.io>
Copyright (c) 2013-2022 Kwabena W. Agyeman <kwagyeman@openmv.io>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
HTS221 driver driver for MicroPython.
Original source: https://github.com/ControlEverythingCommunity/HTS221/blob/master/Python/HTS221.py
Example usage:
import time
import hts221
from machine import Pin, I2C
bus = I2C(1, scl=Pin(15), sda=Pin(14))
hts = hts221.HTS221(bus)
while (True):
rH = hts.humidity()
temp = hts.temperature()
print ("rH: %.2f%% T: %.2fC" %(rH, temp))
time.sleep_ms(100)
"""
import struct
class HTS221:
def __init__(self, i2c, data_rate=1, address=0x5F):
self.bus = i2c
self.odr = data_rate
self.slv_addr = address
# Set configuration register
# Humidity and temperature average configuration
self.bus.writeto_mem(self.slv_addr, 0x10, b"\x1B")
# Set control register
# PD | BDU | ODR
cfg = 0x80 | 0x04 | (self.odr & 0x3)
self.bus.writeto_mem(self.slv_addr, 0x20, bytes([cfg]))
# Read Calibration values from non-volatile memory of the device
# Humidity Calibration values
self.H0 = self._read_reg(0x30, 1) / 2
self.H1 = self._read_reg(0x31, 1) / 2
self.H2 = self._read_reg(0x36, 2)
self.H3 = self._read_reg(0x3A, 2)
# Temperature Calibration values
raw = self._read_reg(0x35, 1)
self.T0 = ((raw & 0x03) * 256) + self._read_reg(0x32, 1)
self.T1 = ((raw & 0x0C) * 64) + self._read_reg(0x33, 1)
self.T2 = self._read_reg(0x3C, 2)
self.T3 = self._read_reg(0x3E, 2)
def _read_reg(self, reg_addr, size):
fmt = "B" if size == 1 else "H"
reg_addr = reg_addr if size == 1 else reg_addr | 0x80
return struct.unpack(fmt, self.bus.readfrom_mem(self.slv_addr, reg_addr, size))[0]
def humidity(self):
rH = self._read_reg(0x28, 2)
return (self.H1 - self.H0) * (rH - self.H2) / (self.H3 - self.H2) + self.H0
def temperature(self):
temp = self._read_reg(0x2A, 2)
if temp > 32767:
temp -= 65536
return ((self.T1 - self.T0) / 8.0) * (temp - self.T2) / (self.T3 - self.T2) + (
self.T0 / 8.0
)
module("hts221.py", opt=3)
"""
The MIT License (MIT)
Copyright (c) 2016-2019 shaoziyang <shaoziyang@micropython.org.cn>
Copyright (c) 2022 Ibrahim Abdelkader <iabdalkader@openmv.io>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
LPS22HB/HH pressure sensor driver for MicroPython.
Example usage:
import time
from lps22h import LPS22H
from machine import Pin, I2C
bus = I2C(1, scl=Pin(15), sda=Pin(14))
lps = LPS22H(bus, oneshot=False)
while (True):
print("Pressure: %.2f hPa Temperature: %.2f C"%(lps.pressure(), lps.temperature()))
time.sleep_ms(10)
"""
import machine
_LPS22_CTRL_REG1 = const(0x10)
_LPS22_CTRL_REG2 = const(0x11)
_LPS22_STATUS = const(0x27)
_LPS22_TEMP_OUT_L = const(0x2B)
_LPS22_PRESS_OUT_XL = const(0x28)
_LPS22_PRESS_OUT_L = const(0x29)
class LPS22H:
def __init__(self, i2c, address=0x5C, oneshot=False):
self.i2c = i2c
self.addr = address
self.oneshot = oneshot
self.buf = bytearray(1)
# ODR=1 EN_LPFP=1 BDU=1
self._write_reg(_LPS22_CTRL_REG1, 0x1A)
self.set_oneshot_mode(self.oneshot)
def _int16(self, d):
return d if d < 0x8000 else d - 0x10000
def _write_reg(self, reg, dat):
self.buf[0] = dat
self.i2c.writeto_mem(self.addr, reg, self.buf)
def _read_reg(self, reg, width=8):
self.i2c.readfrom_mem_into(self.addr, reg, self.buf)
val = self.buf[0]
if width == 16:
val |= self._read_reg(reg + 1) << 8
return val
def _tigger_oneshot(self, b):
if self.oneshot:
self._write_reg(_LPS22_CTRL_REG2, self._read_reg(_LPS22_CTRL_REG2) | 0x01)
self._read_reg(0x28 + b * 2)
while True:
if self._read_reg(_LPS22_STATUS) & b:
return
machine.idle()
def set_oneshot_mode(self, oneshot):
self._read_reg(_LPS22_CTRL_REG1)
self.oneshot = oneshot
if oneshot:
self.buf[0] &= 0x0F
else:
self.buf[0] |= 0x10
self._write_reg(_LPS22_CTRL_REG1, self.buf[0])
def pressure(self):
if self.oneshot:
self._tigger_oneshot(1)
return (
self._read_reg(_LPS22_PRESS_OUT_XL) + self._read_reg(_LPS22_PRESS_OUT_L, 16) * 256
) / 4096
def temperature(self):
if self.oneshot:
self._tigger_oneshot(2)
return self._int16(self._read_reg(_LPS22_TEMP_OUT_L, 16)) / 100
def altitude(self):
return (
(((1013.25 / self.pressure()) ** (1 / 5.257)) - 1.0)
* (self.temperature() + 273.15)
/ 0.0065
)
module("lps22h.py", opt=3)
"""
LSM6DSOX STMicro driver for MicroPython based on LSM9DS1:
Source repo: https://github.com/hoihu/projects/tree/master/raspi-hat
The MIT License (MIT)
Copyright (c) 2021 Damien P. George
Copyright (c) 2021-2022 Ibrahim Abdelkader <iabdalkader@openmv.io>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
Basic example usage:
import time
from lsm6dsox import LSM6DSOX
from machine import Pin, SPI, I2C
# Init in I2C mode.
lsm = LSM6DSOX(I2C(0, scl=Pin(13), sda=Pin(12)))
# Or init in SPI mode.
#lsm = LSM6DSOX(SPI(5), cs_pin=Pin(10))
while (True):
print('Accelerometer: x:{:>8.3f} y:{:>8.3f} z:{:>8.3f}'.format(*lsm.read_accel()))
print('Gyroscope: x:{:>8.3f} y:{:>8.3f} z:{:>8.3f}'.format(*lsm.read_gyro()))
print("")
time.sleep_ms(100)
"""
import array
from micropython import const
class LSM6DSOX:
_CTRL3_C = const(0x12)
_CTRL1_XL = const(0x10)
_CTRL8_XL = const(0x17)
_CTRL9_XL = const(0x18)
_CTRL2_G = const(0x11)
_CTRL7_G = const(0x16)
_OUTX_L_G = const(0x22)
_OUTX_L_XL = const(0x28)
_MLC_STATUS = const(0x38)
_DEFAULT_ADDR = const(0x6A)
_WHO_AM_I_REG = const(0x0F)
_FUNC_CFG_ACCESS = const(0x01)
_FUNC_CFG_BANK_USER = const(0)
_FUNC_CFG_BANK_HUB = const(1)
_FUNC_CFG_BANK_EMBED = const(2)
_MLC0_SRC = const(0x70)
_MLC_INT1 = const(0x0D)
_TAP_CFG0 = const(0x56)
_EMB_FUNC_EN_A = const(0x04)
_EMB_FUNC_EN_B = const(0x05)
def __init__(
self,
bus,
cs_pin=None,
address=_DEFAULT_ADDR,
gyro_odr=104,
accel_odr=104,
gyro_scale=2000,
accel_scale=4,
ucf=None,
):
"""Initalizes Gyro and Accelerator.
accel_odr: (0, 1.6Hz, 3.33Hz, 6.66Hz, 12.5Hz, 26Hz, 52Hz, 104Hz, 208Hz, 416Hz, 888Hz)
gyro_odr: (0, 1.6Hz, 3.33Hz, 6.66Hz, 12.5Hz, 26Hz, 52Hz, 104Hz, 208Hz, 416Hz, 888Hz)
gyro_scale: (245dps, 500dps, 1000dps, 2000dps)
accel_scale: (+/-2g, +/-4g, +/-8g, +-16g)
ucf: MLC program to load.
"""
self.bus = bus
self.cs_pin = cs_pin
self.address = address
self._use_i2c = hasattr(self.bus, "readfrom_mem")
if not self._use_i2c and cs_pin is None:
raise ValueError("A CS pin must be provided in SPI mode")
# check the id of the Accelerometer/Gyro
if self.__read_reg(_WHO_AM_I_REG) != 108:
raise OSError("No LSM6DS device was found at address 0x%x" % (self.address))
# allocate scratch buffer for efficient conversions and memread op's
self.scratch_int = array.array("h", [0, 0, 0])
SCALE_GYRO = {250: 0, 500: 1, 1000: 2, 2000: 3}
SCALE_ACCEL = {2: 0, 4: 2, 8: 3, 16: 1}
# XL_HM_MODE = 0 by default. G_HM_MODE = 0 by default.
ODR = {
0: 0x00,
1.6: 0x08,
3.33: 0x09,
6.66: 0x0A,
12.5: 0x01,
26: 0x02,
52: 0x03,
104: 0x04,
208: 0x05,
416: 0x06,
888: 0x07,
}
gyro_odr = round(gyro_odr, 2)
accel_odr = round(accel_odr, 2)
# Sanity checks
if not gyro_odr in ODR:
raise ValueError("Invalid sampling rate: %d" % accel_odr)
if not gyro_scale in SCALE_GYRO:
raise ValueError("invalid gyro scaling: %d" % gyro_scale)
if not accel_odr in ODR:
raise ValueError("Invalid sampling rate: %d" % accel_odr)
if not accel_scale in SCALE_ACCEL:
raise ValueError("invalid accelerometer scaling: %d" % accel_scale)
# Soft-reset the device.
self.reset()
# Load and configure MLC if UCF file is provided
if ucf != None:
self.load_mlc(ucf)
# Set Gyroscope datarate and scale.
# Note output from LPF2 second filtering stage is selected. See Figure 18.
self.__write_reg(_CTRL1_XL, (ODR[accel_odr] << 4) | (SCALE_ACCEL[accel_scale] << 2) | 2)
# Enable LPF2 and HPF fast-settling mode, ODR/4
self.__write_reg(_CTRL8_XL, 0x09)
# Set Gyroscope datarate and scale.
self.__write_reg(_CTRL2_G, (ODR[gyro_odr] << 4) | (SCALE_GYRO[gyro_scale] << 2) | 0)
self.gyro_scale = 32768 / gyro_scale
self.accel_scale = 32768 / accel_scale
def __read_reg(self, reg, size=1):
if self._use_i2c:
buf = self.bus.readfrom_mem(self.address, reg, size)
else:
try:
self.cs_pin(0)
self.bus.write(bytes([reg | 0x80]))
buf = self.bus.read(size)
finally:
self.cs_pin(1)
if size == 1:
return int(buf[0])
return [int(x) for x in buf]
def __write_reg(self, reg, val):
if self._use_i2c:
self.bus.writeto_mem(self.address, reg, bytes([val]))
else:
try:
self.cs_pin(0)
self.bus.write(bytes([reg, val]))
finally:
self.cs_pin(1)
def __read_reg_into(self, reg, buf):
if self._use_i2c:
self.bus.readfrom_mem_into(self.address, reg, buf)
else:
try:
self.cs_pin(0)
self.bus.write(bytes([reg | 0x80]))
self.bus.readinto(buf)
finally:
self.cs_pin(1)
def reset(self):
self.__write_reg(_CTRL3_C, self.__read_reg(_CTRL3_C) | 0x1)
for i in range(0, 10):
if (self.__read_reg(_CTRL3_C) & 0x01) == 0:
return
time.sleep_ms(10)
raise OSError("Failed to reset LSM6DS device.")
def set_mem_bank(self, bank):
cfg = self.__read_reg(_FUNC_CFG_ACCESS) & 0x3F
self.__write_reg(_FUNC_CFG_ACCESS, cfg | (bank << 6))
def set_embedded_functions(self, enable, emb_ab=None):
self.set_mem_bank(_FUNC_CFG_BANK_EMBED)
if enable:
self.__write_reg(_EMB_FUNC_EN_A, emb_ab[0])
self.__write_reg(_EMB_FUNC_EN_B, emb_ab[1])
else:
emb_a = self.__read_reg(_EMB_FUNC_EN_A)
emb_b = self.__read_reg(_EMB_FUNC_EN_B)
self.__write_reg(_EMB_FUNC_EN_A, (emb_a & 0xC7))
self.__write_reg(_EMB_FUNC_EN_B, (emb_b & 0xE6))
emb_ab = (emb_a, emb_b)
self.set_mem_bank(_FUNC_CFG_BANK_USER)
return emb_ab
def load_mlc(self, ucf):
# Load MLC config from file
with open(ucf, "r") as ucf_file:
for l in ucf_file:
if l.startswith("Ac"):
v = [int(v, 16) for v in l.strip().split(" ")[1:3]]
self.__write_reg(v[0], v[1])
emb_ab = self.set_embedded_functions(False)
# Disable I3C interface
self.__write_reg(_CTRL9_XL, self.__read_reg(_CTRL9_XL) | 0x01)
# Enable Block Data Update
self.__write_reg(_CTRL3_C, self.__read_reg(_CTRL3_C) | 0x40)
# Route signals on interrupt pin 1
self.set_mem_bank(_FUNC_CFG_BANK_EMBED)
self.__write_reg(_MLC_INT1, self.__read_reg(_MLC_INT1) & 0x01)
self.set_mem_bank(_FUNC_CFG_BANK_USER)
# Configure interrupt pin mode
self.__write_reg(_TAP_CFG0, self.__read_reg(_TAP_CFG0) | 0x41)
self.set_embedded_functions(True, emb_ab)
def read_mlc_output(self):
buf = None
if self.__read_reg(_MLC_STATUS) & 0x1:
self.__read_reg(0x1A, size=12)
self.set_mem_bank(_FUNC_CFG_BANK_EMBED)
buf = self.__read_reg(_MLC0_SRC, 8)
self.set_mem_bank(_FUNC_CFG_BANK_USER)
return buf
def read_gyro(self):
"""Returns gyroscope vector in degrees/sec."""
mv = memoryview(self.scratch_int)
f = self.gyro_scale
self.__read_reg_into(_OUTX_L_G, mv)
return (mv[0] / f, mv[1] / f, mv[2] / f)
def read_accel(self):
"""Returns acceleration vector in gravity units (9.81m/s^2)."""
mv = memoryview(self.scratch_int)
f = self.accel_scale
self.__read_reg_into(_OUTX_L_XL, mv)
return (mv[0] / f, mv[1] / f, mv[2] / f)
# LSM6DSOX Basic Example.
import time
from lsm6dsox import LSM6DSOX
from machine import Pin, I2C
lsm = LSM6DSOX(I2C(0, scl=Pin(13), sda=Pin(12)))
# Or init in SPI mode.
# lsm = LSM6DSOX(SPI(5), cs_pin=Pin(10))
while True:
print("Accelerometer: x:{:>8.3f} y:{:>8.3f} z:{:>8.3f}".format(*lsm.read_accel()))
print("Gyroscope: x:{:>8.3f} y:{:>8.3f} z:{:>8.3f}".format(*lsm.read_gyro()))
print("")
time.sleep_ms(100)
# LSM6DSOX IMU MLC (Machine Learning Core) Example.
# Download the raw UCF file, copy to storage and reset.
# NOTE: The pre-trained models (UCF files) for the examples can be found here:
# https://github.com/STMicroelectronics/STMems_Machine_Learning_Core/tree/master/application_examples/lsm6dsox
import time
from lsm6dsox import LSM6DSOX
from machine import Pin, I2C
INT_MODE = True # Run in interrupt mode.
INT_FLAG = False # Set True on interrupt.
def imu_int_handler(pin):
global INT_FLAG
INT_FLAG = True
if INT_MODE == True:
int_pin = Pin(24)
int_pin.irq(handler=imu_int_handler, trigger=Pin.IRQ_RISING)
i2c = I2C(0, scl=Pin(13), sda=Pin(12))
# Vibration detection example
UCF_FILE = "lsm6dsox_vibration_monitoring.ucf"
UCF_LABELS = {0: "no vibration", 1: "low vibration", 2: "high vibration"}
# NOTE: Selected data rate and scale must match the MLC data rate and scale.
lsm = LSM6DSOX(i2c, gyro_odr=26, accel_odr=26, gyro_scale=2000, accel_scale=4, ucf=UCF_FILE)
# Head gestures example
# UCF_FILE = "lsm6dsox_head_gestures.ucf"
# UCF_LABELS = {0:"Nod", 1:"Shake", 2:"Stationary", 3:"Swing", 4:"Walk"}
# NOTE: Selected data rate and scale must match the MLC data rate and scale.
# lsm = LSM6DSOX(i2c, gyro_odr=26, accel_odr=26, gyro_scale=250, accel_scale=2, ucf=UCF_FILE)
print("MLC configured...")
while True:
if INT_MODE:
if INT_FLAG:
INT_FLAG = False
print(UCF_LABELS[lsm.read_mlc_output()[0]])
else:
buf = lsm.read_mlc_output()
if buf != None:
print(UCF_LABELS[buf[0]])
module("lsm6dsox.py", opt=3)
"""
The MIT License (MIT)
Copyright (c) 2013, 2014 Damien P. George
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
LSM9DS1 - 9DOF inertial sensor of STMicro driver for MicroPython.
The sensor contains an accelerometer / gyroscope / magnetometer
Uses the internal FIFO to store up to 16 gyro/accel data, use the iter_accel_gyro generator to access it.
Example usage:
import time
from lsm9ds1 import LSM9DS1
from machine import Pin, I2C
lsm = LSM9DS1(I2C(1, scl=Pin(15), sda=Pin(14)))
while (True):
#for g,a in lsm.iter_accel_gyro(): print(g,a) # using fifo
print('Accelerometer: x:{:>8.3f} y:{:>8.3f} z:{:>8.3f}'.format(*lsm.accel()))
print('Magnetometer: x:{:>8.3f} y:{:>8.3f} z:{:>8.3f}'.format(*lsm.magnet()))
print('Gyroscope: x:{:>8.3f} y:{:>8.3f} z:{:>8.3f}'.format(*lsm.gyro()))
print("")
time.sleep_ms(100)
"""
import array
_WHO_AM_I = const(0xF)
_CTRL_REG1_G = const(0x10)
_INT_GEN_SRC_G = const(0x14)
_OUT_TEMP = const(0x15)
_OUT_G = const(0x18)
_CTRL_REG4_G = const(0x1E)
_STATUS_REG = const(0x27)
_OUT_XL = const(0x28)
_FIFO_CTRL_REG = const(0x2E)
_FIFO_SRC = const(0x2F)
_OFFSET_REG_X_M = const(0x05)
_CTRL_REG1_M = const(0x20)
_OUT_M = const(0x28)
_SCALE_GYRO = const(((245, 0), (500, 1), (2000, 3)))
_SCALE_ACCEL = const(((2, 0), (4, 2), (8, 3), (16, 1)))
class LSM9DS1:
def __init__(self, i2c, address_gyro=0x6B, address_magnet=0x1E):
self.i2c = i2c
self.address_gyro = address_gyro
self.address_magnet = address_magnet
# check id's of accelerometer/gyro and magnetometer
if (self.magent_id() != b"=") or (self.gyro_id() != b"h"):
raise OSError(
"Invalid LSM9DS1 device, using address {}/{}".format(address_gyro, address_magnet)
)
# allocate scratch buffer for efficient conversions and memread op's
self.scratch = array.array("B", [0, 0, 0, 0, 0, 0])
self.scratch_int = array.array("h", [0, 0, 0])
self.init_gyro_accel()
self.init_magnetometer()
def init_gyro_accel(self, sample_rate=6, scale_gyro=0, scale_accel=0):
"""Initalizes Gyro and Accelerator.
sample rate: 0-6 (off, 14.9Hz, 59.5Hz, 119Hz, 238Hz, 476Hz, 952Hz)
scale_gyro: 0-2 (245dps, 500dps, 2000dps )
scale_accel: 0-3 (+/-2g, +/-4g, +/-8g, +-16g)
"""
assert sample_rate <= 6, "invalid sampling rate: %d" % sample_rate
assert scale_gyro <= 2, "invalid gyro scaling: %d" % scale_gyro
assert scale_accel <= 3, "invalid accelerometer scaling: %d" % scale_accel
i2c = self.i2c
addr = self.address_gyro
mv = memoryview(self.scratch)
# angular control registers 1-3 / Orientation
mv[0] = ((sample_rate & 0x07) << 5) | ((_SCALE_GYRO[scale_gyro][1] & 0x3) << 3)
mv[1:4] = b"\x00\x00\x00"
i2c.writeto_mem(addr, _CTRL_REG1_G, mv[:5])
# ctrl4 - enable x,y,z, outputs, no irq latching, no 4D
# ctrl5 - enable all axes, no decimation
# ctrl6 - set scaling and sample rate of accel
# ctrl7,8 - leave at default values
# ctrl9 - FIFO enabled
mv[0] = mv[1] = 0x38
mv[2] = ((sample_rate & 7) << 5) | ((_SCALE_ACCEL[scale_accel][1] & 0x3) << 3)
mv[3] = 0x00
mv[4] = 0x4
mv[5] = 0x2
i2c.writeto_mem(addr, _CTRL_REG4_G, mv[:6])
# fifo: use continous mode (overwrite old data if overflow)
i2c.writeto_mem(addr, _FIFO_CTRL_REG, b"\x00")
i2c.writeto_mem(addr, _FIFO_CTRL_REG, b"\xc0")
self.scale_gyro = 32768 / _SCALE_GYRO[scale_gyro][0]
self.scale_accel = 32768 / _SCALE_ACCEL[scale_accel][0]
def init_magnetometer(self, sample_rate=7, scale_magnet=0):
"""
sample rates = 0-7 (0.625, 1.25, 2.5, 5, 10, 20, 40, 80Hz)
scaling = 0-3 (+/-4, +/-8, +/-12, +/-16 Gauss)
"""
assert sample_rate < 8, "invalid sample rate: %d (0-7)" % sample_rate
assert scale_magnet < 4, "invalid scaling: %d (0-3)" % scale_magnet
i2c = self.i2c
addr = self.address_magnet
mv = memoryview(self.scratch)
mv[0] = 0x40 | (sample_rate << 2) # ctrl1: high performance mode
mv[1] = scale_magnet << 5 # ctrl2: scale, normal mode, no reset
mv[2] = 0x00 # ctrl3: continous conversion, no low power, I2C
mv[3] = 0x08 # ctrl4: high performance z-axis
mv[4] = 0x00 # ctr5: no fast read, no block update
i2c.writeto_mem(addr, _CTRL_REG1_M, mv[:5])
self.scale_factor_magnet = 32768 / ((scale_magnet + 1) * 4)
def calibrate_magnet(self, offset):
"""
offset is a magnet vecor that will be substracted by the magnetometer
for each measurement. It is written to the magnetometer's offset register
"""
offset = [int(i * self.scale_factor_magnet) for i in offset]
mv = memoryview(self.scratch)
mv[0] = offset[0] & 0xFF
mv[1] = offset[0] >> 8
mv[2] = offset[1] & 0xFF
mv[3] = offset[1] >> 8
mv[4] = offset[2] & 0xFF
mv[5] = offset[2] >> 8
self.i2c.writeto_mem(self.address_magnet, _OFFSET_REG_X_M, mv[:6])
def gyro_id(self):
return self.i2c.readfrom_mem(self.address_gyro, _WHO_AM_I, 1)
def magent_id(self):
return self.i2c.readfrom_mem(self.address_magnet, _WHO_AM_I, 1)
def magnet(self):
"""Returns magnetometer vector in gauss.
raw_values: if True, the non-scaled adc values are returned
"""
mv = memoryview(self.scratch_int)
f = self.scale_factor_magnet
self.i2c.readfrom_mem_into(self.address_magnet, _OUT_M | 0x80, mv)
return (mv[0] / f, mv[1] / f, mv[2] / f)
def gyro(self):
"""Returns gyroscope vector in degrees/sec."""
mv = memoryview(self.scratch_int)
f = self.scale_gyro
self.i2c.readfrom_mem_into(self.address_gyro, _OUT_G | 0x80, mv)
return (mv[0] / f, mv[1] / f, mv[2] / f)
def accel(self):
"""Returns acceleration vector in gravity units (9.81m/s^2)."""
mv = memoryview(self.scratch_int)
f = self.scale_accel
self.i2c.readfrom_mem_into(self.address_gyro, _OUT_XL | 0x80, mv)
return (mv[0] / f, mv[1] / f, mv[2] / f)
def iter_accel_gyro(self):
"""A generator that returns tuples of (gyro,accelerometer) data from the fifo."""
while True:
fifo_state = int.from_bytes(
self.i2c.readfrom_mem(self.address_gyro, _FIFO_SRC, 1), "big"
)
if fifo_state & 0x3F:
# print("Available samples=%d" % (fifo_state & 0x1f))
yield self.gyro(), self.accel()
else:
break
module("lsm9ds1.py", opt=3)
module("neopixel.py", opt=3)
# NeoPixel driver for MicroPython
# MIT license; Copyright (c) 2016 Damien P. George, 2021 Jim Mussared
from machine import bitstream
class NeoPixel:
# G R B W
ORDER = (1, 0, 2, 3)
def __init__(self, pin, n, bpp=3, timing=1):
self.pin = pin
self.n = n
self.bpp = bpp
self.buf = bytearray(n * bpp)
self.pin.init(pin.OUT)
# Timing arg can either be 1 for 800kHz or 0 for 400kHz,
# or a user-specified timing ns tuple (high_0, low_0, high_1, low_1).
self.timing = (
((400, 850, 800, 450) if timing else (800, 1700, 1600, 900))
if isinstance(timing, int)
else timing
)
def __len__(self):
return self.n
def __setitem__(self, i, v):
offset = i * self.bpp
for i in range(self.bpp):
self.buf[offset + self.ORDER[i]] = v[i]
def __getitem__(self, i):
offset = i * self.bpp
return tuple(self.buf[offset + self.ORDER[i]] for i in range(self.bpp))
def fill(self, v):
b = self.buf
l = len(self.buf)
bpp = self.bpp
for i in range(bpp):
c = v[i]
j = self.ORDER[i]
while j < l:
b[j] = c
j += bpp
def write(self):
# BITSTREAM_TYPE_HIGH_LOW = 0
bitstream(self.pin, 0, self.timing, self.buf)
module("nrf24l01.py", opt=3)
"""NRF24L01 driver for MicroPython
"""
from micropython import const
import utime
# nRF24L01+ registers
CONFIG = const(0x00)
EN_RXADDR = const(0x02)
SETUP_AW = const(0x03)
SETUP_RETR = const(0x04)
RF_CH = const(0x05)
RF_SETUP = const(0x06)
STATUS = const(0x07)
RX_ADDR_P0 = const(0x0A)
TX_ADDR = const(0x10)
RX_PW_P0 = const(0x11)
FIFO_STATUS = const(0x17)
DYNPD = const(0x1C)
# CONFIG register
EN_CRC = const(0x08) # enable CRC
CRCO = const(0x04) # CRC encoding scheme; 0=1 byte, 1=2 bytes
PWR_UP = const(0x02) # 1=power up, 0=power down
PRIM_RX = const(0x01) # RX/TX control; 0=PTX, 1=PRX
# RF_SETUP register
POWER_0 = const(0x00) # -18 dBm
POWER_1 = const(0x02) # -12 dBm
POWER_2 = const(0x04) # -6 dBm
POWER_3 = const(0x06) # 0 dBm
SPEED_1M = const(0x00)
SPEED_2M = const(0x08)
SPEED_250K = const(0x20)
# STATUS register
RX_DR = const(0x40) # RX data ready; write 1 to clear
TX_DS = const(0x20) # TX data sent; write 1 to clear
MAX_RT = const(0x10) # max retransmits reached; write 1 to clear
# FIFO_STATUS register
RX_EMPTY = const(0x01) # 1 if RX FIFO is empty
# constants for instructions
R_RX_PL_WID = const(0x60) # read RX payload width
R_RX_PAYLOAD = const(0x61) # read RX payload
W_TX_PAYLOAD = const(0xA0) # write TX payload
FLUSH_TX = const(0xE1) # flush TX FIFO
FLUSH_RX = const(0xE2) # flush RX FIFO
NOP = const(0xFF) # use to read STATUS register
class NRF24L01:
def __init__(self, spi, cs, ce, channel=46, payload_size=16):
assert payload_size <= 32
self.buf = bytearray(1)
# store the pins
self.spi = spi
self.cs = cs
self.ce = ce
# init the SPI bus and pins
self.init_spi(4000000)
# reset everything
ce.init(ce.OUT, value=0)
cs.init(cs.OUT, value=1)
self.payload_size = payload_size
self.pipe0_read_addr = None
utime.sleep_ms(5)
# set address width to 5 bytes and check for device present
self.reg_write(SETUP_AW, 0b11)
if self.reg_read(SETUP_AW) != 0b11:
raise OSError("nRF24L01+ Hardware not responding")
# disable dynamic payloads
self.reg_write(DYNPD, 0)
# auto retransmit delay: 1750us
# auto retransmit count: 8
self.reg_write(SETUP_RETR, (6 << 4) | 8)
# set rf power and speed
self.set_power_speed(POWER_3, SPEED_250K) # Best for point to point links
# init CRC
self.set_crc(2)
# clear status flags
self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
# set channel
self.set_channel(channel)
# flush buffers
self.flush_rx()
self.flush_tx()
def init_spi(self, baudrate):
try:
master = self.spi.MASTER
except AttributeError:
self.spi.init(baudrate=baudrate, polarity=0, phase=0)
else:
self.spi.init(master, baudrate=baudrate, polarity=0, phase=0)
def reg_read(self, reg):
self.cs(0)
self.spi.readinto(self.buf, reg)
self.spi.readinto(self.buf)
self.cs(1)
return self.buf[0]
def reg_write_bytes(self, reg, buf):
self.cs(0)
self.spi.readinto(self.buf, 0x20 | reg)
self.spi.write(buf)
self.cs(1)
return self.buf[0]
def reg_write(self, reg, value):
self.cs(0)
self.spi.readinto(self.buf, 0x20 | reg)
ret = self.buf[0]
self.spi.readinto(self.buf, value)
self.cs(1)
return ret
def flush_rx(self):
self.cs(0)
self.spi.readinto(self.buf, FLUSH_RX)
self.cs(1)
def flush_tx(self):
self.cs(0)
self.spi.readinto(self.buf, FLUSH_TX)
self.cs(1)
# power is one of POWER_x defines; speed is one of SPEED_x defines
def set_power_speed(self, power, speed):
setup = self.reg_read(RF_SETUP) & 0b11010001
self.reg_write(RF_SETUP, setup | power | speed)
# length in bytes: 0, 1 or 2
def set_crc(self, length):
config = self.reg_read(CONFIG) & ~(CRCO | EN_CRC)
if length == 0:
pass
elif length == 1:
config |= EN_CRC
else:
config |= EN_CRC | CRCO
self.reg_write(CONFIG, config)
def set_channel(self, channel):
self.reg_write(RF_CH, min(channel, 125))
# address should be a bytes object 5 bytes long
def open_tx_pipe(self, address):
assert len(address) == 5
self.reg_write_bytes(RX_ADDR_P0, address)
self.reg_write_bytes(TX_ADDR, address)
self.reg_write(RX_PW_P0, self.payload_size)
# address should be a bytes object 5 bytes long
# pipe 0 and 1 have 5 byte address
# pipes 2-5 use same 4 most-significant bytes as pipe 1, plus 1 extra byte
def open_rx_pipe(self, pipe_id, address):
assert len(address) == 5
assert 0 <= pipe_id <= 5
if pipe_id == 0:
self.pipe0_read_addr = address
if pipe_id < 2:
self.reg_write_bytes(RX_ADDR_P0 + pipe_id, address)
else:
self.reg_write(RX_ADDR_P0 + pipe_id, address[0])
self.reg_write(RX_PW_P0 + pipe_id, self.payload_size)
self.reg_write(EN_RXADDR, self.reg_read(EN_RXADDR) | (1 << pipe_id))
def start_listening(self):
self.reg_write(CONFIG, self.reg_read(CONFIG) | PWR_UP | PRIM_RX)
self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
if self.pipe0_read_addr is not None:
self.reg_write_bytes(RX_ADDR_P0, self.pipe0_read_addr)
self.flush_rx()
self.flush_tx()
self.ce(1)
utime.sleep_us(130)
def stop_listening(self):
self.ce(0)
self.flush_tx()
self.flush_rx()
# returns True if any data available to recv
def any(self):
return not bool(self.reg_read(FIFO_STATUS) & RX_EMPTY)
def recv(self):
# get the data
self.cs(0)
self.spi.readinto(self.buf, R_RX_PAYLOAD)
buf = self.spi.read(self.payload_size)
self.cs(1)
# clear RX ready flag
self.reg_write(STATUS, RX_DR)
return buf
# blocking wait for tx complete
def send(self, buf, timeout=500):
self.send_start(buf)
start = utime.ticks_ms()
result = None
while result is None and utime.ticks_diff(utime.ticks_ms(), start) < timeout:
result = self.send_done() # 1 == success, 2 == fail
if result == 2:
raise OSError("send failed")
# non-blocking tx
def send_start(self, buf):
# power up
self.reg_write(CONFIG, (self.reg_read(CONFIG) | PWR_UP) & ~PRIM_RX)
utime.sleep_us(150)
# send the data
self.cs(0)
self.spi.readinto(self.buf, W_TX_PAYLOAD)
self.spi.write(buf)
if len(buf) < self.payload_size:
self.spi.write(b"\x00" * (self.payload_size - len(buf))) # pad out data
self.cs(1)
# enable the chip so it can send the data
self.ce(1)
utime.sleep_us(15) # needs to be >10us
self.ce(0)
# returns None if send still in progress, 1 for success, 2 for fail
def send_done(self):
if not (self.reg_read(STATUS) & (TX_DS | MAX_RT)):
return None # tx not finished
# either finished or failed: get and clear status flags, power down
status = self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
self.reg_write(CONFIG, self.reg_read(CONFIG) & ~PWR_UP)
return 1 if status & TX_DS else 2
"""Test for nrf24l01 module. Portable between MicroPython targets."""
import usys
import ustruct as struct
import utime
from machine import Pin, SPI
from nrf24l01 import NRF24L01
from micropython import const
# Slave pause between receiving data and checking for further packets.
_RX_POLL_DELAY = const(15)
# Slave pauses an additional _SLAVE_SEND_DELAY ms after receiving data and before
# transmitting to allow the (remote) master time to get into receive mode. The
# master may be a slow device. Value tested with Pyboard, ESP32 and ESP8266.
_SLAVE_SEND_DELAY = const(10)
if usys.platform == "pyboard":
cfg = {"spi": 2, "miso": "Y7", "mosi": "Y8", "sck": "Y6", "csn": "Y5", "ce": "Y4"}
elif usys.platform == "esp8266": # Hardware SPI
cfg = {"spi": 1, "miso": 12, "mosi": 13, "sck": 14, "csn": 4, "ce": 5}
elif usys.platform == "esp32": # Software SPI
cfg = {"spi": -1, "miso": 32, "mosi": 33, "sck": 25, "csn": 26, "ce": 27}
else:
raise ValueError("Unsupported platform {}".format(usys.platform))
# Addresses are in little-endian format. They correspond to big-endian
# 0xf0f0f0f0e1, 0xf0f0f0f0d2
pipes = (b"\xe1\xf0\xf0\xf0\xf0", b"\xd2\xf0\xf0\xf0\xf0")
def master():
csn = Pin(cfg["csn"], mode=Pin.OUT, value=1)
ce = Pin(cfg["ce"], mode=Pin.OUT, value=0)
if cfg["spi"] == -1:
spi = SPI(-1, sck=Pin(cfg["sck"]), mosi=Pin(cfg["mosi"]), miso=Pin(cfg["miso"]))
nrf = NRF24L01(spi, csn, ce, payload_size=8)
else:
nrf = NRF24L01(SPI(cfg["spi"]), csn, ce, payload_size=8)
nrf.open_tx_pipe(pipes[0])
nrf.open_rx_pipe(1, pipes[1])
nrf.start_listening()
num_needed = 16
num_successes = 0
num_failures = 0
led_state = 0
print("NRF24L01 master mode, sending %d packets..." % num_needed)
while num_successes < num_needed and num_failures < num_needed:
# stop listening and send packet
nrf.stop_listening()
millis = utime.ticks_ms()
led_state = max(1, (led_state << 1) & 0x0F)
print("sending:", millis, led_state)
try:
nrf.send(struct.pack("ii", millis, led_state))
except OSError:
pass
# start listening again
nrf.start_listening()
# wait for response, with 250ms timeout
start_time = utime.ticks_ms()
timeout = False
while not nrf.any() and not timeout:
if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
timeout = True
if timeout:
print("failed, response timed out")
num_failures += 1
else:
# recv packet
(got_millis,) = struct.unpack("i", nrf.recv())
# print response and round-trip delay
print(
"got response:",
got_millis,
"(delay",
utime.ticks_diff(utime.ticks_ms(), got_millis),
"ms)",
)
num_successes += 1
# delay then loop
utime.sleep_ms(250)
print("master finished sending; successes=%d, failures=%d" % (num_successes, num_failures))
def slave():
csn = Pin(cfg["csn"], mode=Pin.OUT, value=1)
ce = Pin(cfg["ce"], mode=Pin.OUT, value=0)
if cfg["spi"] == -1:
spi = SPI(-1, sck=Pin(cfg["sck"]), mosi=Pin(cfg["mosi"]), miso=Pin(cfg["miso"]))
nrf = NRF24L01(spi, csn, ce, payload_size=8)
else:
nrf = NRF24L01(SPI(cfg["spi"]), csn, ce, payload_size=8)
nrf.open_tx_pipe(pipes[1])
nrf.open_rx_pipe(1, pipes[0])
nrf.start_listening()
print("NRF24L01 slave mode, waiting for packets... (ctrl-C to stop)")
while True:
if nrf.any():
while nrf.any():
buf = nrf.recv()
millis, led_state = struct.unpack("ii", buf)
print("received:", millis, led_state)
for led in leds:
if led_state & 1:
led.on()
else:
led.off()
led_state >>= 1
utime.sleep_ms(_RX_POLL_DELAY)
# Give master time to get into receive mode.
utime.sleep_ms(_SLAVE_SEND_DELAY)
nrf.stop_listening()
try:
nrf.send(struct.pack("i", millis))
except OSError:
pass
print("sent response")
nrf.start_listening()
try:
import pyb
leds = [pyb.LED(i + 1) for i in range(4)]
except:
leds = []
print("NRF24L01 test module loaded")
print("NRF24L01 pinout for test:")
print(" CE on", cfg["ce"])
print(" CSN on", cfg["csn"])
print(" SCK on", cfg["sck"])
print(" MISO on", cfg["miso"])
print(" MOSI on", cfg["mosi"])
print("run nrf24l01test.slave() on slave, then nrf24l01test.master() on master")
# DS18x20 temperature sensor driver for MicroPython.
# MIT license; Copyright (c) 2016 Damien P. George
from micropython import const
_CONVERT = const(0x44)
_RD_SCRATCH = const(0xBE)
_WR_SCRATCH = const(0x4E)
class DS18X20:
def __init__(self, onewire):
self.ow = onewire
self.buf = bytearray(9)
def scan(self):
return [rom for rom in self.ow.scan() if rom[0] in (0x10, 0x22, 0x28)]
def convert_temp(self):
self.ow.reset(True)
self.ow.writebyte(self.ow.SKIP_ROM)
self.ow.writebyte(_CONVERT)
def read_scratch(self, rom):
self.ow.reset(True)
self.ow.select_rom(rom)
self.ow.writebyte(_RD_SCRATCH)
self.ow.readinto(self.buf)
if self.ow.crc8(self.buf):
raise Exception("CRC error")
return self.buf
def write_scratch(self, rom, buf):
self.ow.reset(True)
self.ow.select_rom(rom)
self.ow.writebyte(_WR_SCRATCH)
self.ow.write(buf)
def read_temp(self, rom):
buf = self.read_scratch(rom)
if rom[0] == 0x10:
if buf[1]:
t = buf[0] >> 1 | 0x80
t = -((~t + 1) & 0xFF)
else:
t = buf[0] >> 1
return t - 0.25 + (buf[7] - buf[6]) / buf[7]
else:
t = buf[1] << 8 | buf[0]
if t & 0x8000: # sign bit set
t = -((t ^ 0xFFFF) + 1)
return t / 16
options.defaults(ds18x20=False)
module("onewire.py", opt=3)
if options.ds18x20:
module("ds18x20.py", opt=3)
# 1-Wire driver for MicroPython
# MIT license; Copyright (c) 2016 Damien P. George
import _onewire as _ow
class OneWireError(Exception):
pass
class OneWire:
SEARCH_ROM = 0xF0
MATCH_ROM = 0x55
SKIP_ROM = 0xCC
def __init__(self, pin):
self.pin = pin
self.pin.init(pin.OPEN_DRAIN, pin.PULL_UP)
def reset(self, required=False):
reset = _ow.reset(self.pin)
if required and not reset:
raise OneWireError
return reset
def readbit(self):
return _ow.readbit(self.pin)
def readbyte(self):
return _ow.readbyte(self.pin)
def readinto(self, buf):
for i in range(len(buf)):
buf[i] = _ow.readbyte(self.pin)
def writebit(self, value):
return _ow.writebit(self.pin, value)
def writebyte(self, value):
return _ow.writebyte(self.pin, value)
def write(self, buf):
for b in buf:
_ow.writebyte(self.pin, b)
def select_rom(self, rom):
self.reset()
self.writebyte(self.MATCH_ROM)
self.write(rom)
def scan(self):
devices = []
diff = 65
rom = False
for i in range(0xFF):
rom, diff = self._search_rom(rom, diff)
if rom:
devices += [rom]
if diff == 0:
break
return devices
def _search_rom(self, l_rom, diff):
if not self.reset():
return None, 0
self.writebyte(self.SEARCH_ROM)
if not l_rom:
l_rom = bytearray(8)
rom = bytearray(8)
next_diff = 0
i = 64
for byte in range(8):
r_b = 0
for bit in range(8):
b = self.readbit()
if self.readbit():
if b: # there are no devices or there is an error on the bus
return None, 0
else:
if not b: # collision, two devices with different bit meaning
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
b = 1
next_diff = i
self.writebit(b)
if b:
r_b |= 1 << bit
i -= 1
rom[byte] = r_b
return rom, next_diff
def crc8(self, data):
return _ow.crc8(data)
module("sdcard.py", opt=3)
"""
MicroPython driver for SD cards using SPI bus.
Requires an SPI bus and a CS pin. Provides readblocks and writeblocks
methods so the device can be mounted as a filesystem.
Example usage on pyboard:
import pyb, sdcard, os
sd = sdcard.SDCard(pyb.SPI(1), pyb.Pin.board.X5)
pyb.mount(sd, '/sd2')
os.listdir('/')
Example usage on ESP8266:
import machine, sdcard, os
sd = sdcard.SDCard(machine.SPI(1), machine.Pin(15))
os.mount(sd, '/sd')
os.listdir('/')
"""
from micropython import const
import time
_CMD_TIMEOUT = const(100)
_R1_IDLE_STATE = const(1 << 0)
# R1_ERASE_RESET = const(1 << 1)
_R1_ILLEGAL_COMMAND = const(1 << 2)
# R1_COM_CRC_ERROR = const(1 << 3)
# R1_ERASE_SEQUENCE_ERROR = const(1 << 4)
# R1_ADDRESS_ERROR = const(1 << 5)
# R1_PARAMETER_ERROR = const(1 << 6)
_TOKEN_CMD25 = const(0xFC)
_TOKEN_STOP_TRAN = const(0xFD)
_TOKEN_DATA = const(0xFE)
class SDCard:
def __init__(self, spi, cs, baudrate=1320000):
self.spi = spi
self.cs = cs
self.cmdbuf = bytearray(6)
self.dummybuf = bytearray(512)
self.tokenbuf = bytearray(1)
for i in range(512):
self.dummybuf[i] = 0xFF
self.dummybuf_memoryview = memoryview(self.dummybuf)
# initialise the card
self.init_card(baudrate)
def init_spi(self, baudrate):
try:
master = self.spi.MASTER
except AttributeError:
# on ESP8266
self.spi.init(baudrate=baudrate, phase=0, polarity=0)
else:
# on pyboard
self.spi.init(master, baudrate=baudrate, phase=0, polarity=0)
def init_card(self, baudrate):
# init CS pin
self.cs.init(self.cs.OUT, value=1)
# init SPI bus; use low data rate for initialisation
self.init_spi(100000)
# clock card at least 100 cycles with cs high
for i in range(16):
self.spi.write(b"\xff")
# CMD0: init card; should return _R1_IDLE_STATE (allow 5 attempts)
for _ in range(5):
if self.cmd(0, 0, 0x95) == _R1_IDLE_STATE:
break
else:
raise OSError("no SD card")
# CMD8: determine card version
r = self.cmd(8, 0x01AA, 0x87, 4)
if r == _R1_IDLE_STATE:
self.init_card_v2()
elif r == (_R1_IDLE_STATE | _R1_ILLEGAL_COMMAND):
self.init_card_v1()
else:
raise OSError("couldn't determine SD card version")
# get the number of sectors
# CMD9: response R2 (R1 byte + 16-byte block read)
if self.cmd(9, 0, 0, 0, False) != 0:
raise OSError("no response from SD card")
csd = bytearray(16)
self.readinto(csd)
if csd[0] & 0xC0 == 0x40: # CSD version 2.0
self.sectors = ((csd[8] << 8 | csd[9]) + 1) * 1024
elif csd[0] & 0xC0 == 0x00: # CSD version 1.0 (old, <=2GB)
c_size = (csd[6] & 0b11) << 10 | csd[7] << 2 | csd[8] >> 6
c_size_mult = (csd[9] & 0b11) << 1 | csd[10] >> 7
read_bl_len = csd[5] & 0b1111
capacity = (c_size + 1) * (2 ** (c_size_mult + 2)) * (2**read_bl_len)
self.sectors = capacity // 512
else:
raise OSError("SD card CSD format not supported")
# print('sectors', self.sectors)
# CMD16: set block length to 512 bytes
if self.cmd(16, 512, 0) != 0:
raise OSError("can't set 512 block size")
# set to high data rate now that it's initialised
self.init_spi(baudrate)
def init_card_v1(self):
for i in range(_CMD_TIMEOUT):
time.sleep_ms(50)
self.cmd(55, 0, 0)
if self.cmd(41, 0, 0) == 0:
# SDSC card, uses byte addressing in read/write/erase commands
self.cdv = 512
# print("[SDCard] v1 card")
return
raise OSError("timeout waiting for v1 card")
def init_card_v2(self):
for i in range(_CMD_TIMEOUT):
time.sleep_ms(50)
self.cmd(58, 0, 0, 4)
self.cmd(55, 0, 0)
if self.cmd(41, 0x40000000, 0) == 0:
self.cmd(58, 0, 0, -4) # 4-byte response, negative means keep the first byte
ocr = self.tokenbuf[0] # get first byte of response, which is OCR
if not ocr & 0x40:
# SDSC card, uses byte addressing in read/write/erase commands
self.cdv = 512
else:
# SDHC/SDXC card, uses block addressing in read/write/erase commands
self.cdv = 1
# print("[SDCard] v2 card")
return
raise OSError("timeout waiting for v2 card")
def cmd(self, cmd, arg, crc, final=0, release=True, skip1=False):
self.cs(0)
# create and send the command
buf = self.cmdbuf
buf[0] = 0x40 | cmd
buf[1] = arg >> 24
buf[2] = arg >> 16
buf[3] = arg >> 8
buf[4] = arg
buf[5] = crc
self.spi.write(buf)
if skip1:
self.spi.readinto(self.tokenbuf, 0xFF)
# wait for the response (response[7] == 0)
for i in range(_CMD_TIMEOUT):
self.spi.readinto(self.tokenbuf, 0xFF)
response = self.tokenbuf[0]
if not (response & 0x80):
# this could be a big-endian integer that we are getting here
# if final<0 then store the first byte to tokenbuf and discard the rest
if final < 0:
self.spi.readinto(self.tokenbuf, 0xFF)
final = -1 - final
for j in range(final):
self.spi.write(b"\xff")
if release:
self.cs(1)
self.spi.write(b"\xff")
return response
# timeout
self.cs(1)
self.spi.write(b"\xff")
return -1
def readinto(self, buf):
self.cs(0)
# read until start byte (0xff)
for i in range(_CMD_TIMEOUT):
self.spi.readinto(self.tokenbuf, 0xFF)
if self.tokenbuf[0] == _TOKEN_DATA:
break
time.sleep_ms(1)
else:
self.cs(1)
raise OSError("timeout waiting for response")
# read data
mv = self.dummybuf_memoryview
if len(buf) != len(mv):
mv = mv[: len(buf)]
self.spi.write_readinto(mv, buf)
# read checksum
self.spi.write(b"\xff")
self.spi.write(b"\xff")
self.cs(1)
self.spi.write(b"\xff")
def write(self, token, buf):
self.cs(0)
# send: start of block, data, checksum
self.spi.read(1, token)
self.spi.write(buf)
self.spi.write(b"\xff")
self.spi.write(b"\xff")
# check the response
if (self.spi.read(1, 0xFF)[0] & 0x1F) != 0x05:
self.cs(1)
self.spi.write(b"\xff")
return
# wait for write to finish
while self.spi.read(1, 0xFF)[0] == 0:
pass
self.cs(1)
self.spi.write(b"\xff")
def write_token(self, token):
self.cs(0)
self.spi.read(1, token)
self.spi.write(b"\xff")
# wait for write to finish
while self.spi.read(1, 0xFF)[0] == 0x00:
pass
self.cs(1)
self.spi.write(b"\xff")
def readblocks(self, block_num, buf):
nblocks = len(buf) // 512
assert nblocks and not len(buf) % 512, "Buffer length is invalid"
if nblocks == 1:
# CMD17: set read address for single block
if self.cmd(17, block_num * self.cdv, 0, release=False) != 0:
# release the card
self.cs(1)
raise OSError(5) # EIO
# receive the data and release card
self.readinto(buf)
else:
# CMD18: set read address for multiple blocks
if self.cmd(18, block_num * self.cdv, 0, release=False) != 0:
# release the card
self.cs(1)
raise OSError(5) # EIO
offset = 0
mv = memoryview(buf)
while nblocks:
# receive the data and release card
self.readinto(mv[offset : offset + 512])
offset += 512
nblocks -= 1
if self.cmd(12, 0, 0xFF, skip1=True):
raise OSError(5) # EIO
def writeblocks(self, block_num, buf):
nblocks, err = divmod(len(buf), 512)
assert nblocks and not err, "Buffer length is invalid"
if nblocks == 1:
# CMD24: set write address for single block
if self.cmd(24, block_num * self.cdv, 0) != 0:
raise OSError(5) # EIO
# send the data
self.write(_TOKEN_DATA, buf)
else:
# CMD25: set write address for first block
if self.cmd(25, block_num * self.cdv, 0) != 0:
raise OSError(5) # EIO
# send the data
offset = 0
mv = memoryview(buf)
while nblocks:
self.write(_TOKEN_CMD25, mv[offset : offset + 512])
offset += 512
nblocks -= 1
self.write_token(_TOKEN_STOP_TRAN)
def ioctl(self, op, arg):
if op == 4: # get number of blocks
return self.sectors
if op == 5: # get block size in bytes
return 512
# Test for sdcard block protocol
# Peter hinch 30th Jan 2016
import os, sdcard, machine
def sdtest():
spi = machine.SPI(1)
spi.init() # Ensure right baudrate
sd = sdcard.SDCard(spi, machine.Pin.board.X21) # Compatible with PCB
vfs = os.VfsFat(sd)
os.mount(vfs, "/fc")
print("Filesystem check")
print(os.listdir("/fc"))
line = "abcdefghijklmnopqrstuvwxyz\n"
lines = line * 200 # 5400 chars
short = "1234567890\n"
fn = "/fc/rats.txt"
print()
print("Multiple block read/write")
with open(fn, "w") as f:
n = f.write(lines)
print(n, "bytes written")
n = f.write(short)
print(n, "bytes written")
n = f.write(lines)
print(n, "bytes written")
with open(fn, "r") as f:
result1 = f.read()
print(len(result1), "bytes read")
fn = "/fc/rats1.txt"
print()
print("Single block read/write")
with open(fn, "w") as f:
n = f.write(short) # one block
print(n, "bytes written")
with open(fn, "r") as f:
result2 = f.read()
print(len(result2), "bytes read")
os.umount("/fc")
print()
print("Verifying data read back")
success = True
if result1 == "".join((lines, short, lines)):
print("Large file Pass")
else:
print("Large file Fail")
success = False
if result2 == short:
print("Small file Pass")
else:
print("Small file Fail")
success = False
print()
print("Tests", "passed" if success else "failed")
import utime
try:
import usocket as socket
except:
import socket
try:
import ustruct as struct
except:
import struct
# The NTP host can be configured at runtime by doing: ntptime.host = 'myhost.org'
host = "pool.ntp.org"
def time():
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1B
addr = socket.getaddrinfo(host, 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
finally:
s.close()
val = struct.unpack("!I", msg[40:44])[0]
EPOCH_YEAR = utime.gmtime(0)[0]
if EPOCH_YEAR == 2000:
# (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60
NTP_DELTA = 3155673600
elif EPOCH_YEAR == 1970:
# (date(1970, 1, 1) - date(1900, 1, 1)).days * 24*60*60
NTP_DELTA = 2208988800
else:
raise Exception("Unsupported epoch: {}".format(EPOCH_YEAR))
return val - NTP_DELTA
# There's currently no timezone support in MicroPython, and the RTC is set in UTC time.
def settime():
t = time()
import machine
tm = utime.gmtime(t)
machine.RTC().datetime((tm[0], tm[1], tm[2], tm[6] + 1, tm[3], tm[4], tm[5], 0))
module("webrepl.py", opt=3)
module("webrepl_setup.py", opt=3)
# This module should be imported from REPL, not run from command line.
import binascii
import hashlib
import network
import os
import socket
import sys
import websocket
import _webrepl
listen_s = None
client_s = None
DEBUG = 0
_DEFAULT_STATIC_HOST = const("https://micropython.org/webrepl/")
static_host = _DEFAULT_STATIC_HOST
def server_handshake(cl):
req = cl.makefile("rwb", 0)
# Skip HTTP GET line.
l = req.readline()
if DEBUG:
sys.stdout.write(repr(l))
webkey = None
upgrade = False
websocket = False
while True:
l = req.readline()
if not l:
# EOF in headers.
return False
if l == b"\r\n":
break
if DEBUG:
sys.stdout.write(l)
h, v = [x.strip() for x in l.split(b":", 1)]
if DEBUG:
print((h, v))
if h == b"Sec-WebSocket-Key":
webkey = v
elif h == b"Connection" and b"Upgrade" in v:
upgrade = True
elif h == b"Upgrade" and v == b"websocket":
websocket = True
if not (upgrade and websocket and webkey):
return False
if DEBUG:
print("Sec-WebSocket-Key:", webkey, len(webkey))
d = hashlib.sha1(webkey)
d.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
respkey = d.digest()
respkey = binascii.b2a_base64(respkey)[:-1]
if DEBUG:
print("respkey:", respkey)
cl.send(
b"""\
HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: """
)
cl.send(respkey)
cl.send("\r\n\r\n")
return True
def send_html(cl):
cl.send(
b"""\
HTTP/1.0 200 OK\r
\r
<base href=\""""
)
cl.send(static_host)
cl.send(
b"""\"></base>\r
<script src="webrepl_content.js"></script>\r
"""
)
cl.close()
def setup_conn(port, accept_handler):
global listen_s
listen_s = socket.socket()
listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", port)
addr = ai[0][4]
listen_s.bind(addr)
listen_s.listen(1)
if accept_handler:
listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler)
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("WebREPL server started on http://%s:%d/" % (iface.ifconfig()[0], port))
return listen_s
def accept_conn(listen_sock):
global client_s
cl, remote_addr = listen_sock.accept()
if not server_handshake(cl):
send_html(cl)
return False
prev = os.dupterm(None)
os.dupterm(prev)
if prev:
print("\nConcurrent WebREPL connection from", remote_addr, "rejected")
cl.close()
return False
print("\nWebREPL connection from:", remote_addr)
client_s = cl
ws = websocket.websocket(cl, True)
ws = _webrepl._webrepl(ws)
cl.setblocking(False)
# notify REPL on socket incoming data (ESP32/ESP8266-only)
if hasattr(os, "dupterm_notify"):
cl.setsockopt(socket.SOL_SOCKET, 20, os.dupterm_notify)
os.dupterm(ws)
return True
def stop():
global listen_s, client_s
os.dupterm(None)
if client_s:
client_s.close()
if listen_s:
listen_s.close()
def start(port=8266, password=None, accept_handler=accept_conn):
global static_host
stop()
webrepl_pass = password
if webrepl_pass is None:
try:
import webrepl_cfg
webrepl_pass = webrepl_cfg.PASS
if hasattr(webrepl_cfg, "BASE"):
static_host = webrepl_cfg.BASE
except:
print("WebREPL is not configured, run 'import webrepl_setup'")
_webrepl.password(webrepl_pass)
s = setup_conn(port, accept_handler)
if accept_handler is None:
print("Starting webrepl in foreground mode")
# Run accept_conn to serve HTML until we get a websocket connection.
while not accept_conn(s):
pass
elif password is None:
print("Started webrepl in normal mode")
else:
print("Started webrepl in manual override mode")
def start_foreground(port=8266, password=None):
start(port, password, None)
import sys
import os
import machine
RC = "./boot.py"
CONFIG = "./webrepl_cfg.py"
def input_choice(prompt, choices):
while 1:
resp = input(prompt)
if resp in choices:
return resp
def getpass(prompt):
return input(prompt)
def input_pass():
while 1:
passwd1 = getpass("New password (4-9 chars): ")
if len(passwd1) < 4 or len(passwd1) > 9:
print("Invalid password length")
continue
passwd2 = getpass("Confirm password: ")
if passwd1 == passwd2:
return passwd1
print("Passwords do not match")
def exists(fname):
try:
with open(fname):
pass
return True
except OSError:
return False
def get_daemon_status():
with open(RC) as f:
for l in f:
if "webrepl" in l:
if l.startswith("#"):
return False
return True
return None
def change_daemon(action):
LINES = ("import webrepl", "webrepl.start()")
with open(RC) as old_f, open(RC + ".tmp", "w") as new_f:
found = False
for l in old_f:
for patt in LINES:
if patt in l:
found = True
if action and l.startswith("#"):
l = l[1:]
elif not action and not l.startswith("#"):
l = "#" + l
new_f.write(l)
if not found:
new_f.write("import webrepl\nwebrepl.start()\n")
# FatFs rename() is not POSIX compliant, will raise OSError if
# dest file exists.
os.remove(RC)
os.rename(RC + ".tmp", RC)
def main():
status = get_daemon_status()
print("WebREPL daemon auto-start status:", "enabled" if status else "disabled")
print("\nWould you like to (E)nable or (D)isable it running on boot?")
print("(Empty line to quit)")
resp = input("> ").upper()
if resp == "E":
if exists(CONFIG):
resp2 = input_choice(
"Would you like to change WebREPL password? (y/n) ", ("y", "n", "")
)
else:
print("To enable WebREPL, you must set password for it")
resp2 = "y"
if resp2 == "y":
passwd = input_pass()
with open(CONFIG, "w") as f:
f.write("PASS = %r\n" % passwd)
if resp not in ("D", "E") or (resp == "D" and not status) or (resp == "E" and status):
print("No further action required")
sys.exit()
change_daemon(resp == "E")
print("Changes will be activated after reboot")
resp = input_choice("Would you like to reboot now? (y/n) ", ("y", "n", ""))
if resp == "y":
machine.reset()
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment