Source code for powersensor_local.virtual_household

"""Abstraction for producing a household view."""

import sys
from pathlib import Path

from powersensor_local.EventBuffer import EventBuffer

project_root = str(Path(__file__).parents[1])
if project_root not in sys.path:
    sys.path.append(project_root)

from powersensor_local.async_event_emitter import AsyncEventEmitter
from dataclasses import dataclass
from typing import Optional

KEY_DUR_S = 'duration_s'
KEY_RESET = 'summation_resettime_utc'
KEY_START = 'starttime_utc'
KEY_SUM_J = 'summation_joules'
KEY_WATTS = 'watts'

@dataclass
class InstantaneousValues:
    starttime_utc: int
    solar_watts: float
    housenet_watts: float
    duration_s: int

@dataclass
class SummationValues:
    starttime_utc: int
    solar_summation: float
    solar_resettime: int
    housenet_summation: float
    housenet_resettime: int

@dataclass
class SummationDeltas:
    solar_generation: float
    to_grid: float
    from_grid: float
    home_use: float

def same_duration(ev1: dict, ev2: dict):
    """Close-enough matching of duration_s in events."""
    dur = KEY_DUR_S
    if not dur in ev1 or not dur in ev2:
        return False
    # We don't care about sub-second differences
    d1 = round(ev1[dur], 0)
    d2 = round(ev2[dur], 0)
    return d1 == d2

def matching_instants(starttime_utc: int, solar_events: EventBuffer, housenet_events: EventBuffer) -> Optional[InstantaneousValues]:
    """Attempts to match and merge solar+housenet average_power events."""
    solar = solar_events.find_by_key(KEY_START, starttime_utc)
    housenet = housenet_events.find_by_key(KEY_START, starttime_utc)
    if solar is not None and housenet is not None and same_duration(solar, housenet):
        return InstantaneousValues(
            starttime_utc = starttime_utc,
            solar_watts = solar[KEY_WATTS],
            housenet_watts = housenet[KEY_WATTS],
            duration_s = round(solar[KEY_DUR_S], 0),
        )
    else:
        return None

def make_instant_housenet(ev: dict) -> Optional[InstantaneousValues]:
    """Helper for case where no solar merge is expected."""
    if ev is None:
        return None
    return InstantaneousValues(
        starttime_utc = ev[KEY_START],
        solar_watts = 0,
        housenet_watts = ev[KEY_WATTS],
        duration_s = round(ev[KEY_DUR_S], 0)
    )

def matching_summations(starttime_utc: int, solar_events: EventBuffer, housenet_events: EventBuffer) -> Optional[SummationValues]:
    """Attempts to match and merge solar+housenet summation events."""
    solar = solar_events.find_by_key(KEY_START, starttime_utc)
    housenet = housenet_events.find_by_key(KEY_START, starttime_utc)
    if solar is not None and housenet is not None:
        return SummationValues(
            starttime_utc = starttime_utc,
            solar_summation =solar[KEY_SUM_J],
            solar_resettime = solar[KEY_RESET],
            housenet_summation = housenet[KEY_SUM_J],
            housenet_resettime = housenet[KEY_RESET],
        )
    else:
        return None

def make_summation_housenet(ev: dict) -> Optional[SummationValues]:
    """Helper for case where no solar merge is expected."""
    if ev is None:
        return None
    return SummationValues(
        starttime_utc = ev[KEY_START],
        solar_summation = 0,
        solar_resettime = 0,
        housenet_summation = ev[KEY_SUM_J],
        housenet_resettime = ev[KEY_RESET]
    )



[docs] class VirtualHousehold(AsyncEventEmitter): """ Class for processing average_power and summation_energy events into to/from grid, solar generation, and home usage events. To use, simply feed the appropriate PlugApi events to the process_average_power_event and process_summation_event member functions. Point-in-time power flow events include: * home_usage * from_grid * to_grid (only for solar kits) * solar_generation (only for solar kits) These all have an event payload in the form: { timestamp_utc: , watts: } Energy summation events include: * home_usage_summation * from_grid_summation * to_grid_summation (only for solar kits) * solar_generation_summation (only for solar kits) These all have an event payload in the form: { timestamp_utc: , summation_resettime_utc: , summation_joules: } Summations may reset at any time. Track the summation_resettime_utc field to take note of summation resets. """ def __init__(self, with_solar: bool): """Constructor. with_solar True if it's already known that solar exists. Will be automatically enabled upon encountering a solar event during processing, but until such a time may generate incorrect values for home usage. Similarly, if this is set to True but no solar exists, no events may be generated. """ super().__init__() self._expect_solar = with_solar self._summation = self.SummationInfo(0, 0, 0, 0) self._counters = self.Counters(0, 0, 0, 0, 0) self._solar_instants = EventBuffer(31) self._housenet_instants = EventBuffer(31) self._solar_summations = EventBuffer(5) self._housenet_summations = EventBuffer(5)
[docs] async def process_average_power_event(self, ev: dict): """Ingests an event of type 'average_power'.""" if not KEY_START in ev: return starttime_utc = int(ev[KEY_START]) if 'role' in ev: role = ev['role'] if role == 'house-net': self._housenet_instants.append(ev) await self._process_instants(starttime_utc) elif role == 'solar': if not self._expect_solar: self._expect_solar = True self._solar_instants.append(ev) await self._process_instants(starttime_utc)
[docs] async def process_summation_event(self, ev: dict): """Ingests an event of type 'summation_energy'.""" if not KEY_START in ev: return starttime_utc = int(ev[KEY_START]) if 'role' in ev: role = ev['role'] if role == 'house-net': self._housenet_summations.append(ev) await self._process_summations(starttime_utc) elif role == 'solar': if not self._expect_solar: self._expect_solar = True self._solar_summations.append(ev) await self._process_summations(starttime_utc)
async def _process_instants(self, starttime_utc: int): if self._expect_solar: v = matching_instants(starttime_utc, self._solar_instants, self._housenet_instants) else: v = make_instant_housenet(self._housenet_instants.find_by_key(KEY_START, starttime_utc)) if v is None: return self._solar_instants.evict_older(KEY_START, starttime_utc) self._housenet_instants.evict_older(KEY_START, starttime_utc) await self.emit('from_grid', { 'timestamp_utc': v.starttime_utc, 'watts': v.housenet_watts if v.housenet_watts > 0 else 0, }) await self.emit('home_usage', { 'timestamp_utc': v.starttime_utc, 'watts': max(v.housenet_watts - v.solar_watts, 0), }) if self._expect_solar: await self.emit('solar_generation', { 'timestamp_utc': v.starttime_utc, 'watts': max(-v.solar_watts, 0), }) await self.emit('to_grid', { 'timestamp_utc': v.starttime_utc, 'watts': -v.housenet_watts if v.housenet_watts < 0 else 0, }) async def _process_summations(self, starttime_utc: int): if self._expect_solar: v = matching_summations(starttime_utc, self._solar_summations, self._housenet_summations) else: v = make_summation_housenet(self._housenet_summations.find_by_key(KEY_START, starttime_utc)) if v is None: return self._solar_summations.evict_older(KEY_START, starttime_utc) self._housenet_summations.evict_older(KEY_START, starttime_utc) if not self._resettime_validation(v, starttime_utc): return deltas = self._calculate_summation_deltas(v) self._increment_counters(deltas) await self.emit('from_grid_summation', { 'timestamp_utc': starttime_utc, 'summation_resettime_utc': self._counters.resettime_utc, 'summation_joules': self._counters.from_grid, }) await self.emit('home_usage_summation', { 'timestamp_utc': starttime_utc, 'summation_resettime_utc': self._counters.resettime_utc, 'summation_joules': self._counters.home_use, }) if self._expect_solar: await self.emit('solar_generation_summation', { 'timestamp_utc': starttime_utc, 'summation_resettime_utc': self._counters.resettime_utc, 'summation_joules': self._counters.solar_generation, }) await self.emit('to_grid_summation', { 'timestamp_utc': starttime_utc, 'summation_resettime_utc': self._counters.resettime_utc, 'summation_joules': self._counters.to_grid, }) def _resettime_validation(self, v: SummationValues, starttime_utc: int) -> bool: res = True summ = self._summation if v.solar_resettime != summ.solar_resettime: summ.solar_resettime = v.solar_resettime summ.solar_last = v.solar_summation res = False if v.housenet_resettime != summ.housenet_resettime: summ.housenet_resettime = v.housenet_resettime summ.housenet_last = v.housenet_summation res = False if not res: self._clear_counters(starttime_utc) return res def _clear_counters(self, resettime_utc: int): self._counters = self.Counters(resettime_utc, 0, 0, 0, 0) def _calculate_summation_deltas(self, v: SummationValues) -> SummationDeltas: summ = self._summation solar_delta = v.solar_summation - summ.solar_last summ.solar_last = v.solar_summation housenet_delta = v.housenet_summation - summ.housenet_last summ.housenet_last = v.housenet_summation return SummationDeltas( solar_generation = max(-solar_delta, 0), to_grid = -housenet_delta if housenet_delta < 0 else 0, from_grid = housenet_delta if housenet_delta > 0 else 0, home_use = max(housenet_delta - solar_delta, 0) ) def _increment_counters(self, d: SummationDeltas): self._counters.solar_generation += d.solar_generation self._counters.to_grid += d.to_grid self._counters.from_grid += d.from_grid self._counters.home_use += d.home_use
[docs] @dataclass class SummationInfo: solar_resettime: int solar_last: float housenet_resettime: int housenet_last: float
[docs] @dataclass class Counters: resettime_utc: int solar_generation: float to_grid: float from_grid: float home_use: float