Apache Spark
Home » Apache Spark6
For Expert Apache Spark consulting support
Get in touch with us
Let's break ice
Email Us
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs.
Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory. As opposed to the rest of the libraries mentioned in this documentation, Apache Spark is computing framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly to HDFS. elasticsearch-hadoop allows Elasticsearch to be used in Spark in two ways: through the dedicated support available since 2.1 or through the Map/Reduce bridge since 2.0. Spark 2.0 is supported in elasticsearch-hadoop since version 5.0
Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath. As Spark has multiple deployment modes, this can translate to the target classpath, whether it is on only one node (as is the case with the local mode – which will be used through-out the documentation) or per-node depending on the desired infrastructure.
Added in 2.1.
elasticsearch-hadoop provides native integration between Elasticsearch and Apache Spark, in the form of an RDD (Resilient Distributed Dataset) (or Pair RDD to be precise) that can read data from Elasticsearch. The RDD is offered in two flavors: one for Scala (which returns the data as Tuple2 with Scala collections) and one for Java (which returns the data as Tuple2 containing java.util collections).
Whenever possible, consider using the native integration as it offers the best performance and maximum flexibility.
With elasticsearch-hadoop, any RDD can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the RDD type needs to be a Map (whether a Scala or a Java one), a JavaBean or a Scala case class. When that is not the case, one can easily transform the data in Spark or plug-in their own custom ValueWriter
Scala
When using Scala, simply import the org.elasticsearch.spark package which, through the pimp my library pattern, enriches the any RDD API with saveToEs methods:
Scala users might be tempted to use Seq and the → notation for declaring root objects (that is the JSON document) instead of using a Map. While similar, the first notation results in slightly different types that cannot be matched to a JSON document: Seq is an order sequence (in other words a list) while → creates a Tuple which is more or less an ordered, fixed number of elements. As such, a list of lists cannot be used as a document since it cannot be mapped to a JSON object; however it can be used freely within one. Hence why in the example above Map(k→v) was used instead of Seq(k→v)
As an alternative to the implicit import above, one can use elasticsearch-hadoop Spark support in Scala through EsSpark in the org.elasticsearch.spark.rdd package which acts as a utility class allowing explicit method invocations. Additionally instead of Maps (which are convenient but require one mapping per instance due to their difference in structure), use a case class :
For cases where the id (or other metadata fields like ttl or timestamp) of the document needs to be specified, one can do so by setting the appropriate mapping namely es.mapping.id. Following the previous example, to indicate to Elasticsearch to use the field id as the document id, update the RDD configuration (it is also possible to set the property on the SparkConf though due to its global effect it is discouraged):
Java
Java users have a dedicated class that provides a similar functionality to EsSpark, namely JavaEsSpark in the org.elasticsearch.spark.rdd.api.java (a package similar to Spark’s Java API):
The code can be further simplified by using Java 5 static imports. Additionally, the Map (who’s mapping is dynamic due to its loose structure) can be replaced with a JavaBean:
Setting the document id (or other metadata fields like ttl or timestamp) is similar to its Scala counterpart, though potentially a bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):
For cases where the data in the RDD is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either an RDD containing String or byte arrays (byte[]/Array[Byte]), assuming each entry represents a JSON document. If the RDD does not have the proper signature, the saveJsonToEs methods cannot be applied (in Scala they will not be available).
For cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the es.resource.write field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned media example, one could configure it as follows:
For each document/object about to be written, elasticsearch-hadoop will extract the media_type field and use its value to determine the target resource.
Java
As expected, things in Java are strikingly similar:
Elasticsearch allows each document to have its own metadata. As explained above, through the various mapping options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are sent back to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied outside the document itself through the use of pair RDDs. In other words, for RDDs containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.
The metadata is described through the Metadata Java enum within org.elasticsearch.spark.rdd package which identifies its type – id, ttl, version, etc…Thus an RDD keys can be a Map containing the Metadata for each document and its associated values. If RDD key is not of type Map, elasticsearch-hadoop will consider the object as representing the document id and use it accordingly. This sounds more complicated than it is, so let us see some examples.
Scala
Pair RDDs, or simply put RDDs with the signature RDD[(K,V)] can take advantage of the saveToEsWithMeta methods that are available either through the implicit import of org.elasticsearch.spark package or EsSpark object. To manually specify the id for each document, simply pass in the Object (not of type Map) in your RDD:
Java
In a similar fashion, on the Java side, JavaEsSpark provides saveToEsWithMeta methods that are applied to JavaPairRDD (the equivalent in Java of RDD[(K,V)]). Thus to save documents based on their ids one can use:
Configuration
To configure elasticsearch-hadoop for Apache Spark, one can set the various properties described in the Configuration chapter in the SparkConf object:
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName(appName).setMaster(master)
conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
conf.set("es.index.auto.create", "true");
Command-lineFor those that want to set the properties through the command-line (either directly or by loading them from a file), note that Spark only accepts those that start with the “spark.” prefix and will ignore the rest (and depending on the version a warning might be thrown). To work around this limitation, define the elasticsearch-hadoop properties by appending the spark. prefix (thus they become spark.es.) and elasticsearch-hadoop will automatically resolve them:
$ ./bin/spark-submit --conf spark.es.resource=index/type ...
~ Testimonials ~
Here’s what our customers have said.
Empowering Businesses with Exceptional Technology Consulting