Hi Jean-Sébastien,

 

Sorry you’re running into trouble. NiFi can have a bit of a learning curve at first, but once you are comfortable with the components it comes with and how to use them effectively, it gets much faster to accomplish tasks such as your example.

 

In general, don’t worry too much about reducing the amount of data going around. Unless you modify the flow file contents, all content data is passed by reference in NiFi, meaning there is only one physical copy of it (stored in the flow file content repository) in NiFi’s storage at any time. In most cases, this is very efficient and you do not need to optimize the data for NiFi, it will try to do the intelligent thing for you. If you start to experience system resource exhaustion (e.g., run out of storage in content repository), or if copying data becomes a performance bottleneck in your flow, then take the time to optimize that aspect or tune the system to meet the demands of your data flow.

 

Keeping that in mind, here are a few pointers to help you get started:

 
Rather than split parts of your JSON object apart into separate flow files, keep the entire object together. From there, take advantage of processors that are designed to interpret (and manipulate) the flow file contents as JSON. Look at the JSON processors (you can filter processors by name, e.g., “Json” when adding one to the canvas), such as JoltTransformJSON, EvaluateJsonPath, FlattenJson, SplitJson, JsonPathReader.
Processing multiple JSON records at a time can often be more efficient than a single JSON object per flow file. If you have or can construct flow files in this manner (e.g., a single flow file contains an array of JSON elements), then from that point on you can use the record-oriented processors. Record processors (again, you can find them by searching/filtering for “Record” when adding processors or controller services) require defining a schema to apply to your data, but once that is done, you can read/write/modify various record formats, including Json. Here is an example of doing JSON record enrichment in this style [1].
Lastly, in general, don’t worry too much about reducing the amount of data going around. Unless you modify the flow file contents, all content data is passed by reference in NiFi, meaning there is only one physical copy of it (stored in the flow file content repository) in NiFi’s storage at any time. In most cases, this is very efficient and you do not need to optimize the data for NiFi, it will try to do the intelligent thing for you. If you start to experience system resource exhaustion (e.g., run out of storage in content repository), or if copying data becomes a performance bottleneck in your flow, then take the time to optimize that aspect or tune the system to meet the demands of your data flow.
 

[1] https://community.hortonworks.com/articles/138632/data-flow-enrichment-with-nifi-lookuprecord-proces.html

 

Hope this helps!

Kevin

 

From: Jean-Sebastien Vachon <[EMAIL PROTECTED]>
Reply-To: <[EMAIL PROTECTED]>
Date: Thursday, July 12, 2018 at 12:12
To: "[EMAIL PROTECTED]" <[EMAIL PROTECTED]>
Subject: Merging output of multiple processors

 

Hi,

 

I am pretty new to Nifi and I’m struggling on something that (in my mind) should be very easy to do 😉

My flow consists of a Json file being processed by different processors to extract different information and enrich the data. Each processors have been implemented as ExecuteStreamCommand and will output the information extracted in a JSON like element. As an example, one of the module determines the language of one of the field in the original JSON and will output something like:

 

{ “language” : “en” }

 

Every module is extracting a different piece of information and my goal was to reduce the amount of data going around.

 

What would be the best way of merging the responses on all modules into my JSON when everything has been processed? The resulting JSON will then continue in the flow for further processing.

 

I tried using the MergeContent module but the output format can not be in JSON so I’m a bit stuck. Right now, the merge strategy is set to “Bin-Packing algorithm” with the Correlation attribute set to ${filename}. The min and max entries are set to the expected number of elements to merge (6 in this case).

 

I tried the “Defragment” strategy as well but the system was complaining about missing fragment.index attribute (which I tried to provide through an UpdateAttribute processor but that does not seem to work either

 

Thanks
Jean-Sébastien Vachon

[EMAIL PROTECTED]

[EMAIL PROTECTED]

www.brizodata.com