Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/batcontrol_config_dummy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ battery_control_expert:
soften_price_difference_on_charging: False # enable earlier charging based on a more relaxed calculation
# future_price <= current_price-min_price_difference/soften_price_difference_on_charging_factor
soften_price_difference_on_charging_factor: 5
enable_precharge_overhang: false # Feature flag for charging overflow before the price turning point
max_charge_loss_factor: 0.1 # Assume up to 10% charging losses for slot planning
round_price_digits: 4 # round price to n digits after the comma
production_offset_percent: 1.0 # Adjust production forecast by a percentage (1.0 = 100%, 0.8 = 80%, etc.)
# Useful for winter mode when solar panels are covered with snow
Expand Down
3 changes: 2 additions & 1 deletion src/batcontrol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ def run(self):
self.max_charging_from_grid_limit,
self.min_price_difference,
self.min_price_difference_rel,
self.get_max_capacity()
self.get_max_capacity(),
self.inverter.max_grid_charge_rate
)

self.last_logic_instance = this_logic_run
Expand Down
52 changes: 52 additions & 0 deletions src/batcontrol/logic/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __init__(self, timezone: datetime.timezone = datetime.timezone.utc,
self.round_price_digits = 4 # Default rounding for prices
self.soften_price_difference_on_charging = False
self.soften_price_difference_on_charging_factor = 5.0 # Default factor
self.max_charge_loss_factor = 0.1
self.enable_precharge_overhang = False
self.timezone = timezone
self.interval_minutes = interval_minutes
self.common = CommonLogic.get_instance()
Expand Down Expand Up @@ -341,6 +343,7 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput ,
min_price_difference = self.calculation_parameters.min_price_difference
min_dynamic_price_difference = self.__calculate_min_dynamic_price_difference(
current_price)
turning_point_hour = None

# evaluation period until price is first time lower then current price
for h in range(1, max_hour):
Expand All @@ -356,6 +359,7 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput ,

if found_lower_price:
max_hour = h
turning_point_hour = h
break

# get high price hours
Expand Down Expand Up @@ -416,10 +420,58 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput ,
else:
# We are adding that minimum charge energy here, so that we are not stuck between limits.
recharge_energy = recharge_energy + self.common.min_charge_energy
if self.enable_precharge_overhang:
recharge_energy = self.__get_recharge_overhang_energy(
recharge_energy,
turning_point_hour,
current_price,
prices,
min_price_difference
)

self.calculation_output.required_recharge_energy = recharge_energy
return recharge_energy

def __get_recharge_overhang_energy(self, recharge_energy: float, turning_point_hour: Optional[int],
current_price: float, prices: dict,
min_price_difference: float) -> float:
""" Return recharge overhang if more than one charging slot is needed """
if turning_point_hour is None or turning_point_hour < 1:
return recharge_energy

max_grid_charge_rate = self.calculation_parameters.max_grid_charge_rate
if max_grid_charge_rate <= 0:
return recharge_energy

slot_hours = self.interval_minutes / 60.0
usable_charge_per_slot = max_grid_charge_rate * slot_hours * max(0.0, 1.0-self.max_charge_loss_factor)
if usable_charge_per_slot <= 0:
return recharge_energy

required_slots = int(np.ceil(recharge_energy / usable_charge_per_slot))
if required_slots <= 1:
return recharge_energy

next_lowest_price = min(prices[h] for h in range(1, len(prices)))
allowed_price_distance = min_price_difference / 2
if abs(current_price - next_lowest_price) > allowed_price_distance:
logger.debug(
"[Rule] Skip recharge overhang before turning point. Current price %.4f is not within %.4f of next lowest price %.4f.",
current_price,
allowed_price_distance,
next_lowest_price
)
return recharge_energy

overhang_energy = recharge_energy - usable_charge_per_slot
logger.debug(
"[Rule] Recharge overhang detected (%0.1f Wh, %d slots). Charging overhang before turning point in hour %d.",
overhang_energy,
required_slots,
turning_point_hour
)
return max(overhang_energy, 0.0)

def __calculate_min_dynamic_price_difference(self, price: float) -> float:
""" Calculate the dynamic limit for the current price """
return round(
Expand Down
2 changes: 2 additions & 0 deletions src/batcontrol/logic/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def create_logic(config: dict, timezone) -> LogicInterface:
'soften_price_difference_on_charging_factor',
'round_price_digits',
'charge_rate_multiplier',
'max_charge_loss_factor',
'enable_precharge_overhang',
]
for attribute in attribute_list:
if attribute in battery_control_expert:
Expand Down
1 change: 1 addition & 0 deletions src/batcontrol/logic/logic_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class CalculationParameters:
min_price_difference: float
min_price_difference_rel: float
max_capacity: float # Maximum capacity of the battery in Wh (excludes MAX_SOC)
max_grid_charge_rate: float = float('inf') # Maximum grid charge rate in W

@dataclass
class CalculationOutput:
Expand Down
120 changes: 120 additions & 0 deletions tests/batcontrol/logic/test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,5 +326,125 @@ def test_charge_rate_calculation_with_remaining_time(self):
self.assertGreater(result.charge_rate, expected_charge_rate_before_multiplier,
"Charge rate should be adjusted by charge_rate_multiplier")

def test_recharge_overhang_is_charged_before_turning_point(self):
"""Test that only the overhang is charged when multiple slots are required."""
self.logic.enable_precharge_overhang = True
self.logic.max_charge_loss_factor = 0.1
self.logic.set_calculation_parameters(
CalculationParameters(
max_charging_from_grid_limit=0.79,
min_price_difference=0.05,
min_price_difference_rel=0.2,
max_capacity=self.max_capacity,
max_grid_charge_rate=2000
)
)

stored_energy = 1000
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy,
self.max_capacity
)

calc_input = CalculationInput(
consumption=np.array([100, 3500, 100]),
production=np.array([0, 0, 0]),
prices={0: 0.12, 1: 0.40, 2: 0.10},
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

calc_timestamp = datetime.datetime(2025, 6, 20, 12, 0, 0, tzinfo=datetime.timezone.utc)
self.assertTrue(self.logic.calculate(calc_input, calc_timestamp))
calc_output = self.logic.get_calculation_output()

self.assertAlmostEqual(
calc_output.required_recharge_energy,
1300.0,
delta=0.1,
msg="Expected to charge only the overhang before the turning point"
)

def test_recharge_overhang_is_skipped_when_current_price_too_high(self):
"""Test that overhang precharge is skipped if current price is too far from next low."""
self.logic.enable_precharge_overhang = True
self.logic.max_charge_loss_factor = 0.1
self.logic.set_calculation_parameters(
CalculationParameters(
max_charging_from_grid_limit=0.79,
min_price_difference=0.05,
min_price_difference_rel=0.2,
max_capacity=self.max_capacity,
max_grid_charge_rate=2000
)
)

stored_energy = 1000
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy,
self.max_capacity
)

calc_input = CalculationInput(
consumption=np.array([100, 3500, 100]),
production=np.array([0, 0, 0]),
prices={0: 0.20, 1: 0.40, 2: 0.10},
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

calc_timestamp = datetime.datetime(2025, 6, 20, 12, 0, 0, tzinfo=datetime.timezone.utc)
self.assertTrue(self.logic.calculate(calc_input, calc_timestamp))
calc_output = self.logic.get_calculation_output()

self.assertAlmostEqual(
calc_output.required_recharge_energy,
3100.0,
delta=0.1,
msg="Expected full recharge amount when current price is still too expensive"
)

def test_recharge_overhang_feature_flag_disables_behavior(self):
"""Test that disabling the feature flag keeps full recharge amount."""
self.logic.enable_precharge_overhang = False
self.logic.max_charge_loss_factor = 0.1
self.logic.set_calculation_parameters(
CalculationParameters(
max_charging_from_grid_limit=0.79,
min_price_difference=0.05,
min_price_difference_rel=0.2,
max_capacity=self.max_capacity,
max_grid_charge_rate=2000
)
)

stored_energy = 1000
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy,
self.max_capacity
)

calc_input = CalculationInput(
consumption=np.array([100, 3500, 100]),
production=np.array([0, 0, 0]),
prices={0: 0.20, 1: 0.40, 2: 0.10},
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

calc_timestamp = datetime.datetime(2025, 6, 20, 12, 0, 0, tzinfo=datetime.timezone.utc)
self.assertTrue(self.logic.calculate(calc_input, calc_timestamp))
calc_output = self.logic.get_calculation_output()

self.assertAlmostEqual(
calc_output.required_recharge_energy,
3100.0,
delta=0.1,
msg="Expected full recharge amount when overhang precharge feature is disabled"
)

if __name__ == '__main__':
unittest.main()