Nifi — когда record-oriented не оправдывает себя

67b3d1b0d76f2fe9bf2484157575a815.png
{
  "type": "record",
  "name": "Mdm_Record",
  "namespace": "any.org",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "MessageInfo",
        "namespace": "any.org",
        "fields": [
          {
            "name": "ts",
            "type": {
              "type": "long",
              "logicalType": "timestamp-millis"
            }            
          }
        ]
      }
    },
    {
      "name": "data",
      "type": {
        "type": "record",
        "name": "MDM_Item",
        "fields": [
          {
            "name": "object",
            "type": {
              "type": "record",
              "name": "Node",
              "fields": [
                {
                  "name": "guid",
                  "type": [
                    "null",
                    "string"
                  ],               
                  "default": null
                },
                {
                  "name": "template",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "language",
                  "type": [
                    "null",
                    "string"
                  ],                 
                  "default": null
                },
                {
                  "name": "fields",
                  "type": {
                    "type": "map",
                    "values": [
                      "Node",
                      {
                        "type": "record",
                        "name": "Barcode",
                        "fields": [
                          {
                            "name": "provider",
                            "type": {
                              "type": "record",
                              "name": "Provider",
                              "fields": [
                                {
                                  "name": "name",
                                  "type": "string"                                 
                                },
                                {
                                  "name": "city",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                },
                                {
                                  "name": "taxIdNumber",
                                  "type": [
                                    "null",
                                    "string"
                                  ],
                                  "default": null
                                },
                                {
                                  "name": "taxRegistrationReasonCode",
                                  "type": [
                                    "null",
                                    "string"
                                  ],                                 
                                  "default": null
                                }
                              ]
                            }
                           
                          },
                          {
                            "name": "barcode",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "supplierArticle",
                            "type": [
                              "null",
                              "string"
                            ],
                           "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MediaResource",
                        "fields": [
                          {
                            "name": "link",
                            "type": "string",                           
                          },
                          {
                            "name": "filename",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "mimetype",
                            "type": [
                              "null",
                              "string"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "fileSize",
                            "type": [
                              "null",
                              "int"
                            ],
                           
                            "default": null
                          },
                          {
                            "name": "format",
                            "type": [
                              "null",
                              "string"
                            ],                            
                            "default": null
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "SofaSpecification",
                        "fields": [
                          {
                            "name": "seats",
                            "type": [
                              "null",
                              "int"
                            ],                           
                            "default": null
                          },
                          {
                            "name": "bedLength",
                            "type": {
                              "type": "record",
                              "name": "QuantityValue",
                              "fields": [
                                {
                                  "name": "value",
                                  "type": "float",
                                 
                                },
                                {
                                  "name": "measureUnit",
                                  "type": [
                                    "null",
                                    {
                                      "type": "record",
                                      "name": "MeasureUnit",
                                      "fields": [
                                        {
                                          "name": "unitName",
                                          "type": "string",
                                        },
                                        {
                                          "name": "unitCode",
                                          "type": "string",
                                        }
                                      ]
                                    }
                                  ],
                                  "default": null
                                }
                              ]
                            }
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFolded",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "backHeightFoldedOut",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "fullLength",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      {
                        "type": "record",
                        "name": "MattressSpecification",
                        "fields": [
                          {
                            "name": "bedLength",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "bedWidth",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "height",
                            "type": "QuantityValue",
                          },
                          {
                            "name": "maximumBedLoad",
                            "type": "QuantityValue",
                          }
                        ]
                      },
                      "QuantityValue",
                      "MeasureUnit",
                      {
                        "type": "array",
                        "items": [
                          "Node",
                          "Barcode",
                          {
                            "type": "record",
                            "name": "ItemPack",
                            "fields": [
                              {
                                "name": "packageName",
                                "type": "string",
                              },
                              {
                                "name": "package",
                                "type": [
                                  "null",
                                  {
                                    "type": "record",
                                    "name": "Pack",
                                    "fields": [
                                      {
                                        "name": "packageGuid",
                                        "type": "string",
                                      },
                                      {
                                        "name": "packageType",
                                        "type": [
                                          "null",
                                          "string"
                                        ],
                                        "default": null
                                      },
                                      {
                                        "name": "packaging",
                                        "type": {
                                          "type": "map",
                                          "values": [
                                            {
                                              "type": "record",
                                              "name": "Box",
                                              "fields": [
                                                {
                                                  "name": "length",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "width",
                                                  "type": "QuantityValue",
                                                },
                                                {
                                                  "name": "height",
                                                  "type": "QuantityValue",
                                                }
                                              ]
                                            },
                                            {
                                              "type": "record",
                                              "name": "SoftPackaging",
                                              "fields": [
                                                {
                                                  "name": "twist",
                                                  "type": "boolean",
                                                }
                                              ]
                                            }
                                          ]
                                        },
                                      }
                                    ]
                                  }
                                ],
                                "default": null
                              },
                              {
                                "name": "length",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "width",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "height",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "volume",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightNetto",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "weightGross",
                                "type": "QuantityValue",
                              },
                              {
                                "name": "seatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "packagedUnitsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "setSeatsCount",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "priority",
                                "type": [
                                  "null",
                                  "int"
                                ],
                                "default": null
                              },
                              {
                                "name": "measureUnit",
                                "type": [
                                  "null",
                                  "MeasureUnit"
                                ],
                                "default": null
                              },
                              {
                                "name": "mainLogisticsPackaging",
                                "type": [
                                  "null",
                                  "boolean"
                                ],
                                "default": null
                              },
                              {
                                "name": "barcode",
                                "type": [
                                  "null",
                                  "string"
                                ],
                                "default": null
                              }
                            ]
                          },
                          "MediaResource",
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      {
                        "type": "map",
                        "values": [
                          "string",
                          "int",
                          "boolean"
                        ]
                      },
                      "string",
                      "int",
                      "boolean"
                    ]
                  }
                }
              ]
            }
          },
          {
            "name": "stateName",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Так как выбранный ранее вариант с трансформацией Jolt не работал, я решил применить проверенное средство для крайних случаев — скрипт. Есть хороший процессор — ScriptedTransformRecord, позволяющий выполнять обработку по одной записи, получая объект типа Record.

Скрипт получился довольно большим по причине наличия типов map, а также возможности значения «null» в требуемых полях. Так как существует вероятность модификации формата данных либо корректировки S2T, то в будущем потребуется изменять скрипт, что при его большом объёме влечёт увеличение сложности для разработчика. Так что я решил отказаться от скрипта и вернуться к разработанной и отлаженной спецификации, ведь для JSON она работает корректно, ошибка возникает только при работе с записью, когда применяется схема данных. 

 То есть в этом случае я решил отказаться от обработки записи и перейти к обработке контента целиком, что приводит к увеличению количества FlowFile и, соответственно, увеличению задействования оперативной памяти.

 В NiFi можно применить спецификацию Jolt к записям c помощью JoltTransformRecord или воспользоваться процессором JoltTransformJSON, который ожидает на входе JSON и преобразовывает его не как запись, а как единый JSON-файл. Так как происходит обработка всего контента целиком, то для сокращения накладных расходов лучше подавать на вход единичный объект JSON, а не массив. Поэтому предварительно потребовалась разбивка пришедшего FlowFile на фрагменты, где каждый содержал бы один JSON. Это позволило в дальнейшем выполнить трансформацию для единичного объекта быстро, но повлекло генерацию большого количества FlowFile. Для этого применил SplitRecord, где Reader читал Avro-формат, а RecordSetWriter был настроен для записи одного JSON-объекта. 

© Habrahabr.ru