Source code for PyExpLabSys.drivers.sensirion_sps30

"""Driver class for """
import time
import serial
import struct
import logging


[docs]class SensirionSPS30: """Driver for r"""
[docs] def __init__(self, port='/dev/serial0'): self.serial = serial.Serial( port, baudrate=115200, timeout=1, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, )
def _calculate_checksum(self, command_bytes): check_sum = 0 for char in command_bytes: check_sum += char check_sum = check_sum & 0xFF # Only last byte is considered check_sum = 0xFF - check_sum # print('Check sum: {}'.format(hex(check_sum))) return check_sum def comm(self, command, data=[]): read_success = True length = len(data) first = ( [0x7E, 0x00] # Start # Addr, always 0 for this device + [command, length] + data ) checksum = self._calculate_checksum(first[1:]) last = [checksum, 0x7E] # Stop actual_command = first + last self.serial.write(actual_command) if not ord(self.serial.read(1)) == 0x7E: read_success = False logging.warning('First char should have been 0x7e') chars = bytes() char = self.serial.read(1) while not ord(char) == 0x7E: chars += char char = self.serial.read(1) # Reverse byte-stuffing chars = chars.replace(b'\x7D\x5E', b'\x7E') chars = chars.replace(b'\x7D\x5D', b'\x7D') chars = chars.replace(b'\x7D\x31', b'\x11') chars = chars.replace(b'\x7D\x33', b'\x13') if not chars[0] == 0: # Address, 0x7E is already stripped pass expected_checksum = self._calculate_checksum(chars[:-1]) assert expected_checksum == chars[-1] state = chars[2] if state == 0x43: print('SPS30: Command {} not allowd in current state'.format(hex(command))) elif state > 0: print('SPS30: Error state: {}'.format(state)) # Do not return header and checksum chars = chars[4:-1] return chars def device_info(self): command = 0xD0 data = [0x00] product_type = self.comm(command, data)[:-1] data = [0x03] serial = self.comm(command, data) info = 'Product type: {}. Serial no.: {}'.format( product_type.decode(), serial.decode() ) return info def clean_fan(self): pass def read_version(self): command = 0xD1 data = self.comm(command) version = 'Firmware: {}.{}; Hardware: {}; SHDC: {}.{}'.format( data[0], data[1], data[3], data[5], data[6] ) return version
[docs] def device_status(self): """ Check status of device, only very few errors exists, until one of them shows up, this is just an on/off indicator. """ command = 0xD2 # Device status # data = [0] # Do not clear after reading data = [1] # Clear after reading reply = self.comm(command, data) status_ok = True for byte in reply: if byte > 0: status_ok = False return status_ok
def start_measuring(self): command = 0x00 # Start data = [0x01, 0x03] # Protocol, must be 0x01 # Big-endian IEEE754 float values self.comm(command, data) # No reply from this command return True def read_measurement(self): command = 0x03 # Read # Todo, assertion error is a bit primitive, we should make # a dedicated exception. error = 0 while -1 < error < 50: try: data = self.comm(command) error = -1 except AssertionError: error += 1 if error > 0: return None # Unpack 10 big-endian floats - 40 bytes in total parsed_data = struct.unpack(">ffffffffff", data) return parsed_data
if __name__ == '__main__': dust_sensor = SensirionSPS30() print(dust_sensor.device_info()) print(dust_sensor.read_version()) print(dust_sensor.device_status()) dust_sensor.start_measuring() for i in range(0, 3): time.sleep(1.1) # Todo: Read status to know if measurement is ready parsed_data = dust_sensor.read_measurement() print('MC PM1.0 [μg/m3]: {:.2f}'.format(parsed_data[0])) print('MC PM2.5 [μg/m3]: {:.2f}'.format(parsed_data[1])) print('MC PM4.0 [μg/m3]: {:.2f}'.format(parsed_data[2])) print('MC PM10 [μg/m3]: {:.2f}'.format(parsed_data[3])) print('NC NM0.5 [#/cm3]: {:.2f}'.format(parsed_data[4])) print('NC NM1.0 [#/cm3]: {:.2f}'.format(parsed_data[5])) print('NC NM2.5 [#/cm3]: {:.2f}'.format(parsed_data[6])) print('NC NM4.0 [#/cm3]: {:.2f}'.format(parsed_data[7])) print('NC NM10 [#/cm3]: {:.2f}'.format(parsed_data[8])) print('Typical size: [μm]: {:.2f}'.format(parsed_data[9]))