Add helper to calculate statistic period start and end (#82493)
* Add helper to calculate statistic period start and end * Don't parse values in resolve_period * Add specific test for resolve_period * Improve typing * Move to recorder/util.py * Extract period schema
This commit is contained in:
@@ -24,8 +24,10 @@ from sqlalchemy.orm.query import Query
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy.sql.lambdas import StatementLambdaElement
|
||||
from typing_extensions import Concatenate, ParamSpec
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .const import DATA_INSTANCE, SQLITE_URL_PREFIX, SupportedDialect
|
||||
@@ -35,7 +37,7 @@ from .db_schema import (
|
||||
TABLES_TO_CHECK,
|
||||
RecorderRuns,
|
||||
)
|
||||
from .models import UnsupportedDialect, process_timestamp
|
||||
from .models import StatisticPeriod, UnsupportedDialect, process_timestamp
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from . import Recorder
|
||||
@@ -604,3 +606,83 @@ def get_instance(hass: HomeAssistant) -> Recorder:
|
||||
"""Get the recorder instance."""
|
||||
instance: Recorder = hass.data[DATA_INSTANCE]
|
||||
return instance
|
||||
|
||||
|
||||
PERIOD_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Exclusive("calendar", "period"): vol.Schema(
|
||||
{
|
||||
vol.Required("period"): vol.Any("hour", "day", "week", "month", "year"),
|
||||
vol.Optional("offset"): int,
|
||||
}
|
||||
),
|
||||
vol.Exclusive("fixed_period", "period"): vol.Schema(
|
||||
{
|
||||
vol.Optional("start_time"): vol.All(cv.datetime, dt_util.as_utc),
|
||||
vol.Optional("end_time"): vol.All(cv.datetime, dt_util.as_utc),
|
||||
}
|
||||
),
|
||||
vol.Exclusive("rolling_window", "period"): vol.Schema(
|
||||
{
|
||||
vol.Required("duration"): cv.time_period_dict,
|
||||
vol.Optional("offset"): cv.time_period_dict,
|
||||
}
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def resolve_period(
|
||||
period_def: StatisticPeriod,
|
||||
) -> tuple[datetime | None, datetime | None]:
|
||||
"""Return start and end datetimes for a statistic period definition."""
|
||||
start_time = None
|
||||
end_time = None
|
||||
|
||||
if "calendar" in period_def:
|
||||
calendar_period = period_def["calendar"]["period"]
|
||||
start_of_day = dt_util.start_of_local_day()
|
||||
cal_offset = period_def["calendar"].get("offset", 0)
|
||||
if calendar_period == "hour":
|
||||
start_time = dt_util.now().replace(minute=0, second=0, microsecond=0)
|
||||
start_time += timedelta(hours=cal_offset)
|
||||
end_time = start_time + timedelta(hours=1)
|
||||
elif calendar_period == "day":
|
||||
start_time = start_of_day
|
||||
start_time += timedelta(days=cal_offset)
|
||||
end_time = start_time + timedelta(days=1)
|
||||
elif calendar_period == "week":
|
||||
start_time = start_of_day - timedelta(days=start_of_day.weekday())
|
||||
start_time += timedelta(days=cal_offset * 7)
|
||||
end_time = start_time + timedelta(weeks=1)
|
||||
elif calendar_period == "month":
|
||||
start_time = start_of_day.replace(day=28)
|
||||
# This works for up to 48 months of offset
|
||||
start_time = (start_time + timedelta(days=cal_offset * 31)).replace(day=1)
|
||||
end_time = (start_time + timedelta(days=31)).replace(day=1)
|
||||
else: # calendar_period = "year"
|
||||
start_time = start_of_day.replace(month=12, day=31)
|
||||
# This works for 100+ years of offset
|
||||
start_time = (start_time + timedelta(days=cal_offset * 366)).replace(
|
||||
month=1, day=1
|
||||
)
|
||||
end_time = (start_time + timedelta(days=365)).replace(day=1)
|
||||
|
||||
start_time = dt_util.as_utc(start_time)
|
||||
end_time = dt_util.as_utc(end_time)
|
||||
|
||||
elif "fixed_period" in period_def:
|
||||
start_time = period_def["fixed_period"].get("start_time")
|
||||
end_time = period_def["fixed_period"].get("end_time")
|
||||
|
||||
elif "rolling_window" in period_def:
|
||||
duration = period_def["rolling_window"]["duration"]
|
||||
now = dt_util.utcnow()
|
||||
start_time = now - duration
|
||||
end_time = start_time + duration
|
||||
|
||||
if offset := period_def["rolling_window"].get("offset"):
|
||||
start_time += offset
|
||||
end_time += offset
|
||||
|
||||
return (start_time, end_time)
|
||||
|
||||
Reference in New Issue
Block a user