Implement ConfigEntry async_wait_for_states (#81771)

* Implement async_wait_for_states

* stale docstr, remove hints

* Assert return states for tests
This commit is contained in:
Jan Bouwhuis
2022-11-08 17:18:40 +01:00
committed by GitHub
parent 7d768dc3a6
commit 3cc9ecf1dc
2 changed files with 140 additions and 1 deletions

View File

@@ -12,6 +12,8 @@ from types import MappingProxyType, MethodType
from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast
import weakref
import async_timeout
from . import data_entry_flow, loader
from .backports.enum import StrEnum
from .components import persistent_notification
@@ -19,7 +21,7 @@ from .const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, Platfo
from .core import CALLBACK_TYPE, CoreState, Event, HomeAssistant, callback
from .exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady, HomeAssistantError
from .helpers import device_registry, entity_registry, storage
from .helpers.dispatcher import async_dispatcher_send
from .helpers.dispatcher import async_dispatcher_connect, async_dispatcher_send
from .helpers.event import async_call_later
from .helpers.frame import report
from .helpers.typing import UNDEFINED, ConfigType, DiscoveryInfoType, UndefinedType
@@ -1239,6 +1241,36 @@ class ConfigEntries:
await entry.async_setup(self.hass, integration=integration)
return True
async def async_wait_for_states(
self, entry: ConfigEntry, states: set[ConfigEntryState], timeout: float = 60.0
) -> ConfigEntryState:
"""Wait for the setup of an entry to reach one of the supplied states state.
Returns the state the entry reached or raises asyncio.TimeoutError if the
entry did not reach one of the supplied states within the timeout.
"""
state_reached_future: asyncio.Future[ConfigEntryState] = asyncio.Future()
@callback
def _async_entry_changed(
change: ConfigEntryChange, event_entry: ConfigEntry
) -> None:
if (
event_entry is entry
and change is ConfigEntryChange.UPDATED
and entry.state in states
):
state_reached_future.set_result(entry.state)
unsub = async_dispatcher_connect(
self.hass, SIGNAL_CONFIG_ENTRY_CHANGED, _async_entry_changed
)
try:
async with async_timeout.timeout(timeout):
return await state_reached_future
finally:
unsub()
async def async_unload_platforms(
self, entry: ConfigEntry, platforms: Iterable[Platform | str]
) -> bool: