Skip to main content

ML Prediction Service

The Ampra ML service is a standalone Python microservice that trains per-source XGBoost models and generates 7-day, 30-minute-resolution forecasts for solar power, load, battery state of charge, and battery voltage. It uses a physics-aware hybrid approach that blends machine learning predictions with solar physics and historical profiles.


Architecture


Flask API

Location: Ampra.ML/app.py · Port: 5050 · Authentication: X-API-Key header

Endpoints

MethodPathAuthDescription
GET/healthNoneHealth check — returns { "status": "ok" }
POST/trainAPI KeyTrain a model for a sun source
POST/predictAPI KeyGenerate 7-day predictions
GET/status/{job_id}API KeyPoll job status from Redis

API Key Authentication

Every endpoint except /health is protected by a decorator that checks the X-API-Key header:

ML_API_KEY = os.environ.get("ML_API_KEY", "changeme-ml-key")

def require_api_key(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
provided = request.headers.get("X-API-Key", "")
if not provided or provided != ML_API_KEY:
return jsonify({"error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated

Job Status Tracking

Job progress is stored in Redis with a 24-hour TTL and updated at each pipeline stage:

JOB_STATUS_TTL = 86400  # 24 hours

def _set_status(job_id: str, status: str, progress: int = 0, **extra):
data = {
"jobId": job_id,
"status": status,
"progress": progress, # 0–100
"updatedAt": datetime.now(timezone.utc).isoformat(),
**extra,
}
rdb.set(redis_ml_job_key(job_id), json.dumps(data), ex=JOB_STATUS_TTL)

Redis key format: ampra:ml:job:{jobId}

Train Request

{
"sunSourceId": "guid",
"jobId": "optional-guid",
"latitude": 48.85,
"longitude": 2.35,
"weatherHistory": [ /* daily weather records */ ],
"installedCapacityWatts": 5000
}

Predict Request

{
"sunSourceId": "guid",
"jobId": "optional-guid",
"weatherForecast": [ /* 8-day daily forecast from Open-Meteo */ ]
}

Training Pipeline

Location: Ampra.ML/trainer.py

The training pipeline takes raw telemetry from MongoDB, engineers 50+ features, and trains a time-series-split cross-validated XGBoost model.

Pipeline Stages

Target Columns

The model predicts four instantaneous values:

TARGET_COLS = [
"solarPower", # W — instantaneous PV output
"loadPower", # W — instantaneous load consumption
"stateOfCharge", # % — battery SOC
"batteryVoltage", # V — battery terminal voltage
]
# NOTE: dailyEnergyProduced/Consumed are excluded — they're cumulative
# within-day values that can't be predicted by a memoryless model.
# The predictor derives energy by integrating power × time instead.

Feature Engineering (50+ features)

Features are built from four categories:

1. Cyclical Time Features

def _add_time_features(df):
df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24)
df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24)
df["month_sin"] = np.sin(2 * np.pi * df["month"] / 12)
df["month_cos"] = np.cos(2 * np.pi * df["month"] / 12)
df["dayOfYear_sin"] = np.sin(2 * np.pi * df["dayOfYear"] / 365)
df["dayOfYear_cos"] = np.cos(2 * np.pi * df["dayOfYear"] / 365)
# Plus: hour, dayOfWeek, dayOfYear, month, weekOfYear

2. Solar Physics Features

Solar elevation angle and clear-sky GHI are computed analytically:

def _solar_elevation(hour_utc, day_of_year, latitude, longitude=0.0):
"""Approximate solar elevation angle in degrees."""
declination = 23.45 * np.sin(np.radians((284 + day_of_year) / 365 * 360))
hour_angle = (hour_utc - 12) * 15 + longitude
sin_elev = (np.sin(lat_rad) * np.sin(dec_rad) +
np.cos(lat_rad) * np.cos(dec_rad) * np.cos(ha_rad))
return float(np.degrees(np.arcsin(np.clip(sin_elev, -1, 1))))

def _clear_sky_ghi(elevation_deg):
"""Approximate clear-sky Global Horizontal Irradiance (W/m²)."""
if elevation_deg <= 0:
return 0.0
return float(1098 * np.sin(elev_rad) * np.exp(-0.057 / np.sin(elev_rad)))

Derived features:

  • solarElevation — sun angle in degrees
  • clearSkyGHI — theoretical maximum irradiance (W/m²)
  • isNight — binary: 1 if elevation ≤ 0°
  • effectiveIrradianceclearSkyGHI × (1 - cloudIndex)

3. Weather Features

Daily weather data (from Open-Meteo) is merged onto hourly rows by date:

WEATHER_FEATURES = [
"temperatureMax", "temperatureMin", "temperatureMean",
"shortwaveRadiationSum", "uvIndexMax",
"precipitationSum", "daylightHours", "cloudIndex",
]

The cloudIndex is derived from radiation data: 1 - (shortwaveRadiationSum / 95th_percentile)

4. Historical Profiles & Numeric Features

  • {target}_hourly_profile — mean value of each target at each hour (captures daily patterns)
  • All non-target NUMERIC_FIELDS with sufficient data (used as contextual features)

XGBoost Hyperparameters

model = MultiOutputRegressor(
XGBRegressor(
n_estimators=500,
max_depth=7,
learning_rate=0.03,
subsample=0.85,
colsample_bytree=0.8,
reg_alpha=0.1, # L1 regularization
reg_lambda=1.0, # L2 regularization
min_child_weight=3,
gamma=0.1,
random_state=42,
n_jobs=-1,
)
)

Time-Series Cross-Validation

Standard k-fold would leak future data. Instead, TimeSeriesSplit ensures the training set always precedes the validation set:

tscv = TimeSeriesSplit(n_splits=min(5, max(2, len(X) // 48)))

for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model.fit(X_train, y_train)
y_pred = model.predict(X_val)

cv_scores["mae"].append(mean_absolute_error(y_val, y_pred))
cv_scores["rmse"].append(np.sqrt(mean_squared_error(y_val, y_pred)))
cv_scores["r2"].append(r2_score(y_val, y_pred))

After cross-validation, the model is retrained on the full dataset.

Model Storage

Trained models are serialized with joblib and uploaded to MinIO:

MinIO path: models/{sunSourceId}/model.joblib

The model artefact includes:

  • The trained MultiOutputRegressor object
  • Feature column names and order
  • Target column names
  • Latitude/longitude
  • Hourly profiles for each target
  • Battery capacity estimate
  • Installed panel capacity

Metadata is also persisted to MongoDB (model_metadata collection) with training metrics.


Prediction Pipeline

Location: Ampra.ML/predictor.py

The prediction pipeline generates 7 days of 30-minute-resolution forecasts using a hybrid ML + physics approach.

Pipeline Stages

Forecast Parameters

ParameterValue
Step size30 minutes
Steps per day48
Forecast horizon7 days
Total predictions336
Nominal battery voltage24V
SOC minimum (deep-discharge)10%
SOC maximum100%
Charge efficiency95%
Discharge efficiency95%

Physics-Aware Hybrid Blending

The raw ML predictions are post-processed with physics constraints:

Solar Power:

if elev <= 0:
solar_power = 0.0 # Zero at night — physically impossible
else:
ml_solar = max(0.0, raw.get("solarPower", 0.0))

# Physics-based estimate
physics_solar = clear_sky_ghi * panel_factor * (1.0 - cloud_index * 0.75)

if ml_solar > profile_val * 0.25 and ml_solar < peak_solar * 1.5:
# ML is credible → 60% ML / 40% physics
solar_power = ml_solar * 0.6 + physics_solar * 0.4
else:
# ML is out of range → trust physics + historical profile
solar_power = physics_solar * 0.7 + profile_val * 0.3

solar_power = min(solar_power, peak_solar * 1.2) # Hard cap at 120% of rated
solar_power *= 1.0 + rng.uniform(-0.03, 0.03) # ±3% natural variation

Load Power:

if ml_load > profile_load * 0.3 and ml_load < profile_load * 3.0:
# ML is credible → 60% ML / 40% profile
load_power = ml_load * 0.6 + profile_load * 0.4
else:
# ML out of range → use historical profile
load_power = profile_load

load_power = max(15.0, load_power) # Minimum 15W standby draw
load_power *= 1.0 + rng.uniform(-0.02, 0.02) # ±2% variation

Battery Simulation

SOC is tracked with a proper energy-balance simulation — not a damped ML output:

STEP_HOURS = 0.5        # 30-minute steps
CHARGE_EFF = 0.95
DISCHARGE_EFF = 0.95

net_power = solar_power - load_power # W (positive = surplus)
energy_wh = net_power * STEP_HOURS # Wh for this step

if energy_wh > 0:
usable_energy = energy_wh * CHARGE_EFF
else:
usable_energy = energy_wh / DISCHARGE_EFF

soc_delta = (usable_energy / battery_wh) * 100.0
soc = np.clip(soc + soc_delta, SOC_MIN, SOC_MAX)

Battery Voltage Curve

Battery voltage is derived from SOC using a 24V LiFePO4 discharge curve:

def _battery_voltage_from_soc(soc):
"""24V LiFePO4: ~21V empty → ~29.2V full"""
if soc <= 10:
return 21.0 + (soc / 10) * 1.5 # 21.0 – 22.5V
elif soc <= 20:
return 22.5 + ((soc - 10) / 10) * 1.0 # 22.5 – 23.5V
elif soc <= 90:
return 23.5 + ((soc - 20) / 70) * 4.5 # 23.5 – 28.0V (flat plateau)
else:
return 28.0 + ((soc - 90) / 10) * 1.2 # 28.0 – 29.2V

Energy Tracking

Daily energy totals are derived by integrating power over time rather than predicting them directly:

daily_energy_produced += solar_power * STEP_HOURS / 1000.0  # kWh
daily_energy_consumed += load_power * STEP_HOURS / 1000.0 # kWh
# Reset at midnight

Daily Summaries

The 336 half-hourly predictions are aggregated into 7 daily summaries:

summary = {
"date": "2025-01-15",
"dayLabel": "Wednesday",
"solarPowerAvg": 245.5, # W
"solarPowerMax": 1200.0, # W (peak)
"solarPowerMin": 0.0, # W (night)
"loadPowerAvg": 180.3, # W
"stateOfChargeAvg": 65.2, # %
"stateOfChargeMin": 32.1, # % (overnight low)
"batteryVoltageAvg": 25.8, # V
"dailyEnergyProducedAvg": 3.45, # kWh (total for the day)
"dailyEnergyConsumedAvg": 2.16, # kWh
"weather": { /* Open-Meteo daily forecast */ }
}

MongoDB Storage

Predictions are stored as a single document per source (upserted on each run):

prediction_doc = {
"sunSourceId": sun_source_id,
"createdAt": datetime.now(timezone.utc),
"hourly": results, # 336 half-hourly predictions
"daily": daily_summaries, # 7 daily summaries
"targetColumns": target_cols,
"generatedAt": datetime.now(timezone.utc).isoformat(),
}
predictions_collection.replace_one(
{"sunSourceId": sun_source_id},
prediction_doc,
upsert=True,
)

Constants

Location: Ampra.ML/constants.py

# MongoDB collections
MONGO_COLLECTION_NORMALIZED_DATA = "normalized_sun_source_data"
MONGO_COLLECTION_PREDICTIONS = "predictions"
MONGO_COLLECTION_MODEL_METADATA = "model_metadata"
MONGO_COLLECTION_WEATHER_DATA = "weather_data"

# Redis key prefix
REDIS_ML_JOB_PREFIX = "ampra:ml:job:"

# MinIO model path template
MINIO_MODEL_PATH_TEMPLATE = "models/{sun_source_id}/model.joblib"

Environment Variables

VariableDefaultDescription
REDIS_URLredis://localhost:6379/0Redis connection string
MONGO_URLmongodb://...localhost:27017/ampradbMongoDB connection string
MONGO_DBampradbMongoDB database name
MINIO_ENDPOINTlocalhost:9000MinIO endpoint
MINIO_ACCESS_KEYampraMinIO access key
MINIO_SECRET_KEYampra123MinIO secret key
MINIO_BUCKETampra-modelsMinIO bucket for model artefacts
MINIO_USE_SSLfalseUse TLS for MinIO
ML_API_KEYchangeme-ml-keyAPI key for service authentication

Dependencies

From requirements.txt:

PackagePurpose
FlaskHTTP API framework
RedisJob status tracking
PyMongoMongoDB data access
MinIOS3-compatible model storage
XGBoostGradient-boosted tree models
scikit-learnCross-validation, metrics, MultiOutputRegressor
pandas / numpyData manipulation
joblibModel serialization