ML Prediction Service
The Ampra ML service is a standalone Python microservice that trains per-source XGBoost models and generates 7-day, 30-minute-resolution forecasts for solar power, load, battery state of charge, and battery voltage. It uses a physics-aware hybrid approach that blends machine learning predictions with solar physics and historical profiles.
Architecture
Flask API
Location: Ampra.ML/app.py · Port: 5050 · Authentication: X-API-Key header
Endpoints
| Method | Path | Auth | Description |
|---|---|---|---|
GET | /health | None | Health check — returns { "status": "ok" } |
POST | /train | API Key | Train a model for a sun source |
POST | /predict | API Key | Generate 7-day predictions |
GET | /status/{job_id} | API Key | Poll job status from Redis |
API Key Authentication
Every endpoint except /health is protected by a decorator that checks the X-API-Key header:
ML_API_KEY = os.environ.get("ML_API_KEY", "changeme-ml-key")
def require_api_key(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
provided = request.headers.get("X-API-Key", "")
if not provided or provided != ML_API_KEY:
return jsonify({"error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated
Job Status Tracking
Job progress is stored in Redis with a 24-hour TTL and updated at each pipeline stage:
JOB_STATUS_TTL = 86400 # 24 hours
def _set_status(job_id: str, status: str, progress: int = 0, **extra):
data = {
"jobId": job_id,
"status": status,
"progress": progress, # 0–100
"updatedAt": datetime.now(timezone.utc).isoformat(),
**extra,
}
rdb.set(redis_ml_job_key(job_id), json.dumps(data), ex=JOB_STATUS_TTL)
Redis key format: ampra:ml:job:{jobId}
Train Request
{
"sunSourceId": "guid",
"jobId": "optional-guid",
"latitude": 48.85,
"longitude": 2.35,
"weatherHistory": [ /* daily weather records */ ],
"installedCapacityWatts": 5000
}
Predict Request
{
"sunSourceId": "guid",
"jobId": "optional-guid",
"weatherForecast": [ /* 8-day daily forecast from Open-Meteo */ ]
}
Training Pipeline
Location: Ampra.ML/trainer.py
The training pipeline takes raw telemetry from MongoDB, engineers 50+ features, and trains a time-series-split cross-validated XGBoost model.
Pipeline Stages
Target Columns
The model predicts four instantaneous values:
TARGET_COLS = [
"solarPower", # W — instantaneous PV output
"loadPower", # W — instantaneous load consumption
"stateOfCharge", # % — battery SOC
"batteryVoltage", # V — battery terminal voltage
]
# NOTE: dailyEnergyProduced/Consumed are excluded — they're cumulative
# within-day values that can't be predicted by a memoryless model.
# The predictor derives energy by integrating power × time instead.
Feature Engineering (50+ features)
Features are built from four categories:
1. Cyclical Time Features
def _add_time_features(df):
df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24)
df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24)
df["month_sin"] = np.sin(2 * np.pi * df["month"] / 12)
df["month_cos"] = np.cos(2 * np.pi * df["month"] / 12)
df["dayOfYear_sin"] = np.sin(2 * np.pi * df["dayOfYear"] / 365)
df["dayOfYear_cos"] = np.cos(2 * np.pi * df["dayOfYear"] / 365)
# Plus: hour, dayOfWeek, dayOfYear, month, weekOfYear
2. Solar Physics Features
Solar elevation angle and clear-sky GHI are computed analytically:
def _solar_elevation(hour_utc, day_of_year, latitude, longitude=0.0):
"""Approximate solar elevation angle in degrees."""
declination = 23.45 * np.sin(np.radians((284 + day_of_year) / 365 * 360))
hour_angle = (hour_utc - 12) * 15 + longitude
sin_elev = (np.sin(lat_rad) * np.sin(dec_rad) +
np.cos(lat_rad) * np.cos(dec_rad) * np.cos(ha_rad))
return float(np.degrees(np.arcsin(np.clip(sin_elev, -1, 1))))
def _clear_sky_ghi(elevation_deg):
"""Approximate clear-sky Global Horizontal Irradiance (W/m²)."""
if elevation_deg <= 0:
return 0.0
return float(1098 * np.sin(elev_rad) * np.exp(-0.057 / np.sin(elev_rad)))
Derived features:
solarElevation— sun angle in degreesclearSkyGHI— theoretical maximum irradiance (W/m²)isNight— binary: 1 if elevation ≤ 0°effectiveIrradiance—clearSkyGHI × (1 - cloudIndex)
3. Weather Features
Daily weather data (from Open-Meteo) is merged onto hourly rows by date:
WEATHER_FEATURES = [
"temperatureMax", "temperatureMin", "temperatureMean",
"shortwaveRadiationSum", "uvIndexMax",
"precipitationSum", "daylightHours", "cloudIndex",
]
The cloudIndex is derived from radiation data: 1 - (shortwaveRadiationSum / 95th_percentile)
4. Historical Profiles & Numeric Features
{target}_hourly_profile— mean value of each target at each hour (captures daily patterns)- All non-target
NUMERIC_FIELDSwith sufficient data (used as contextual features)
XGBoost Hyperparameters
model = MultiOutputRegressor(
XGBRegressor(
n_estimators=500,
max_depth=7,
learning_rate=0.03,
subsample=0.85,
colsample_bytree=0.8,
reg_alpha=0.1, # L1 regularization
reg_lambda=1.0, # L2 regularization
min_child_weight=3,
gamma=0.1,
random_state=42,
n_jobs=-1,
)
)
Time-Series Cross-Validation
Standard k-fold would leak future data. Instead, TimeSeriesSplit ensures the training set always precedes the validation set:
tscv = TimeSeriesSplit(n_splits=min(5, max(2, len(X) // 48)))
for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model.fit(X_train, y_train)
y_pred = model.predict(X_val)
cv_scores["mae"].append(mean_absolute_error(y_val, y_pred))
cv_scores["rmse"].append(np.sqrt(mean_squared_error(y_val, y_pred)))
cv_scores["r2"].append(r2_score(y_val, y_pred))
After cross-validation, the model is retrained on the full dataset.
Model Storage
Trained models are serialized with joblib and uploaded to MinIO:
MinIO path: models/{sunSourceId}/model.joblib
The model artefact includes:
- The trained
MultiOutputRegressorobject - Feature column names and order
- Target column names
- Latitude/longitude
- Hourly profiles for each target
- Battery capacity estimate
- Installed panel capacity
Metadata is also persisted to MongoDB (model_metadata collection) with training metrics.
Prediction Pipeline
Location: Ampra.ML/predictor.py
The prediction pipeline generates 7 days of 30-minute-resolution forecasts using a hybrid ML + physics approach.
Pipeline Stages
Forecast Parameters
| Parameter | Value |
|---|---|
| Step size | 30 minutes |
| Steps per day | 48 |
| Forecast horizon | 7 days |
| Total predictions | 336 |
| Nominal battery voltage | 24V |
| SOC minimum (deep-discharge) | 10% |
| SOC maximum | 100% |
| Charge efficiency | 95% |
| Discharge efficiency | 95% |
Physics-Aware Hybrid Blending
The raw ML predictions are post-processed with physics constraints:
Solar Power:
if elev <= 0:
solar_power = 0.0 # Zero at night — physically impossible
else:
ml_solar = max(0.0, raw.get("solarPower", 0.0))
# Physics-based estimate
physics_solar = clear_sky_ghi * panel_factor * (1.0 - cloud_index * 0.75)
if ml_solar > profile_val * 0.25 and ml_solar < peak_solar * 1.5:
# ML is credible → 60% ML / 40% physics
solar_power = ml_solar * 0.6 + physics_solar * 0.4
else:
# ML is out of range → trust physics + historical profile
solar_power = physics_solar * 0.7 + profile_val * 0.3
solar_power = min(solar_power, peak_solar * 1.2) # Hard cap at 120% of rated
solar_power *= 1.0 + rng.uniform(-0.03, 0.03) # ±3% natural variation
Load Power:
if ml_load > profile_load * 0.3 and ml_load < profile_load * 3.0:
# ML is credible → 60% ML / 40% profile
load_power = ml_load * 0.6 + profile_load * 0.4
else:
# ML out of range → use historical profile
load_power = profile_load
load_power = max(15.0, load_power) # Minimum 15W standby draw
load_power *= 1.0 + rng.uniform(-0.02, 0.02) # ±2% variation
Battery Simulation
SOC is tracked with a proper energy-balance simulation — not a damped ML output:
STEP_HOURS = 0.5 # 30-minute steps
CHARGE_EFF = 0.95
DISCHARGE_EFF = 0.95
net_power = solar_power - load_power # W (positive = surplus)
energy_wh = net_power * STEP_HOURS # Wh for this step
if energy_wh > 0:
usable_energy = energy_wh * CHARGE_EFF
else:
usable_energy = energy_wh / DISCHARGE_EFF
soc_delta = (usable_energy / battery_wh) * 100.0
soc = np.clip(soc + soc_delta, SOC_MIN, SOC_MAX)
Battery Voltage Curve
Battery voltage is derived from SOC using a 24V LiFePO4 discharge curve:
def _battery_voltage_from_soc(soc):
"""24V LiFePO4: ~21V empty → ~29.2V full"""
if soc <= 10:
return 21.0 + (soc / 10) * 1.5 # 21.0 – 22.5V
elif soc <= 20:
return 22.5 + ((soc - 10) / 10) * 1.0 # 22.5 – 23.5V
elif soc <= 90:
return 23.5 + ((soc - 20) / 70) * 4.5 # 23.5 – 28.0V (flat plateau)
else:
return 28.0 + ((soc - 90) / 10) * 1.2 # 28.0 – 29.2V
Energy Tracking
Daily energy totals are derived by integrating power over time rather than predicting them directly:
daily_energy_produced += solar_power * STEP_HOURS / 1000.0 # kWh
daily_energy_consumed += load_power * STEP_HOURS / 1000.0 # kWh
# Reset at midnight
Daily Summaries
The 336 half-hourly predictions are aggregated into 7 daily summaries:
summary = {
"date": "2025-01-15",
"dayLabel": "Wednesday",
"solarPowerAvg": 245.5, # W
"solarPowerMax": 1200.0, # W (peak)
"solarPowerMin": 0.0, # W (night)
"loadPowerAvg": 180.3, # W
"stateOfChargeAvg": 65.2, # %
"stateOfChargeMin": 32.1, # % (overnight low)
"batteryVoltageAvg": 25.8, # V
"dailyEnergyProducedAvg": 3.45, # kWh (total for the day)
"dailyEnergyConsumedAvg": 2.16, # kWh
"weather": { /* Open-Meteo daily forecast */ }
}
MongoDB Storage
Predictions are stored as a single document per source (upserted on each run):
prediction_doc = {
"sunSourceId": sun_source_id,
"createdAt": datetime.now(timezone.utc),
"hourly": results, # 336 half-hourly predictions
"daily": daily_summaries, # 7 daily summaries
"targetColumns": target_cols,
"generatedAt": datetime.now(timezone.utc).isoformat(),
}
predictions_collection.replace_one(
{"sunSourceId": sun_source_id},
prediction_doc,
upsert=True,
)
Constants
Location: Ampra.ML/constants.py
# MongoDB collections
MONGO_COLLECTION_NORMALIZED_DATA = "normalized_sun_source_data"
MONGO_COLLECTION_PREDICTIONS = "predictions"
MONGO_COLLECTION_MODEL_METADATA = "model_metadata"
MONGO_COLLECTION_WEATHER_DATA = "weather_data"
# Redis key prefix
REDIS_ML_JOB_PREFIX = "ampra:ml:job:"
# MinIO model path template
MINIO_MODEL_PATH_TEMPLATE = "models/{sun_source_id}/model.joblib"
Environment Variables
| Variable | Default | Description |
|---|---|---|
REDIS_URL | redis://localhost:6379/0 | Redis connection string |
MONGO_URL | mongodb://...localhost:27017/ampradb | MongoDB connection string |
MONGO_DB | ampradb | MongoDB database name |
MINIO_ENDPOINT | localhost:9000 | MinIO endpoint |
MINIO_ACCESS_KEY | ampra | MinIO access key |
MINIO_SECRET_KEY | ampra123 | MinIO secret key |
MINIO_BUCKET | ampra-models | MinIO bucket for model artefacts |
MINIO_USE_SSL | false | Use TLS for MinIO |
ML_API_KEY | changeme-ml-key | API key for service authentication |
Dependencies
From requirements.txt:
| Package | Purpose |
|---|---|
| Flask | HTTP API framework |
| Redis | Job status tracking |
| PyMongo | MongoDB data access |
| MinIO | S3-compatible model storage |
| XGBoost | Gradient-boosted tree models |
| scikit-learn | Cross-validation, metrics, MultiOutputRegressor |
| pandas / numpy | Data manipulation |
| joblib | Model serialization |