Building a Big Data Processing Pipeline – Chapter 3

Juan Irabedra
.
August 10, 2022
Building a Big Data Processing Pipeline – Chapter 3

In the previous article we focused on getting started with Apache Kafka. We will now focus on Apache Flink: the core technology in our pipeline. We will set up a Flink cluster and learn how to build and submit an app. We will also quickly see some Scala and Scala tools needed to build this app appropriately.

Remember the pipeline looks like the following:

A little Flink architecture

Similarly to what happened with Kafka, Flink architecture should be explained in detail. Given that the purpose of this guide is to show an easy and quick setup for getting started with Flink, we will only mention the Job Manager and the Task Managers.

The Job Manager is the process responsible for coordinating the execution of applications. In particular, distributed applications. It schedules jobs, mediates resource access, reacts to critical events (such as node failure) and so on.

The Task Managers are responsible for actual operations on data. They have slots, which are hosts for actual processing.

There is a lot more to Flink. We do not expect more than an intuitive understanding of its architecture. Flink has some great documentation on its architecture, and it is an advised reading! Check it out here.

Setting up a local Flink Cluster

It is worth mentioning that Flink 1.15.0 runs on Scala 2.12. We can configure the version our application uses with SBT, in case we have installed a different Scala version.

Before we get hands on our Flink job we have to get a cluster up and running. The very first task is installing Flink. Similar to Kafka, Flink can be downloaded from the Apache Flink downloads site.

The Flink source folder has two subfolders worth pointing out: bin and conf. Both contain the same information they had for Kafka. There is one more folder worth mentioning: log. When the Flink Cluster launches, this folder populates. Logs are written there when the cluster is launched or terminated, as well as when jobs are executed.

In order to launch the Flink Cluster, open a new terminal tab at the Flink source folder and run:

no-line-numbers|bash ./bin/start-cluster.sh

The cluster should be up and running and ready to receive jobs to run. We can check it is running by accessing the Flink web UI running on localhost:8081.

It is time to create a simple Flink application to run as a job. The idea is to consume the Kafka records we fed our Kafka topic. Finally, we will write the value of such records in a Cassandra table.

Building a Scala application with Flink

In order to easily get started with Scala, we recommend installing SBT. SBT is the preferred package manager in the Scala community. SBT can be easily installed through Homebrew on Mac, Chocolatey on Windows or rpm on Linux. Find out more details here.

SBT offers some template projects for getting started with Scala. In order to create the most simple, open a terminal tab on the directory you would like to create the project. Then run:

no-line-numbers|bash sbt new scala/hello-world.g8

You will be prompted to name your project. After that, you are ready to go! Go to your project root directory (the one that has a build.sbt file) and try running:

no-line-numbers|dark|text sbt run

Now the project is being compiled and executed. You should soon see:


Who said getting started with Scala was hard?

SBT Configuration

Before moving to sbt specific syntax, there is one detail we have to take into consideration. Flink allows us to submit .JAR files. Most of the time, we need to add dependencies that are not native to Flink (for instance, connectors). This information must live within the Jar. A Jar that carries all of its dependencies is known as a fat jar or uber jar. In order to build a fat jar, we need a specific plugin: sbt-assembly plugin. Sbt-assembly works like Maven dependency shading.

According to the sbt plugin documentation a valid approach to install plugins within a project is creating a plugins.sbt file. This file should be located in a /project directory in the project root directory. The only plugin we need for now is the assembly plugin. Therefore, our plugins.sbt should look like this:

addSbtPlugin(“com.eed3si9n” % “sbt-assembly” % “1.2.0”);

As mentioned before, Flink requires Scala 2.12.x to run. All project-level configuration for Scala will be made on the build.sbt file.

Each entry in this file is called a setting expression. Each setting expression consists of a key, an operator and a value. The most common setting expression is the dependency expression. Let us see how they look:

no-line-numbers|bash libraryDependencies += “org.apache.flink” % “flink-core” % “1.15.0”

This line adds the Flink base library to our project. Verify it does match the definition we just built: libraryDependencies is a key. += is an operator and “org.apache.flink” % “flink-core” % “1.15.0” is a value. These last three strings are Maven artifacts groupId, artifactId and version.

With this brief introduction, let us see the code we need in our build.sbt file in order to get started:

no-line-numbers|scala scalaVersion := “2.12.15” name := “FlinkApp” organization := “MontevideoLabs” version := “1.0” libraryDependencies += “org.scala-lang.modules” %% “scala-parser-combinators” % “2.1.1”; libraryDependencies += “org.apache.flink” % “flink-core” % “1.15.0” libraryDependencies += “org.apache.flink” %% “flink-streaming-scala” % “1.15.0”; libraryDependencies += “org.apache.flink” % “flink-connector-kafka” % “1.15.0”; libraryDependencies += “org.apache.flink” %% “flink-connector-cassandra” % “1.15.0”; libraryDependencies += “org.apache.flink” % “flink-clients” % “1.15.0”; assemblyMergeStrategy in assembly := { case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first }

This code is pretty straightforward. First, we specify the Scala version. Remember we need Scala 2.12.x in order to run Flink. The second chunk is jar file metadata. The third chunk are the dependencies we need. The fourth chunk resolves conflicts introduced by the assembly plugin.

More information on sbt can be found in the official documentation.

The Scala app

The following code snippet is the most basic code (with virtually no actual data processing) to sink events from a Kafka topic to a Cassandra table. In this context, sinking data is another (rather fancy) word for pushing data from one system to another.

This code should be in the /src/main/scala directory. We called the file Main.scala, but you could choose any different name as long as the file and the name of the Scala object are the same. Make sure your Scala object extends the App trait.

no-line-numbers|scala import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.cassandra.CassandraSink import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ object Main extends App{ //Environment init. Checkpointing configuration. val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Building KafkaSource. val source : KafkaSource[String] = KafkaSource.builder() .setBootstrapServers(“localhost:9092”) .setTopics(“flink-input”) .setGroupId(“group1”) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build() //CassandraSink only works with Tuples or POJOs. In Scala, it only works for Tuples. val tuples = env.fromSource(source, WatermarkStrategy.noWatermarks(), “KafkaSource”) .flatMap( new FlatMapFunction[String,Tuple1[String]]{ override def flatMap(t: String, collector: Collector[Tuple1[String]]): Unit = collector.collect(Tuple1[String](t)) } ) //actual sinking CassandraSink.addSink(tuples) .setHost(“127.0.0.1”) .setQuery(“INSERT INTO cassandraKafkaSink.messages (payload) values (?);”) .build() .name(“flinkTestSink”) //executing Flink job env.execute(“Flink test”) }

There is a lot going on here. Let us break it a little down.

Troubleshooting

Debugging Flink applications can be tricky. The following is a personal tip for unix-like OS users. To quickly check the most recent Flink job logs, change directory to the Flink source directory and run:

no-line-numbers|bash tail log/flink-*-taskexecutor-*.out

This will output the most recent logs on your TaskExecutor.

Despite the Flink app being complete, the app itself is not enough to be tested. We still need some kind of Cassandra instance running to sink values to. The following entry in this series of articles will tackle this.

Before moving on, can you think of a similar implementation of this pipeline using some other technology for data streams processing? What advantages and disadvantages would it offer? We will leave one idea for you to get started:

Source code: The Flink app can be found within the source code repository.

Missed any of our previous articles on this pipeline? Check them here:

Download your e-book today!

Download your report today!

In the previous article we focused on getting started with Apache Kafka. We will now focus on Apache Flink: the core technology in our pipeline. We will set up a Flink cluster and learn how to build and submit an app. We will also quickly see some Scala and Scala tools needed to build this app appropriately.

Remember the pipeline looks like the following:

A little Flink architecture

Similarly to what happened with Kafka, Flink architecture should be explained in detail. Given that the purpose of this guide is to show an easy and quick setup for getting started with Flink, we will only mention the Job Manager and the Task Managers.

The Job Manager is the process responsible for coordinating the execution of applications. In particular, distributed applications. It schedules jobs, mediates resource access, reacts to critical events (such as node failure) and so on.

The Task Managers are responsible for actual operations on data. They have slots, which are hosts for actual processing.

There is a lot more to Flink. We do not expect more than an intuitive understanding of its architecture. Flink has some great documentation on its architecture, and it is an advised reading! Check it out here.

Setting up a local Flink Cluster

It is worth mentioning that Flink 1.15.0 runs on Scala 2.12. We can configure the version our application uses with SBT, in case we have installed a different Scala version.

Before we get hands on our Flink job we have to get a cluster up and running. The very first task is installing Flink. Similar to Kafka, Flink can be downloaded from the Apache Flink downloads site.

The Flink source folder has two subfolders worth pointing out: bin and conf. Both contain the same information they had for Kafka. There is one more folder worth mentioning: log. When the Flink Cluster launches, this folder populates. Logs are written there when the cluster is launched or terminated, as well as when jobs are executed.

In order to launch the Flink Cluster, open a new terminal tab at the Flink source folder and run:

no-line-numbers|bash ./bin/start-cluster.sh

The cluster should be up and running and ready to receive jobs to run. We can check it is running by accessing the Flink web UI running on localhost:8081.

It is time to create a simple Flink application to run as a job. The idea is to consume the Kafka records we fed our Kafka topic. Finally, we will write the value of such records in a Cassandra table.

Building a Scala application with Flink

In order to easily get started with Scala, we recommend installing SBT. SBT is the preferred package manager in the Scala community. SBT can be easily installed through Homebrew on Mac, Chocolatey on Windows or rpm on Linux. Find out more details here.

SBT offers some template projects for getting started with Scala. In order to create the most simple, open a terminal tab on the directory you would like to create the project. Then run:

no-line-numbers|bash sbt new scala/hello-world.g8

You will be prompted to name your project. After that, you are ready to go! Go to your project root directory (the one that has a build.sbt file) and try running:

no-line-numbers|dark|text sbt run

Now the project is being compiled and executed. You should soon see:


Who said getting started with Scala was hard?

SBT Configuration

Before moving to sbt specific syntax, there is one detail we have to take into consideration. Flink allows us to submit .JAR files. Most of the time, we need to add dependencies that are not native to Flink (for instance, connectors). This information must live within the Jar. A Jar that carries all of its dependencies is known as a fat jar or uber jar. In order to build a fat jar, we need a specific plugin: sbt-assembly plugin. Sbt-assembly works like Maven dependency shading.

According to the sbt plugin documentation a valid approach to install plugins within a project is creating a plugins.sbt file. This file should be located in a /project directory in the project root directory. The only plugin we need for now is the assembly plugin. Therefore, our plugins.sbt should look like this:

addSbtPlugin(“com.eed3si9n” % “sbt-assembly” % “1.2.0”);

As mentioned before, Flink requires Scala 2.12.x to run. All project-level configuration for Scala will be made on the build.sbt file.

Each entry in this file is called a setting expression. Each setting expression consists of a key, an operator and a value. The most common setting expression is the dependency expression. Let us see how they look:

no-line-numbers|bash libraryDependencies += “org.apache.flink” % “flink-core” % “1.15.0”

This line adds the Flink base library to our project. Verify it does match the definition we just built: libraryDependencies is a key. += is an operator and “org.apache.flink” % “flink-core” % “1.15.0” is a value. These last three strings are Maven artifacts groupId, artifactId and version.

With this brief introduction, let us see the code we need in our build.sbt file in order to get started:

no-line-numbers|scala scalaVersion := “2.12.15” name := “FlinkApp” organization := “MontevideoLabs” version := “1.0” libraryDependencies += “org.scala-lang.modules” %% “scala-parser-combinators” % “2.1.1”; libraryDependencies += “org.apache.flink” % “flink-core” % “1.15.0” libraryDependencies += “org.apache.flink” %% “flink-streaming-scala” % “1.15.0”; libraryDependencies += “org.apache.flink” % “flink-connector-kafka” % “1.15.0”; libraryDependencies += “org.apache.flink” %% “flink-connector-cassandra” % “1.15.0”; libraryDependencies += “org.apache.flink” % “flink-clients” % “1.15.0”; assemblyMergeStrategy in assembly := { case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first }

This code is pretty straightforward. First, we specify the Scala version. Remember we need Scala 2.12.x in order to run Flink. The second chunk is jar file metadata. The third chunk are the dependencies we need. The fourth chunk resolves conflicts introduced by the assembly plugin.

More information on sbt can be found in the official documentation.

The Scala app

The following code snippet is the most basic code (with virtually no actual data processing) to sink events from a Kafka topic to a Cassandra table. In this context, sinking data is another (rather fancy) word for pushing data from one system to another.

This code should be in the /src/main/scala directory. We called the file Main.scala, but you could choose any different name as long as the file and the name of the Scala object are the same. Make sure your Scala object extends the App trait.

no-line-numbers|scala import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.cassandra.CassandraSink import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ object Main extends App{ //Environment init. Checkpointing configuration. val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Building KafkaSource. val source : KafkaSource[String] = KafkaSource.builder() .setBootstrapServers(“localhost:9092”) .setTopics(“flink-input”) .setGroupId(“group1”) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build() //CassandraSink only works with Tuples or POJOs. In Scala, it only works for Tuples. val tuples = env.fromSource(source, WatermarkStrategy.noWatermarks(), “KafkaSource”) .flatMap( new FlatMapFunction[String,Tuple1[String]]{ override def flatMap(t: String, collector: Collector[Tuple1[String]]): Unit = collector.collect(Tuple1[String](t)) } ) //actual sinking CassandraSink.addSink(tuples) .setHost(“127.0.0.1”) .setQuery(“INSERT INTO cassandraKafkaSink.messages (payload) values (?);”) .build() .name(“flinkTestSink”) //executing Flink job env.execute(“Flink test”) }

There is a lot going on here. Let us break it a little down.

Troubleshooting

Debugging Flink applications can be tricky. The following is a personal tip for unix-like OS users. To quickly check the most recent Flink job logs, change directory to the Flink source directory and run:

no-line-numbers|bash tail log/flink-*-taskexecutor-*.out

This will output the most recent logs on your TaskExecutor.

Despite the Flink app being complete, the app itself is not enough to be tested. We still need some kind of Cassandra instance running to sink values to. The following entry in this series of articles will tackle this.

Before moving on, can you think of a similar implementation of this pipeline using some other technology for data streams processing? What advantages and disadvantages would it offer? We will leave one idea for you to get started:

Source code: The Flink app can be found within the source code repository.

Missed any of our previous articles on this pipeline? Check them here: