ML Predictions & Forecasting
The ML subsystem provides 7-day energy production, consumption, and battery state-of-charge forecasts using a physics-aware hybrid approach that blends gradient-boosted tree models with solar physics calculations.
Architecture
ML Service (Flask)
| Endpoint | Method | Purpose |
|---|---|---|
/train | POST | Train an XGBoost model on historical data |
/predict | POST | Generate 7-day forecast using trained model |
The ML service runs as a standalone Python microservice with Gunicorn (2 workers, 600s timeout).
Training Pipeline
Trigger
POST /api/predictions/train
Body: { "sunSourceId": "..." }
StartTrainingAsync Implementation
The training flow uses Redis as a concurrency guard and fires the ML call asynchronously:
public async Task<PredictionResultDto> StartTrainingAsync(Guid sunSourceId, string userId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId && s.UserId == userId);
if (sunSource == null)
throw new NotFoundException("Sun source not found");
// Redis concurrency guard — only one training job per source at a time
var db = _redis.GetDatabase();
var activeKey = RedisKeys.MlActive(sunSourceId, "train");
// Key format: "ampra:ml:active:{sunSourceId}:train"
if (await db.KeyExistsAsync(activeKey))
{
return new PredictionResultDto
{
JobId = (await db.StringGetAsync(activeKey)).ToString(),
Status = "already_running"
};
}
var jobId = Guid.NewGuid().ToString();
// Reserve the active slot (30-min TTL as safety net)
await db.StringSetAsync(activeKey, jobId, TimeSpan.FromMinutes(30));
// Store initial job status in Redis (24h TTL)
var initialStatus = JsonSerializer.Serialize(new
{
jobId, status = "queued", progress = 0,
sunSourceId = sunSourceId.ToString(), type = "train",
updatedAt = DateTime.UtcNow.ToString("o"),
message = "Starting training..."
});
await db.StringSetAsync(RedisKeys.MlJob(jobId), initialStatus, JobStatusTtl);
// Pre-fetch weather data for the ML service
var weatherHistory = await BuildWeatherDataAsync(sunSourceId, sunSource);
// Fire-and-forget: POST to ML /train endpoint
_ = Task.Run(async () =>
{
try
{
var payload = JsonSerializer.Serialize(new
{
sunSourceId = sunSourceId.ToString(), jobId,
latitude = sunSource.Latitude, longitude = sunSource.Longitude,
installedCapacityWatts = sunSource.InstalledCapacityWatts,
weatherHistory
});
var content = new StringContent(payload, Encoding.UTF8, HttpContentTypes.Json);
var response = await _httpClient.PostAsync(MlEndpoints.Train, content);
}
catch (Exception ex)
{
// On failure, update Redis job status so the client sees the error
var errorStatus = JsonSerializer.Serialize(new
{
jobId, status = "failed", progress = 0,
sunSourceId = sunSourceId.ToString(), type = "train",
updatedAt = DateTime.UtcNow.ToString("o"),
error = "ML service is temporarily unavailable"
});
await db.StringSetAsync(RedisKeys.MlJob(jobId), errorStatus, JobStatusTtl);
}
finally
{
// Always release the active slot
await db.KeyDeleteAsync(RedisKeys.MlActive(sunSourceId, "train"));
}
});
return new PredictionResultDto { JobId = jobId, Status = "queued" };
}
Key design decisions:
| Decision | Rationale |
|---|---|
| Redis active key with 30-min TTL | Prevents duplicate concurrent jobs; TTL acts as a safety net if the process crashes |
Fire-and-forget via Task.Run | Client gets immediate response; ML training can take minutes |
| Weather data pre-fetched | Ensures fresh weather data is available before the ML call |
| Error status written to Redis | Client polling sees the failure instead of hanging indefinitely |
ML Training Process (Python)
Feature Engineering Details
| Category | Features | Description |
|---|---|---|
| Time Cyclicals | hour_sin, hour_cos, month_sin, month_cos, day_of_year_sin, day_of_year_cos | Circular encoding of temporal features |
| Solar Physics | solar_elevation, solar_azimuth, clear_sky_ghi, extraterrestrial_radiation | Calculated from latitude/longitude and timestamp |
| Weather | temperature, shortwave_radiation, uv_index, precipitation, weather_code | From Open-Meteo forecast data |
| Historical Profiles | avg_solar_by_hour, avg_load_by_hour, avg_soc_by_hour | Average values for each hour of day from historical data |
| Rolling Statistics | solar_power_1h, solar_power_3h, load_power_1h, soc_1h | Rolling mean windows |
| Lag Features | solar_power_lag_1, load_power_lag_1, soc_lag_1 | Previous timestep values |
Prediction Pipeline
Trigger
POST /api/predictions/predict
Body: { "sunSourceId": "..." }
StartPredictionAsync Implementation
The prediction flow follows the same pattern as training — Redis guard, fire-and-forget, status tracking:
public async Task<PredictionResultDto> StartPredictionAsync(Guid sunSourceId, string userId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId && s.UserId == userId);
if (sunSource == null)
throw new NotFoundException("Sun source not found");
var db = _redis.GetDatabase();
var activeKey = RedisKeys.MlActive(sunSourceId, "predict");
// Key: "ampra:ml:active:{sunSourceId}:predict"
if (await db.KeyExistsAsync(activeKey))
{
return new PredictionResultDto
{
JobId = (await db.StringGetAsync(activeKey)).ToString(),
Status = "already_running"
};
}
var jobId = Guid.NewGuid().ToString();
// Reserve active slot (10-min TTL — predictions are faster than training)
await db.StringSetAsync(activeKey, jobId, TimeSpan.FromMinutes(10));
await db.StringSetAsync(RedisKeys.MlJob(jobId), /* initial status JSON */, JobStatusTtl);
var weatherForecast = await BuildWeatherDataAsync(sunSourceId, sunSource);
_ = Task.Run(async () =>
{
try
{
var payload = JsonSerializer.Serialize(new
{
sunSourceId = sunSourceId.ToString(), jobId,
weatherForecast
});
var content = new StringContent(payload, Encoding.UTF8, HttpContentTypes.Json);
await _httpClient.PostAsync(MlEndpoints.Predict, content);
}
catch (Exception ex) { /* Write failed status to Redis */ }
finally { await db.KeyDeleteAsync(activeKey); }
});
return new PredictionResultDto { JobId = jobId, Status = "queued" };
}
Note: The prediction active key uses a 10-minute TTL (vs 30 minutes for training) because predictions are significantly faster.
Physics-Aware Hybrid Approach
The prediction engine does not rely solely on ML output. Instead, it uses a physics-aware blending strategy:
Why Physics-Aware?
| Problem | ML-Only Issue | Physics-Aware Solution |
|---|---|---|
| SOC drift | Cumulative prediction errors compound over 7 days | Energy-balance simulation respects conservation of energy |
| Night production | ML may predict non-zero solar at night | Clear-sky GHI is zero after sunset |
| Extreme weather | Limited training data for rare events | Cloud attenuation from weather codes provides physical bounds |
| New installations | Insufficient historical data | Physics model provides baseline even with no training data |
Job Tracking
All ML jobs are tracked in Redis with a 24-hour TTL:
Redis Key Structure
// Ampra.Core.Constants.RedisKeys
public static class RedisKeys
{
public const string MlJobPrefix = "ampra:ml:job:";
public const string MlActivePrefix = "ampra:ml:active:";
public static string MlJob(string jobId) => $"{MlJobPrefix}{jobId}";
public static string MlActive(Guid sunSourceId, string type)
=> $"{MlActivePrefix}{sunSourceId}:{type}";
}
| Key Pattern | Purpose | TTL |
|---|---|---|
ampra:ml:job:{jobId} | Full job status JSON | 24 hours |
ampra:ml:active:{sourceId}:train | Prevents duplicate training jobs | 30 minutes |
ampra:ml:active:{sourceId}:predict | Prevents duplicate prediction jobs | 10 minutes |
Job Status Lifecycle
Polling
GET /api/predictions/status/{jobId}
GetJobStatusAsync Implementation
public async Task<PredictionJobDto?> GetJobStatusAsync(string jobId, string userId)
{
var db = _redis.GetDatabase();
var data = await db.StringGetAsync(RedisKeys.MlJob(jobId));
if (data.IsNullOrEmpty)
return null;
var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var job = JsonSerializer.Deserialize<PredictionJobDto>(data.ToString(), options);
if (job == null) return null;
// Ownership check: user must own the source this job belongs to
var ownsSource = await _context.SunSources
.AnyAsync(s => s.Id == job.SunSourceId && s.UserId == userId);
if (!ownsSource) return null;
return job;
}
Returns the current job state:
{
"jobId": "abc-123",
"status": "completed",
"progress": 100,
"message": "Prediction complete",
"type": "predict",
"sunSourceId": "...",
"updatedAt": "2026-03-09T15:30:00Z",
"predictions": [...],
"metrics": {
"solar_mae": 150.2,
"solar_rmse": 210.5,
"solar_r2": 0.92,
"load_mae": 85.3,
"load_rmse": 120.1,
"load_r2": 0.88
},
"dataPointsUsed": 8640
}
Prediction Results
Latest Prediction
GET /api/predictions/latest/{sunSourceId}
GetLatestPredictionAsync Implementation
public async Task<object?> GetLatestPredictionAsync(Guid sunSourceId, string userId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId && s.UserId == userId);
if (sunSource == null)
throw new NotFoundException("Sun source not found");
var collection = _mongoDatabase.GetCollection<BsonDocument>(MongoCollections.Predictions);
var filter = Builders<BsonDocument>.Filter.Eq("sunSourceId", sunSourceId.ToString());
// Always get the most recent prediction
var doc = await collection.Find(filter)
.SortByDescending(d => d["createdAt"])
.FirstOrDefaultAsync();
if (doc == null) return null;
doc.Remove("_id"); // Strip MongoDB internal ID
return BsonTypeMapper.MapToDotNetValue(doc);
}
Returns stored predictions from MongoDB (predictions collection):
{
"exists": true,
"data": {
"predictions": [
{
"timestamp": "2026-03-10T00:00:00Z",
"solarPower": 0.0,
"loadPower": 450.0,
"stateOfCharge": 65.2,
"energyProduced": 0.0,
"energyConsumed": 0.45
},
{
"timestamp": "2026-03-10T01:00:00Z",
"solarPower": 0.0,
"loadPower": 420.0,
"stateOfCharge": 63.8,
"energyProduced": 0.0,
"energyConsumed": 0.42
}
// ... hourly for 7 days (168 data points)
]
}
}
Model Info
GET /api/predictions/model-info/{sunSourceId}
Returns metadata about the trained model from MongoDB (model_metadata collection), including training date, data points used, and accuracy metrics.
Scheduled Predictions
The DailyPredictionJob runs every 6 hours via Quartz:
DailyPredictionJob Implementation
[DisallowConcurrentExecution]
public class DailyPredictionJob : IJob
{
private readonly ApplicationDbContext _dbContext;
private readonly IPredictionService _predictionService;
private readonly ILogger<DailyPredictionJob> _logger;
public async Task Execute(IJobExecutionContext context)
{
// 1. Find users who opted into auto-predictions
var optedInUserIds = await _dbContext.UserSettings
.Where(s => s.SettingType == (int)SettingType.AutoRunPredictions
&& s.Value == "1")
.Select(s => s.UserId)
.ToListAsync();
// 2. Get all active sources for those users
var sunSources = await _dbContext.SunSources
.Where(s => optedInUserIds.Contains(s.UserId) && s.IsActive)
.ToListAsync();
// 3. Run predictions for each source
foreach (var sunSource in sunSources)
{
try
{
await _predictionService.RunScheduledPredictionAsync(sunSource.Id);
_logger.LogInformation("Completed prediction for source {SourceId}", sunSource.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed prediction for source {SourceId}", sunSource.Id);
}
}
}
}
RunScheduledPredictionAsync Implementation
Unlike the user-triggered StartPredictionAsync, scheduled predictions run synchronously (no fire-and-forget) and skip the Redis concurrency guard:
public async Task RunScheduledPredictionAsync(Guid sunSourceId)
{
var sunSource = await _context.SunSources
.FirstOrDefaultAsync(s => s.Id == sunSourceId);
if (sunSource == null)
throw new NotFoundException("Sun source not found");
var weatherForecast = await BuildWeatherDataAsync(sunSourceId, sunSource);
var jobId = $"scheduled-{Guid.NewGuid()}";
var payload = JsonSerializer.Serialize(new
{
sunSourceId = sunSourceId.ToString(), jobId, weatherForecast
});
var content = new StringContent(payload, Encoding.UTF8, HttpContentTypes.Json);
var response = await _httpClient.PostAsync(MlEndpoints.Predict, content);
if (!response.IsSuccessStatusCode)
throw new InvalidOperationException(
"Prediction service encountered an error. Please ensure the model has been trained.");
}
BuildWeatherDataAsync — Weather Data Preparation
Both user-triggered and scheduled predictions share this method to prepare weather data for the ML service. It auto-refreshes if no data exists:
private async Task<List<object>> BuildWeatherDataAsync(Guid sunSourceId, SunSource sunSource)
{
var weatherData = await _weatherService.GetWeatherDataAsync(sunSourceId);
// Auto-refresh weather if none exists and source has coordinates
if (weatherData.Count == 0 && sunSource.Latitude.HasValue && sunSource.Longitude.HasValue)
{
try
{
await _weatherService.RefreshWeatherDataAsync(sunSource);
weatherData = await _weatherService.GetWeatherDataAsync(sunSourceId);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to refresh weather data for {SourceId}", sunSourceId);
}
}
return weatherData.Select(w => (object)new
{
date = w.Date.ToString("yyyy-MM-dd"),
temperatureMax = w.TemperatureMax,
temperatureMin = w.TemperatureMin,
weatherCode = w.WeatherCode,
shortwaveRadiationSum = w.ShortwaveRadiationSum,
precipitationSum = w.PrecipitationSum,
uvIndexMax = w.UvIndexMax,
sunrise = w.Sunrise?.ToString("o"),
sunset = w.Sunset?.ToString("o"),
}).ToList();
}
Weather Data Dependency
Predictions require weather data. The UpdateWeatherDataJob also runs every 6 hours and is triggered before the prediction job in the Quartz schedule, ensuring fresh weather data is available for the ML models.