Apache NiFi – a whistle-stop tour of how it works, what it’s good for and how to get the most out of it.
Author: Leah Tarbuck
Apache NiFi is a tool used to create Data Pipelines via a drag/drop interface and is designed to automate the flow of data between systems. It fits into the ‘no code/low code’ category of tools, primarily geared towards companies who feel less comfortable writing and managing code or building a solution that requires a significant amount of engineering effort.
What can NiFi be used for?
NiFi is good at reliably transferring data between platforms e.g. Kafka to ES and allows you to perform some lightweight ETL during the process. It can enrich and prepare data, perform conversion between data formats, change named fields and filter/route data to different locations.
However, it should not be used for complex event processing, distributed processing, joins, rolling windows or aggregation (it is not Apache Spark and doesn’t claim to be! 😎). Therefore it shouldn’t be compared to big data processing tools like AWS EMR or more flexible Open Source ETL Tools like Talend. A more reasonable comparison would be between Apache NiFi and StreamSets.
NiFi can integrate with some AWS services, such as S3, SNS, Lambda, SQS and DynamoDB.
If you need to integrate with streaming services, it’s worth noting that there is more support for the fellow Apache product Kafka, than there is AWS Kinesis. There is currently no ability to read from Kinesis Data Streams, but there is an open PR for the change!
Below are some useful terms specific to constructing NiFi flows which will help in understanding the rest of this article:
FlowFile — is the data, it consists of content (the data) and attributes (metadata key value pairs e.g. ‘creation date’)
Processor — applies a set of transformations and rules to the FlowFiles and generates new FlowFiles. Apache NiFi boasts there are 280+ processors to choose from! Examples include getKafta, getFTP, putKinesisStream, putHDFS, convertRecord, replaceText, validateCSV etc.
Connector — a queue of all the FlowFiles that are yet to be processed by the next processor. Which defines rules about how FlowFiles are prioritised (which ones are first, or filtered out e.g. FIFO). Connectors can be used for Load balancing and re-directing ‘failed’ FlowFiles.
Controller Service — a shared service used by processors e.g. DB connections, AWS credentials, Writers and Readers that you can use within conversion processors (to name just a few).
Process Group — acts as an abstraction layer, generally good practice to use these as a way of breaking up large data flows.
Templates — can be used if you have a collection of processors performing some logic you’d like to repeat and re-use. e.g. common components you wish to share with others.
Input/Output ports — allow you to transfer data in or out of remote process groups.
UI and NiFi Flows
The UI is very intuitive and simple to use. You drag different components from the top banner onto the canvas and configure accordingly. You can also write labels within the code, a bit like JavaDoc!
An image of the UI is below, you can see a processing group in the centre of the canvas which you can right click and ‘enter’ into to display the flow within.
NiFi Canvas with a process group
An example NiFi flow: reads from S3, converts to JSON and writes to Kinesis Firehose
Here you can see how processors are used in conjunction with connectors to handle the flow of data. Failed files can be redirected to other pathways and handled/logged accordingly. No code is needed, this is all done through configuration. This flow lists CSV files in an S3 bucket (to achieve this you need a combination of ListS3 and FetchS3Object processors), logs any issues via a LogAttribute processors, converts the files to JSON and puts on to a Kinesis Firehose Stream.
Within your root installation NiFi directory (e.g. ‘/Applications/nifi-<version>’), state is stored within a ‘/state’ folder (unless you run as a cluster, then Zookeeper stores this). Logs are stored under ‘/logs’, this directory is utilised by the LogAttribute processor. By default the flow is saved in ‘/conf/flow.xml.gz’, but you can override this configuration along with many others by editing the ‘/conf/nifi.properties’ file’.
NiFi directs you to documentation (if available) via the small book icon, for example the View usage option below:
This is also available when adding controller services to a process group.
You can create an AWSCredentialsProviderControllerService (set at process group level) to re-use your AWS credentials within multiple components of your flow. Specify either the location of a credentials file, which states the AWS Access Key and Secret Access Key or set these key’s directly within the properties of the controller service.
Then you can reference this controller service within the configuration of processors that require AWS connection e.g. the PutKinesisFirehose processor properties.
PutKinesisFireshose processor configuration
Running processors need to be stopped to apply any changes such as new controller services or other property modifications.
The NiFi Registry is an Apache NiFi sub project and aims to answer people’s deployment and version control needs! It provides a central location to store resource group flows. You have to use the registry UI to create a ‘bucket’ first (which are used to organise the version controlled flows) before adding the new registry client in your local NiFi flow. This article: NiFi-how-do-i-deploy-my-flow? has some nicely written steps detailing how to do this.
NiFi Toolkit and Deployments
The toolkit provides several command line utilities to help setup NiFi in single and clustered environments. Whilst investigating deployment options, I used the cli tool to connect to the NiFi Registry.
Within your downloaded toolkit bin directory (e.g. /Applications/nifi-toolkit-version/bin) there is a ‘cli.sh’ script which can be used to launch an interactive shell.
For my deployment to an AWS account, I used terraform to create two EC2 instances (using the m4.large instance type) running within the same VPC, one was installed with NiFi and the other with NiFi Registry. The simple user_data configuration for NiFi is below:
curl -fsSL https://get.docker.com get-docker.sh | sh
sudo docker pull apache/nifi:latest
sudo docker build -t apache/nifi:latest .
sudo docker run –name nifi -p 8080:8080 -d apache/nifi:latest
…. and NiFi Registry:
curl -fsSL https://get.docker.com get-docker.sh | sh
sudo docker pull apache/nifi-registry:latest
sudo docker build -t apache/nifi-registry:latest .
sudo docker run –name nifi-registry -p 18080:18080 /
Both shell scripts install docker, pull, build and run the latest images and publish the default container ports to the instance.
For my ami_id I used a community edition of ubuntu-focal-20.04-amd64-server — this allows you to install docker on the instance.
I had already created a bucket (“leah’s_bucket”) in the remote NiFi Registry and committed a flow (“product_flow“). Now I wanted to see how to deploy this to the remote NiFi instance…
Local version controlled process group
Remote registry with committed versioned flow
Once you’ve launched the interactive shell, you can deploy and start a versioned flow with just a few cli commands:
First you need to create a new registry client in the remote NiFi instance, e.g:
nifi create-reg-client –baseUrl http://<public_ip_of_nifi>:8080 \
–registryClientUrl http://<private_ip_of_registry>:18080 \
The baseUrl is the url to execute the command against (as I’m running these cli commands on my local machine, the baseUrl is the public IP address of the NiFi EC2 instance — because I’m connecting over the internet).
The EC2 instance Security Group was modified to allow traffic from my local machine
The registryClientUrl can refer to the private IP of the NiFi Registry — as these two EC2 instances are within the same VPC in AWS so can talk to one another!
The registryClientName is whatever name you want this registry to be called within the NiFi UI.
Next we need to create a process group by importing a versioned flow from the registry, e.g:
nifi pg-import –bucketIdentifier 0455d64e-ea40-4213-bbe4-359985de09f9 –flowIdentifier 6df2b6c1-249b-4af3-8f84-b57f32f6c5aa \
–flowVersion 1 –baseUrl http://<public_ip_of_nifi>:8080
The bucketIdentifier and flowIdentifier can both be found in the screenshot of the committed versioned flow above, these are unique.
This pg-import command returns a unique id for the process group (processGroupId) — which you need for the next steps!
Enable any controller services you are using, e.g:
nifi pg-enable-services –processGroupId 94a93dc0-0173-1000-fee3-442454b1ecd6 –baseUrl http://<public_ip_of_nifi>:8080
Then you can start the process group, e.g:
nifi pg-start –processGroupId 94a93dc0-0173-1000-fee3-442454b1ecd6 –baseUrl http://<public_ip_of_nifi>:8080
You should be able to see the newly deployed process group running in the UI!
These commands could be scripted to provide a more automated way of deployment.
If you need to set variables in your deployed NiFi flow, check out this link for more details.
Things to consider when using Apache NiFi
- Unit testing — with most “low code“ solutions unit testing is never easy and often goes overlooked 😢. NiFi have provided a way to test individual processors using a TestRunner class, however no way to test a process group. This article discusses the difficulties in testing the application and introduces a library they created for testing a full flow.
- Reviewing the code — as the flow code format is xml, it would be hard to detect small but potentially impactful (e.g. connector load balance changes) modifications to the flow file via a code review tool. You’d need a fairly strict review process here and consider looking through the Flow Configuration History in the UI (images below). The registry provides a way of version controlling the code, so you could review the configuration locally, before promoting it to another environment.
Flow Configuration HistoryFlow Configuration Action Details for a component
Thanks for reading my whistle-stop tour of Apache NiFi! I hope you’ve found it useful 😊
Sign up to our newsletter
If you enjoyed reading Leah’s blog, sign up to our monthly newsletter to keep up-to-date with news and views from our editorial team. We’ll send you technical blogs, whitepapers, eBooks and other technology-related content from our regular and guest contributors.
Who contributed to this article
Leah TarbuckSenior Software Engineer
Leah is a Senior Software Engineer at BlackCat Technology Solutions, she has worked on multiple Data Lake projects and containerised microservice applications, has experience in open source ETL and various AWS services. She has a strong work ethic, is self motivated and very personable. In her spare time she likes to keep active, enjoys running and practicing yoga.