Работа с MongoDB Oplog: Как отслеживать изменения документов

ea170fa13f9a01f1d6c0bd327470777a

Статья, в основном, ориентирована для специалистов, которые столкнулись с проблемой «исчезающих» документов в MongoDB и не понимающих, где найти историю этих документов, была ли она вообще сохранена в коллекции или же была обновлена и удалена, и в какой последовательности происходили все эти действия и в какое время.

MongoDB — это популярная NoSQL база данных, широко используемая для хранения больших объемов данных. Одной из ключевых возможностей MongoDB является механизм Oplog (операционный журнал), который позволяет отслеживать изменения в коллекциях. В этой статье мы рассмотрим, как работать с Oplog, искать документы, преобразовывать временные метки и выводить результаты в читаемом формате, что крайне удобно для аналитиков.

Введение в Oplog

Oplog представляет собой журнал, содержащий записи обо всех операциях, которые изменяют состояние базы данных. Это могут быть операции вставки [i], обновления [u] и удаления [d]. Oplog особенно полезен для репликации данных и отслеживания изменений в реальном времени.

Задача

Предположим, у нас есть коллекция документов в MongoDB, и мы хотим отслеживать изменения статусов этих документов. Наша цель — найти документы с определёнными начальными статусами, а затем найти последующие изменения этих документов на новые статусы. Мы также хотим преобразовать временные метки MongoDB в читаемый формат и вывести результаты.

Пример готового запроса в Oplog

function convertNumberLongToISOString(numberLong) {
    var date = new Date(numberLong.toNumber());
    return date.toISOString();
}

function convertTimestampToMoscowTime(timestamp) {
    var date = new Date(timestamp.getHighBits() * 1000);
    var moscowOffset = 3 * 60 * 60 * 1000;
    var moscowTime = new Date(date.getTime() + moscowOffset);

    var day = ('0' + moscowTime.getDate()).slice(-2);
    var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2);
    var year = moscowTime.getFullYear();
    var hours = ('0' + moscowTime.getHours()).slice(-2);
    var minutes = ('0' + moscowTime.getMinutes()).slice(-2);
    var seconds = ('0' + moscowTime.getSeconds()).slice(-2);

    return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds;
}

var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую
var targetStatuses = ["ERROR"]; // может быть множество значений, перечисленных через запятую

var startDate = new Date('2020-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве, начиная с которой будет осуществлятся поиск логов
var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0);

var initialDocuments = db.getCollection("oplog.rs").find({
    "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
    "o.statusCode": { $in: initialStatuses },
    "ts": { $gte: startTimestamp }
}, {
     // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса
     // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object
    "o.statusCode": 1,
    "o.cadId": 1,
    "o.pupsId": 1,
    "o.number": 1,
    "o.date": 1,
    "o.dateFast": 1,
    "o.refactor": 1,
    "o.rembo": 1,
    "o._id": 1,
    "op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным
    "ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным
}).toArray();

if (initialDocuments.length > 0) {
    var documentIds = initialDocuments.map(function(doc) {
        return doc.o._id; // берем конкретный _id документа и смотрим переход по нужным нам статусам
    });

    var targetDocuments = db.getCollection("oplog.rs").find({
        "op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая), можно оставить только одну из них
        "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
        "o._id": { $in: documentIds },
        "o.statusCode": { $in: targetStatuses },
        "ts": { $gte: startTimestamp }
    }, {
         // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса
         // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object
       "o.statusCode": 1,
       "o.cadId": 1,
       "o.pupsId": 1,
       "o.number": 1,         
       "o.date": 1,
       "o.dateFast": 1,
       "o.refactor": 1,
       "o.rembo": 1,
       "o._id": 1,
       "op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным
       "ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным
    }).toArray();

    var initialDocMap = {};
    initialDocuments.forEach(function(doc) {
        initialDocMap[doc.o._id] = doc;
    });

    var matchCount = 0;
    targetDocuments.forEach(function(doc) {
        if (matchCount >= 100) return; //  остановка вывода результатов после 100 совпадений, чтобы не грохнуть БД) но вы можете снять это ограничение или уменьшить

        var prevDoc = initialDocMap[doc.o._id];
        if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) {
            print("Document with pupsId" + prevDoc.o.pupsId + ":"); // тут пишем, что хотим видеть в заголовке найденных документов, в данном случае я хочу видеть в заголовке наименование компании
            printjson({ // перечисляем поля для красивого вывода, где можно преобразовать наименование поля из коллекции в более удобочитаемое
                _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
                refactor: prevDoc.o.refactor,
                rembo: prevDoc.o.rembo,
                cadId: prevDoc.o.cadId,
                pupsId: prevDoc.o.pupsId,
                statusCode: prevDoc.o.statusCode,
                number: prevDoc.o.number,
                dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
                date: prevDoc.o.date,
                op: prevDoc.op,
                ts: convertTimestampToMoscowTime(prevDoc.ts)
            });

            print("Document with pupsId" + doc.o.pupsId + ":");
            printjson({
                _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
                refactor: prevDoc.o.refactor,
                rembo: prevDoc.o.rembo,
                cadId: prevDoc.o.cadId,
                pupsId: prevDoc.o.pupsId,
                statusCode: prevDoc.o.statusCode,
                number: prevDoc.o.number,
                dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
                date: prevDoc.o.date,
                op: doc.op,
                ts: convertTimestampToMoscowTime(doc.ts)
            });

            matchCount++;
        }
    });

    if (matchCount === 0) {
        print("No matching documents found."); // пишем в свободной форме, какой текст вывести, если документов не нашлось
    }
} else {
    print("No documents found with initial statuses."); // пишем в свободной форме, какой текст вывести, если документов, с изначальным (initial) статусом не нашлось
}

// Поиск операций удаления
var deleteOperations = db.getCollection("oplog.rs").find({
    "op": "d"
}).limit(100).toArray(); // Лимит на выдачу документов

// Вывод операций удаления
deleteOperations.forEach(function(doc) {
    print("Delete operation:");
    printjson({
        _id: doc.o._id,
        ts: convertTimestampToMoscowTime(doc.ts),
        op: doc.op,
        ns: doc.ns,
        ui: doc.ui
    });
});

Пример ответа

Document with pupsId Ромашка:
{
  _id: '1234fgbf4d8d48aa8a2ca44565fds43j', // по этому _id мы нашли 3 записи обновления, которые произошли с этим документом после утсановленной нами даты в запросе 2020-03-01T00:00:00Z 
  refactor: 'sdgds435g',
  rembo: null,
  cadId: '444fff',
  pupsId: 'UBP9JN',
  statusCode: 'ERROR',
  number: '692343',
  dateFast: '2023-11-15T18:16:59.636Z',
  date: '15.11.2023',
  op: 'u', // также можно побаловаться запросом и при необходимости преобразовать "u" в "обновление"
  ts: '16-11-2023 00:19:02' // наш преобразованный NumberLong в удобочитаемом формате
}
Document with pupsId Ромашка:
{
  _id: '1234fgbf4d8d48aa8a2ca44565fds43j',
  refactor: 'sdfsdf454ff',
  rembo: null,
  cadId: '444fff',
  pupsId: 'UBP9JN',
  statusCode: 'WELL_DONE',
  number: '75324324',
  dateFast: '2023-11-16T15:02:04.655Z',
  date: '16.11.2023',
  op: 'u',
  ts: '16-11-2023 21:02:05'
}
Document with pupsId Ромашка:
{
  _id: '1234fgbf4d8d48aa8a2ca44565fds43j',
  refactor: 'sdfsdg4543fg',
  rembo: null,
  cadId: '444fff',
  pupsId: 'UBP9JN',
  statusCode: 'ERROR',
  number: '75234234',
  dateFast: '2023-11-16T15:02:05.655Z',
  date: '16.11.2023',
  op: 'u',
  ts: '16-11-2023 21:04:35'
}

Подробные шаги выполнения запроса выше

  1. Получение UUID коллекции для запроса в Oplog.

  2. Подготовка функций для преобразования временных меток.

  3. Поиск начальных документов с определёнными статусами.

  4. Поиск последующих изменений этих документов.

  5. Вывод результатов в читаемом формате.

  6. Поиск операций удаления [d].

Получение UUID коллекции для запроса в Oplog

Поскольку oplog хранится в БД local и используется сразу для всех коллекций внутри сета серверов, то нам необходимо получить конкретный UUID коллекции, из которой хотим получить хронологию данных по документам.

Запрос вводится в окне query-запроса выбранной коллекции

db.getCollectionInfos()

Пример ответа

Из этого ответа нам понадобится значение из поля «uuid», которое в дальнейшем будем вставлять в поле «ui» в окне query-запроса Oplog.

[
    {
        "name" : "your-collection-name",
        "type" : "collection",
        "options" : {

        },
        "info" : {
            "readOnly" : false,
            "uuid" : UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54") // необходимое значение
        },
        "idIndex" : {
            "v" : 2.0,
            "key" : {
                "_id" : 1.0
            },
            "name" : "_id_"
        }
    }
]

Подготовка функций для преобразования временных меток

MongoDB хранит временные метки в формате Timestamp и NumberLong. Для удобства чтения нам нужно преобразовать их в стандартные временные форматы.

Функция для преобразования NumberLong в ISO-строку

function convertNumberLongToISOString(numberLong) {
    var date = new Date(numberLong.toNumber());
    return date.toISOString();
}

Функция для преобразования Timestamp в московское время

function convertTimestampToMoscowTime(timestamp) {
    var date = new Date(timestamp.getHighBits() * 1000);
    var moscowOffset = 3 * 60 * 60 * 1000; // Москва на 3 часа впереди UTC
    var moscowTime = new Date(date.getTime() + moscowOffset);

    var day = ('0' + moscowTime.getDate()).slice(-2);
    var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2);
    var year = moscowTime.getFullYear();
    var hours = ('0' + moscowTime.getHours()).slice(-2);
    var minutes = ('0' + moscowTime.getMinutes()).slice(-2);
    var seconds = ('0' + moscowTime.getSeconds()).slice(-2);

    return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds;
}

Поиск начальных документов с определёнными статусами

Наша задача — найти документы с начальными статусами ERROR, начиная с определённой даты.

В примере мы используем поле statusCode, которое имеется в каждом документе нашей коллекции.

Выполняем поиск по логике — initialStatuses — статус, который был записан первым и targetStatuses — статус, который обновил предыдущий статус.

Для этого используем следующий запрос:

var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую
var startDate = new Date('2024-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве
var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0);

var initialDocuments = db.getCollection("oplog.rs").find({
    "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
    "o.statusCode": { $in: initialStatuses },
    "ts": { $gte: startTimestamp }
}, { // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса
     // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object
    "o.statusCode": 1,
    "o.cadId": 1,
    "o.pupsId": 1,
    "o.number": 1,         
    "o.date": 1,
    "o.dateFast": 1,
    "o.refactor": 1,
    "o.rembo": 1,
    "o._id": 1,
    "op": 1,
    "ts": 1
}).toArray();

Поиск последующих изменений этих документов

После получения начальных документов ищем их изменения на целевые статусы DEBITED или DOCUMENT_DONE.

var targetStatuses = ["ERROR"];

if (initialDocuments.length > 0) {
    var documentIds = initialDocuments.map(function(doc) {
        return doc.o._id;
    });

    var targetDocuments = db.getCollection("oplog.rs").find({
        "op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая)
        "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1
        "o._id": { $in: documentIds },
        "o.statusCode": { $in: targetStatuses },
        "ts": { $gte: startTimestamp }
    }, {
       "o.statusCode": 1,
       "o.cadId": 1,
       "o.pupsId": 1,
       "o.number": 1,         
       "o.date": 1,
       "o.dateFast": 1,
       "o.refactor": 1,
       "o.rembo": 1,
       "o._id": 1,
       "op": 1,
       "ts": 1
    }).toArray();

Вывод результатов в читаемом формате

Для удобства создадим карту начальных документов, а затем выведем сопоставленные документы.

    var initialDocMap = {};
    initialDocuments.forEach(function(doc) {
        initialDocMap[doc.o._id] = doc;
    });

    var matchCount = 0;
    targetDocuments.forEach(function(doc) {
        if (matchCount >= 100) return;

        var prevDoc = initialDocMap[doc.o._id];
        if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) {
            print("Document with pupsId" + prevDoc.o.pupsId + ":");
            printjson({
                _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
                refactor: prevDoc.o.refactor,
                rembo: prevDoc.o.rembo,
                cadId: prevDoc.o.cadId,
                pupsId: prevDoc.o.pupsId,
                statusCode: prevDoc.o.statusCode,
                number: prevDoc.o.number,
                dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
                date: prevDoc.o.date,
                op: prevDoc.op,
                ts: convertTimestampToMoscowTime(prevDoc.ts)
            });

            print("Document with pupsId" + doc.o.pupsId + ":");
            printjson({
                _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog
                refactor: prevDoc.o.refactor,
                rembo: prevDoc.o.rembo,
                cadId: prevDoc.o.cadId,
                pupsId: prevDoc.o.pupsId,
                statusCode: prevDoc.o.statusCode,
                number: prevDoc.o.number,
                dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),
                date: prevDoc.o.date,
                op: doc.op,
                ts: convertTimestampToMoscowTime(doc.ts)
            });

            matchCount++;
        }
    });

    if (matchCount === 0) {
        print("No matching documents found.");
    }
} else {
    print("No documents found with initial statuses.");
}

Поиск операций удаления [d]

Мы не можем вставить в начале нашего запроса операцию [d] удаления совместно с операциями [u] (обновление) и [i] (внесение), тк в выводе oplog показывает нужные нам _id и ts (timestamp — время) удаленного документа без полей, свойственных для документов в нашей коллекции.

// Поиск операций удаления
var deleteOperations = db.getCollection("oplog.rs").find({
    "op": "d"
}).limit(100).toArray(); // Лимит на выдачу документов

// Вывод операций удаления
deleteOperations.forEach(function(doc) {
    print("Delete operation:");
    printjson({
        _id: doc.o._id,
        ts: convertTimestampToMoscowTime(doc.ts),
        op: doc.op,
        ns: doc.ns,
        ui: doc.ui
    });
});

Этот скрипт выполняет следующие шаги:

  1. Создаёт карту начальных документов для быстрого доступа по идентификатору.

  2. Проходит по каждому документу из целевых изменений и проверяет, есть ли соответствующий начальный документ с той же ID и более поздней временной меткой.

  3. Выводит начальный и изменённый документ в читаемом формате, включая преобразованные временные метки.

Заключение

В этой статье мы рассмотрели, как работать с MongoDB Oplog для отслеживания изменений документов. Мы научились искать документы с определёнными начальными статусами, находить их последующие изменения и выводить результаты в читаемом формате.

© Habrahabr.ru