Machine Learning & Big Data Blog

How To Write Apache Spark Data to ElasticSearch Using Python

3 minute read
Walker Rowe

Here we explain how to write Apache Spark data to ElasticSearch (ES) using Python. We will write Apache log data into ES.

This topic is made complicated, because of all the bad, convoluted examples on the internet. But here, we make it easy.

One complicating factor is that Spark provides native support for writing to ElasticSearch in Scala and Java but not Python. For you need to download ES-Hadoop, which is written by ElasticSearch, available here.

You then bring that into scope and make it available to pyspark like this:

pyspark --jars elasticsearch-hadoop-6.4.1.jar

Set PySpark to use Python 3 like this:

export PYSPARK_PYTHON=/usr/bin/python3

The key to understanding writing to ElasticSearch is that, while ES is a JSON database, it does have one requirement. The data has to be in this format:

{ "id: { the rest of your json}}

Below we show how to make that transformation.

At the bottom is the complete code and it is online here. Here we explain it in sections:

Parsing Apache Log Files

We read an Apache log into a Spark RDD. We then write a parse() function to read each string into into regular expression groups, pick the fields we want, and pass it back as a dictionary:.

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
def parse(str):
s=p.match(str)
d = {}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
return d   

In other words, when we first read the text file logs into an RDD it looks like this:

['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']

Then we use the rdd map() function to pass each line into the parse() function to yield this.

rdd2 = rdd.map(parse)
rdd2.take(1)
[{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]

Now, that looks like JSON, but it’s not JSON yet. We will use json.dumps, which per the technical description in the Python documentation will “serialize obj as a JSON formatted stream.”

We also add a ID. In the ES configuration below we tell ES what field will be the unique document identifier: “es.mapping.id”: “doc_id”.

We calculate a SHA digest over the whole JSON document first to create that ID as a unique number.

The results are returned like this. You can see that the ID is a very long SHA number in front follow by the JSON.

rdd3.take(1)
[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]

Now we specify the ElasticSearch configuration. The important items to note are:

“es.resource” : ‘walker/apache’ “walker” is the index and “apache” is the type. The whole thing together is often called “the index.”
“es.mapping.id”: “doc_id” Here we tell ES which document to use as the document ID, which is the same as saying the _id field.

The rest of the fields are self explanatory.

Then we use the saveAsNewAPIHadoopFile() method to save the RDD to ES. There is nothing study there as the syntax is always the same for ES, so there is no need to understand all the pieces of that.

es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : 'walker/apache',
"es.input.json": "yes",
"es.mapping.id": "doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
rdd3 = rdd2.map(addID)
def addId(data):
j=json.dumps(data).encode('ascii', 'ignore')
data['doc_id'] = hashlib.sha224(j).hexdigest()
return (data['doc_id'], json.dumps(data))

Now we can query ES from the command line and look at one document:

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*
{
"_index" : "walker",
"_type" : "apache",
"_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"_score" : 1.0,
"_source" : {
"date" : "17/May/2015:10:05:32 +0000",
"ip" : "91.177.205.119",
"operation" : "GET",
"doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"uri" : "/favicon.ico"
}

Here is the complete code:

import json
import hashlib
import re
def addId(data):
j=json.dumps(data).encode('ascii', 'ignore')
data['doc_id'] = hashlib.sha224(j).hexdigest()
return (data['doc_id'], json.dumps(data))
def parse(str):
s=p.match(str)
d = {}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
return d    
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
rdd2 = rdd.map(parse)
rdd3 = rdd2.map(addID)
es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : 'walker/apache',
"es.input.json": "yes",
"es.mapping.id": "doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)

Learn ML with our free downloadable guide

This e-book teaches machine learning in the simplest way possible. This book is for managers, programmers, directors – and anyone else who wants to learn machine learning. We start with very basic stats and algebra and build upon that.


These postings are my own and do not necessarily represent BMC's position, strategies, or opinion.

See an error or have a suggestion? Please let us know by emailing blogs@bmc.com.

BMC Bring the A-Game

From core to cloud to edge, BMC delivers the software and services that enable nearly 10,000 global customers, including 84% of the Forbes Global 100, to thrive in their ongoing evolution to an Autonomous Digital Enterprise.
Learn more about BMC ›

About the author

Walker Rowe

Walker Rowe is an American freelancer tech writer and programmer living in Cyprus. He writes tutorials on analytics and big data and specializes in documenting SDKs and APIs. He is the founder of the Hypatia Academy Cyprus, an online school to teach secondary school children programming. You can find Walker here and here.