Fix concurrent stat insertions causing duplicate key errors

Resolves issue where multiple monitors updating statistics simultaneously
can cause "Duplicate entry" database errors for the same monitor_id and
timestamp combination in stat_hourly and stat_daily tables.

Changes:
- Add database-specific upsert logic for SQLite and MariaDB
- Replace R.store() calls with atomic upsert operations
- Add fallback to original R.store() if upsert fails
- Initialize default values for new stat beans to prevent null conflicts
- Use ON CONFLICT/ON DUPLICATE KEY UPDATE for atomic stat updates

This fix is particularly important for high-volume monitoring scenarios
with 400+ monitors where concurrent heartbeats can trigger race conditions
in the stat insertion process.

Fixes #5357
This commit is contained in:
undaunt 2025-06-17 21:27:17 -07:00
parent 443d5cf554
commit 19889a57b2

View file

@ -3,6 +3,7 @@ const { UP, MAINTENANCE, DOWN, PENDING } = require("../src/util");
const { LimitQueue } = require("./utils/limit-queue"); const { LimitQueue } = require("./utils/limit-queue");
const { log } = require("../src/util"); const { log } = require("../src/util");
const { R } = require("redbean-node"); const { R } = require("redbean-node");
const { Database } = require("./database");
/** /**
* Calculates the uptime of a monitor. * Calculates the uptime of a monitor.
@ -299,14 +300,23 @@ class UptimeCalculator {
dailyStatBean.ping = dailyData.avgPing; dailyStatBean.ping = dailyData.avgPing;
dailyStatBean.pingMin = dailyData.minPing; dailyStatBean.pingMin = dailyData.minPing;
dailyStatBean.pingMax = dailyData.maxPing; dailyStatBean.pingMax = dailyData.maxPing;
let dailyExtras = null;
{ {
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = dailyData; const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = dailyData;
if (Object.keys(extras).length > 0) { if (Object.keys(extras).length > 0) {
dailyStatBean.extras = JSON.stringify(extras); dailyStatBean.extras = JSON.stringify(extras);
dailyExtras = JSON.stringify(extras);
} }
} }
await R.store(dailyStatBean); try {
await this.upsertStat("stat_daily", this.monitorID, dailyKey,
dailyData.up, dailyData.down, dailyData.avgPing,
dailyData.minPing, dailyData.maxPing, dailyExtras);
} catch (error) {
log.warn("uptime-calc", `Upsert failed for daily stat, falling back to R.store(): ${error.message}`);
await R.store(dailyStatBean);
}
let currentDate = this.getCurrentDate(); let currentDate = this.getCurrentDate();
@ -319,14 +329,23 @@ class UptimeCalculator {
hourlyStatBean.ping = hourlyData.avgPing; hourlyStatBean.ping = hourlyData.avgPing;
hourlyStatBean.pingMin = hourlyData.minPing; hourlyStatBean.pingMin = hourlyData.minPing;
hourlyStatBean.pingMax = hourlyData.maxPing; hourlyStatBean.pingMax = hourlyData.maxPing;
let hourlyExtras = null;
{ {
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = hourlyData; const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = hourlyData;
if (Object.keys(extras).length > 0) { if (Object.keys(extras).length > 0) {
hourlyStatBean.extras = JSON.stringify(extras); hourlyStatBean.extras = JSON.stringify(extras);
hourlyExtras = JSON.stringify(extras);
} }
} }
await R.store(hourlyStatBean); try {
await this.upsertStat("stat_hourly", this.monitorID, hourlyKey,
hourlyData.up, hourlyData.down, hourlyData.avgPing,
hourlyData.minPing, hourlyData.maxPing, hourlyExtras);
} catch (error) {
log.warn("uptime-calc", `Upsert failed for hourly stat, falling back to R.store(): ${error.message}`);
await R.store(hourlyStatBean);
}
} }
// For migration mode, we don't need to store old hourly and minutely data, but we need 24-hour's minutely data // For migration mode, we don't need to store old hourly and minutely data, but we need 24-hour's minutely data
@ -338,14 +357,23 @@ class UptimeCalculator {
minutelyStatBean.ping = minutelyData.avgPing; minutelyStatBean.ping = minutelyData.avgPing;
minutelyStatBean.pingMin = minutelyData.minPing; minutelyStatBean.pingMin = minutelyData.minPing;
minutelyStatBean.pingMax = minutelyData.maxPing; minutelyStatBean.pingMax = minutelyData.maxPing;
let minutelyExtras = null;
{ {
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = minutelyData; const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = minutelyData;
if (Object.keys(extras).length > 0) { if (Object.keys(extras).length > 0) {
minutelyStatBean.extras = JSON.stringify(extras); minutelyStatBean.extras = JSON.stringify(extras);
minutelyExtras = JSON.stringify(extras);
} }
} }
await R.store(minutelyStatBean); try {
await this.upsertStat("stat_minutely", this.monitorID, divisionKey,
minutelyData.up, minutelyData.down, minutelyData.avgPing,
minutelyData.minPing, minutelyData.maxPing, minutelyExtras);
} catch (error) {
log.warn("uptime-calc", `Upsert failed for minutely stat, falling back to R.store(): ${error.message}`);
await R.store(minutelyStatBean);
}
} }
// No need to remove old data in migration mode // No need to remove old data in migration mode
@ -386,6 +414,11 @@ class UptimeCalculator {
bean = R.dispense("stat_daily"); bean = R.dispense("stat_daily");
bean.monitor_id = this.monitorID; bean.monitor_id = this.monitorID;
bean.timestamp = timestamp; bean.timestamp = timestamp;
bean.up = 0;
bean.down = 0;
bean.ping = 0;
bean.pingMin = 0;
bean.pingMax = 0;
} }
this.lastDailyStatBean = bean; this.lastDailyStatBean = bean;
@ -411,6 +444,11 @@ class UptimeCalculator {
bean = R.dispense("stat_hourly"); bean = R.dispense("stat_hourly");
bean.monitor_id = this.monitorID; bean.monitor_id = this.monitorID;
bean.timestamp = timestamp; bean.timestamp = timestamp;
bean.up = 0;
bean.down = 0;
bean.ping = 0;
bean.pingMin = 0;
bean.pingMax = 0;
} }
this.lastHourlyStatBean = bean; this.lastHourlyStatBean = bean;
@ -436,6 +474,11 @@ class UptimeCalculator {
bean = R.dispense("stat_minutely"); bean = R.dispense("stat_minutely");
bean.monitor_id = this.monitorID; bean.monitor_id = this.monitorID;
bean.timestamp = timestamp; bean.timestamp = timestamp;
bean.up = 0;
bean.down = 0;
bean.ping = 0;
bean.pingMin = 0;
bean.pingMax = 0;
} }
this.lastMinutelyStatBean = bean; this.lastMinutelyStatBean = bean;
@ -516,6 +559,59 @@ class UptimeCalculator {
return dailyKey; return dailyKey;
} }
/**
* Upsert stat data using database-specific logic to handle concurrent insertions
* @param {string} table The stat table name (stat_daily, stat_hourly, stat_minutely)
* @param {number} monitorId The monitor ID
* @param {number} timestamp The timestamp key
* @param {number} up Up count
* @param {number} down Down count
* @param {number} ping Average ping
* @param {number} pingMin Minimum ping
* @param {number} pingMax Maximum ping
* @param {string|null} extras JSON string of extra data
* @returns {Promise<void>}
*/
async upsertStat(table, monitorId, timestamp, up, down, ping, pingMin, pingMax, extras = null) {
const dbType = Database.dbConfig.type;
try {
if (dbType === "sqlite") {
await R.exec(`
INSERT INTO ${table} (monitor_id, timestamp, up, down, ping, pingMin, pingMax, extras)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(monitor_id, timestamp) DO UPDATE SET
up = ?,
down = ?,
ping = ?,
pingMin = ?,
pingMax = ?,
extras = ?
`, [
monitorId, timestamp, up, down, ping, pingMin, pingMax, extras,
up, down, ping, pingMin, pingMax, extras
]);
} else if (dbType.endsWith("mariadb")) {
await R.exec(`
INSERT INTO ${table} (monitor_id, timestamp, up, down, ping, pingMin, pingMax, extras)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
up = VALUES(up),
down = VALUES(down),
ping = VALUES(ping),
pingMin = VALUES(pingMin),
pingMax = VALUES(pingMax),
extras = VALUES(extras)
`, [monitorId, timestamp, up, down, ping, pingMin, pingMax, extras]);
} else {
throw new Error(`Unsupported database type: ${dbType}`);
}
} catch (error) {
log.debug("uptime-calc", `Failed to upsert ${table} for monitor ${monitorId}: ${error.message}`);
throw error;
}
}
/** /**
* Convert timestamp to key * Convert timestamp to key
* @param {dayjs.Dayjs} datetime Datetime * @param {dayjs.Dayjs} datetime Datetime