Работа с MongoDB Oplog: Как отслеживать изменения документов
Статья, в основном, ориентирована для специалистов, которые столкнулись с проблемой «исчезающих» документов в MongoDB и не понимающих, где найти историю этих документов, была ли она вообще сохранена в коллекции или же была обновлена и удалена, и в какой последовательности происходили все эти действия и в какое время.
MongoDB — это популярная NoSQL база данных, широко используемая для хранения больших объемов данных. Одной из ключевых возможностей MongoDB является механизм Oplog (операционный журнал), который позволяет отслеживать изменения в коллекциях. В этой статье мы рассмотрим, как работать с Oplog, искать документы, преобразовывать временные метки и выводить результаты в читаемом формате, что крайне удобно для аналитиков.
Введение в Oplog
Oplog представляет собой журнал, содержащий записи обо всех операциях, которые изменяют состояние базы данных. Это могут быть операции вставки [i], обновления [u] и удаления [d]. Oplog особенно полезен для репликации данных и отслеживания изменений в реальном времени.
Задача
Предположим, у нас есть коллекция документов в MongoDB, и мы хотим отслеживать изменения статусов этих документов. Наша цель — найти документы с определёнными начальными статусами, а затем найти последующие изменения этих документов на новые статусы. Мы также хотим преобразовать временные метки MongoDB в читаемый формат и вывести результаты.
Пример готового запроса в Oplog
function convertNumberLongToISOString(numberLong) {
var date = new Date(numberLong.toNumber());
return date.toISOString();
}
function convertTimestampToMoscowTime(timestamp) {
var date = new Date(timestamp.getHighBits() * 1000);
var moscowOffset = 3 * 60 * 60 * 1000;
var moscowTime = new Date(date.getTime() + moscowOffset);
var day = ('0' + moscowTime.getDate()).slice(-2);
var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2);
var year = moscowTime.getFullYear();
var hours = ('0' + moscowTime.getHours()).slice(-2);
var minutes = ('0' + moscowTime.getMinutes()).slice(-2);
var seconds = ('0' + moscowTime.getSeconds()).slice(-2);
return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds;
}
var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую
var targetStatuses = ["ERROR"]; // может быть множество значений, перечисленных через запятую
var startDate = new Date('2020-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве, начиная с которой будет осуществлятся поиск логов
var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0);
var initialDocuments = db.getCollection("oplog.rs").find({
"ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
"o.statusCode": { $in: initialStatuses },
"ts": { $gte: startTimestamp }
}, {
// тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса
// перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object
"o.statusCode": 1,
"o.cadId": 1,
"o.pupsId": 1,
"o.number": 1,
"o.date": 1,
"o.dateFast": 1,
"o.refactor": 1,
"o.rembo": 1,
"o._id": 1,
"op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным
"ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным
}).toArray();
if (initialDocuments.length > 0) {
var documentIds = initialDocuments.map(function(doc) {
return doc.o._id; // берем конкретный _id документа и смотрим переход по нужным нам статусам
});
var targetDocuments = db.getCollection("oplog.rs").find({
"op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая), можно оставить только одну из них
"ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
"o._id": { $in: documentIds },
"o.statusCode": { $in: targetStatuses },
"ts": { $gte: startTimestamp }
}, {
// тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса
// перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object
"o.statusCode": 1,
"o.cadId": 1,
"o.pupsId": 1,
"o.number": 1,
"o.date": 1,
"o.dateFast": 1,
"o.refactor": 1,
"o.rembo": 1,
"o._id": 1,
"op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным
"ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным
}).toArray();
var initialDocMap = {};
initialDocuments.forEach(function(doc) {
initialDocMap[doc.o._id] = doc;
});
var matchCount = 0;
targetDocuments.forEach(function(doc) {
if (matchCount >= 100) return; // остановка вывода результатов после 100 совпадений, чтобы не грохнуть БД) но вы можете снять это ограничение или уменьшить
var prevDoc = initialDocMap[doc.o._id];
if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) {
print("Document with pupsId" + prevDoc.o.pupsId + ":"); // тут пишем, что хотим видеть в заголовке найденных документов, в данном случае я хочу видеть в заголовке наименование компании
printjson({ // перечисляем поля для красивого вывода, где можно преобразовать наименование поля из коллекции в более удобочитаемое
_id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
refactor: prevDoc.o.refactor,
rembo: prevDoc.o.rembo,
cadId: prevDoc.o.cadId,
pupsId: prevDoc.o.pupsId,
statusCode: prevDoc.o.statusCode,
number: prevDoc.o.number,
dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
date: prevDoc.o.date,
op: prevDoc.op,
ts: convertTimestampToMoscowTime(prevDoc.ts)
});
print("Document with pupsId" + doc.o.pupsId + ":");
printjson({
_id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
refactor: prevDoc.o.refactor,
rembo: prevDoc.o.rembo,
cadId: prevDoc.o.cadId,
pupsId: prevDoc.o.pupsId,
statusCode: prevDoc.o.statusCode,
number: prevDoc.o.number,
dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
date: prevDoc.o.date,
op: doc.op,
ts: convertTimestampToMoscowTime(doc.ts)
});
matchCount++;
}
});
if (matchCount === 0) {
print("No matching documents found."); // пишем в свободной форме, какой текст вывести, если документов не нашлось
}
} else {
print("No documents found with initial statuses."); // пишем в свободной форме, какой текст вывести, если документов, с изначальным (initial) статусом не нашлось
}
// Поиск операций удаления
var deleteOperations = db.getCollection("oplog.rs").find({
"op": "d"
}).limit(100).toArray(); // Лимит на выдачу документов
// Вывод операций удаления
deleteOperations.forEach(function(doc) {
print("Delete operation:");
printjson({
_id: doc.o._id,
ts: convertTimestampToMoscowTime(doc.ts),
op: doc.op,
ns: doc.ns,
ui: doc.ui
});
});
Пример ответа
Document with pupsId Ромашка:
{
_id: '1234fgbf4d8d48aa8a2ca44565fds43j', // по этому _id мы нашли 3 записи обновления, которые произошли с этим документом после утсановленной нами даты в запросе 2020-03-01T00:00:00Z
refactor: 'sdgds435g',
rembo: null,
cadId: '444fff',
pupsId: 'UBP9JN',
statusCode: 'ERROR',
number: '692343',
dateFast: '2023-11-15T18:16:59.636Z',
date: '15.11.2023',
op: 'u', // также можно побаловаться запросом и при необходимости преобразовать "u" в "обновление"
ts: '16-11-2023 00:19:02' // наш преобразованный NumberLong в удобочитаемом формате
}
Document with pupsId Ромашка:
{
_id: '1234fgbf4d8d48aa8a2ca44565fds43j',
refactor: 'sdfsdf454ff',
rembo: null,
cadId: '444fff',
pupsId: 'UBP9JN',
statusCode: 'WELL_DONE',
number: '75324324',
dateFast: '2023-11-16T15:02:04.655Z',
date: '16.11.2023',
op: 'u',
ts: '16-11-2023 21:02:05'
}
Document with pupsId Ромашка:
{
_id: '1234fgbf4d8d48aa8a2ca44565fds43j',
refactor: 'sdfsdg4543fg',
rembo: null,
cadId: '444fff',
pupsId: 'UBP9JN',
statusCode: 'ERROR',
number: '75234234',
dateFast: '2023-11-16T15:02:05.655Z',
date: '16.11.2023',
op: 'u',
ts: '16-11-2023 21:04:35'
}
Подробные шаги выполнения запроса выше
Получение UUID коллекции для запроса в Oplog.
Подготовка функций для преобразования временных меток.
Поиск начальных документов с определёнными статусами.
Поиск последующих изменений этих документов.
Вывод результатов в читаемом формате.
Поиск операций удаления [d].
Получение UUID коллекции для запроса в Oplog
Поскольку oplog хранится в БД local и используется сразу для всех коллекций внутри сета серверов, то нам необходимо получить конкретный UUID коллекции, из которой хотим получить хронологию данных по документам.
Запрос вводится в окне query-запроса выбранной коллекции
db.getCollectionInfos()
Пример ответа
Из этого ответа нам понадобится значение из поля «uuid», которое в дальнейшем будем вставлять в поле «ui» в окне query-запроса Oplog.
[
{
"name" : "your-collection-name",
"type" : "collection",
"options" : {
},
"info" : {
"readOnly" : false,
"uuid" : UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54") // необходимое значение
},
"idIndex" : {
"v" : 2.0,
"key" : {
"_id" : 1.0
},
"name" : "_id_"
}
}
]
Подготовка функций для преобразования временных меток
MongoDB хранит временные метки в формате Timestamp
и NumberLong
. Для удобства чтения нам нужно преобразовать их в стандартные временные форматы.
Функция для преобразования NumberLong в ISO-строку
function convertNumberLongToISOString(numberLong) {
var date = new Date(numberLong.toNumber());
return date.toISOString();
}
Функция для преобразования Timestamp в московское время
function convertTimestampToMoscowTime(timestamp) {
var date = new Date(timestamp.getHighBits() * 1000);
var moscowOffset = 3 * 60 * 60 * 1000; // Москва на 3 часа впереди UTC
var moscowTime = new Date(date.getTime() + moscowOffset);
var day = ('0' + moscowTime.getDate()).slice(-2);
var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2);
var year = moscowTime.getFullYear();
var hours = ('0' + moscowTime.getHours()).slice(-2);
var minutes = ('0' + moscowTime.getMinutes()).slice(-2);
var seconds = ('0' + moscowTime.getSeconds()).slice(-2);
return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds;
}
Поиск начальных документов с определёнными статусами
Наша задача — найти документы с начальными статусами ERROR
, начиная с определённой даты.
В примере мы используем поле statusCode
, которое имеется в каждом документе нашей коллекции.
Выполняем поиск по логике — initialStatuses
— статус, который был записан первым и targetStatuses
— статус, который обновил предыдущий статус.
Для этого используем следующий запрос:
var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую
var startDate = new Date('2024-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве
var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0);
var initialDocuments = db.getCollection("oplog.rs").find({
"ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
"o.statusCode": { $in: initialStatuses },
"ts": { $gte: startTimestamp }
}, { // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса
// перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object
"o.statusCode": 1,
"o.cadId": 1,
"o.pupsId": 1,
"o.number": 1,
"o.date": 1,
"o.dateFast": 1,
"o.refactor": 1,
"o.rembo": 1,
"o._id": 1,
"op": 1,
"ts": 1
}).toArray();
Поиск последующих изменений этих документов
После получения начальных документов ищем их изменения на целевые статусы DEBITED
или DOCUMENT_DONE
.
var targetStatuses = ["ERROR"];
if (initialDocuments.length > 0) {
var documentIds = initialDocuments.map(function(doc) {
return doc.o._id;
});
var targetDocuments = db.getCollection("oplog.rs").find({
"op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая)
"ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
"o._id": { $in: documentIds },
"o.statusCode": { $in: targetStatuses },
"ts": { $gte: startTimestamp }
}, {
"o.statusCode": 1,
"o.cadId": 1,
"o.pupsId": 1,
"o.number": 1,
"o.date": 1,
"o.dateFast": 1,
"o.refactor": 1,
"o.rembo": 1,
"o._id": 1,
"op": 1,
"ts": 1
}).toArray();
Вывод результатов в читаемом формате
Для удобства создадим карту начальных документов, а затем выведем сопоставленные документы.
var initialDocMap = {};
initialDocuments.forEach(function(doc) {
initialDocMap[doc.o._id] = doc;
});
var matchCount = 0;
targetDocuments.forEach(function(doc) {
if (matchCount >= 100) return;
var prevDoc = initialDocMap[doc.o._id];
if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) {
print("Document with pupsId" + prevDoc.o.pupsId + ":");
printjson({
_id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
refactor: prevDoc.o.refactor,
rembo: prevDoc.o.rembo,
cadId: prevDoc.o.cadId,
pupsId: prevDoc.o.pupsId,
statusCode: prevDoc.o.statusCode,
number: prevDoc.o.number,
dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
date: prevDoc.o.date,
op: prevDoc.op,
ts: convertTimestampToMoscowTime(prevDoc.ts)
});
print("Document with pupsId" + doc.o.pupsId + ":");
printjson({
_id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
refactor: prevDoc.o.refactor,
rembo: prevDoc.o.rembo,
cadId: prevDoc.o.cadId,
pupsId: prevDoc.o.pupsId,
statusCode: prevDoc.o.statusCode,
number: prevDoc.o.number,
dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
date: prevDoc.o.date,
op: doc.op,
ts: convertTimestampToMoscowTime(doc.ts)
});
matchCount++;
}
});
if (matchCount === 0) {
print("No matching documents found.");
}
} else {
print("No documents found with initial statuses.");
}
Поиск операций удаления [d]
Мы не можем вставить в начале нашего запроса операцию [d] удаления совместно с операциями [u] (обновление) и [i] (внесение), тк в выводе oplog показывает нужные нам _id
и ts
(timestamp — время) удаленного документа без полей, свойственных для документов в нашей коллекции.
// Поиск операций удаления
var deleteOperations = db.getCollection("oplog.rs").find({
"op": "d"
}).limit(100).toArray(); // Лимит на выдачу документов
// Вывод операций удаления
deleteOperations.forEach(function(doc) {
print("Delete operation:");
printjson({
_id: doc.o._id,
ts: convertTimestampToMoscowTime(doc.ts),
op: doc.op,
ns: doc.ns,
ui: doc.ui
});
});
Этот скрипт выполняет следующие шаги:
Создаёт карту начальных документов для быстрого доступа по идентификатору.
Проходит по каждому документу из целевых изменений и проверяет, есть ли соответствующий начальный документ с той же ID и более поздней временной меткой.
Выводит начальный и изменённый документ в читаемом формате, включая преобразованные временные метки.
Заключение
В этой статье мы рассмотрели, как работать с MongoDB Oplog для отслеживания изменений документов. Мы научились искать документы с определёнными начальными статусами, находить их последующие изменения и выводить результаты в читаемом формате.