Files
core/homeassistant/components/numato/__init__.py
Ville Skyttä b4bac0f7a0 Exception chaining and wrapping improvements (#39320)
* Remove unnecessary exception re-wraps

* Preserve exception chains on re-raise

We slap "from cause" to almost all possible cases here. In some cases it
could conceivably be better to do "from None" if we really want to hide
the cause. However those should be in the minority, and "from cause"
should be an improvement over the corresponding raise without a "from"
in all cases anyway.

The only case where we raise from None here is in plex, where the
exception for an original invalid SSL cert is not the root cause for
failure to validate a newly fetched one.

Follow local convention on exception variable names if there is a
consistent one, otherwise `err` to match with majority of codebase.

* Fix mistaken re-wrap in homematicip_cloud/hap.py

Missed the difference between HmipConnectionError and
HmipcConnectionError.

* Do not hide original error on plex new cert validation error

Original is not the cause for the new one, but showing old in the
traceback is useful nevertheless.
2020-08-28 13:50:32 +02:00

249 lines
8.0 KiB
Python

"""Support for controlling GPIO pins of a Numato Labs USB GPIO expander."""
import logging
import numato_gpio as gpio
import voluptuous as vol
from homeassistant.const import (
CONF_BINARY_SENSORS,
CONF_ID,
CONF_NAME,
CONF_SENSORS,
CONF_SWITCHES,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import load_platform
_LOGGER = logging.getLogger(__name__)
DOMAIN = "numato"
CONF_INVERT_LOGIC = "invert_logic"
CONF_DISCOVER = "discover"
CONF_DEVICES = "devices"
CONF_DEVICE_ID = "id"
CONF_PORTS = "ports"
CONF_SRC_RANGE = "source_range"
CONF_DST_RANGE = "destination_range"
CONF_DST_UNIT = "unit"
DEFAULT_INVERT_LOGIC = False
DEFAULT_SRC_RANGE = [0, 1024]
DEFAULT_DST_RANGE = [0.0, 100.0]
DEFAULT_UNIT = "%"
DEFAULT_DEV = [f"/dev/ttyACM{i}" for i in range(10)]
PORT_RANGE = range(1, 8) # ports 0-7 are ADC capable
DATA_PORTS_IN_USE = "ports_in_use"
DATA_API = "api"
def int_range(rng):
"""Validate the input array to describe a range by two integers."""
if not (isinstance(rng[0], int) and isinstance(rng[1], int)):
raise vol.Invalid(f"Only integers are allowed: {rng}")
if len(rng) != 2:
raise vol.Invalid(f"Only two numbers allowed in a range: {rng}")
if rng[0] > rng[1]:
raise vol.Invalid(f"Lower range bound must come first: {rng}")
return rng
def float_range(rng):
"""Validate the input array to describe a range by two floats."""
try:
coe = vol.Coerce(float)
coe(rng[0])
coe(rng[1])
except vol.CoerceInvalid as err:
raise vol.Invalid(f"Only int or float values are allowed: {rng}") from err
if len(rng) != 2:
raise vol.Invalid(f"Only two numbers allowed in a range: {rng}")
if rng[0] > rng[1]:
raise vol.Invalid(f"Lower range bound must come first: {rng}")
return rng
def adc_port_number(num):
"""Validate input number to be in the range of ADC enabled ports."""
try:
num = int(num)
except ValueError as err:
raise vol.Invalid(f"Port numbers must be integers: {num}") from err
if num not in range(1, 8):
raise vol.Invalid(f"Only port numbers from 1 to 7 are ADC capable: {num}")
return num
ADC_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_SRC_RANGE, default=DEFAULT_SRC_RANGE): int_range,
vol.Optional(CONF_DST_RANGE, default=DEFAULT_DST_RANGE): float_range,
vol.Optional(CONF_DST_UNIT, default=DEFAULT_UNIT): cv.string,
}
)
PORTS_SCHEMA = vol.Schema({cv.positive_int: cv.string})
IO_PORTS_SCHEMA = vol.Schema(
{
vol.Required(CONF_PORTS): PORTS_SCHEMA,
vol.Optional(CONF_INVERT_LOGIC, default=DEFAULT_INVERT_LOGIC): cv.boolean,
}
)
DEVICE_SCHEMA = vol.Schema(
{
vol.Required(CONF_ID): cv.positive_int,
CONF_BINARY_SENSORS: IO_PORTS_SCHEMA,
CONF_SWITCHES: IO_PORTS_SCHEMA,
CONF_SENSORS: {CONF_PORTS: {adc_port_number: ADC_SCHEMA}},
}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: {
CONF_DEVICES: vol.All(cv.ensure_list, [DEVICE_SCHEMA]),
vol.Optional(CONF_DISCOVER, default=DEFAULT_DEV): vol.All(
cv.ensure_list, [cv.string]
),
},
},
extra=vol.ALLOW_EXTRA,
)
def setup(hass, config):
"""Initialize the numato integration.
Discovers available Numato devices and loads the binary_sensor, sensor and
switch platforms.
Returns False on error during device discovery (e.g. duplicate ID),
otherwise returns True.
No exceptions should occur, since the platforms are initialized on a best
effort basis, which means, errors are handled locally.
"""
hass.data[DOMAIN] = config[DOMAIN]
try:
gpio.discover(config[DOMAIN][CONF_DISCOVER])
except gpio.NumatoGpioError as err:
_LOGGER.info("Error discovering Numato devices: %s", err)
gpio.cleanup()
return False
_LOGGER.info(
"Initializing Numato 32 port USB GPIO expanders with IDs: %s",
", ".join(str(d) for d in gpio.devices),
)
hass.data[DOMAIN][DATA_API] = NumatoAPI()
def cleanup_gpio(event):
"""Stuff to do before stopping."""
_LOGGER.debug("Clean up Numato GPIO")
gpio.cleanup()
if DATA_API in hass.data[DOMAIN]:
hass.data[DOMAIN][DATA_API].ports_registered.clear()
def prepare_gpio(event):
"""Stuff to do when home assistant starts."""
_LOGGER.debug("Setup cleanup at stop for Numato GPIO")
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, prepare_gpio)
load_platform(hass, "binary_sensor", DOMAIN, {}, config)
load_platform(hass, "sensor", DOMAIN, {}, config)
load_platform(hass, "switch", DOMAIN, {}, config)
return True
# pylint: disable=no-self-use
class NumatoAPI:
"""Home-Assistant specific API for numato device access."""
def __init__(self):
"""Initialize API state."""
self.ports_registered = dict()
def check_port_free(self, device_id, port, direction):
"""Check whether a port is still free set up.
Fail with exception if it has already been registered.
"""
if (device_id, port) not in self.ports_registered:
self.ports_registered[(device_id, port)] = direction
else:
raise gpio.NumatoGpioError(
"Device {} port {} already in use as {}.".format(
device_id,
port,
"input"
if self.ports_registered[(device_id, port)] == gpio.IN
else "output",
)
)
def check_device_id(self, device_id):
"""Check whether a device has been discovered.
Fail with exception.
"""
if device_id not in gpio.devices:
raise gpio.NumatoGpioError(f"Device {device_id} not available.")
def check_port(self, device_id, port, direction):
"""Raise an error if the port setup doesn't match the direction."""
self.check_device_id(device_id)
if (device_id, port) not in self.ports_registered:
raise gpio.NumatoGpioError(
f"Port {port} is not set up for numato device {device_id}."
)
msg = {
gpio.OUT: f"Trying to write to device {device_id} port {port} set up as input.",
gpio.IN: f"Trying to read from device {device_id} port {port} set up as output.",
}
if self.ports_registered[(device_id, port)] != direction:
raise gpio.NumatoGpioError(msg[direction])
def setup_output(self, device_id, port):
"""Set up a GPIO as output."""
self.check_device_id(device_id)
self.check_port_free(device_id, port, gpio.OUT)
gpio.devices[device_id].setup(port, gpio.OUT)
def setup_input(self, device_id, port):
"""Set up a GPIO as input."""
self.check_device_id(device_id)
gpio.devices[device_id].setup(port, gpio.IN)
self.check_port_free(device_id, port, gpio.IN)
def write_output(self, device_id, port, value):
"""Write a value to a GPIO."""
self.check_port(device_id, port, gpio.OUT)
gpio.devices[device_id].write(port, value)
def read_input(self, device_id, port):
"""Read a value from a GPIO."""
self.check_port(device_id, port, gpio.IN)
return gpio.devices[device_id].read(port)
def read_adc_input(self, device_id, port):
"""Read an ADC value from a GPIO ADC port."""
self.check_port(device_id, port, gpio.IN)
self.check_device_id(device_id)
return gpio.devices[device_id].adc_read(port)
def edge_detect(self, device_id, port, event_callback):
"""Add detection for RISING and FALLING events."""
self.check_port(device_id, port, gpio.IN)
gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH)
gpio.devices[device_id].notify = True