Initial commit: EMS — Energie Management System

Complete self-consumption optimisation system for 7 kWp PV installation:
- Prometheus collector (grid power, SOC, PV, per-phase, compressor)
- Pure decision engine with SOC gates, hysteresis, priority ordering
- Shelly Gen1/Gen2 actuator (SHA-256 Digest auth, PM power readback)
- Viessmann OAuth2 client for DHW temperature control
- PV forecast integration (forecast.solar)
- Wallbox mutual exclusion (VX3 4.6 kW AC output constraint)
- Car-not-charging detection via Shelly PM
- Compressor idle → early SG-Ready release
- Per-phase grid power for single-phase wallbox decisions
- Manual override detection and web UI with override buttons
- Full unit test coverage for decision engine
- systemd service, Makefile, complete documentation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-06 10:02:16 +02:00
commit 99613c52ae
19 changed files with 3924 additions and 0 deletions

15
.gitignore vendored Normal file
View File

@@ -0,0 +1,15 @@
# Build output
/ems
# IDE
.idea/
.vscode/
*.swp
# Secrets — never commit real credentials
/etc/
viessmann-token.json
*-token.json
# Generated HTML docs
ems-documentation.html

76
CLAUDE.md Normal file
View File

@@ -0,0 +1,76 @@
# EMS — Energie Management System
## Project overview
Eigenverbrauchsoptimiertes Energie Management System für eine Heim-PV-Anlage.
Liest Zustandsdaten aus einem bestehenden Prometheus, trifft Schaltentscheidungen
basierend auf SOC-Schwellen und Hysterese, und steuert Shelly-Aktoren per HTTP.
## Architecture
```
Prometheus (192.168.0.23:9090) → Collector (PromQL) → Decision Engine → Actuator → Shelly HTTP
Prometheus Exporter (:9099)
```
### Layers
- **Collector** (`internal/collector/`): Reads system state via PromQL instant queries from existing Prometheus
- **Engine** (`internal/engine/`): Pure decision logic — SOC gates, hysteresis, priority ordering. No side effects, fully testable.
- **Actuator** (`internal/actuator/`): Executes switching via Shelly Gen1/Gen2 HTTP APIs
- **Metrics** (`internal/metrics/`): Exports EMS decisions as Prometheus metrics on :9099
### Key design decisions
- Data reads come from Prometheus (already scraped every 2min by custom Viessmann exporter)
- Data writes (Shelly switching, Viessmann API for WW) are done directly by the EMS
- `pcc_transfer_power_exchange_value`: positive = grid import, negative = grid export
- Decision engine is pure: `func Decide(state, now) → []Action` — easy to unit test
- All thresholds configurable via YAML (`configs/ems-config.yaml`)
## Hardware setup
| Component | Details |
|---|---|
| PV | 7 kWp, Trina VX3 4.6 hybrid inverter |
| Battery | 8 kWh, managed autonomously by VX3 |
| Heat pump | Viessmann Vitocal 200-S AWB-E-AC |
| SG-Ready | Shelly 1 Gen1 @ 192.168.42.90 → heating buffer +5°C |
| Wallbox A | 2kW, Shelly Plus 1 PM Gen2 @ 192.168.42.107 |
| Wallbox B | 4kW, Shelly Plus 1 Gen2 @ 192.168.42.51 |
| WW control | Viessmann API (OAuth2, refresh token) — NOT YET IMPLEMENTED |
## Prometheus metrics (read from existing instance)
- `pcc_transfer_power_exchange_value` — grid power (pos=import, neg=export)
- `ess_stateOfCharge_value` — battery SOC (0-100%)
- `photovoltaic_production_current_value` — current PV production (W)
- `ess_power_value` — battery charge/discharge (W)
- `heating_compressors_0_power_value` — heat pump compressor power (W)
- `heating_sensors_temperature_outside_value` — outdoor temp (°C)
## Decision logic summary
1. SOC gates: <50% all blocked, 50-70% SG-Ready only, 70-90% +WB_A, >90% all
2. Export threshold + hysteresis (4min on, 6min off)
3. Priority: SG-Ready (heating only) → WB_A (2kW) → WB_B (4kW)
4. Shutdown: reverse order, respecting min runtimes (15min WB, 30min SG-Ready)
5. Emergency brake: SOC drops below threshold → immediate shutdown
## TODO / open items
- [ ] Viessmann API OAuth2 write integration (WW-Solltemperatur)
- [ ] PV Forecast integration (Solcast/forecast.solar) for strategic layer
- [ ] Modbus TCP/RS485 for faster PCC data (Phase 3)
- [ ] Grafana dashboard for EMS metrics
- [ ] go.sum generation (`go mod tidy`)
- [ ] Integration tests with mock Prometheus / mock Shelly endpoints
## Commands
```bash
make build # Build binary
make test # Run unit tests
make dry-run # Run without switching (log only)
make run # Run with live switching
make install # Install as systemd service
```
## Code style
- Go 1.23+, structured logging via `log/slog`
- Idiomatic Go: no frameworks, standard library where possible
- Engine is pure (no I/O) — all side effects in Collector and Actuator
- Config via YAML, parsed with gopkg.in/yaml.v3
- Prometheus metrics via prometheus/client_golang

26
Makefile Normal file
View File

@@ -0,0 +1,26 @@
BINARY := ems
CONFIG := configs/ems-config.yaml
.PHONY: build test dry-run run install lint
build:
go build -o $(BINARY) .
test:
go test ./...
dry-run: build
./$(BINARY) -config $(CONFIG) -dry-run
run: build
./$(BINARY) -config $(CONFIG)
install: build
install -Dm755 $(BINARY) /usr/local/bin/$(BINARY)
install -Dm644 $(CONFIG) /etc/ems/ems-config.yaml
install -Dm644 systemd/ems.service /etc/systemd/system/ems.service
systemctl daemon-reload
@echo "Run: systemctl enable --now ems"
lint:
go vet ./...

405
README.md Normal file
View File

@@ -0,0 +1,405 @@
# EMS — Energie Management System
Self-consumption optimisation for a home PV installation with battery storage, heat pump, and EV wallboxes. Reads live data from Prometheus, makes switching decisions based on SOC thresholds and export surplus, and controls Shelly actuators via HTTP.
---
## Table of Contents
1. [System Overview](#system-overview)
2. [Hardware Setup](#hardware-setup)
3. [Architecture](#architecture)
4. [Decision Logic](#decision-logic)
5. [Configuration Reference](#configuration-reference)
6. [Deployment](#deployment)
7. [Web Interface](#web-interface)
8. [Prometheus Metrics](#prometheus-metrics)
9. [Branch Strategy](#branch-strategy)
10. [Development](#development)
---
## System Overview
```
PV panels (7 kWp)
Trina VX3 4.6 hybrid inverter ──► Battery (8 kWh)
▼ AC output (max 4.6 kW)
Grid connection (PCC)
├──► House loads (heat pump, household)
├──► SG-Ready relay (Shelly Gen1) ──► Viessmann Vitocal 200-S
├──► Wallbox A 2kW (Shelly Plus 1 PM) ──► EV charging
└──► Wallbox B 4kW (Shelly Plus 1) ──► EV charging
```
The EMS runs on a Raspberry Pi on the local network. Every 2 minutes it:
1. Reads system state from Prometheus (grid power, battery SOC, PV production, phase currents, compressor power)
2. Reads actual relay states from Shelly devices (with live power from PM variants)
3. Fetches a daily PV forecast (forecast.solar)
4. Runs the decision engine → produces a list of switching actions
5. Executes actions via Shelly HTTP API and Viessmann cloud API
---
## Hardware Setup
| Component | Details |
|-----------|---------|
| PV | 7 kWp, Trina VX3 4.6 hybrid inverter |
| Battery | 8 kWh, managed autonomously by VX3 |
| Heat pump | Viessmann Vitocal 200-S AWB-E-AC |
| SG-Ready | Shelly 1 Gen1 @ 192.168.42.90 — activates heating buffer boost (+5°C) |
| Wallbox A | 2 kW single-phase, Shelly Plus 1 PM Gen2 @ 192.168.42.185 |
| Wallbox B | 4 kW 3-phase, Shelly Plus 1 Gen2 @ 192.168.42.51 |
| WW control | Viessmann API (OAuth2) — sets DHW target temperature |
| Data source | Prometheus @ 192.168.0.23:9090 (custom Viessmann exporter) |
| EMS host | Raspberry Pi, runs as systemd service |
### VX3 AC output constraint
The inverter's AC output is limited to **4.6 kW**. Only one wallbox may be active at a time:
- Wallbox A (2 kW) OR Wallbox B (4 kW) — never both simultaneously.
---
## Architecture
```
Prometheus (192.168.0.23:9090)
│ PromQL instant queries (every 2 min)
internal/collector/ — SystemState{GridPowerW, BatterySOC, PVProductionW,
PhaseL1/2/3PowerW, CompressorPowerW, ...}
internal/engine/ — Pure decision logic (no I/O, fully unit-tested)
│ Decide(state, now, wwBoostC) → []Action
internal/actuator/ — Executes switching
├── Shelly Gen1: GET /relay/0?turn=on|off
├── Shelly Gen2: POST /rpc/Switch.Set (SHA-256 Digest auth)
│ + reads live apower from /rpc/Switch.GetStatus
└── Viessmann API: OAuth2, sets DHW temperature via IoT API
internal/forecast/ — Daily PV forecast from forecast.solar (cached)
internal/viessmann/ — OAuth2 token lifecycle + DHW temperature API
internal/status/ — Web status page (HTML) + manual override handler
internal/metrics/ — Prometheus exporter on :9099
internal/config/ — YAML config loader
```
### Key design decisions
- **Data reads** come from Prometheus (already scraped every 2 min by the Viessmann exporter). No direct Modbus/RS485.
- **Data writes** (Shelly switching, Viessmann DHW temp) are done directly by the EMS.
- **Engine is pure**: `Decide()` has no side effects — easy to unit test without mocks.
- **Shelly state read-back every cycle**: detects manual switches (Shelly app, physical button) and applies an override lockout.
- **Per-device resilience**: if one Shelly is unreachable, the others still work.
- **Wallbox PM integration**: Shelly Plus 1 PM reports live power — used to detect when a car stops charging and release the wallbox slot.
- **Compressor idle detection**: if SG-Ready is active but the heat pump compressor drops to near-zero power, SG-Ready is released early (before its minimum runtime) since the boost is no longer being used.
---
## Decision Logic
### SOC Gates
Battery SOC determines which consumers are allowed at all:
| SOC range | Allowed consumers |
|-----------|-------------------|
| < 50% | None (all blocked) |
| 5070% | SG-Ready, WW boost |
| 7090% | SG-Ready, WW boost, Wallbox A |
| ≥ 90% | All consumers |
### Turn-on sequence (priority order)
Each cycle, if grid power is negative (exporting):
1. **SG-Ready** (heating season OctApr only): export > 500 W for 4 min continuously
2. **WW boost** (12:3018:00 time window + PV forecast required): export > 500 W for 4 min
3. **Wallbox A** (2 kW, single-phase): export > 1500 W on best phase for 4 min. Blocked if Wallbox B is active.
4. **Wallbox B** (4 kW, 3-phase): total export > 3800 W for 4 min. Blocked if Wallbox A is active.
### Shutdown sequence (reverse priority)
If grid import > 200 W for 6 min continuously:
Wallbox B → Wallbox A → WW → SG-Ready (one consumer per cycle, respecting minimum runtimes)
Minimum runtimes: Wallbox 15 min, SG-Ready 30 min.
### Emergency brake
If SOC drops below its allowed threshold, the consumer is shut off immediately (min-runtime ignored).
### WW temperature setpoint
| PV forecast | DHW target |
|-------------|-----------|
| < 15 kWh | 48°C (base) |
| 1525 kWh | 51°C (+3°C) |
| ≥ 25 kWh | 53°C (+5°C) |
The heat pump's own hysteresis fires the compressor when DHW drops 5°C below target.
### Early release conditions
- **SG-Ready**: released early if `CompressorPowerW < 50 W` for 3 consecutive cycles (~6 min). The heat pump has finished its boost cycle.
- **Wallbox A/B**: released early if Shelly PM reads `< 50 W` for 3 consecutive cycles. Car is full or unplugged.
### Manual override detection
Every cycle the EMS reads back actual Shelly relay states. If the hardware state differs from the engine's expected state, a manual override is detected. The consumer is locked from EMS control for 1 hour (configurable). Shown on the status page.
---
## Configuration Reference
All parameters are in `configs/ems-config.yaml` (or `/etc/ems/ems-config.yaml` when installed).
```yaml
prometheus:
url: "http://<host>:9090"
metrics:
grid_power_exchange: "pcc_transfer_power_exchange_value" # +import/-export (W)
battery_soc: "ess_stateOfCharge_value" # 0100%
pv_production: "photovoltaic_production_current_value" # kW (multiplied by 1000)
battery_power: "ess_power_value" # W
compressor_power: "heating_compressors_0_power_value" # W
ambient_temp: "heating_sensors_temperature_outside_value" # °C
phase_l1_power: "pcc_ac_active_power_phaseOne" # W
phase_l2_power: "pcc_ac_active_power_phaseTwo" # W
phase_l3_power: "pcc_ac_active_power_phaseThree" # W
shelly:
sg_ready:
ip: "<ip>"
gen: 1 # Gen1: relay endpoint
wallbox_a:
ip: "<ip>"
gen: 2 # Gen2: rpc endpoint
power_w: 2000 # rated power (informational)
password: "" # Shelly admin password (SHA-256 Digest auth)
wallbox_b:
ip: "<ip>"
gen: 2
power_w: 4000
password: ""
viessmann:
token_file: "/etc/ems/viessmann-token.json"
client_id: "<oauth2 client id>"
installation_id: "<id>"
gateway_serial: "<serial>"
device_id: "0"
soc_thresholds:
block_all: 50 # below this: nothing runs
sg_ready_only: 70 # 5070%: SG-Ready + WW only
plus_wallbox_a: 90 # 7090%: + Wallbox A
all_consumers: 90 # ≥90%: all consumers
hysteresis:
export_on_duration: "4m" # export must persist before switching on
import_off_duration: "6m" # import must persist before switching off
min_runtime_wallbox: "15m" # wallbox minimum on-time before shutdown
min_runtime_sg_ready: "30m" # SG-Ready minimum on-time before shutdown
thresholds:
sg_ready_export_w: -500 # export needed for SG-Ready (negative = export)
ww_export_w: -500 # export needed for WW boost
wallbox_a_export_w: -1800 # total export fallback for Wallbox A
wallbox_a_phase_export_w: -1500 # per-phase export needed for single-phase Wallbox A
wallbox_b_export_w: -3800 # total export needed for Wallbox B
import_off_w: 200 # import level that starts shutdown timer
consumers:
compressor_idle_w: 50 # compressor below this = heat pump idle (W)
wallbox_min_charge_w: 50 # wallbox below this = car not charging (W)
idle_cycles: 3 # consecutive idle cycles before early release
strategic:
forecast_high_kwh: 25 # high-forecast day threshold
forecast_mid_kwh: 15 # medium-forecast day threshold
ww_base_c: 48 # normal DHW setpoint (°C)
ww_boost_high_c: 5 # additional °C on high-forecast days
ww_boost_mid_c: 3 # additional °C on medium-forecast days
ww_window_start: "12:30" # WW boost only within this window
ww_window_end: "18:00"
schedule_on: "08:00"
schedule_off: "20:00"
season:
heating_start_month: 10 # October
heating_end_month: 4 # April (wrap-around handled)
forecast:
enabled: true
lat: <latitude>
lon: <longitude>
declination: <panel tilt degrees>
azimuth: <degrees from south: 0=south, 90=west, -90=east>
kwp: <installed peak power kWp>
ems:
poll_interval: "2m"
listen_addr: ":9099"
log_level: "info" # debug | info | warn | error
state_file: "/run/ems/heartbeat"
recovery_timeout: "1h" # ignore Shelly state if EMS was down longer than this
override_timeout: "1h" # how long a manual Shelly change is respected
```
---
## Deployment
### Prerequisites
- Go 1.23+
- Prometheus instance scraping the Viessmann exporter
- Shelly devices on the local network
- Viessmann API credentials (OAuth2 refresh token in `/etc/ems/viessmann-token.json`)
### Install
```bash
# Build and install as systemd service
sudo make install
sudo systemctl enable --now ems
# Check status
sudo systemctl status ems
sudo journalctl -u ems -f
```
### Makefile targets
```bash
make build # Build binary → ./ems
make test # Run unit tests
make dry-run # Run without executing actions (log only)
make run # Run with live switching (foreground)
make install # Build + install binary, config, systemd unit
```
### Config file locations (when installed)
| File | Purpose |
|------|---------|
| `/etc/ems/ems-config.yaml` | Main configuration |
| `/etc/ems/viessmann-token.json` | OAuth2 refresh token |
| `/run/ems/heartbeat` | Written each cycle — used for state recovery on restart |
### Viessmann token file format
```json
{
"access_token": "...",
"refresh_token": "...",
"expires_in": 3600,
"validToTimeDate": "2024-01-01T12:00:00Z"
}
```
The EMS automatically refreshes the token when it expires. The updated token is written back to the file.
---
## Web Interface
Available at `http://<pi-ip>:9099/`
Shows:
- Current grid power, battery SOC, PV production
- Per-phase grid power (L1/L2/L3)
- Each consumer: state, manual override status, Shelly PM live power
- Today's PV forecast
- Manual override buttons for Shelly consumers (SG-Ready, Wallbox A, Wallbox B)
Manual overrides via the web UI switch the hardware directly. The EMS detects the change next cycle via state read-back and applies the override lockout automatically.
---
## Prometheus Metrics
The EMS exports its own metrics on `:9099/metrics`:
| Metric | Description |
|--------|-------------|
| `ems_grid_power_watts` | Current grid power (positive=import, negative=export) |
| `ems_battery_soc_percent` | Battery SOC (0100%) |
| `ems_pv_production_watts` | Current PV production |
| `ems_grid_import_kwh_total` | Cumulative grid import estimate |
| `ems_grid_export_kwh_total` | Cumulative grid export estimate |
| `ems_consumer_active{consumer}` | 1 if consumer is active, 0 otherwise |
| `ems_switch_cycles_total{consumer,action}` | Total switching events |
| `ems_decision_duration_seconds` | Engine decision latency |
---
## Branch Strategy
This repository uses separate branches per installation:
| Branch | Purpose |
|--------|---------|
| `main` | Core code + generic config template |
| `house/lutz` | Lutz's installation (7 kWp, Vitocal 200-S, 2+4 kW wallboxes) |
| `house/son` | Son's installation (hardware TBD) |
**Workflow:**
- Bug fixes and new features are developed on `main`
- Each house branch is kept up to date by merging from `main`: `git merge main`
- House-specific config changes stay on the house branch and are never merged back to `main`
- The config in `main` contains placeholder values — it is a template, not a working config
---
## Development
### Running tests
```bash
make test
# or
go test ./...
```
The engine package has full unit test coverage. Tests run without any hardware or network — the engine is a pure function and all dependencies are injected via structs.
### Adding a new consumer
1. Add a `ConsumerXxx Consumer = iota` constant in `internal/engine/engine.go`
2. Add its `String()` case
3. Add a `ShellyDevice` (or other actuator) entry to `internal/config/config.go`
4. Add the switch case in `internal/actuator/actuator.go` `executeOne()`
5. Add it to the priority order in `Decide()` and `shutdownLastConsumer()`
6. Add it to `ReadAllStates()` in the actuator
7. Update `configs/ems-config.yaml`
### Project structure
```
.
├── main.go # Entry point, control loop, HTTP server
├── configs/
│ └── ems-config.yaml # Configuration (template on main, real values on house branches)
├── internal/
│ ├── actuator/ # Shelly HTTP control + Viessmann DHW API
│ ├── collector/ # Prometheus PromQL queries → SystemState
│ ├── config/ # YAML config types and loader
│ ├── engine/ # Pure decision logic (fully unit-tested)
│ ├── forecast/ # forecast.solar daily PV forecast (cached)
│ ├── metrics/ # Prometheus exporter
│ ├── status/ # Web status page + override handler
│ └── viessmann/ # OAuth2 token management + IoT API client
└── systemd/
└── ems.service # systemd unit file
```

106
configs/ems-config.yaml Normal file
View File

@@ -0,0 +1,106 @@
# EMS Configuration
# All thresholds and timing parameters are configurable here.
prometheus:
url: "http://192.168.0.23:9090"
# Metric names from the custom Viessmann exporter
metrics:
grid_power_exchange: "pcc_transfer_power_exchange_value" # positive = import, negative = export
battery_soc: "ess_stateOfCharge_value" # 0-100%
pv_production: "photovoltaic_production_current_value" # watts
battery_power: "ess_power_value" # watts
compressor_power: "heating_compressors_0_power_value" # watts
ambient_temp: "heating_sensors_temperature_outside_value" # °C
phase_l1_power: "pcc_ac_active_power_phaseOne" # per-phase grid power L1 (W)
phase_l2_power: "pcc_ac_active_power_phaseTwo" # per-phase grid power L2 (W)
phase_l3_power: "pcc_ac_active_power_phaseThree" # per-phase grid power L3 (W)
# Shelly actuators
shelly:
sg_ready:
ip: "192.168.42.90"
gen: 1 # Gen1: /relay/0?turn=on|off
wallbox_a:
ip: "192.168.42.185"
gen: 2 # Gen2: /rpc/Switch.Set
power_w: 2000
password: "" # set Shelly admin password if auth is enabled
wallbox_b:
ip: "192.168.42.51"
gen: 2 # Gen2: /rpc/Switch.Set
power_w: 4000
password: "" # set Shelly admin password if auth is enabled
# Viessmann API (write access for WW temperature)
viessmann:
token_file: "/etc/ems/viessmann-token.json" # OAuth2 refresh token
client_id: "5d1235548dfaa01ce37db8f865d2df4d"
installation_id: "1253124"
gateway_serial: "7637415006796210"
device_id: "0"
# SOC thresholds — which consumers are allowed at which battery level
soc_thresholds:
block_all: 50 # below 50%: no consumers
sg_ready_only: 70 # 50-70%: only SG-Ready (heating period)
plus_wallbox_a: 90 # 70-90%: + Wallbox A (2kW)
all_consumers: 90 # above 90%: all consumers
# Hysteresis settings
hysteresis:
export_on_duration: "4m" # export must persist for 4 min before switching on
import_off_duration: "6m" # import must persist for 6 min before switching off
min_runtime_wallbox: "15m"
min_runtime_sg_ready: "30m"
# Power thresholds (grid_power_exchange is positive=import, so we check for negative values)
# These are the NEGATIVE thresholds (export) that trigger consumers
thresholds:
sg_ready_export_w: -500 # export > 500W → SG-Ready
ww_export_w: -500 # export > 500W → WW boost
wallbox_a_export_w: -1800 # export > 1800W → Wallbox A (total, fallback if no phase data)
wallbox_a_phase_export_w: -1500 # per-phase export needed for single-phase Wallbox A
wallbox_b_export_w: -3800 # export > 3800W → Wallbox B
import_off_w: 200 # import > 200W → start shutdown timer
# Strategic layer (PV forecast based, daily)
strategic:
forecast_high_kwh: 25
forecast_mid_kwh: 15
ww_base_c: 48 # normal WW setpoint (°C)
ww_boost_high_c: 5 # +5°C on high-forecast days (>25 kWh)
ww_boost_mid_c: 3 # +3°C on medium-forecast days (>15 kWh)
ww_window_start: "12:30" # WW boost only allowed from 12:30
ww_window_end: "18:00" # ... until 18:00
schedule_on: "08:00"
schedule_off: "20:00"
# Consumer behavior — idle detection thresholds
consumers:
compressor_idle_w: 50 # heat pump compressor below this = idle (W)
wallbox_min_charge_w: 50 # wallbox below this = car not charging (W)
idle_cycles: 3 # consecutive idle cycles before early release (~6 min at 2-min poll)
# Season detection
season:
heating_start_month: 10 # October
heating_end_month: 4 # April
# PV forecast — forecast.solar (free, no API key required)
forecast:
enabled: true
lat: 48.7839
lon: 9.3889
declination: 20 # panel tilt in degrees
azimuth: 5 # degrees from south (south=0, west=90, east=-90)
kwp: 7.0 # installed peak power
# EMS operational settings
ems:
poll_interval: "2m"
listen_addr: ":9099" # Prometheus metrics endpoint
log_level: "info"
state_file: "/run/ems/heartbeat" # written each cycle for state recovery
recovery_timeout: "1h" # ignore Shelly state if EMS was down longer
override_timeout: "1h" # how long a manual Shelly change is respected

19
go.mod Normal file
View File

@@ -0,0 +1,19 @@
module github.com/tb/ems
go 1.23
require (
github.com/prometheus/client_golang v1.19.1
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/sys v0.17.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)

32
go.sum Normal file
View File

@@ -0,0 +1,32 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,295 @@
package actuator
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"regexp"
"strings"
"time"
"github.com/tb/ems/internal/config"
"github.com/tb/ems/internal/engine"
"github.com/tb/ems/internal/viessmann"
)
// Actuator executes switching decisions on physical devices.
type Actuator struct {
client *http.Client
cfg *config.Config
viessmann *viessmann.Client // nil if not configured
logger *slog.Logger
}
// NewActuator creates a new actuator.
// vc may be nil if Viessmann integration is not configured.
func NewActuator(cfg *config.Config, vc *viessmann.Client, logger *slog.Logger) *Actuator {
return &Actuator{
client: &http.Client{
Timeout: 5 * time.Second,
},
cfg: cfg,
viessmann: vc,
logger: logger,
}
}
// Execute performs a list of switching actions.
func (a *Actuator) Execute(ctx context.Context, actions []engine.Action) error {
for _, action := range actions {
if err := a.executeOne(ctx, action); err != nil {
a.logger.Error("action failed",
"consumer", action.Consumer,
"turn_on", action.TurnOn,
"error", err,
)
// Continue with other actions even if one fails
continue
}
a.logger.Info("action executed",
"consumer", action.Consumer,
"turn_on", action.TurnOn,
"reason", action.Reason,
)
}
return nil
}
func (a *Actuator) executeOne(ctx context.Context, action engine.Action) error {
switch action.Consumer {
case engine.ConsumerSGReady:
return a.switchShellyGen1(ctx, a.cfg.Shelly.SGReady, action.TurnOn)
case engine.ConsumerWW:
return a.setDHWTemperature(ctx, action.TargetTempC)
case engine.ConsumerWallboxA:
return a.switchShellyGen2(ctx, a.cfg.Shelly.WallboxA, action.TurnOn)
case engine.ConsumerWallboxB:
return a.switchShellyGen2(ctx, a.cfg.Shelly.WallboxB, action.TurnOn)
default:
return fmt.Errorf("unknown consumer: %v", action.Consumer)
}
}
// setDHWTemperature sets the WW temperature via the Viessmann API.
func (a *Actuator) setDHWTemperature(ctx context.Context, tempC float64) error {
if a.viessmann == nil {
return fmt.Errorf("Viessmann client not configured")
}
return a.viessmann.SetDHWTemperature(ctx, tempC)
}
// switchShellyGen1 controls a Shelly Gen1 device (relay endpoint).
// API: http://<ip>/relay/0?turn=on|off
func (a *Actuator) switchShellyGen1(ctx context.Context, dev config.ShellyDevice, turnOn bool) error {
state := "off"
if turnOn {
state = "on"
}
url := fmt.Sprintf("http://%s/relay/0?turn=%s", dev.IP, state)
a.logger.Debug("shelly gen1 request", "url", url)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("shelly gen1 %s: %w", dev.IP, err)
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("shelly gen1 %s returned %d", dev.IP, resp.StatusCode)
}
return nil
}
// switchShellyGen2 controls a Shelly Gen2/Plus device (RPC endpoint).
// API: http://<ip>/rpc/Switch.Set {"id":0,"on":true|false}
func (a *Actuator) switchShellyGen2(ctx context.Context, dev config.ShellyDevice, turnOn bool) error {
payload := fmt.Sprintf(`{"id":0,"on":%t}`, turnOn)
body, err := a.gen2Request(ctx, dev, "/rpc/Switch.Set", payload)
if err != nil {
return err
}
var result struct {
WasOn bool `json:"was_on"`
}
if err := json.Unmarshal(body, &result); err != nil {
a.logger.Warn("could not parse shelly gen2 response", "body", string(body))
}
return nil
}
// gen2Request performs a POST to a Shelly Gen2 RPC endpoint, handling Digest auth
// transparently when a password is configured on the device.
func (a *Actuator) gen2Request(ctx context.Context, dev config.ShellyDevice, path, payload string) ([]byte, error) {
url := "http://" + dev.IP + path
a.logger.Debug("shelly gen2 request", "url", url)
do := func(authHeader string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, strings.NewReader(payload))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if authHeader != "" {
req.Header.Set("Authorization", authHeader)
}
return a.client.Do(req)
}
resp, err := do("")
if err != nil {
return nil, fmt.Errorf("shelly gen2 %s: %w", dev.IP, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusUnauthorized && dev.Password != "" {
// Digest auth: parse challenge, compute response, retry
challenge := resp.Header.Get("WWW-Authenticate")
authHeader := digestAuthHeader("admin", dev.Password, http.MethodPost, path, challenge)
resp2, err := do(authHeader)
if err != nil {
return nil, fmt.Errorf("shelly gen2 %s: %w", dev.IP, err)
}
defer resp2.Body.Close()
if resp2.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp2.Body)
return nil, fmt.Errorf("shelly gen2 %s returned %d: %s", dev.IP, resp2.StatusCode, b)
}
return io.ReadAll(resp2.Body)
}
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("shelly gen2 %s returned %d: %s", dev.IP, resp.StatusCode, b)
}
return io.ReadAll(resp.Body)
}
// digestAuthHeader computes an HTTP Digest Authorization header.
// Shelly Gen2 requires SHA-256 with qop=auth (RFC 7616).
func digestAuthHeader(username, password, method, uri, challenge string) string {
realm := digestParam(challenge, "realm")
nonce := digestParam(challenge, "nonce")
// Fixed nc/cnonce — one request per nonce is sufficient for our use case
const nc = "00000001"
const cnonce = "ems00001"
ha1 := sha256hex(username + ":" + realm + ":" + password)
ha2 := sha256hex(method + ":" + uri)
response := sha256hex(ha1 + ":" + nonce + ":" + nc + ":" + cnonce + ":auth:" + ha2)
return fmt.Sprintf(
`Digest username="%s", realm="%s", nonce="%s", uri="%s", algorithm=SHA-256, qop=auth, nc=%s, cnonce="%s", response="%s"`,
username, realm, nonce, uri, nc, cnonce, response,
)
}
var digestParamRe = regexp.MustCompile(`(\w+)="([^"]*)"`)
func digestParam(header, key string) string {
for _, m := range digestParamRe.FindAllStringSubmatch(header, -1) {
if m[1] == key {
return m[2]
}
}
return ""
}
func sha256hex(s string) string {
h := sha256.Sum256([]byte(s))
return fmt.Sprintf("%x", h)
}
// ReadAllStates reads the current relay state (and power, if available) from every
// configured Shelly device. Unreachable devices are logged and skipped — only
// successfully read devices are returned, so callers should not assume all consumers
// are present in the map.
func (a *Actuator) ReadAllStates(ctx context.Context) (map[engine.Consumer]engine.DeviceStatus, error) {
states := make(map[engine.Consumer]engine.DeviceStatus)
type entry struct {
consumer engine.Consumer
label string
read func() (engine.DeviceStatus, error)
}
devices := []entry{
{engine.ConsumerSGReady, "sg_ready", func() (engine.DeviceStatus, error) {
on, err := a.ReadShellyGen1State(ctx, a.cfg.Shelly.SGReady)
return engine.DeviceStatus{On: on}, err
}},
{engine.ConsumerWallboxA, "wallbox_a", func() (engine.DeviceStatus, error) {
return a.ReadShellyGen2Status(ctx, a.cfg.Shelly.WallboxA)
}},
{engine.ConsumerWallboxB, "wallbox_b", func() (engine.DeviceStatus, error) {
return a.ReadShellyGen2Status(ctx, a.cfg.Shelly.WallboxB)
}},
}
for _, d := range devices {
status, err := d.read()
if err != nil {
a.logger.Warn("could not read Shelly state", "device", d.label, "error", err)
continue
}
states[d.consumer] = status
}
if len(states) == 0 {
return nil, fmt.Errorf("all Shelly devices unreachable")
}
return states, nil
}
// ReadShellyGen1State reads the current state of a Shelly Gen1 relay.
func (a *Actuator) ReadShellyGen1State(ctx context.Context, dev config.ShellyDevice) (bool, error) {
url := fmt.Sprintf("http://%s/relay/0", dev.IP)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return false, fmt.Errorf("creating request: %w", err)
}
resp, err := a.client.Do(req)
if err != nil {
return false, fmt.Errorf("reading shelly gen1 %s: %w", dev.IP, err)
}
defer resp.Body.Close()
var state struct {
IsOn bool `json:"ison"`
}
if err := json.NewDecoder(resp.Body).Decode(&state); err != nil {
return false, fmt.Errorf("decoding shelly state: %w", err)
}
return state.IsOn, nil
}
// ReadShellyGen2Status reads the current state and active power of a Shelly Gen2 switch.
// Power is only meaningful when the relay is on; it is 0 for devices without a power meter.
func (a *Actuator) ReadShellyGen2Status(ctx context.Context, dev config.ShellyDevice) (engine.DeviceStatus, error) {
body, err := a.gen2Request(ctx, dev, "/rpc/Switch.GetStatus", `{"id":0}`)
if err != nil {
return engine.DeviceStatus{}, err
}
var resp struct {
Output bool `json:"output"`
APower float64 `json:"apower"` // active power in W; present on PM variants
}
if err := json.Unmarshal(body, &resp); err != nil {
return engine.DeviceStatus{}, fmt.Errorf("decoding shelly gen2 status: %w", err)
}
return engine.DeviceStatus{On: resp.Output, PowerW: resp.APower}, nil
}

View File

@@ -0,0 +1,179 @@
package actuator
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"github.com/tb/ems/internal/config"
"github.com/tb/ems/internal/engine"
)
// mockShelly simulates both Gen1 and Gen2 Shelly HTTP APIs.
type mockShelly struct {
state bool // current relay state
calls []string
}
func (m *mockShelly) handler() http.Handler {
mux := http.NewServeMux()
// Gen1: GET /relay/0?turn=on|off or GET /relay/0 (read state)
mux.HandleFunc("/relay/0", func(w http.ResponseWriter, r *http.Request) {
m.calls = append(m.calls, r.Method+" "+r.URL.String())
if turn := r.URL.Query().Get("turn"); turn != "" {
m.state = turn == "on"
}
json.NewEncoder(w).Encode(map[string]interface{}{"ison": m.state})
})
// Gen2: POST /rpc/Switch.Set or POST /rpc/Switch.GetStatus
mux.HandleFunc("/rpc/Switch.Set", func(w http.ResponseWriter, r *http.Request) {
m.calls = append(m.calls, r.Method+" "+r.URL.Path)
var body struct {
On bool `json:"on"`
}
json.NewDecoder(r.Body).Decode(&body)
wasOn := m.state
m.state = body.On
json.NewEncoder(w).Encode(map[string]interface{}{"was_on": wasOn})
})
mux.HandleFunc("/rpc/Switch.GetStatus", func(w http.ResponseWriter, r *http.Request) {
m.calls = append(m.calls, r.Method+" "+r.URL.Path)
json.NewEncoder(w).Encode(map[string]interface{}{"output": m.state})
})
return mux
}
func (m *mockShelly) start(t *testing.T) *httptest.Server {
t.Helper()
srv := httptest.NewServer(m.handler())
t.Cleanup(srv.Close)
return srv
}
// ipFrom extracts host:port from an httptest server URL.
func ipFrom(srv *httptest.Server) string {
u, _ := url.Parse(srv.URL)
return u.Host
}
func testActuator(t *testing.T, sgReadyIP, wallboxAIP, wallboxBIP string) *Actuator {
t.Helper()
cfg := &config.Config{
Shelly: config.ShellyConfig{
SGReady: config.ShellyDevice{IP: sgReadyIP, Gen: 1},
WallboxA: config.ShellyDevice{IP: wallboxAIP, Gen: 2},
WallboxB: config.ShellyDevice{IP: wallboxBIP, Gen: 2},
},
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
return NewActuator(cfg, nil, logger)
}
func TestExecuteTurnOnSGReady(t *testing.T) {
sg := &mockShelly{}
sgSrv := sg.start(t)
dummy := &mockShelly{}
dummySrv := dummy.start(t)
act := testActuator(t, ipFrom(sgSrv), ipFrom(dummySrv), ipFrom(dummySrv))
err := act.Execute(context.Background(), []engine.Action{
{Consumer: engine.ConsumerSGReady, TurnOn: true, Reason: "test"},
})
if err != nil {
t.Fatalf("Execute failed: %v", err)
}
if !sg.state {
t.Error("SG-Ready should be ON after TurnOn action")
}
if len(sg.calls) != 1 || !strings.Contains(sg.calls[0], "turn=on") {
t.Errorf("expected one GET /relay/0?turn=on call, got %v", sg.calls)
}
}
func TestExecuteTurnOffWallboxA(t *testing.T) {
wbA := &mockShelly{state: true} // starts ON
wbASrv := wbA.start(t)
dummy := &mockShelly{}
dummySrv := dummy.start(t)
act := testActuator(t, ipFrom(dummySrv), ipFrom(wbASrv), ipFrom(dummySrv))
err := act.Execute(context.Background(), []engine.Action{
{Consumer: engine.ConsumerWallboxA, TurnOn: false, Reason: "import"},
})
if err != nil {
t.Fatalf("Execute failed: %v", err)
}
if wbA.state {
t.Error("Wallbox A should be OFF after TurnOff action")
}
}
func TestReadAllStates(t *testing.T) {
sg := &mockShelly{state: true}
sgSrv := sg.start(t)
wbA := &mockShelly{state: false}
wbASrv := wbA.start(t)
wbB := &mockShelly{state: true}
wbBSrv := wbB.start(t)
act := testActuator(t, ipFrom(sgSrv), ipFrom(wbASrv), ipFrom(wbBSrv))
states, err := act.ReadAllStates(context.Background())
if err != nil {
t.Fatalf("ReadAllStates failed: %v", err)
}
if !states[engine.ConsumerSGReady].On {
t.Error("SG-Ready should be ON")
}
if states[engine.ConsumerWallboxA].On {
t.Error("Wallbox A should be OFF")
}
if !states[engine.ConsumerWallboxB].On {
t.Error("Wallbox B should be ON")
}
}
func TestExecuteMultipleActions(t *testing.T) {
sg := &mockShelly{}
sgSrv := sg.start(t)
wbA := &mockShelly{}
wbASrv := wbA.start(t)
wbB := &mockShelly{}
wbBSrv := wbB.start(t)
act := testActuator(t, ipFrom(sgSrv), ipFrom(wbASrv), ipFrom(wbBSrv))
err := act.Execute(context.Background(), []engine.Action{
{Consumer: engine.ConsumerSGReady, TurnOn: true},
{Consumer: engine.ConsumerWallboxA, TurnOn: true},
{Consumer: engine.ConsumerWallboxB, TurnOn: true},
})
if err != nil {
t.Fatalf("Execute failed: %v", err)
}
if !sg.state || !wbA.state || !wbB.state {
t.Error("all consumers should be ON")
}
}

View File

@@ -0,0 +1,195 @@
package collector
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"net/url"
"strconv"
"time"
"github.com/tb/ems/internal/config"
)
// SystemState represents the current state of the energy system,
// collected from Prometheus metrics.
type SystemState struct {
Timestamp time.Time
GridPowerW float64 // positive = import, negative = export
BatterySOC float64 // 0-100
PVProductionW float64 // current PV production in watts
BatteryPowerW float64 // battery charge/discharge power
CompressorPowerW float64 // heat pump compressor power
AmbientTempC float64 // outdoor temperature
PhaseL1PowerW float64 // per-phase grid power L1 (positive=import, negative=export)
PhaseL2PowerW float64 // per-phase grid power L2
PhaseL3PowerW float64 // per-phase grid power L3
}
// IsExporting returns true if the system is exporting to grid.
func (s SystemState) IsExporting() bool {
return s.GridPowerW < 0
}
// ExportW returns the export power as a positive number, or 0 if importing.
func (s SystemState) ExportW() float64 {
if s.GridPowerW < 0 {
return -s.GridPowerW
}
return 0
}
// ImportW returns the import power as a positive number, or 0 if exporting.
func (s SystemState) ImportW() float64 {
if s.GridPowerW > 0 {
return s.GridPowerW
}
return 0
}
// Collector reads the current system state from a Prometheus instance.
type Collector struct {
client *http.Client
baseURL string
metrics map[string]string
logger *slog.Logger
}
// NewCollector creates a new Prometheus collector.
func NewCollector(cfg *config.Config, logger *slog.Logger) *Collector {
return &Collector{
client: &http.Client{
Timeout: 10 * time.Second,
},
baseURL: cfg.Prometheus.URL,
metrics: cfg.Prometheus.Metrics,
logger: logger,
}
}
// Collect queries Prometheus for the current system state.
func (c *Collector) Collect(ctx context.Context) (SystemState, error) {
state := SystemState{
Timestamp: time.Now(),
}
type metricTarget struct {
name string
dest *float64
scale float64 // unit conversion multiplier (1.0 = no conversion)
}
targets := []metricTarget{
{"grid_power_exchange", &state.GridPowerW, 1},
{"battery_soc", &state.BatterySOC, 1},
{"pv_production", &state.PVProductionW, 1000}, // kW → W
{"battery_power", &state.BatteryPowerW, 1},
{"compressor_power", &state.CompressorPowerW, 1},
{"ambient_temp", &state.AmbientTempC, 1},
{"phase_l1_power", &state.PhaseL1PowerW, 1},
{"phase_l2_power", &state.PhaseL2PowerW, 1},
{"phase_l3_power", &state.PhaseL3PowerW, 1},
}
for _, t := range targets {
metricName, ok := c.metrics[t.name]
if !ok {
c.logger.Warn("metric not configured", "key", t.name)
continue
}
val, err := c.queryInstant(ctx, metricName)
if err != nil {
c.logger.Error("failed to query metric",
"key", t.name,
"metric", metricName,
"error", err,
)
continue
}
*t.dest = val * t.scale
}
c.logger.Info("collected system state",
"grid_w", state.GridPowerW,
"soc", state.BatterySOC,
"pv_w", state.PVProductionW,
"compressor_w", state.CompressorPowerW,
"l1_w", state.PhaseL1PowerW,
"l2_w", state.PhaseL2PowerW,
"l3_w", state.PhaseL3PowerW,
)
return state, nil
}
// promResponse represents the Prometheus API response for instant queries.
type promResponse struct {
Status string `json:"status"`
Data promData `json:"data"`
}
type promData struct {
ResultType string `json:"resultType"`
Result []promResult `json:"result"`
}
type promResult struct {
Metric map[string]string `json:"metric"`
Value [2]interface{} `json:"value"` // [timestamp, "value"]
}
// queryInstant performs a Prometheus instant query and returns the scalar value.
func (c *Collector) queryInstant(ctx context.Context, query string) (float64, error) {
u, err := url.Parse(c.baseURL + "/api/v1/query")
if err != nil {
return 0, fmt.Errorf("parsing URL: %w", err)
}
q := u.Query()
q.Set("query", query)
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return 0, fmt.Errorf("creating request: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return 0, fmt.Errorf("querying prometheus: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("prometheus returned status %d", resp.StatusCode)
}
var promResp promResponse
if err := json.NewDecoder(resp.Body).Decode(&promResp); err != nil {
return 0, fmt.Errorf("decoding response: %w", err)
}
if promResp.Status != "success" {
return 0, fmt.Errorf("prometheus query failed: %s", promResp.Status)
}
if len(promResp.Data.Result) == 0 {
return 0, fmt.Errorf("no data for query %q", query)
}
// Value is [timestamp, "string_value"]
valStr, ok := promResp.Data.Result[0].Value[1].(string)
if !ok {
return 0, fmt.Errorf("unexpected value type for %q", query)
}
val, err := strconv.ParseFloat(valStr, 64)
if err != nil {
return 0, fmt.Errorf("parsing value %q: %w", valStr, err)
}
return val, nil
}

203
internal/config/config.go Normal file
View File

@@ -0,0 +1,203 @@
package config
import (
"fmt"
"os"
"time"
"gopkg.in/yaml.v3"
)
// Config is the top-level EMS configuration.
type Config struct {
Prometheus PrometheusConfig `yaml:"prometheus"`
Shelly ShellyConfig `yaml:"shelly"`
Viessmann ViessmannConfig `yaml:"viessmann"`
SOC SOCThresholds `yaml:"soc_thresholds"`
Hysteresis HysteresisConfig `yaml:"hysteresis"`
Thresholds PowerThresholds `yaml:"thresholds"`
Consumers ConsumersConfig `yaml:"consumers"`
Strategic StrategicConfig `yaml:"strategic"`
Season SeasonConfig `yaml:"season"`
Forecast ForecastConfig `yaml:"forecast"`
EMS EMSConfig `yaml:"ems"`
}
// PrometheusConfig holds Prometheus connection settings.
type PrometheusConfig struct {
URL string `yaml:"url"`
Metrics map[string]string `yaml:"metrics"`
}
// ShellyConfig holds all Shelly actuator addresses.
type ShellyConfig struct {
SGReady ShellyDevice `yaml:"sg_ready"`
WallboxA ShellyDevice `yaml:"wallbox_a"`
WallboxB ShellyDevice `yaml:"wallbox_b"`
}
// ShellyDevice represents a single Shelly device.
type ShellyDevice struct {
IP string `yaml:"ip"`
Gen int `yaml:"gen"`
PowerW int `yaml:"power_w"`
Password string `yaml:"password"` // optional; Gen2 uses HTTP Digest auth
}
// ViessmannConfig holds Viessmann API credentials (write access for WW temp).
type ViessmannConfig struct {
TokenFile string `yaml:"token_file"`
ClientID string `yaml:"client_id"`
InstallationID string `yaml:"installation_id"`
GatewaySerial string `yaml:"gateway_serial"`
DeviceID string `yaml:"device_id"`
}
// SOCThresholds defines the battery SOC levels that gate consumers.
type SOCThresholds struct {
BlockAll float64 `yaml:"block_all"`
SGReadyOnly float64 `yaml:"sg_ready_only"`
PlusWallboxA float64 `yaml:"plus_wallbox_a"`
AllConsumers float64 `yaml:"all_consumers"`
}
// HysteresisConfig defines timing parameters for switching decisions.
type HysteresisConfig struct {
ExportOnDuration string `yaml:"export_on_duration"`
ImportOffDuration string `yaml:"import_off_duration"`
MinRuntimeWallbox string `yaml:"min_runtime_wallbox"`
MinRuntimeSGReady string `yaml:"min_runtime_sg_ready"`
}
func (h *HysteresisConfig) ExportOnDurationParsed() time.Duration {
d, _ := time.ParseDuration(h.ExportOnDuration)
return d
}
func (h *HysteresisConfig) ImportOffDurationParsed() time.Duration {
d, _ := time.ParseDuration(h.ImportOffDuration)
return d
}
func (h *HysteresisConfig) MinRuntimeWallboxParsed() time.Duration {
d, _ := time.ParseDuration(h.MinRuntimeWallbox)
return d
}
func (h *HysteresisConfig) MinRuntimeSGReadyParsed() time.Duration {
d, _ := time.ParseDuration(h.MinRuntimeSGReady)
return d
}
// PowerThresholds defines the grid power levels that trigger switching.
// Export thresholds are negative (grid exports = negative grid power).
type PowerThresholds struct {
SGReadyExportW float64 `yaml:"sg_ready_export_w"`
WWExportW float64 `yaml:"ww_export_w"`
WallboxAExportW float64 `yaml:"wallbox_a_export_w"`
WallboxAPhaseExportW float64 `yaml:"wallbox_a_phase_export_w"` // per-phase export for single-phase WallboxA
WallboxBExportW float64 `yaml:"wallbox_b_export_w"`
ImportOffW float64 `yaml:"import_off_w"`
}
// ConsumersConfig holds per-consumer behavior thresholds.
type ConsumersConfig struct {
CompressorIdleW int `yaml:"compressor_idle_w"` // below this = heat pump compressor idle (W)
WallboxMinChargeW int `yaml:"wallbox_min_charge_w"` // below this = car not charging (W)
IdleCycles int `yaml:"idle_cycles"` // consecutive idle cycles before early release
}
// StrategicConfig holds PV forecast based strategic settings.
type StrategicConfig struct {
ForecastHighKWh float64 `yaml:"forecast_high_kwh"`
ForecastMidKWh float64 `yaml:"forecast_mid_kwh"`
WWBoostHighC float64 `yaml:"ww_boost_high_c"`
WWBoostMidC float64 `yaml:"ww_boost_mid_c"`
WWBaseC float64 `yaml:"ww_base_c"` // normal WW setpoint (°C)
WWWindowStart string `yaml:"ww_window_start"` // e.g. "12:30"
WWWindowEnd string `yaml:"ww_window_end"` // e.g. "18:00"
ScheduleOn string `yaml:"schedule_on"`
ScheduleOff string `yaml:"schedule_off"`
}
// SeasonConfig defines the heating season by month range.
type SeasonConfig struct {
HeatingStartMonth int `yaml:"heating_start_month"`
HeatingEndMonth int `yaml:"heating_end_month"`
}
// ForecastConfig holds forecast.solar API parameters.
type ForecastConfig struct {
Enabled bool `yaml:"enabled"`
Lat float64 `yaml:"lat"`
Lon float64 `yaml:"lon"`
Declination int `yaml:"declination"` // panel tilt in degrees
Azimuth int `yaml:"azimuth"` // degrees from south (south=0, west=90)
KWp float64 `yaml:"kwp"` // installed peak power
}
// EMSConfig holds operational settings for the EMS daemon.
type EMSConfig struct {
PollInterval string `yaml:"poll_interval"`
ListenAddr string `yaml:"listen_addr"`
LogLevel string `yaml:"log_level"`
StateFile string `yaml:"state_file"`
RecoveryTimeout string `yaml:"recovery_timeout"`
OverrideTimeout string `yaml:"override_timeout"`
}
func (e *EMSConfig) PollIntervalParsed() time.Duration {
d, _ := time.ParseDuration(e.PollInterval)
return d
}
func (e *EMSConfig) RecoveryTimeoutParsed() time.Duration {
d, err := time.ParseDuration(e.RecoveryTimeout)
if err != nil {
return time.Hour // safe default
}
return d
}
func (e *EMSConfig) OverrideTimeoutParsed() time.Duration {
d, err := time.ParseDuration(e.OverrideTimeout)
if err != nil {
return time.Hour // safe default
}
return d
}
// Load reads and parses the YAML config file at the given path.
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("reading config file: %w", err)
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("parsing config file: %w", err)
}
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
return &cfg, nil
}
func (c *Config) validate() error {
if c.Prometheus.URL == "" {
return fmt.Errorf("prometheus.url is required")
}
if c.EMS.PollInterval == "" {
return fmt.Errorf("ems.poll_interval is required")
}
if _, err := time.ParseDuration(c.EMS.PollInterval); err != nil {
return fmt.Errorf("ems.poll_interval %q: %w", c.EMS.PollInterval, err)
}
if c.EMS.ListenAddr == "" {
return fmt.Errorf("ems.listen_addr is required")
}
return nil
}

622
internal/engine/engine.go Normal file
View File

@@ -0,0 +1,622 @@
package engine
import (
"fmt"
"log/slog"
"time"
"github.com/tb/ems/internal/collector"
"github.com/tb/ems/internal/config"
)
// Consumer identifies a controllable load.
type Consumer int
const (
ConsumerSGReady Consumer = iota
ConsumerWW // domestic hot water boost via Viessmann API
ConsumerWallboxA
ConsumerWallboxB
)
func (c Consumer) String() string {
switch c {
case ConsumerSGReady:
return "sg_ready"
case ConsumerWW:
return "ww"
case ConsumerWallboxA:
return "wallbox_a"
case ConsumerWallboxB:
return "wallbox_b"
default:
return "unknown"
}
}
// Action represents a switching decision.
type Action struct {
Consumer Consumer
TurnOn bool
Reason string
TargetTempC float64 // non-zero for ConsumerWW: the absolute temperature to set
}
// DeviceStatus holds the hardware-reported state of a consumer device,
// as read back from the physical device each cycle.
type DeviceStatus struct {
On bool
PowerW float64 // measured active power; 0 if device has no power meter
}
// ConsumerState tracks the runtime state of a single consumer.
type ConsumerState struct {
Active bool
ActivatedAt time.Time // when it was last turned on
ManualOverride bool
OverrideUntil time.Time
LowPowerCycles int // consecutive cycles with power below minimum threshold
}
// OverrideInfo is returned to callers that need to display or record override state.
type OverrideInfo struct {
Active bool
Until time.Time
}
// HysteresisState tracks the timing for hysteresis decisions.
type HysteresisState struct {
// How long has export been above the on-threshold continuously?
ExportSinceAbove map[Consumer]time.Time
// How long has import been above the off-threshold continuously?
ImportSinceAbove time.Time
}
// Engine is the EMS decision engine.
// It is pure: given a state snapshot and timing info, it returns actions.
// No network calls, no side effects — fully testable.
type Engine struct {
cfg *config.Config
consumers map[Consumer]*ConsumerState
hyst HysteresisState
logger *slog.Logger
}
// NewEngine creates a new decision engine.
func NewEngine(cfg *config.Config, logger *slog.Logger) *Engine {
return &Engine{
cfg: cfg,
consumers: map[Consumer]*ConsumerState{
ConsumerSGReady: {},
ConsumerWW: {},
ConsumerWallboxA: {},
ConsumerWallboxB: {},
},
hyst: HysteresisState{
ExportSinceAbove: make(map[Consumer]time.Time),
},
logger: logger,
}
}
// Decide evaluates the current system state and returns a list of actions.
// wwBoostC is the WW temperature boost in °C derived from the PV forecast
// (0 = no forecast / forecast too low to warrant boosting).
func (e *Engine) Decide(state collector.SystemState, now time.Time, wwBoostC float64) []Action {
var actions []Action
soc := state.BatterySOC
gridW := state.GridPowerW // positive = import, negative = export
heatingPeriod := e.isHeatingPeriod(now)
wwWindow := e.isWWWindow(now)
allowed := e.allowedConsumers(soc)
e.logger.Debug("decision input",
"grid_w", gridW,
"soc", soc,
"heating_period", heatingPeriod,
"ww_window", wwWindow,
"ww_boost_c", wwBoostC,
"allowed", allowed,
)
// --- SOC emergency brake ---
actions = append(actions, e.socEmergencyBrake(soc, now)...)
// --- WW window shutdown ---
// If WW is active but we're outside the allowed time window, reset immediately.
if cs := e.consumers[ConsumerWW]; cs.Active && !wwWindow {
e.logger.Info("WW window ended, resetting DHW temperature")
cs.Active = false
actions = append(actions, Action{
Consumer: ConsumerWW,
TurnOn: false,
TargetTempC: e.cfg.Strategic.WWBaseC,
Reason: "WW time window ended",
})
}
// --- Shutdown logic (reverse priority order) ---
if gridW > e.cfg.Thresholds.ImportOffW {
if e.hyst.ImportSinceAbove.IsZero() {
e.hyst.ImportSinceAbove = now
}
importDuration := now.Sub(e.hyst.ImportSinceAbove)
if importDuration >= e.cfg.Hysteresis.ImportOffDurationParsed() {
if a := e.shutdownLastConsumer(now); a != nil {
actions = append(actions, *a)
e.hyst.ImportSinceAbove = time.Time{}
}
}
} else {
e.hyst.ImportSinceAbove = time.Time{}
}
// --- Compressor idle: release SG-Ready early if heat pump stopped ---
if cs := e.consumers[ConsumerSGReady]; cs.Active {
if state.CompressorPowerW < float64(e.cfg.Consumers.CompressorIdleW) {
cs.LowPowerCycles++
e.logger.Debug("SG-Ready: compressor idle cycle",
"compressor_w", state.CompressorPowerW,
"idle_cycles", cs.LowPowerCycles,
)
if cs.LowPowerCycles >= e.cfg.Consumers.IdleCycles {
e.logger.Info("SG-Ready released early: compressor idle",
"idle_cycles", cs.LowPowerCycles,
"compressor_w", state.CompressorPowerW,
)
cs.Active = false
cs.LowPowerCycles = 0
actions = append(actions, Action{
Consumer: ConsumerSGReady,
TurnOn: false,
Reason: fmt.Sprintf("compressor idle for %d cycles", e.cfg.Consumers.IdleCycles),
})
}
} else {
cs.LowPowerCycles = 0
}
}
// --- Car not charging: release wallbox if Shelly PM shows no draw ---
for _, wb := range []Consumer{ConsumerWallboxA, ConsumerWallboxB} {
cs := e.consumers[wb]
if !cs.Active {
continue
}
if cs.LowPowerCycles >= e.cfg.Consumers.IdleCycles {
e.logger.Info("wallbox released: car not charging",
"consumer", wb,
"low_power_cycles", cs.LowPowerCycles,
)
cs.Active = false
cs.LowPowerCycles = 0
actions = append(actions, Action{
Consumer: wb,
TurnOn: false,
Reason: fmt.Sprintf("car not charging for %d cycles", e.cfg.Consumers.IdleCycles),
})
}
}
// --- Turn-on logic (priority order) ---
if gridW <= 0 {
// P1: SG-Ready (heating period only)
if heatingPeriod {
actions = append(actions, e.evaluateTurnOn(
ConsumerSGReady, gridW, e.cfg.Thresholds.SGReadyExportW,
allowed, now,
)...)
}
// P2: WW boost (time window + forecast required)
if wwWindow && wwBoostC > 0 {
actions = append(actions, e.evaluateWWTurnOn(gridW, wwBoostC, allowed, now)...)
}
// P3: Wallbox A (2kW, single-phase) — only if Wallbox B is not active.
// Uses per-phase export check if available, otherwise falls back to total.
// Re-reads Active state directly so a same-cycle activation of WallboxB blocks WallboxA.
if !e.consumers[ConsumerWallboxB].Active {
phaseGridW := gridW // fallback: total grid power
if state.PhaseL1PowerW != 0 || state.PhaseL2PowerW != 0 || state.PhaseL3PowerW != 0 {
phaseGridW = min3(state.PhaseL1PowerW, state.PhaseL2PowerW, state.PhaseL3PowerW)
}
threshold := e.cfg.Thresholds.WallboxAExportW
if e.cfg.Thresholds.WallboxAPhaseExportW != 0 {
threshold = e.cfg.Thresholds.WallboxAPhaseExportW
}
actions = append(actions, e.evaluateTurnOn(
ConsumerWallboxA, phaseGridW, threshold,
allowed, now,
)...)
}
// P4: Wallbox B (4kW, 3-phase) — only if Wallbox A is not active.
// Re-reads Active state so a same-cycle activation of WallboxA blocks WallboxB.
if !e.consumers[ConsumerWallboxA].Active {
actions = append(actions, e.evaluateTurnOn(
ConsumerWallboxB, gridW, e.cfg.Thresholds.WallboxBExportW,
allowed, now,
)...)
}
}
return actions
}
// min3 returns the minimum of three float64 values.
func min3(a, b, c float64) float64 {
if b < a {
a = b
}
if c < a {
return c
}
return a
}
// allowedConsumers returns which consumers are allowed based on SOC.
func (e *Engine) allowedConsumers(soc float64) map[Consumer]bool {
allowed := make(map[Consumer]bool)
if soc >= e.cfg.SOC.AllConsumers {
allowed[ConsumerSGReady] = true
allowed[ConsumerWW] = true
allowed[ConsumerWallboxA] = true
allowed[ConsumerWallboxB] = true
} else if soc >= e.cfg.SOC.SGReadyOnly {
allowed[ConsumerSGReady] = true
allowed[ConsumerWW] = true
allowed[ConsumerWallboxA] = true
} else if soc >= e.cfg.SOC.BlockAll {
allowed[ConsumerSGReady] = true
allowed[ConsumerWW] = true
}
// below BlockAll: nothing allowed
return allowed
}
// evaluateTurnOn checks if a consumer should be turned on.
func (e *Engine) evaluateTurnOn(
consumer Consumer,
gridW float64,
threshold float64,
allowed map[Consumer]bool,
now time.Time,
) []Action {
cs := e.consumers[consumer]
// Already active — nothing to do
if cs.Active {
return nil
}
// Manually overridden to OFF — respect until timeout
if cs.ManualOverride && now.Before(cs.OverrideUntil) {
delete(e.hyst.ExportSinceAbove, consumer)
return nil
}
// Not allowed by SOC
if !allowed[consumer] {
delete(e.hyst.ExportSinceAbove, consumer)
return nil
}
// Check if export exceeds the threshold
// gridW is negative for export, threshold is negative (e.g. -1800)
// export > 1800W means gridW < -1800
if gridW > threshold {
// Not enough export
delete(e.hyst.ExportSinceAbove, consumer)
return nil
}
// Export is above threshold — track how long
if _, ok := e.hyst.ExportSinceAbove[consumer]; !ok {
e.hyst.ExportSinceAbove[consumer] = now
}
exportDuration := now.Sub(e.hyst.ExportSinceAbove[consumer])
if exportDuration < e.cfg.Hysteresis.ExportOnDurationParsed() {
// Not long enough yet
return nil
}
// All conditions met — turn on
e.logger.Info("turning on consumer",
"consumer", consumer,
"grid_w", gridW,
"threshold", threshold,
"export_duration", exportDuration,
)
cs.Active = true
cs.ActivatedAt = now
delete(e.hyst.ExportSinceAbove, consumer)
return []Action{{
Consumer: consumer,
TurnOn: true,
Reason: fmt.Sprintf(
"export %.0fW > %.0fW for %s",
-gridW, -threshold, exportDuration,
),
}}
}
// shutdownLastConsumer turns off the lowest-priority active consumer
// that has exceeded its minimum runtime.
func (e *Engine) shutdownLastConsumer(now time.Time) *Action {
// Reverse priority: WallboxB → WallboxA → WW → SGReady
order := []Consumer{ConsumerWallboxB, ConsumerWallboxA, ConsumerWW, ConsumerSGReady}
for _, c := range order {
cs := e.consumers[c]
if !cs.Active {
continue
}
// Manually overridden to ON — don't shut down until override expires
if cs.ManualOverride && now.Before(cs.OverrideUntil) {
e.logger.Debug("skipping shutdown, consumer is manually overridden",
"consumer", c,
"override_until", cs.OverrideUntil.Format("15:04"),
)
continue
}
minRuntime := e.minRuntime(c)
runtime := now.Sub(cs.ActivatedAt)
if runtime < minRuntime {
e.logger.Debug("skipping shutdown, min runtime not reached",
"consumer", c,
"runtime", runtime,
"min_runtime", minRuntime,
)
continue
}
e.logger.Info("shutting down consumer",
"consumer", c,
"runtime", runtime,
)
cs.Active = false
a := &Action{
Consumer: c,
TurnOn: false,
Reason: fmt.Sprintf("import detected, runtime %s", runtime),
}
if c == ConsumerWW {
a.TargetTempC = e.cfg.Strategic.WWBaseC
}
return a
}
return nil
}
// socEmergencyBrake immediately shuts off consumers whose SOC threshold
// is no longer met, ignoring minimum runtimes.
func (e *Engine) socEmergencyBrake(soc float64, now time.Time) []Action {
var actions []Action
allowed := e.allowedConsumers(soc)
for c, cs := range e.consumers {
if !cs.Active {
continue
}
if allowed[c] {
continue
}
e.logger.Warn("SOC emergency brake",
"consumer", c,
"soc", soc,
)
cs.Active = false
a := Action{
Consumer: c,
TurnOn: false,
Reason: fmt.Sprintf("SOC emergency brake: %.0f%%", soc),
}
if c == ConsumerWW {
a.TargetTempC = e.cfg.Strategic.WWBaseC
}
actions = append(actions, a)
}
return actions
}
// isHeatingPeriod returns true if the current month is within the heating season.
func (e *Engine) isHeatingPeriod(now time.Time) bool {
month := int(now.Month())
start := e.cfg.Season.HeatingStartMonth
end := e.cfg.Season.HeatingEndMonth
// Handles wrap-around: e.g. October(10) to April(4)
if start > end {
return month >= start || month <= end
}
return month >= start && month <= end
}
// minRuntime returns the minimum runtime for a consumer.
func (e *Engine) minRuntime(c Consumer) time.Duration {
switch c {
case ConsumerSGReady:
return e.cfg.Hysteresis.MinRuntimeSGReadyParsed()
case ConsumerWallboxA, ConsumerWallboxB:
return e.cfg.Hysteresis.MinRuntimeWallboxParsed()
default:
return 0 // WW has no minimum runtime
}
}
// evaluateWWTurnOn checks whether WW boost should be activated.
// Prerequisites (time window and forecast) are already verified by the caller.
func (e *Engine) evaluateWWTurnOn(gridW, wwBoostC float64, allowed map[Consumer]bool, now time.Time) []Action {
cs := e.consumers[ConsumerWW]
if cs.Active {
return nil
}
if cs.ManualOverride && now.Before(cs.OverrideUntil) {
delete(e.hyst.ExportSinceAbove, ConsumerWW)
return nil
}
if !allowed[ConsumerWW] {
delete(e.hyst.ExportSinceAbove, ConsumerWW)
return nil
}
if gridW > e.cfg.Thresholds.WWExportW {
delete(e.hyst.ExportSinceAbove, ConsumerWW)
return nil
}
if _, ok := e.hyst.ExportSinceAbove[ConsumerWW]; !ok {
e.hyst.ExportSinceAbove[ConsumerWW] = now
}
exportDuration := now.Sub(e.hyst.ExportSinceAbove[ConsumerWW])
if exportDuration < e.cfg.Hysteresis.ExportOnDurationParsed() {
return nil
}
targetTemp := e.cfg.Strategic.WWBaseC + wwBoostC
e.logger.Info("activating WW boost",
"grid_w", gridW,
"ww_boost_c", wwBoostC,
"target_temp_c", targetTemp,
"export_duration", exportDuration,
)
cs.Active = true
cs.ActivatedAt = now
delete(e.hyst.ExportSinceAbove, ConsumerWW)
return []Action{{
Consumer: ConsumerWW,
TurnOn: true,
TargetTempC: targetTemp,
Reason: fmt.Sprintf("export %.0fW for %s, WW boost +%.0f°C", -gridW, exportDuration, wwBoostC),
}}
}
// isWWWindow returns true if the current time falls within the configured WW boost window.
func (e *Engine) isWWWindow(now time.Time) bool {
start, err1 := parseTimeOfDay(e.cfg.Strategic.WWWindowStart, now)
end, err2 := parseTimeOfDay(e.cfg.Strategic.WWWindowEnd, now)
if err1 != nil || err2 != nil {
return false
}
return now.After(start) && now.Before(end)
}
// parseTimeOfDay parses "HH:MM" and returns a time.Time on the same day as ref.
func parseTimeOfDay(s string, ref time.Time) (time.Time, error) {
var h, m int
if _, err := fmt.Sscanf(s, "%d:%d", &h, &m); err != nil {
return time.Time{}, fmt.Errorf("invalid time-of-day %q: %w", s, err)
}
return time.Date(ref.Year(), ref.Month(), ref.Day(), h, m, 0, 0, ref.Location()), nil
}
// ConsumerStates returns a snapshot of all consumer states (for metrics).
func (e *Engine) ConsumerStates() map[Consumer]bool {
states := make(map[Consumer]bool)
for c, cs := range e.consumers {
states[c] = cs.Active
}
return states
}
// RecoverState injects externally-read consumer states on startup.
// Does not set override flags — startup state is treated as the EMS baseline.
func (e *Engine) RecoverState(states map[Consumer]DeviceStatus) {
for c, status := range states {
if cs, ok := e.consumers[c]; ok {
cs.Active = status.On
cs.ManualOverride = false
cs.OverrideUntil = time.Time{}
cs.LowPowerCycles = 0
// ActivatedAt left as zero: unknown start time means the consumer
// is always considered to have exceeded its minimum runtime.
cs.ActivatedAt = time.Time{}
}
}
e.logger.Info("consumer state recovered from Shelly read-back",
"sg_ready", states[ConsumerSGReady].On,
"wallbox_a", states[ConsumerWallboxA].On,
"wallbox_b", states[ConsumerWallboxB].On,
)
}
// SyncHardwareState compares live hardware states against the engine's internal state.
// Discrepancies indicate an external change (manual override via Shelly app etc.).
// On mismatch: engine state is updated to match hardware, and the consumer is locked
// from EMS control for overrideTimeout.
// On match: expired overrides are cleared, resuming normal EMS control.
// Power readings (from PM-capable devices) update the low-power cycle counter for
// car-not-charging detection.
func (e *Engine) SyncHardwareState(states map[Consumer]DeviceStatus, now time.Time, overrideTimeout time.Duration) {
for c, status := range states {
cs, ok := e.consumers[c]
if !ok {
continue
}
if cs.Active != status.On {
// External change detected
e.logger.Info("manual override detected — external state change",
"consumer", c,
"engine_state", cs.Active,
"hardware_state", status.On,
"override_until", now.Add(overrideTimeout).Format("15:04"),
)
cs.Active = status.On
cs.ManualOverride = true
cs.OverrideUntil = now.Add(overrideTimeout)
cs.LowPowerCycles = 0
if !status.On {
cs.ActivatedAt = time.Time{}
} else {
cs.ActivatedAt = now
}
} else if cs.ManualOverride && now.After(cs.OverrideUntil) {
// Override expired and state matches — resume EMS control
cs.ManualOverride = false
cs.OverrideUntil = time.Time{}
e.logger.Info("manual override expired, resuming EMS control", "consumer", c)
}
// Track low-power cycles for car-not-charging detection (PM devices only).
// Only meaningful when PowerW > 0 (i.e., device has a power meter and is on).
if status.PowerW > 0 {
if cs.Active && status.PowerW < float64(e.cfg.Consumers.WallboxMinChargeW) {
cs.LowPowerCycles++
e.logger.Debug("wallbox low power cycle",
"consumer", c,
"power_w", status.PowerW,
"low_power_cycles", cs.LowPowerCycles,
)
} else {
cs.LowPowerCycles = 0
}
}
}
}
// Overrides returns current override info for all consumers that are overridden.
func (e *Engine) Overrides() map[Consumer]OverrideInfo {
result := make(map[Consumer]OverrideInfo)
for c, cs := range e.consumers {
if cs.ManualOverride {
result[c] = OverrideInfo{Active: true, Until: cs.OverrideUntil}
}
}
return result
}

View File

@@ -0,0 +1,386 @@
package engine
import (
"log/slog"
"os"
"testing"
"time"
"github.com/tb/ems/internal/collector"
"github.com/tb/ems/internal/config"
)
func testConfig() *config.Config {
return &config.Config{
SOC: config.SOCThresholds{
BlockAll: 50,
SGReadyOnly: 70,
PlusWallboxA: 90,
AllConsumers: 90,
},
Hysteresis: config.HysteresisConfig{
ExportOnDuration: "4m",
ImportOffDuration: "6m",
MinRuntimeWallbox: "15m",
MinRuntimeSGReady: "30m",
},
Thresholds: config.PowerThresholds{
SGReadyExportW: -500,
WWExportW: -500,
WallboxAExportW: -1800,
WallboxBExportW: -3800,
ImportOffW: 200,
},
Consumers: config.ConsumersConfig{
CompressorIdleW: 50,
WallboxMinChargeW: 50,
IdleCycles: 3,
},
Season: config.SeasonConfig{
HeatingStartMonth: 10,
HeatingEndMonth: 4,
},
Strategic: config.StrategicConfig{
WWBaseC: 50,
WWWindowStart: "12:30",
WWWindowEnd: "18:00",
},
}
}
func testLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
}
func TestSOCBlocksAll(t *testing.T) {
eng := NewEngine(testConfig(), testLogger())
now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) // January = heating period
state := collector.SystemState{
GridPowerW: -3000, // 3kW export
BatterySOC: 40, // below 50% → all blocked
}
actions := eng.Decide(state, now, 0)
if len(actions) != 0 {
t.Errorf("expected no actions with SOC 40%%, got %d actions", len(actions))
}
}
func TestSOCAllowsSGReady(t *testing.T) {
eng := NewEngine(testConfig(), testLogger())
// Simulate export for >4 minutes to pass hysteresis
base := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) // January = heating
state := collector.SystemState{
GridPowerW: -600, // 600W export, above SG-Ready threshold
BatterySOC: 60, // 50-70% → SG-Ready only
}
// First call — starts hysteresis timer
actions := eng.Decide(state, base, 0)
if len(actions) != 0 {
t.Errorf("expected no actions on first call (hysteresis), got %d", len(actions))
}
// Second call after 5 minutes — hysteresis passed
actions = eng.Decide(state, base.Add(5*time.Minute), 0)
if len(actions) != 1 {
t.Fatalf("expected 1 action after hysteresis, got %d", len(actions))
}
if actions[0].Consumer != ConsumerSGReady {
t.Errorf("expected SG-Ready, got %v", actions[0].Consumer)
}
if !actions[0].TurnOn {
t.Error("expected TurnOn=true")
}
}
func TestSOCBlocksWallboxAt60(t *testing.T) {
eng := NewEngine(testConfig(), testLogger())
base := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC)
state := collector.SystemState{
GridPowerW: -5000, // massive export
BatterySOC: 60, // only SG-Ready allowed
}
// Pass hysteresis
eng.Decide(state, base, 0)
actions := eng.Decide(state, base.Add(5*time.Minute), 0)
// Should only get SG-Ready, no wallboxes
for _, a := range actions {
if a.Consumer == ConsumerWallboxA || a.Consumer == ConsumerWallboxB {
t.Errorf("wallbox should not be activated at SOC 60%%, got %v", a.Consumer)
}
}
}
func TestSGReadyOnlyInHeatingPeriod(t *testing.T) {
eng := NewEngine(testConfig(), testLogger())
// July = NOT heating period
base := time.Date(2025, 7, 15, 12, 0, 0, 0, time.UTC)
state := collector.SystemState{
GridPowerW: -600,
BatterySOC: 95, // all consumers allowed
}
// Pass hysteresis
eng.Decide(state, base, 0)
actions := eng.Decide(state, base.Add(5*time.Minute), 0)
for _, a := range actions {
if a.Consumer == ConsumerSGReady {
t.Error("SG-Ready should not activate outside heating period")
}
}
}
func TestSOCEmergencyBrake(t *testing.T) {
eng := NewEngine(testConfig(), testLogger())
base := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC)
// First, activate SG-Ready with high SOC
state := collector.SystemState{
GridPowerW: -600,
BatterySOC: 95,
}
eng.Decide(state, base, 0)
eng.Decide(state, base.Add(5*time.Minute), 0)
// Now SOC drops below threshold
state.BatterySOC = 45
state.GridPowerW = -600 // still exporting, but SOC is too low
actions := eng.Decide(state, base.Add(10*time.Minute), 0)
foundBrake := false
for _, a := range actions {
if a.Consumer == ConsumerSGReady && !a.TurnOn {
foundBrake = true
}
}
if !foundBrake {
t.Error("expected SOC emergency brake to shut off SG-Ready")
}
}
func TestShutdownReverseOrder(t *testing.T) {
cfg := testConfig()
cfg.Hysteresis.MinRuntimeWallbox = "0s"
cfg.Hysteresis.MinRuntimeSGReady = "0s"
cfg.Hysteresis.ImportOffDuration = "0s"
eng := NewEngine(cfg, testLogger())
base := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC)
// Inject WallboxB + SG-Ready as active (simulating recovery from a previous run
// where WallboxB was switched on manually, bypassing the mutex).
eng.RecoverState(map[Consumer]DeviceStatus{
ConsumerWallboxB: {On: true},
ConsumerSGReady: {On: true},
})
// Import detected — WallboxB should be shut down first (reverse priority order)
state := collector.SystemState{
GridPowerW: 500, // importing
BatterySOC: 95,
}
actions := eng.Decide(state, base, 0)
if len(actions) == 0 {
t.Fatal("expected shutdown action")
}
found := false
for _, a := range actions {
if !a.TurnOn && a.Consumer == ConsumerWallboxB {
found = true
}
}
if !found {
t.Error("expected WallboxB to be shut down first")
}
}
func TestHeatingPeriodDetection(t *testing.T) {
eng := NewEngine(testConfig(), testLogger())
tests := []struct {
month time.Month
expected bool
}{
{time.January, true},
{time.February, true},
{time.March, true},
{time.April, true},
{time.May, false},
{time.June, false},
{time.July, false},
{time.August, false},
{time.September, false},
{time.October, true},
{time.November, true},
{time.December, true},
}
for _, tt := range tests {
t.Run(tt.month.String(), func(t *testing.T) {
date := time.Date(2025, tt.month, 15, 12, 0, 0, 0, time.UTC)
if got := eng.isHeatingPeriod(date); got != tt.expected {
t.Errorf("month %s: got %v, want %v", tt.month, got, tt.expected)
}
})
}
}
func TestWallboxMutualExclusion(t *testing.T) {
cfg := testConfig()
cfg.Hysteresis.ExportOnDuration = "0s"
cfg.Hysteresis.ImportOffDuration = "0s"
eng := NewEngine(cfg, testLogger())
base := time.Date(2025, 7, 15, 12, 0, 0, 0, time.UTC) // summer (no SG-Ready)
// Massive export — enough to meet both wallbox thresholds
state := collector.SystemState{
GridPowerW: -5000,
BatterySOC: 95,
}
// First Decide: WallboxA should activate (P3), WallboxB must be blocked (mutex)
actions := eng.Decide(state, base, 0)
var wbAOn, wbBOn bool
for _, a := range actions {
if a.Consumer == ConsumerWallboxA && a.TurnOn {
wbAOn = true
}
if a.Consumer == ConsumerWallboxB && a.TurnOn {
wbBOn = true
}
}
if !wbAOn {
t.Error("expected WallboxA to activate")
}
if wbBOn {
t.Error("WallboxB must not activate while WallboxA is active (mutex)")
}
// Second Decide with WallboxA still active: WallboxB must still be blocked
actions = eng.Decide(state, base.Add(2*time.Minute), 0)
for _, a := range actions {
if a.Consumer == ConsumerWallboxB && a.TurnOn {
t.Error("WallboxB must not activate while WallboxA is active (second cycle)")
}
}
}
func TestCarNotChargingReleasesWallbox(t *testing.T) {
cfg := testConfig()
cfg.Hysteresis.ExportOnDuration = "0s"
cfg.Consumers.IdleCycles = 3
cfg.Consumers.WallboxMinChargeW = 50
eng := NewEngine(cfg, testLogger())
base := time.Date(2025, 7, 15, 12, 0, 0, 0, time.UTC)
// Activate WallboxA
state := collector.SystemState{GridPowerW: -2000, BatterySOC: 95}
actions := eng.Decide(state, base, 0)
if len(actions) != 1 || actions[0].Consumer != ConsumerWallboxA || !actions[0].TurnOn {
t.Fatalf("expected WallboxA to activate, got %v", actions)
}
// Simulate 3 cycles with Shelly PM reading near zero (car not charging / unplugged)
lowPower := DeviceStatus{On: true, PowerW: 10} // 10W < 50W threshold
for i := 0; i < 3; i++ {
eng.SyncHardwareState(
map[Consumer]DeviceStatus{ConsumerWallboxA: lowPower},
base.Add(time.Duration(i+1)*2*time.Minute),
time.Hour,
)
}
// Decide should now release WallboxA
actions = eng.Decide(state, base.Add(8*time.Minute), 0)
found := false
for _, a := range actions {
if a.Consumer == ConsumerWallboxA && !a.TurnOn {
found = true
}
}
if !found {
t.Error("expected WallboxA to be turned off after 3 low-power cycles")
}
}
func TestCompressorIdleReleasesSGReady(t *testing.T) {
cfg := testConfig()
cfg.Hysteresis.ExportOnDuration = "0s"
cfg.Hysteresis.MinRuntimeSGReady = "30m" // long min-runtime
cfg.Consumers.IdleCycles = 3
cfg.Consumers.CompressorIdleW = 50
eng := NewEngine(cfg, testLogger())
base := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) // January = heating period
// Activate SG-Ready
state := collector.SystemState{
GridPowerW: -600,
BatterySOC: 95,
CompressorPowerW: 1500, // compressor running
}
actions := eng.Decide(state, base, 0)
if len(actions) != 1 || actions[0].Consumer != ConsumerSGReady || !actions[0].TurnOn {
t.Fatalf("expected SG-Ready to activate, got %v", actions)
}
// Compressor drops to idle — 3 consecutive cycles
state.CompressorPowerW = 10 // below idle threshold
for i := 1; i <= 3; i++ {
actions = eng.Decide(state, base.Add(time.Duration(i)*2*time.Minute), 0)
}
// After 3 idle cycles, SG-Ready should be released despite min-runtime not reached
found := false
for _, a := range actions {
if a.Consumer == ConsumerSGReady && !a.TurnOn {
found = true
}
}
if !found {
t.Error("expected SG-Ready to be released early when compressor is idle for 3 cycles")
}
}
func TestMinRuntimeRespected(t *testing.T) {
cfg := testConfig()
cfg.Hysteresis.ExportOnDuration = "0s"
cfg.Hysteresis.MinRuntimeWallbox = "15m"
eng := NewEngine(cfg, testLogger())
base := time.Date(2025, 7, 15, 12, 0, 0, 0, time.UTC) // summer
// Activate Wallbox A
state := collector.SystemState{
GridPowerW: -2000,
BatterySOC: 95,
}
eng.Decide(state, base, 0)
// Try to shutdown after 5 minutes (< 15min minimum)
state.GridPowerW = 500
eng.Decide(state, base.Add(1*time.Minute), 0) // start import timer
actions := eng.Decide(state, base.Add(8*time.Minute), 0) // import for >6min
for _, a := range actions {
if a.Consumer == ConsumerWallboxA && !a.TurnOn {
t.Error("Wallbox A should not be shut down before 15 min runtime")
}
}
}

View File

@@ -0,0 +1,171 @@
package forecast
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"
"github.com/tb/ems/internal/config"
)
// Result holds the fetched forecast for a single day.
type Result struct {
Date time.Time
TotalKWh float64
FetchedAt time.Time
}
// Quality returns a human-readable label based on configured thresholds.
func (r Result) Quality(cfg config.StrategicConfig) string {
switch {
case r.TotalKWh >= cfg.ForecastHighKWh:
return "Sehr gut"
case r.TotalKWh >= cfg.ForecastMidKWh:
return "Gut"
case r.TotalKWh >= 5:
return "Mittel"
default:
return "Schwach"
}
}
// QualityIcon returns a weather icon for the forecast quality.
func (r Result) QualityIcon(cfg config.StrategicConfig) string {
switch {
case r.TotalKWh >= cfg.ForecastHighKWh:
return "☀️"
case r.TotalKWh >= cfg.ForecastMidKWh:
return "🌤️"
case r.TotalKWh >= 5:
return "⛅"
default:
return "☁️"
}
}
// Client fetches daily PV forecasts from forecast.solar and caches the result.
type Client struct {
cfg config.ForecastConfig
strategic config.StrategicConfig
httpClient *http.Client
logger *slog.Logger
mu sync.RWMutex
cached *Result
}
// NewClient creates a new forecast client.
func NewClient(cfg config.ForecastConfig, strategic config.StrategicConfig, logger *slog.Logger) *Client {
return &Client{
cfg: cfg,
strategic: strategic,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
logger: logger,
}
}
// Today returns the forecast for today, fetching from the API if needed.
// Returns a zero Result and no error if forecasting is disabled.
func (c *Client) Today(ctx context.Context) (Result, error) {
if !c.cfg.Enabled {
return Result{}, nil
}
c.mu.RLock()
cached := c.cached
c.mu.RUnlock()
now := time.Now()
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
if cached != nil && cached.Date.Equal(today) {
return *cached, nil
}
result, err := c.fetch(ctx, today)
if err != nil {
// Return stale cache rather than nothing, if we have it
if cached != nil {
c.logger.Warn("forecast fetch failed, using stale cache",
"error", err,
"cached_date", cached.Date.Format("2006-01-02"),
)
return *cached, nil
}
return Result{}, err
}
c.mu.Lock()
c.cached = &result
c.mu.Unlock()
return result, nil
}
// forecastResponse is the forecast.solar API response structure.
type forecastResponse struct {
Result struct {
WattHoursDay map[string]float64 `json:"watt_hours_day"`
} `json:"result"`
Message struct {
Code int `json:"code"`
Text string `json:"text"`
} `json:"message"`
}
func (c *Client) fetch(ctx context.Context, day time.Time) (Result, error) {
url := fmt.Sprintf(
"https://api.forecast.solar/estimate/%.4f/%.4f/%d/%d/%.1f",
c.cfg.Lat, c.cfg.Lon,
c.cfg.Declination, c.cfg.Azimuth,
c.cfg.KWp,
)
c.logger.Debug("fetching forecast", "url", url)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return Result{}, fmt.Errorf("creating request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return Result{}, fmt.Errorf("fetching forecast: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return Result{}, fmt.Errorf("forecast.solar returned status %d", resp.StatusCode)
}
var fr forecastResponse
if err := json.NewDecoder(resp.Body).Decode(&fr); err != nil {
return Result{}, fmt.Errorf("decoding response: %w", err)
}
dateKey := day.Format("2006-01-02")
wh, ok := fr.Result.WattHoursDay[dateKey]
if !ok {
return Result{}, fmt.Errorf("no forecast data for %s", dateKey)
}
result := Result{
Date: day,
TotalKWh: wh / 1000.0,
FetchedAt: time.Now(),
}
c.logger.Info("forecast fetched",
"date", dateKey,
"kwh", result.TotalKWh,
"quality", result.Quality(c.strategic),
)
return result, nil
}

120
internal/metrics/metrics.go Normal file
View File

@@ -0,0 +1,120 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tb/ems/internal/engine"
)
// Metrics holds all EMS Prometheus metrics.
type Metrics struct {
// Current system state
GridPowerW prometheus.Gauge
BatterySOC prometheus.Gauge
PVProductionW prometheus.Gauge
// Consumer states
ConsumerActive *prometheus.GaugeVec
// Decision tracking
SwitchCyclesTotal *prometheus.CounterVec
DecisionDuration prometheus.Histogram
// Actuator health
ActuatorErrors *prometheus.CounterVec
APILatency *prometheus.HistogramVec
// Cumulative
SelfConsumedKWh prometheus.Counter
GridExportKWh prometheus.Counter
GridImportKWh prometheus.Counter
}
// NewMetrics creates and registers all EMS metrics.
func NewMetrics(reg prometheus.Registerer) *Metrics {
m := &Metrics{
GridPowerW: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ems_grid_power_watts",
Help: "Current grid power exchange in watts (positive=import, negative=export)",
}),
BatterySOC: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ems_battery_soc_percent",
Help: "Current battery state of charge in percent",
}),
PVProductionW: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ems_pv_production_watts",
Help: "Current PV production in watts",
}),
ConsumerActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ems_consumer_active",
Help: "Whether a consumer is currently active (1=on, 0=off)",
}, []string{"consumer"}),
SwitchCyclesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ems_switch_cycles_total",
Help: "Total number of switch cycles per consumer",
}, []string{"consumer", "action"}),
DecisionDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "ems_decision_duration_seconds",
Help: "Time taken for the decision engine to evaluate",
Buckets: prometheus.DefBuckets,
}),
ActuatorErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ems_actuator_errors_total",
Help: "Total actuator errors per consumer",
}, []string{"consumer"}),
APILatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ems_api_latency_seconds",
Help: "API call latency per source",
Buckets: []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
}, []string{"source"}),
SelfConsumedKWh: prometheus.NewCounter(prometheus.CounterOpts{
Name: "ems_self_consumed_kwh_total",
Help: "Total energy self-consumed in kWh (estimated)",
}),
GridExportKWh: prometheus.NewCounter(prometheus.CounterOpts{
Name: "ems_grid_export_kwh_total",
Help: "Total energy exported to grid in kWh (estimated)",
}),
GridImportKWh: prometheus.NewCounter(prometheus.CounterOpts{
Name: "ems_grid_import_kwh_total",
Help: "Total energy imported from grid in kWh (estimated)",
}),
}
reg.MustRegister(
m.GridPowerW,
m.BatterySOC,
m.PVProductionW,
m.ConsumerActive,
m.SwitchCyclesTotal,
m.DecisionDuration,
m.ActuatorErrors,
m.APILatency,
m.SelfConsumedKWh,
m.GridExportKWh,
m.GridImportKWh,
)
return m
}
// RecordActions records switching actions in the metrics.
func (m *Metrics) RecordActions(actions []engine.Action) {
for _, a := range actions {
action := "on"
if !a.TurnOn {
action = "off"
}
m.SwitchCyclesTotal.WithLabelValues(a.Consumer.String(), action).Inc()
}
}
// UpdateConsumerStates updates the consumer active gauges.
func (m *Metrics) UpdateConsumerStates(states map[engine.Consumer]bool) {
for c, active := range states {
val := 0.0
if active {
val = 1.0
}
m.ConsumerActive.WithLabelValues(c.String()).Set(val)
}
}

531
internal/status/status.go Normal file
View File

@@ -0,0 +1,531 @@
package status
import (
"fmt"
"html/template"
"net/http"
"sync"
"time"
"github.com/tb/ems/internal/collector"
"github.com/tb/ems/internal/config"
"github.com/tb/ems/internal/engine"
"github.com/tb/ems/internal/forecast"
)
// consumerMeta holds static display info for each consumer.
var consumerMeta = map[engine.Consumer]struct{ Label, Icon string }{
engine.ConsumerSGReady: {"Wärmepumpe Boost (SG-Ready)", "🔥"},
engine.ConsumerWW: {"Warmwasser Boost", "🌡️"},
engine.ConsumerWallboxA: {"Wallbox A (2 kW)", "🔌"},
engine.ConsumerWallboxB: {"Wallbox B (4 kW)", "🔌"},
}
// consumerOrder defines the display order of consumers.
var consumerOrder = []engine.Consumer{
engine.ConsumerSGReady,
engine.ConsumerWW,
engine.ConsumerWallboxA,
engine.ConsumerWallboxB,
}
// consumerRecord tracks the state of a single consumer across cycles.
type consumerRecord struct {
active bool
since time.Time
reason string
manualOverride bool
overrideUntil time.Time
}
// Store holds the latest EMS snapshot and is safe for concurrent use.
type Store struct {
mu sync.RWMutex
lastUpdate time.Time
errMsg string
state collector.SystemState
dryRun bool
wwConfigured bool
strategic config.StrategicConfig
consumers map[engine.Consumer]*consumerRecord
fcResult *forecast.Result
}
// NewStore creates a new status store.
func NewStore(dryRun bool, wwConfigured bool, strategic config.StrategicConfig) *Store {
consumers := make(map[engine.Consumer]*consumerRecord, len(consumerOrder))
for _, c := range consumerOrder {
consumers[c] = &consumerRecord{}
}
return &Store{
dryRun: dryRun,
wwConfigured: wwConfigured,
strategic: strategic,
consumers: consumers,
}
}
// Update records the latest state, actions, overrides, and forecast for this cycle.
func (s *Store) Update(state collector.SystemState, actions []engine.Action, overrides map[engine.Consumer]engine.OverrideInfo, fcResult *forecast.Result, err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastUpdate = time.Now()
s.state = state
if err != nil {
s.errMsg = err.Error()
} else {
s.errMsg = ""
}
if fcResult != nil {
s.fcResult = fcResult
}
for _, a := range actions {
rec, ok := s.consumers[a.Consumer]
if !ok {
continue
}
rec.active = a.TurnOn
rec.since = time.Now()
rec.reason = a.Reason
}
// Sync override state for all consumers
for c, rec := range s.consumers {
if info, overridden := overrides[c]; overridden {
rec.manualOverride = info.Active
rec.overrideUntil = info.Until
} else {
rec.manualOverride = false
rec.overrideUntil = time.Time{}
}
}
}
// --- Template data types ---
type consumerView struct {
Icon string
Label string
ConsumerKey string // e.g. "wallbox_a" — used for the override form
Active bool
Since time.Time
Reason string
Unconfigured bool
CanOverride bool // true for Shelly consumers (hardware read-back available)
ManualOverride bool
OverrideUntil time.Time
}
type forecastView struct {
Available bool
KWh float64
Icon string
Quality string
}
type pageData struct {
LastUpdate time.Time
ErrMsg string
DryRun bool
BatterySOC float64
GridPowerW float64
PVProductionW float64
AmbientTempC float64
IsExporting bool
AbsGridW float64
Forecast forecastView
Consumers []consumerView
}
// SyncConsumerStates updates active flags for all consumers directly from the
// engine's current state. Called every cycle so the status page reflects reality
// even when no Action was produced (e.g. after a manual override is detected).
func (s *Store) SyncConsumerStates(states map[engine.Consumer]bool) {
s.mu.Lock()
defer s.mu.Unlock()
for c, active := range states {
if rec, ok := s.consumers[c]; ok {
rec.active = active
}
}
}
// Handler returns an HTTP handler that renders the status page.
func (s *Store) Handler() http.HandlerFunc {
tmpl := template.Must(template.New("status").Funcs(template.FuncMap{
"formatW": formatW,
"formatSince": func(t time.Time) string {
if t.IsZero() {
return ""
}
d := time.Since(t).Round(time.Minute)
if d < time.Minute {
return "gerade eben"
}
if d < time.Hour {
return fmt.Sprintf("seit %d min", int(d.Minutes()))
}
h := int(d.Hours())
m := int(d.Minutes()) % 60
if m == 0 {
return fmt.Sprintf("seit %d h", h)
}
return fmt.Sprintf("seit %d h %d min", h, m)
},
"socColor": func(soc float64) string {
switch {
case soc >= 80:
return "#16a34a"
case soc >= 50:
return "#d97706"
default:
return "#dc2626"
}
},
}).Parse(htmlTemplate))
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
s.mu.RLock()
data := pageData{
LastUpdate: s.lastUpdate,
ErrMsg: s.errMsg,
DryRun: s.dryRun,
BatterySOC: s.state.BatterySOC,
GridPowerW: s.state.GridPowerW,
PVProductionW: s.state.PVProductionW,
AmbientTempC: s.state.AmbientTempC,
IsExporting: s.state.GridPowerW < 0,
AbsGridW: abs(s.state.GridPowerW),
}
if s.fcResult != nil {
data.Forecast = forecastView{
Available: true,
KWh: s.fcResult.TotalKWh,
Icon: s.fcResult.QualityIcon(s.strategic),
Quality: s.fcResult.Quality(s.strategic),
}
}
for _, c := range consumerOrder {
rec := s.consumers[c]
meta := consumerMeta[c]
cv := consumerView{
Icon: meta.Icon,
Label: meta.Label,
ConsumerKey: c.String(),
Active: rec.active,
Since: rec.since,
Reason: rec.reason,
CanOverride: c != engine.ConsumerWW,
ManualOverride: rec.manualOverride,
OverrideUntil: rec.overrideUntil,
}
if c == engine.ConsumerWW && !s.wwConfigured {
cv.Unconfigured = true
}
data.Consumers = append(data.Consumers, cv)
}
s.mu.RUnlock()
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := tmpl.Execute(w, data); err != nil {
http.Error(w, "template error", http.StatusInternalServerError)
}
}
}
func abs(v float64) float64 {
if v < 0 {
return -v
}
return v
}
func formatW(w float64) string {
if w >= 1000 || w <= -1000 {
return fmt.Sprintf("%.1f kW", w/1000)
}
return fmt.Sprintf("%.0f W", w)
}
const htmlTemplate = `<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="refresh" content="30">
<title>Solar Status</title>
<style>
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #f0fdf4;
color: #1a1a1a;
min-height: 100vh;
padding: 1.25rem 1rem 2rem;
max-width: 480px;
margin: 0 auto;
}
header {
display: flex;
justify-content: space-between;
align-items: baseline;
margin-bottom: 1.25rem;
}
h1 { font-size: 1.4rem; font-weight: 700; color: #14532d; }
.updated { font-size: 0.75rem; color: #6b7280; }
.banner {
border-radius: 10px;
padding: 0.6rem 0.9rem;
font-size: 0.85rem;
margin-bottom: 1rem;
}
.banner.warn { background: #fef9c3; color: #854d0e; border: 1px solid #fde68a; }
.banner.error { background: #fee2e2; color: #991b1b; border: 1px solid #fca5a5; }
.grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0.75rem;
margin-bottom: 1.25rem;
}
.card {
background: #ffffff;
border-radius: 14px;
padding: 1rem;
box-shadow: 0 1px 4px rgba(0,0,0,0.07);
}
.card-label {
font-size: 0.7rem;
text-transform: uppercase;
letter-spacing: 0.06em;
color: #9ca3af;
margin-bottom: 0.4rem;
}
.card-value {
font-size: 1.8rem;
font-weight: 700;
line-height: 1;
}
.card-sub {
font-size: 0.78rem;
color: #6b7280;
margin-top: 0.3rem;
}
/* SOC bar */
.soc-bar {
background: #e5e7eb;
border-radius: 999px;
height: 8px;
margin-top: 0.6rem;
overflow: hidden;
}
.soc-fill {
height: 100%;
border-radius: 999px;
transition: width 0.4s ease;
}
/* Forecast card — full width */
.card.full { grid-column: 1 / -1; }
.forecast-row {
display: flex;
align-items: baseline;
gap: 0.5rem;
}
.forecast-icon { font-size: 1.4rem; line-height: 1; }
.forecast-quality { font-size: 0.85rem; color: #6b7280; margin-top: 0.2rem; }
/* Grid card direction arrow */
.arrow { font-size: 1.1rem; margin-right: 0.1rem; }
/* Consumer list */
.section-title {
font-size: 0.7rem;
text-transform: uppercase;
letter-spacing: 0.06em;
color: #9ca3af;
margin-bottom: 0.6rem;
}
.consumer {
background: #ffffff;
border-radius: 14px;
padding: 0.85rem 1rem;
box-shadow: 0 1px 4px rgba(0,0,0,0.07);
display: flex;
align-items: center;
gap: 0.85rem;
margin-bottom: 0.6rem;
}
.consumer:last-child { margin-bottom: 0; }
.consumer.unconfigured { opacity: 0.45; }
.indicator {
width: 13px;
height: 13px;
border-radius: 50%;
flex-shrink: 0;
}
.indicator.on {
background: #22c55e;
box-shadow: 0 0 0 3px #bbf7d0;
}
.indicator.off { background: #d1d5db; }
.indicator.override {
background: #f59e0b;
box-shadow: 0 0 0 3px #fde68a;
}
.consumer-icon { font-size: 1.2rem; flex-shrink: 0; }
.consumer-body { flex: 1; min-width: 0; }
.consumer-name { font-weight: 600; font-size: 0.9rem; }
.consumer-detail {
font-size: 0.78rem;
color: #6b7280;
margin-top: 0.15rem;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.consumer-detail.active { color: #16a34a; font-weight: 500; }
.consumer-detail.override { color: #d97706; font-weight: 500; }
.override-btn {
border: none;
border-radius: 8px;
padding: 0.35rem 0.75rem;
font-size: 0.78rem;
font-weight: 600;
cursor: pointer;
flex-shrink: 0;
transition: opacity 0.15s;
}
.override-btn:active { opacity: 0.7; }
.override-btn.turn-on { background: #dcfce7; color: #15803d; }
.override-btn.turn-off { background: #fee2e2; color: #b91c1c; }
</style>
</head>
<body>
<header>
<h1>☀️ Solar Status</h1>
{{if not .LastUpdate.IsZero}}
<span class="updated">{{.LastUpdate.Format "15:04:05"}}</span>
{{end}}
</header>
{{if .DryRun}}
<div class="banner warn">⚠️ Testmodus — keine echten Schaltvorgänge</div>
{{end}}
{{if .ErrMsg}}
<div class="banner error">⚠️ {{.ErrMsg}}</div>
{{end}}
<div class="grid">
<!-- Battery -->
<div class="card">
<div class="card-label">Batterie</div>
<div class="card-value" style="color: {{socColor .BatterySOC}}">
{{printf "%.0f" .BatterySOC}}<span style="font-size:1rem;font-weight:400"> %</span>
</div>
<div class="soc-bar">
<div class="soc-fill" style="width:{{printf "%.0f" .BatterySOC}}%; background:{{socColor .BatterySOC}}"></div>
</div>
</div>
<!-- Grid -->
<div class="card">
<div class="card-label">Netz</div>
{{if .IsExporting}}
<div class="card-value" style="color:#16a34a">
<span class="arrow">↑</span>{{formatW .AbsGridW}}
</div>
<div class="card-sub">Einspeisung</div>
{{else}}
<div class="card-value" style="color:#dc2626">
<span class="arrow">↓</span>{{formatW .AbsGridW}}
</div>
<div class="card-sub">Bezug</div>
{{end}}
</div>
<!-- PV -->
<div class="card">
<div class="card-label">PV-Leistung</div>
<div class="card-value" style="color:#d97706">{{formatW .PVProductionW}}</div>
</div>
<!-- Temperature -->
<div class="card">
<div class="card-label">Außentemperatur</div>
<div class="card-value">{{printf "%.1f" .AmbientTempC}}<span style="font-size:1rem;font-weight:400"> °C</span></div>
</div>
<!-- Forecast (full width) -->
{{if .Forecast.Available}}
<div class="card full">
<div class="card-label">Prognose heute</div>
<div class="forecast-row">
<span class="forecast-icon">{{.Forecast.Icon}}</span>
<span class="card-value" style="color:#d97706">{{printf "%.1f" .Forecast.KWh}} kWh</span>
</div>
<div class="forecast-quality">{{.Forecast.Quality}}</div>
</div>
{{end}}
</div>
<div class="section-title">Verbraucher</div>
{{range .Consumers}}
<div class="consumer{{if .Unconfigured}} unconfigured{{end}}">
<div class="indicator {{if .ManualOverride}}override{{else if .Active}}on{{else}}off{{end}}"></div>
<div class="consumer-icon">{{.Icon}}</div>
<div class="consumer-body">
<div class="consumer-name">{{.Label}}</div>
<div class="consumer-detail{{if and .Active (not .ManualOverride)}} active{{end}}{{if .ManualOverride}} override{{end}}">
{{if .Unconfigured}}
nicht konfiguriert
{{else if .ManualOverride}}
Manuell bis {{.OverrideUntil.Format "15:04"}}
{{else if .Active}}
{{formatSince .Since}}
{{else if .Reason}}
{{.Reason}}
{{else}}
Ausgeschaltet
{{end}}
</div>
</div>
{{if .CanOverride}}
<form method="post" action="/override">
<input type="hidden" name="consumer" value="{{.ConsumerKey}}">
{{if .Active}}
<input type="hidden" name="state" value="off">
<button type="submit" class="override-btn turn-off">Aus</button>
{{else}}
<input type="hidden" name="state" value="on">
<button type="submit" class="override-btn turn-on">Ein</button>
{{end}}
</form>
{{end}}
</div>
{{end}}
</body>
</html>`

View File

@@ -0,0 +1,171 @@
package viessmann
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/tb/ems/internal/config"
)
const (
tokenEndpoint = "https://iam.viessmann-climatesolutions.com/idp/v3/token"
apiBase = "https://api.viessmann.com/iot/v2"
)
// tokenFile mirrors the JSON structure stored on disk.
type tokenFile struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
TokenType string `json:"token_type"`
ExpiresIn int `json:"expires_in"`
ValidToTimeDate int64 `json:"validToTimeDate"` // Unix milliseconds
}
// Client manages Viessmann OAuth2 tokens and sends commands to the IoT API.
type Client struct {
cfg config.ViessmannConfig
httpClient *http.Client
logger *slog.Logger
mu sync.Mutex
token tokenFile
}
// NewClient creates a new Viessmann client and loads the token from disk.
func NewClient(cfg config.ViessmannConfig, logger *slog.Logger) (*Client, error) {
c := &Client{
cfg: cfg,
httpClient: &http.Client{Timeout: 15 * time.Second},
logger: logger,
}
if err := c.loadToken(); err != nil {
return nil, fmt.Errorf("loading token: %w", err)
}
return c, nil
}
func (c *Client) loadToken() error {
data, err := os.ReadFile(c.cfg.TokenFile)
if err != nil {
return fmt.Errorf("reading %s: %w", c.cfg.TokenFile, err)
}
return json.Unmarshal(data, &c.token)
}
func (c *Client) saveToken() {
data, err := json.MarshalIndent(c.token, "", " ")
if err != nil {
c.logger.Warn("could not marshal token", "error", err)
return
}
if err := os.WriteFile(c.cfg.TokenFile, data, 0600); err != nil {
c.logger.Warn("could not save token file", "error", err)
}
}
// ensureToken refreshes the access token if it expires within 5 minutes.
// Caller must hold c.mu.
func (c *Client) ensureToken(ctx context.Context) error {
remaining := time.Until(time.UnixMilli(c.token.ValidToTimeDate))
if remaining > 5*time.Minute {
return nil
}
c.logger.Info("refreshing Viessmann access token", "remaining", remaining.Round(time.Second))
form := url.Values{}
form.Set("grant_type", "refresh_token")
form.Set("client_id", c.cfg.ClientID)
form.Set("refresh_token", c.token.RefreshToken)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, tokenEndpoint,
strings.NewReader(form.Encode()))
if err != nil {
return fmt.Errorf("creating token request: %w", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("token refresh: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("token refresh returned %d", resp.StatusCode)
}
var fresh struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
TokenType string `json:"token_type"`
ExpiresIn int `json:"expires_in"`
}
if err := json.NewDecoder(resp.Body).Decode(&fresh); err != nil {
return fmt.Errorf("decoding token response: %w", err)
}
c.token.AccessToken = fresh.AccessToken
if fresh.RefreshToken != "" {
c.token.RefreshToken = fresh.RefreshToken
}
c.token.TokenType = fresh.TokenType
c.token.ExpiresIn = fresh.ExpiresIn
c.token.ValidToTimeDate = time.Now().Add(time.Duration(fresh.ExpiresIn) * time.Second).UnixMilli()
c.saveToken()
c.logger.Info("Viessmann token refreshed")
return nil
}
// featureURL builds the IoT API URL for a device feature.
func (c *Client) featureURL(feature string) string {
return fmt.Sprintf(
"%s/features/installations/%s/gateways/%s/devices/%s/features/%s",
apiBase,
c.cfg.InstallationID,
c.cfg.GatewaySerial,
c.cfg.DeviceID,
feature,
)
}
// SetDHWTemperature sets the domestic hot water target temperature via the Viessmann API.
func (c *Client) SetDHWTemperature(ctx context.Context, tempC float64) error {
c.mu.Lock()
defer c.mu.Unlock()
if err := c.ensureToken(ctx); err != nil {
return fmt.Errorf("ensuring token: %w", err)
}
cmdURL := c.featureURL("heating.dhw.temperature.main") + "/commands/setTargetTemperature"
body := fmt.Sprintf(`{"temperature":%g}`, tempC)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cmdURL, strings.NewReader(body))
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.token.AccessToken)
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("API call: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("Viessmann API returned %d", resp.StatusCode)
}
c.logger.Info("DHW temperature set", "temp_c", tempC)
return nil
}

345
main.go Normal file
View File

@@ -0,0 +1,345 @@
package main
import (
"context"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tb/ems/internal/actuator"
"github.com/tb/ems/internal/collector"
"github.com/tb/ems/internal/config"
"github.com/tb/ems/internal/engine"
"github.com/tb/ems/internal/forecast"
"github.com/tb/ems/internal/metrics"
"github.com/tb/ems/internal/status"
"github.com/tb/ems/internal/viessmann"
)
func main() {
configPath := flag.String("config", "configs/ems-config.yaml", "Path to config file")
dryRun := flag.Bool("dry-run", false, "Run without executing actions (log only)")
flag.Parse()
// Load configuration
cfg, err := config.Load(*configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to load config: %v\n", err)
os.Exit(1)
}
// Setup structured logging
logLevel := slog.LevelInfo
switch cfg.EMS.LogLevel {
case "debug":
logLevel = slog.LevelDebug
case "warn":
logLevel = slog.LevelWarn
case "error":
logLevel = slog.LevelError
}
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: logLevel,
}))
logger.Info("starting EMS",
"config", *configPath,
"dry_run", *dryRun,
"poll_interval", cfg.EMS.PollInterval,
"listen_addr", cfg.EMS.ListenAddr,
)
// Initialize components
coll := collector.NewCollector(cfg, logger)
eng := engine.NewEngine(cfg, logger)
fc := forecast.NewClient(cfg.Forecast, cfg.Strategic, logger)
// Viessmann client — optional, only if credentials are configured
var vc *viessmann.Client
if cfg.Viessmann.InstallationID != "" && cfg.Viessmann.ClientID != "" {
var err error
vc, err = viessmann.NewClient(cfg.Viessmann, logger)
if err != nil {
logger.Warn("Viessmann client init failed, WW boost disabled", "error", err)
} else {
logger.Info("Viessmann client initialized")
}
}
act := actuator.NewActuator(cfg, vc, logger)
// Startup: attempt state recovery from Shelly read-back
recoverCtx, recoverCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer recoverCancel()
if shouldRecover(cfg.EMS.StateFile, cfg.EMS.RecoveryTimeoutParsed(), logger) {
if states, err := act.ReadAllStates(recoverCtx); err != nil {
logger.Warn("state recovery failed, starting with all consumers off", "error", err)
} else {
eng.RecoverState(states)
}
} else {
logger.Info("state recovery skipped (downtime exceeded threshold or first run)")
}
// Prometheus metrics
reg := prometheus.NewRegistry()
m := metrics.NewMetrics(reg)
// Status store (shared between HTTP handler and control loop)
wwConfigured := cfg.Viessmann.InstallationID != ""
statusStore := status.NewStore(*dryRun, wwConfigured, cfg.Strategic)
// Metrics HTTP server
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
mux.HandleFunc("/override", overrideHandler(act, logger))
mux.HandleFunc("/", statusStore.Handler())
srv := &http.Server{
Addr: cfg.EMS.ListenAddr,
Handler: mux,
}
go func() {
logger.Info("metrics server listening", "addr", cfg.EMS.ListenAddr)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
logger.Error("metrics server error", "error", err)
}
}()
// Graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Main control loop
ticker := time.NewTicker(cfg.EMS.PollIntervalParsed())
defer ticker.Stop()
logger.Info("EMS control loop started")
// Run once immediately
runCycle(ctx, coll, eng, act, fc, m, statusStore, cfg, cfg.EMS.StateFile, logger, *dryRun)
for {
select {
case <-ticker.C:
runCycle(ctx, coll, eng, act, fc, m, statusStore, cfg, cfg.EMS.StateFile, logger, *dryRun)
case sig := <-sigCh:
logger.Info("received signal, shutting down", "signal", sig)
cancel()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
srv.Shutdown(shutdownCtx)
return
}
}
}
func runCycle(
ctx context.Context,
coll *collector.Collector,
eng *engine.Engine,
act *actuator.Actuator,
fc *forecast.Client,
m *metrics.Metrics,
store *status.Store,
cfg *config.Config,
stateFile string,
logger *slog.Logger,
dryRun bool,
) {
now := time.Now()
// Step 1: Collect current state from Prometheus
state, err := coll.Collect(ctx)
if err != nil {
logger.Error("collection failed", "error", err)
store.Update(collector.SystemState{}, nil, nil, nil, err)
return
}
// Write heartbeat so next startup can assess downtime
writeHeartbeat(stateFile, logger)
// Step 1b: Read Shelly states and sync to engine (detects manual overrides).
// Partial results are fine — unreachable devices are logged inside ReadAllStates.
if shellyStates, err := act.ReadAllStates(ctx); err != nil {
logger.Warn("all Shelly devices unreachable, skipping override detection", "error", err)
} else {
eng.SyncHardwareState(shellyStates, now, cfg.EMS.OverrideTimeoutParsed())
}
// Update metrics with current state
m.GridPowerW.Set(state.GridPowerW)
m.BatterySOC.Set(state.BatterySOC)
m.PVProductionW.Set(state.PVProductionW)
// Track energy flow (rough estimation based on 2-min intervals)
intervalHours := 2.0 / 60.0
if state.GridPowerW > 0 {
m.GridImportKWh.Add(state.GridPowerW / 1000.0 * intervalHours)
} else {
m.GridExportKWh.Add(-state.GridPowerW / 1000.0 * intervalHours)
}
// Step 2: Fetch forecast (cached; only hits the API once per day)
var fcResult *forecast.Result
if r, err := fc.Today(ctx); err == nil && r.TotalKWh > 0 {
fcResult = &r
}
// Step 3: Run decision engine
wwBoostC := computeWWBoost(fcResult, cfg)
decisionStart := time.Now()
actions := eng.Decide(state, now, wwBoostC)
m.DecisionDuration.Observe(time.Since(decisionStart).Seconds())
// Update consumer state metrics
m.UpdateConsumerStates(eng.ConsumerStates())
// Update status page
store.Update(state, actions, eng.Overrides(), fcResult, nil)
store.SyncConsumerStates(eng.ConsumerStates())
if len(actions) == 0 {
logger.Debug("no actions this cycle",
"grid_w", state.GridPowerW,
"soc", state.BatterySOC,
)
return
}
// Step 4: Execute actions
m.RecordActions(actions)
if dryRun {
for _, a := range actions {
logger.Info("[DRY RUN] would execute",
"consumer", a.Consumer,
"turn_on", a.TurnOn,
"reason", a.Reason,
)
}
return
}
if err := act.Execute(ctx, actions); err != nil {
logger.Error("execution failed", "error", err)
}
}
// shouldRecover checks the heartbeat file to decide whether to read back
// Shelly states on startup. Returns false if the file is missing (first run)
// or older than the recovery timeout.
func shouldRecover(stateFile string, timeout time.Duration, logger *slog.Logger) bool {
if stateFile == "" {
return false
}
info, err := os.Stat(stateFile)
if err != nil {
return false // first run or file was cleaned up
}
age := time.Since(info.ModTime())
if age > timeout {
logger.Info("heartbeat too old, skipping state recovery",
"age", age.Round(time.Second),
"timeout", timeout,
)
return false
}
return true
}
// overrideHandler handles manual on/off requests from the status page.
// For Shelly consumers, it switches the hardware directly; the existing
// SyncHardwareState mechanism detects the change next cycle and applies
// the override lockout automatically.
func overrideHandler(act *actuator.Actuator, logger *slog.Logger) http.HandlerFunc {
consumerByKey := map[string]engine.Consumer{
"sg_ready": engine.ConsumerSGReady,
"wallbox_a": engine.ConsumerWallboxA,
"wallbox_b": engine.ConsumerWallboxB,
}
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Redirect(w, r, "/", http.StatusSeeOther)
return
}
consumerKey := r.FormValue("consumer")
stateVal := r.FormValue("state")
consumer, ok := consumerByKey[consumerKey]
if !ok {
http.Error(w, "unknown consumer", http.StatusBadRequest)
return
}
turnOn := stateVal == "on"
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
action := engine.Action{
Consumer: consumer,
TurnOn: turnOn,
Reason: "manual override via web UI",
}
if err := act.Execute(ctx, []engine.Action{action}); err != nil {
logger.Error("web UI override failed", "consumer", consumerKey, "state", stateVal, "error", err)
http.Error(w, "switch failed — check logs", http.StatusInternalServerError)
return
}
logger.Info("web UI override executed", "consumer", consumerKey, "state", stateVal)
http.Redirect(w, r, "/", http.StatusSeeOther)
}
}
// computeWWBoost derives the WW temperature boost in °C from the daily PV forecast.
// Returns 0 if the forecast is unavailable or below the mid threshold.
func computeWWBoost(fc *forecast.Result, cfg *config.Config) float64 {
if fc == nil || fc.TotalKWh == 0 {
return 0
}
switch {
case fc.TotalKWh >= cfg.Strategic.ForecastHighKWh:
return cfg.Strategic.WWBoostHighC
case fc.TotalKWh >= cfg.Strategic.ForecastMidKWh:
return cfg.Strategic.WWBoostMidC
default:
return 0
}
}
// writeHeartbeat updates the heartbeat file timestamp each cycle.
func writeHeartbeat(stateFile string, logger *slog.Logger) {
if stateFile == "" {
return
}
if err := os.MkdirAll(filepath.Dir(stateFile), 0755); err != nil {
logger.Warn("could not create heartbeat directory", "error", err)
return
}
if err := os.WriteFile(stateFile, []byte(time.Now().Format(time.RFC3339)), 0644); err != nil {
logger.Warn("could not write heartbeat file", "error", err)
}
}

27
systemd/ems.service Normal file
View File

@@ -0,0 +1,27 @@
[Unit]
Description=EMS — Energie Management System
Documentation=https://github.com/tb/ems
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/usr/local/bin/ems -config /etc/ems/ems-config.yaml
Restart=on-failure
RestartSec=30s
# Creates /run/ems/ automatically (used for the heartbeat file)
RuntimeDirectory=ems
RuntimeDirectoryMode=0755
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ems
# Basic hardening
NoNewPrivileges=true
PrivateTmp=true
[Install]
WantedBy=multi-user.target