Machine Learning & Big Data Blog

Reading Streaming Twitter feeds into Apache Spark

5 minute read
Walker Rowe

In part 1 of this blog post we explained how to read Tweets streaming off Twitter into Apache Kafka. Here we explain how to read that data from Kafka into Apache Spark. We broke this document into two pieces, because this second piece is considerably more complicated.

Prerequisites

First install Kafka as shown in Part 1 to verify that you can retrieve tweets from Twitter. Then start Kafka and run that Python program documented there. It will still be downloading Tweets into Kafka. Keep it running until you run the code below. Or just remember to start it before you run the code below.

It is difficult to get this code working mainly because you have to get the right versions of the jar files that will work with the version of Spark and Kafka you are using. Getting those mismatched will generate all kinds of hours you will spend hours debugging. It is not always clear what version to use. And sowing other confusing then there are sample code versions on the Apache Spark website that conflict with other versions.

So if you want to avoid those kinds of errors, use the exact versions that we have here, which are shown below. Or forge ahead with the new versions that will be delivered after this blog post is written, taking care to get compatible versions of each.

Now, install Kafka and Spark:

  • Kafka 2.12-0.10.2.1
  • Kafka-clients-0.10.2.1.jar (Gets installed when you install Kafka.)
  • Spark-streaming-kafka-0-10_2.11-2.1.1.jar (You need to download this as explained below.)
  • Apache Spark 2.1.1
  • Scala 2.11.8 (You do not need to install Scala. It is installed when you install Spark.)

We will also be using sbt, which is a tool for compiling Scala code and resolving all dependencies, so you need to install that. Sbt is very complicated, but you do not have to master that. Just know that you need to put the code into this folder structure:

/ProjectHome: build.sbt (explained below)
/ProjectHome/src/main/scala/scala code (explained below).

Copy jar files

Now, the build.sbt file we write below will resolve all the import statements to make your code compile. But it will not install the Kafka and Spark-streaming-kafka jar files to where you need them. The easier way to fix that is copy them to $SPARK_HOME/jars as Spark can obviously find them there, in its own folder.

One of these files you copy from the Kafka lib folder:

cp /usr/share/kafka/kafka_2.12-0.10.2.1/libs/kafka-clients-0.10.2.1.jar
$SPARK_HOME/jars

The second one you download from the internet as it is neither part of Spark nor Kafka:

wget https://spark.apache.org/docs/2.0.0-preview/streaming-kafka-integration.html

Now, in /ProjectHome create the file build.sbt and then copy the text shown below. It is not necessary to understand what it all means. But you are free to investigate that. Basically it indicates the external dependencies required and their version number. If you are a Scala or Java programmer you will know that those files are available on the internet at, for example here at Maven Central. There is where you get the syntax for the SBT dependencies which look something like this:

libraryDependencies += “org.apache.spark” % “spark-streaming-kafka-0-10_2.11” % “2.1.1”

import AssemblyKeys._
name := "TwitterStream"
version := "1.0"
libraryDependencies ++= {
val sparkVer = "2.1.1"
Seq(
"org.apache.spark" %% "spark-core" % sparkVer,
"org.apache.spark" %% "spark-mllib" % sparkVer,
"org.apache.spark" %% "spark-streaming" % sparkVer,
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVer,
"org.apache.kafka" % "kafka-clients" % "0.10.2.1"
)
}
scalaVersion := "2.11.8"
assemblySettings
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf")      => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => MergeStrategy.discard
case "log4j.properties"                  => MergeStrategy.discard
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
case "reference.conf"                    => MergeStrategy.concat
case _                                   => MergeStrategy.first
}

The Scala code

Here we explain each section of the code, which we store in
/ProjectHome/src/main/scala/TwitterStream.scala.

The first part are the imports. Again it is important to use these so they match the files imported with build.sbt to avoid compile errors.

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.SparkConf
import org.apache.spark.streaming._

Here we create an object called TwitterStream. Sbt will create jar file that we will run with spark-submit. We need to give that class name to use, which in this case is TwitterStream.

kafkaParams gives the name or names of the Kafka server(s) where Kafka is running and which port. Group id is supposed to have some value, so we just take the value from an Apache example. The other parts tell it what objects to use to convert the data in Kafka, which in this case is org.apache.kafka.common.serialization.StringDeserializer.

The first values we create as a SparkConf and StreamingContext. We don’t need to create a SparkContext as spark-submit will do that for us.

object TwitterStream {
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sparkConf = new SparkConf().setAppName("tweeter")
val ssc = new StreamingContext(sparkConf, Seconds(2))

Now, in part 1 we decided to get Tweets that had the topic trump, as there are obviously plenty of those. Those have to be an Array to pass to the KafkaUtils.createDirectStream function.

That functions creates a DStream. For Spark Streaming this is designed to contained streaming data. You cannot actually do much with a DStream until you turn it into an RDD, which we do below. So this points to the irony that while these types products, like Spark, are called streaming, they obviously have to take some discrete, fixed chunk of data to work on it. That is what a RDD is. In fact, the DStream will create multiple RDDs.

val topics = Array("trump")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

KafkaUtils.createDirectStream creates a org.apache.spark.streaming.dstream.DStream.

To work with that data we use the Dstream.foreach() method to pluck off RDDs as they are created. Then we use RDD.foreach() to get each object in the RDD. Those objects will be Kafka ConsumerRecords, i.e., org.apache.kafka.clients.consumer.ConsumerRecord. You use the ConsumerRecord.value() method to read the message retrieved from the Kafka topic.

The Twitter messages are in JSON format, whose contents are described here. If you are doing analytics on this data you would not print it out. But we do that below to show that our sample is working.

ssc.start() ssc.awaitTermination() will start the streaming jon, which will continually run it until you kill it with (control-)(-c-).

stream.foreachRDD { rdd =>
rdd.foreach { record =>
val value = record.value()
val tweet = scala.util.parsing.json.JSON.parseFull(value)
val map:Map[String,Any] = tweet.get.asInstanceOf[Map[String, Any]]
println(map.get("text"))
}
}
ssc.start()
ssc.awaitTermination()
}
}

To compile this code go to /ProjectHome and run:

sbt package

The first time you do that is will take quite a long time as it will download many items from the internet.

Run the job in Spark

Now you run the job using $SPARK_HOME/bin/spark-submit passing is the class name and the name of the jar file that sbt created, which will be /ProjectHome/arget/scala-2.11/twitterstream_2.11-1.0.jar . local[x] means use x number of cpus. * means use all of them.

spark-submit \
–class TwitterStream \
–master local[*] \
/home/walker/Documents/target/scala-2.11/twitterstream_2.11-1.0.jar

Now the program will run and output the Tweet text in a continuous stream. As you can see below it is creating RDDs as it runs too:

Some(RT @Cernovich: That baby ran away crying when @CassandraRules and I put cameras on him during the DNC. Totally irrelevant and has… )
Some(Trump Approval Rating Tanks: Majority Says He’s ‘Too Friendly With Russia’ https://t.co/gUSH4oI2Rc via Christine Beswick)
17/06/10 12:08:39 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 998 bytes result sent to driver
17/06/10 12:08:39 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 1344 ms on localhost (executor driver) (1/1)
17/06/10 12:12:59 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all completed, from pool
17/06/10 12:12:59 INFO DAGScheduler: ResultStage 14 (foreach at TwitterStream.scala:32) finished in 1.147 s
17/06/10 12:12:59 INFO DAGScheduler: Job 14 finished: foreach at TwitterStream.scala:32, took 1.156825 s
17/06/10 12:12:59 INFO JobScheduler: Finished job streaming job 1497111178000 ms.0 from job set of time 1497111178000 ms
17/06/10 12:12:59 INFO JobScheduler: Total delay: 1.172 s for time 1497111178000 ms (execution: 1.166 s)
17/06/10 12:12:59 INFO KafkaRDD: Removing RDD 13 from persistence list

Learn ML with our free downloadable guide

This e-book teaches machine learning in the simplest way possible. This book is for managers, programmers, directors – and anyone else who wants to learn machine learning. We start with very basic stats and algebra and build upon that.


These postings are my own and do not necessarily represent BMC's position, strategies, or opinion.

See an error or have a suggestion? Please let us know by emailing blogs@bmc.com.

BMC Bring the A-Game

From core to cloud to edge, BMC delivers the software and services that enable nearly 10,000 global customers, including 84% of the Forbes Global 100, to thrive in their ongoing evolution to an Autonomous Digital Enterprise.
Learn more about BMC ›

About the author

Walker Rowe

Walker Rowe is an American freelancer tech writer and programmer living in Cyprus. He writes tutorials on analytics and big data and specializes in documenting SDKs and APIs. He is the founder of the Hypatia Academy Cyprus, an online school to teach secondary school children programming. You can find Walker here and here.