Nifi — когда record-oriented не оправдывает себя
{
"type": "record",
"name": "Mdm_Record",
"namespace": "any.org",
"fields": [
{
"name": "metadata",
"type": {
"type": "record",
"name": "MessageInfo",
"namespace": "any.org",
"fields": [
{
"name": "ts",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
},
{
"name": "data",
"type": {
"type": "record",
"name": "MDM_Item",
"fields": [
{
"name": "object",
"type": {
"type": "record",
"name": "Node",
"fields": [
{
"name": "guid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "template",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "language",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "fields",
"type": {
"type": "map",
"values": [
"Node",
{
"type": "record",
"name": "Barcode",
"fields": [
{
"name": "provider",
"type": {
"type": "record",
"name": "Provider",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "city",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "taxIdNumber",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "taxRegistrationReasonCode",
"type": [
"null",
"string"
],
"default": null
}
]
}
},
{
"name": "barcode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "supplierArticle",
"type": [
"null",
"string"
],
"default": null
}
]
},
{
"type": "record",
"name": "MediaResource",
"fields": [
{
"name": "link",
"type": "string",
},
{
"name": "filename",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "mimetype",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "fileSize",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "format",
"type": [
"null",
"string"
],
"default": null
}
]
},
{
"type": "record",
"name": "SofaSpecification",
"fields": [
{
"name": "seats",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "bedLength",
"type": {
"type": "record",
"name": "QuantityValue",
"fields": [
{
"name": "value",
"type": "float",
},
{
"name": "measureUnit",
"type": [
"null",
{
"type": "record",
"name": "MeasureUnit",
"fields": [
{
"name": "unitName",
"type": "string",
},
{
"name": "unitCode",
"type": "string",
}
]
}
],
"default": null
}
]
}
},
{
"name": "bedWidth",
"type": "QuantityValue",
},
{
"name": "backHeightFolded",
"type": "QuantityValue",
},
{
"name": "backHeightFoldedOut",
"type": "QuantityValue",
},
{
"name": "fullLength",
"type": "QuantityValue",
}
]
},
{
"type": "record",
"name": "MattressSpecification",
"fields": [
{
"name": "bedLength",
"type": "QuantityValue",
},
{
"name": "bedWidth",
"type": "QuantityValue",
},
{
"name": "height",
"type": "QuantityValue",
},
{
"name": "maximumBedLoad",
"type": "QuantityValue",
}
]
},
"QuantityValue",
"MeasureUnit",
{
"type": "array",
"items": [
"Node",
"Barcode",
{
"type": "record",
"name": "ItemPack",
"fields": [
{
"name": "packageName",
"type": "string",
},
{
"name": "package",
"type": [
"null",
{
"type": "record",
"name": "Pack",
"fields": [
{
"name": "packageGuid",
"type": "string",
},
{
"name": "packageType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "packaging",
"type": {
"type": "map",
"values": [
{
"type": "record",
"name": "Box",
"fields": [
{
"name": "length",
"type": "QuantityValue",
},
{
"name": "width",
"type": "QuantityValue",
},
{
"name": "height",
"type": "QuantityValue",
}
]
},
{
"type": "record",
"name": "SoftPackaging",
"fields": [
{
"name": "twist",
"type": "boolean",
}
]
}
]
},
}
]
}
],
"default": null
},
{
"name": "length",
"type": "QuantityValue",
},
{
"name": "width",
"type": "QuantityValue",
},
{
"name": "height",
"type": "QuantityValue",
},
{
"name": "volume",
"type": "QuantityValue",
},
{
"name": "weightNetto",
"type": "QuantityValue",
},
{
"name": "weightGross",
"type": "QuantityValue",
},
{
"name": "seatsCount",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "packagedUnitsCount",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "setSeatsCount",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "priority",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "measureUnit",
"type": [
"null",
"MeasureUnit"
],
"default": null
},
{
"name": "mainLogisticsPackaging",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "barcode",
"type": [
"null",
"string"
],
"default": null
}
]
},
"MediaResource",
"string",
"int",
"boolean"
]
},
{
"type": "map",
"values": [
"string",
"int",
"boolean"
]
},
"string",
"int",
"boolean"
]
}
}
]
}
},
{
"name": "stateName",
"type": "string"
}
]
}
}
]
}
Так как выбранный ранее вариант с трансформацией Jolt не работал, я решил применить проверенное средство для крайних случаев — скрипт. Есть хороший процессор — ScriptedTransformRecord, позволяющий выполнять обработку по одной записи, получая объект типа Record.
Скрипт получился довольно большим по причине наличия типов map, а также возможности значения «null» в требуемых полях. Так как существует вероятность модификации формата данных либо корректировки S2T, то в будущем потребуется изменять скрипт, что при его большом объёме влечёт увеличение сложности для разработчика. Так что я решил отказаться от скрипта и вернуться к разработанной и отлаженной спецификации, ведь для JSON она работает корректно, ошибка возникает только при работе с записью, когда применяется схема данных.
То есть в этом случае я решил отказаться от обработки записи и перейти к обработке контента целиком, что приводит к увеличению количества FlowFile и, соответственно, увеличению задействования оперативной памяти.
В NiFi можно применить спецификацию Jolt к записям c помощью JoltTransformRecord или воспользоваться процессором JoltTransformJSON, который ожидает на входе JSON и преобразовывает его не как запись, а как единый JSON-файл. Так как происходит обработка всего контента целиком, то для сокращения накладных расходов лучше подавать на вход единичный объект JSON, а не массив. Поэтому предварительно потребовалась разбивка пришедшего FlowFile на фрагменты, где каждый содержал бы один JSON. Это позволило в дальнейшем выполнить трансформацию для единичного объекта быстро, но повлекло генерацию большого количества FlowFile. Для этого применил SplitRecord, где Reader читал Avro-формат, а RecordSetWriter был настроен для записи одного JSON-объекта.