﻿---
title: Apache Spark support
description: Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory. As opposed to the rest of the libraries...
url: https://www.elastic.co/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/apache-spark-support
products:
  - Elasticsearch
  - Elasticsearch for Apache Hadoop
---

# Apache Spark support
Spark provides fast iterative/functional-like capabilities over large data sets, typically by *caching* data in memory. As opposed to the rest of the libraries mentioned in this documentation, Apache Spark is computing framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly to HDFS. elasticsearch-hadoop allows Elasticsearch to be used in Spark in two ways: through the dedicated support available since 2.1 or through the Map/Reduce bridge since 2.0. Spark 2.0 is supported in elasticsearch-hadoop since version 5.0

## Installation

Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath. As Spark has multiple deployment modes, this can translate to the target classpath, whether it is on only one node (as is the case with the local mode - which will be used through-out the documentation) or per-node depending on the desired infrastructure.

### Native RDD support

elasticsearch-hadoop provides *native* integration between Elasticsearch and Apache Spark, in the form of an `RDD` (Resilient Distributed Dataset) (or *Pair* `RDD` to be precise) that can read data from Elasticsearch. The `RDD` is offered in two *flavors*: one for Scala (which returns the data as `Tuple2` with Scala collections) and one for Java (which returns the data as `Tuple2` containing `java.util` collections).
<important>
  Whenever possible, consider using the *native* integration as it offers the best performance and maximum flexibility.
</important>


#### Configuration

To configure elasticsearch-hadoop for Apache Spark, one can set the various properties described in the [*Configuration*](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration) chapter in the [`SparkConf`](http://spark.apache.org/docs/1.6.2/programming-guide.md#initializing-spark) object:
```scala
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName(appName).setMaster(master)
conf.set("es.index.auto.create", "true")
```

```java
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
conf.set("es.index.auto.create", "true");
```

For those that want to set the properties through the command-line (either directly or by loading them from a file), note that Spark *only* accepts those that start with the "spark." prefix and will *ignore* the rest (and depending on the version a warning might be thrown). To work around this limitation, define the elasticsearch-hadoop properties by appending the `spark.` prefix (thus they become `spark.es.`) and elasticsearch-hadoop will automatically resolve them:
```bash
$ ./bin/spark-submit --conf spark.es.resource=index/type ... 
```


#### Writing data to Elasticsearch

With elasticsearch-hadoop, any `RDD` can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the `RDD` type needs to be a `Map` (whether a Scala or a Java one), a [`JavaBean`](http://docs.oracle.com/javase/tutorial/javabeans/) or a Scala [case class](http://docs.scala-lang.org/tutorials/tour/case-classes.html). When that is not the case, one can easily *transform* the data in Spark or plug-in their own custom [`ValueWriter`](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#configuration-serialization).

##### Scala

When using Scala, simply import the `org.elasticsearch.spark` package which, through the [*pimp my library*](http://www.artima.com/weblogs/viewpost.jsp?thread=179766) pattern, enriches the  *any* `RDD` API with `saveToEs` methods:
```scala
import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD( 
  Seq(numbers, airports)
).saveToEs("spark/docs") 
```

<note>
  Scala users might be tempted to use `Seq` and the `→` notation for declaring *root* objects (that is the JSON document) instead of using a `Map`. While similar, the first notation results in slightly different types that cannot be matched to a JSON document: `Seq` is an order sequence (in other words a list) while `→` creates a `Tuple` which is more or less an ordered, fixed number of elements. As such, a list of lists cannot be used as a document since it cannot be mapped to a JSON object; however it can be used freely within one. Hence why in the example above `Map(k→v)` was used instead of `Seq(k→v)`
</note>

As an alternative to the *implicit* import above, one can use elasticsearch-hadoop Spark support in Scala through `EsSpark` in the `org.elasticsearch.spark.rdd` package which acts as a utility class allowing explicit method invocations. Additionally instead of `Map`s (which are convenient but require one mapping per instance due to their difference in structure), use a *case class* :
```scala
import org.apache.spark.SparkContext
import org.elasticsearch.spark.rdd.EsSpark                        

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))             
EsSpark.saveToEs(rdd, "spark/docs")                               
```

For cases where the id (or other metadata fields like `ttl` or `timestamp`) of the document needs to be specified, one can do so by setting the appropriate [mapping](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-mapping) namely `es.mapping.id`. Following the previous example, to indicate to Elasticsearch to use the field `id` as the document id, update the `RDD` configuration (it is also possible to set the property on the `SparkConf` though due to its global effect it is discouraged):
```scala
EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
```


##### Java

Java users have a dedicated class that provides a similar functionality to `EsSpark`, namely `JavaEsSpark` in the `org.elasticsearch.spark.rdd.api.java` (a package similar to Spark’s [Java API](https://spark.apache.org/docs/1.0.1/api/java/index.md?org/apache/spark/api/java/package-summary.md)):
```java
import org.apache.spark.api.java.JavaSparkContext;                              
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                        
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));<5>
JavaEsSpark.saveToEs(javaRDD, "spark/docs");                                    
```

The code can be further simplified by using Java 5 *static* imports. Additionally, the `Map` (who’s mapping is dynamic due to its *loose* structure) can be replaced with a `JavaBean`:
```java
public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
```

```java
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark;                
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(
                            ImmutableList.of(upcoming, lastWeek));        
saveToEs(javaRDD, "spark/docs");                                          
```

Setting the document id (or other metadata fields like `ttl` or `timestamp`) is similar to its Scala counterpart, though potentially a bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):
```java
JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
```


#### Writing existing JSON to Elasticsearch

For cases where the data in the `RDD` is already in JSON, elasticsearch-hadoop allows direct indexing *without* applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either an `RDD` containing `String` or byte arrays (`byte[]`/`Array[Byte]`), assuming each entry represents a JSON document. If the `RDD` does not have the proper signature, the `saveJsonToEs` methods cannot be applied (in Scala they will not be available).

##### Scala

```scala
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

new SparkContext(conf).makeRDD(Seq(json1, json2))
                      .saveJsonToEs("spark/json-trips") 
```


##### Java

```java
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); 
JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");             
```


#### Writing to dynamic/multi-resources

For cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the `es.resource.write` field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned [media example](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-multi-writes), one could configure it as follows:

##### Scala

```scala
val game = Map(
  "media_type"->"game", 
      "title" -> "FF VI",
       "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")  
```

For each document/object about to be written, elasticsearch-hadoop will extract the `media_type` field and use its value to determine the target resource.

##### Java

As expected, things in Java are strikingly similar:
```java
Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
saveToEs(javaRDD, "my-collection-{media_type}/doc");  
```


#### Handling document metadata

Elasticsearch allows each document to have its own [metadata](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/document-metadata-fields). As explained above, through the various [mapping](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-mapping) options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are sent back to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied *outside* the document itself through the use of [*pair* `RDD`s](http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs). In other words, for `RDD`s containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.
The metadata is described through the `Metadata` Java [enum](http://docs.oracle.com/javase/tutorial/java/javaOO/enum.md) within `org.elasticsearch.spark.rdd` package which identifies its type - `id`, `ttl`, `version`, etc… Thus an `RDD` keys can be a `Map` containing the `Metadata` for each document and its associated values. If `RDD` key is not of type `Map`, elasticsearch-hadoop will consider the object as representing the document id and use it accordingly. This sounds more complicated than it is, so let us see some examples.

##### Scala

Pair `RDD`s, or simply put `RDD`s with the signature `RDD[(K,V)]` can take advantage of the `saveToEsWithMeta` methods that are available either through the *implicit* import of `org.elasticsearch.spark` package or `EsSpark` object. To manually specify the id for each document, simply pass in the `Object` (not of type `Map`) in your `RDD`:
```scala
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
airportsRDD.saveToEsWithMeta("airports/2015")    
```

When more than just the id needs to be specified, one should use a `scala.collection.Map` with keys of type `org.elasticsearch.spark.rdd.Metadata`:
```scala
import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
airportsRDD.saveToEsWithMeta("airports/2015") 
```


##### Java

In a similar fashion, on the Java side, `JavaEsSpark` provides `saveToEsWithMeta` methods that are applied to `JavaPairRDD` (the equivalent in Java of `RDD[(K,V)]`). Thus to save documents based on their ids one can use:
```java
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( 
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       
```

When more than just the id needs to be specified, one can choose to use a `java.util.Map` populated with keys of type `org.elasticsearch.spark.rdd.Metadata`:
```java
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  
JavaEsSpark.saveToEsWithMeta(pairRDD, target);       
```


#### Reading data from Elasticsearch

For reading, one should define the Elasticsearch `RDD` that *streams* data from Elasticsearch to Spark.

##### Scala

Similar to writing, the `org.elasticsearch.spark` package, enriches the `SparkContext` API with `esRDD` methods:
```scala
import org.apache.spark.SparkContext    
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._        

...

val conf = ...
val sc = new SparkContext(conf)         

val RDD = sc.esRDD("radio/artists")     
```

The method can be overloaded to specify an additional query or even a configuration `Map` (overriding `SparkConf`):
```scala
...
import org.elasticsearch.spark._

...
val conf = ...
val sc = new SparkContext(conf)

sc.esRDD("radio/artists", "?q=me*") 
```

The documents from Elasticsearch are returned, by default, as a `Tuple2` containing as the first element the document id and the second element the actual document represented through Scala [collections](http://docs.scala-lang.org/overviews/collections/overview.html), namely one `Map[String, Any]`where the keys represent the field names and the value their respective values.

##### Java

Java users have a dedicated `JavaPairRDD` that works the same as its Scala counterpart however the returned `Tuple2` values (or second element) returns the documents as native, `java.util` collections.
```java
import org.apache.spark.api.java.JavaSparkContext;               
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;             
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);               

JavaPairRDD<String, Map<String, Object>> esRDD =
                        JavaEsSpark.esRDD(jsc, "radio/artists"); 
```

In a similar fashion one can use the overloaded `esRDD` methods to specify a query or pass a `Map` object for advanced configuration. Let us see how this looks, but this time around using [Java static imports](http://docs.oracle.com/javase/1.5.0/docs/guide/language/static-import.md). Further more, let us discard the documents ids and retrieve only the `RDD` values:
```java
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*;   

...
JavaRDD<Map<String, Object>> rdd =
        esRDD(jsc, "radio/artists", "?q=me*")  
            .values(); 
```

By using the `JavaEsSpark` API, one gets a hold of Spark’s dedicated `JavaPairRDD` which are better suited in Java environments than the base `RDD` (due to its Scala signatures). Moreover, the dedicated `RDD` returns Elasticsearch documents as proper Java collections so one does not have to deal with Scala collections (which is typically the case with `RDD`s). This is particularly powerful when using Java 8, which we strongly recommend as its [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.md) make collection processing *extremely* concise.
To wit, let us assume one wants to filter the documents from the `RDD` and return only those that contain a value that contains `mega` (please ignore the fact one can and should do the filtering directly through Elasticsearch).
In versions prior to Java 8, the code would look something like this:
```java
JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(
    new Function<Map<String, Object>, Boolean>() {
      @Override
      public Boolean call(Map<String, Object> map) throws Exception {
          returns map.contains("mega");
      }
    });
```

with Java 8, the filtering becomes a one liner:
```java
JavaRDD<Map<String, Object>> esRDD =
                        esRDD(jsc, "radio/artists", "?q=me*").values();
JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc ->
                                                doc.contains("mega"));
```


##### Reading data in JSON format

In case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can use the dedicated `esJsonRDD` methods. In this case, the connector will return the documents content as it is received from Elasticsearch without any processing as an `RDD[(String, String)]` in Scala or `JavaPairRDD[String, String]` in Java with the keys representing the document id and the value its actual content in JSON format.

#### Type conversion

<important>
  When dealing with multi-value/array fields, please see [this](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapping-types#mapping-multi-values) section and in particular [these](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-field-info) configuration options. IMPORTANT: If automatic index creation is used, please review [this](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapping-types#auto-mapping-type-loss) section for more information.
</important>

elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch [types](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/field-data-types) (and back) as shown in the table below:

| Scala type    | Elasticsearch type         |
|---------------|----------------------------|
| `None`        | `null`                     |
| `Unit`        | `null`                     |
| `Nil`         | empty `array`              |
| `Some[T]`     | `T` according to the table |
| `Map`         | `object`                   |
| `Traversable` | `array`                    |
| *case class*  | `object` (see `Map`)       |
| `Product`     | `array`                    |

in addition, the following *implied* conversion applies for Java types:

| Java type             | Elasticsearch type                      |
|-----------------------|-----------------------------------------|
| `null`                | `null`                                  |
| `String`              | `string`                                |
| `Boolean`             | `boolean`                               |
| `Byte`                | `byte`                                  |
| `Short`               | `short`                                 |
| `Integer`             | `int`                                   |
| `Long`                | `long`                                  |
| `Double`              | `double`                                |
| `Float`               | `float`                                 |
| `Number`              | `float` or `double` (depending on size) |
| `java.util.Calendar`  | `date`  (`string` format)               |
| `java.util.Date`      | `date`  (`string` format)               |
| `java.util.Timestamp` | `date`  (`string` format)               |
| `byte[]`              | `string` (BASE64)                       |
| `Object[]`            | `array`                                 |
| `Iterable`            | `array`                                 |
| `Map`                 | `object`                                |
| *Java Bean*           | `object` (see `Map`)                    |

The conversion is done as a *best* effort; built-in Java and Scala types are guaranteed to be properly converted, however there are no guarantees for user types whether in Java or Scala. As mentioned in the tables above, when a `case` class is encountered in Scala or `JavaBean` in Java, the converters will try to `unwrap` its content and save it as an `object`. Note this works only for top-level user objects - if the user object has other user objects nested in, the conversion is likely to fail since the converter does not perform nested `unwrapping`. This is done on purpose since the converter has to *serialize* and *deserialize* the data and user types introduce ambiguity due to data loss; this can be addressed through some type of mapping however that takes the project way too close to the realm of ORMs and arguably introduces too much complexity for little to no gain; thanks to the processing functionality in Spark and the plugability in elasticsearch-hadoop one can easily transform objects into other types, if needed with minimal effort and maximum control.
It is worth mentioning that rich data types available only in Elasticsearch, such as [`GeoPoint`](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/geo-point) or [`GeoShape`](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/geo-shape) are supported by converting their structure into the primitives available in the table above. For example, based on its storage a `geo_point` might be returned as a `String` or a `Traversable`.

## Spark Streaming support

<note>
  Added in 5.0.
</note>

[TBC: FANCY QUOTE]
Spark Streaming is an extension on top of the core Spark functionality that allows near real time processing of stream data. Spark Streaming works around the idea of `DStream`s, or *Discretized Streams*. `DStreams` operate by collecting newly arrived records into a small `RDD` and executing it. This repeats every few seconds with a new `RDD` in a process called *microbatching*. The `DStream` api includes many of the same processing operations as the `RDD` api, plus a few other streaming specific methods. elasticsearch-hadoop provides native integration with Spark Streaming as of version 5.0.
When using the elasticsearch-hadoop Spark Streaming support, Elasticsearch can be targeted as an output location to index data into from a Spark Streaming job in the same way that one might persist the results from an `RDD`. Though, unlike `RDD`s, you are unable to read data out of Elasticsearch using a `DStream` due to the continuous nature of it.
<important>
  Spark Streaming support provides special optimizations to allow for conservation of network resources on Spark executors when running jobs with very small processing windows. For this reason, one should prefer to use this integration instead of invoking `saveToEs` on `RDD`s returned from the `foreachRDD` call on `DStream`.
</important>


#### Writing `DStream` to Elasticsearch

Like `RDD`s, any `DStream` can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the `DStream` type needs to be a `Map` (either a Scala or a Java one), a [`JavaBean`](http://docs.oracle.com/javase/tutorial/javabeans/) or a Scala [case class](http://docs.scala-lang.org/tutorials/tour/case-classes.html). When that is not the case, one can easily *transform* the data in Spark or plug-in their own custom [`ValueWriter`](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#configuration-serialization).

##### Scala

When using Scala, simply import the `org.elasticsearch.spark.streaming` package which, through the [*pimp my library*](http://www.artima.com/weblogs/viewpost.jsp?thread=179766) pattern, enriches the `DStream` API with `saveToEs` methods:
```scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._               
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._

import org.elasticsearch.spark.streaming._           

...

val conf = ...
val sc = new SparkContext(conf)                      
val ssc = new StreamingContext(sc, Seconds(1))       

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)                

ssc.queueStream(microbatches).saveToEs("spark/docs") 

ssc.start()
ssc.awaitTermination() 
```

As an alternative to the *implicit* import above, one can use elasticsearch-hadoop Spark Streaming support in Scala through `EsSparkStreaming` in the `org.elasticsearch.spark.streaming` package which acts as a utility class allowing explicit method invocations. Additionally instead of `Map`s (which are convenient but require one mapping per instance due to their difference in structure), use a *case class* :
```scala
import org.apache.spark.SparkContext
import org.elasticsearch.spark.streaming.EsSparkStreaming         

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
val microbatches = mutable.Queue(rdd)                             
val dstream = ssc.queueStream(microbatches)

EsSparkStreaming.saveToEs(dstream, "spark/docs")                  

ssc.start()                                                       
```

<important>
  Once a SparkStreamingContext is started, no new `DStream`s can be added or configured. Once a context has stopped, it cannot be restarted. There can only be one active SparkStreamingContext at a time per JVM. Also note that when stopping a SparkStreamingContext programmatically, it stops the underlying SparkContext unless instructed not to.
</important>

For cases where the id (or other metadata fields like `ttl` or `timestamp`) of the document needs to be specified, one can do so by setting the appropriate [mapping](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-mapping) namely `es.mapping.id`. Following the previous example, to indicate to Elasticsearch to use the field `id` as the document id, update the `DStream` configuration (it is also possible to set the property on the `SparkConf` though due to its global effect it is discouraged):
```scala
EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
```


##### Java

Java users have a dedicated class that provides a similar functionality to `EsSparkStreaming`, namely `JavaEsSparkStreaming` in the package `org.elasticsearch.spark.streaming.api.java` (a package similar to Spark’s [Java API](https://spark.apache.org/docs/1.6.1/api/java/index.md?org/apache/spark/streaming/api/java/package-summary.md)):
```java
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;                                              
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;         
...

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);                              
JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1));

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);                   
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
microbatches.add(javaRDD);                                                      
JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs");                       

jssc.start()                                                                    
```

The code can be further simplified by using Java 5 *static* imports. Additionally, the `Map` (who’s mapping is dynamic due to its *loose* structure) can be replaced with a `JavaBean`:
```java
public class TripBean implements Serializable {
   private String departure, arrival;

   public TripBean(String departure, String arrival) {
       setDeparture(departure);
       setArrival(arrival);
   }

   public TripBean() {}

   public String getDeparture() { return departure; }
   public String getArrival() { return arrival; }
   public void setDeparture(String dep) { departure = dep; }
   public void setArrival(String arr) { arrival = arr; }
}
```

```java
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming;  
...

TripBean upcoming = new TripBean("OTP", "SFO");
TripBean lastWeek = new TripBean("MUC", "OTP");

JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek));
Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>();
microbatches.add(javaRDD);
JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches);       

saveToEs(javaDStream, "spark/docs");                                          

jssc.start()                                                              
```

Setting the document id (or other metadata fields like `ttl` or `timestamp`) is similar to its Scala counterpart, though potentially a bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):
```java
JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
```


#### Writing Existing JSON to Elasticsearch

For cases where the data being streamed by the `DStream` is already serialized as JSON, elasticsearch-hadoop allows direct indexing *without* applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either a `DStream` containing `String` or byte arrays (`byte[]`/`Array[Byte]`), assuming each entry represents a JSON document. If the `DStream` does not have the proper signature, the `saveJsonToEs` methods cannot be applied (in Scala they will not be available).

##### Scala

```scala
val json1 = """{"reason" : "business", "airport" : "SFO"}"""      
val json2 = """{"participants" : 5, "airport" : "OTP"}"""

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

val rdd = sc.makeRDD(Seq(json1, json2))
val microbatch = mutable.Queue(rdd)
ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips")      

ssc.start()                                                       
```


##### Java

```java
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";  
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>();      
microbatches.add(stringRDD);
JavaDStream<String> stringDStream = jssc.queueStream(microbatches);  

JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips");    

jssc.start()                                                         
```


#### Writing to dynamic/multi-resources

For cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the `es.resource.write` field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned [media example](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-multi-writes), one could configure it as follows:

##### Scala

```scala
val game = Map(
  "media_type" -> "game", 
       "title" -> "FF VI",
        "year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

val batch = sc.makeRDD(Seq(game, book, cd))
val microbatches = mutable.Queue(batch)
ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc")  
ssc.start()
```

For each document/object about to be written, elasticsearch-hadoop will extract the `media_type` field and use its value to determine the target resource.

##### Java

As expected, things in Java are strikingly similar:
```java
Map<String, ?> game =
  ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994");
Map<String, ?> book = ...
Map<String, ?> cd = ...

JavaRDD<Map<String, ?>> javaRDD =
                jsc.parallelize(ImmutableList.of(game, book, cd));
Queue<JavaRDD<Map<String, ?>>> microbatches = ...
JavaDStream<Map<String, ?>> javaDStream =
                jssc.queueStream(microbatches);

saveToEs(javaDStream, "my-collection-{media_type}/doc");  
jssc.start();
```


#### Handling document metadata

Elasticsearch allows each document to have its own [metadata](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/document-metadata-fields). As explained above, through the various [mapping](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-mapping) options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are sent back to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied *outside* the document itself through the use of [*pair* `RDD`s](http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs).
This is no different in Spark Streaming. For `DStreams`s containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.
The metadata is described through the `Metadata` Java [enum](http://docs.oracle.com/javase/tutorial/java/javaOO/enum.md) within `org.elasticsearch.spark.rdd` package which identifies its type - `id`, `ttl`, `version`, etc… Thus a `DStream’s keys can be a `Map`containing the`Metadata`for each document and its associated values. If the`DStream`key is not of type`Map`, elasticsearch-hadoop will consider the object as representing the document id and use it accordingly. This sounds more complicated than it is, so let us see some examples.

##### Scala

Pair `DStreams`s, or simply put `DStreams`s with the signature `DStream[(K,V)]` can take advantage of the `saveToEsWithMeta` methods that are available either through the *implicit* import of `org.elasticsearch.spark.streaming` package or `EsSparkStreaming` object. To manually specify the id for each document, simply pass in the `Object` (not of type `Map`) in your `DStream`:
```scala
val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = 
  sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))  
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()
```

When more than just the id needs to be specified, one should use a `scala.collection.Map` with keys of type `org.elasticsearch.spark.rdd.Metadata`:
```scala
import org.elasticsearch.spark.rdd.Metadata._          

val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
val muc = Map("iata" -> "MUC", "name" -> "Munich")
val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

// metadata for each document
// note it's not required for them to have the same structure
val otpMeta = Map(ID -> 1, TTL -> "3h")                
val mucMeta = Map(ID -> 2, VERSION -> "23")            
val sfoMeta = Map(ID -> 3)                             

// instance of SparkContext
val sc = ...
// instance of StreamingContext
val ssc = ...

val airportsRDD = sc.makeRDD( 
  Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
val microbatches = mutable.Queue(airportsRDD)

ssc.queueStream(microbatches)        
  .saveToEsWithMeta("airports/2015") 
ssc.start()
```


##### Java

In a similar fashion, on the Java side, `JavaEsSparkStreaming` provides `saveToEsWithMeta` methods that are applied to `JavaPairDStream` (the equivalent in Java of `DStream[(K,V)]`).
This tends to involve a little more work due to the Java API’s limitations. For instance, you cannot create a `JavaPairDStream` directly from a queue of `JavaPairRDD`s. Instead, you must create a regular `JavaDStream` of `Tuple2` objects and convert the `JavaDStream` into a `JavaPairDStream`. This sounds complex, but it’s a simple work around for a limitation of the API.
First, we’ll create a pair function, that takes a `Tuple2` object in, and returns it right back to the framework:
```java
public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable {
    @Override
    public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception {
        return tuple2;
    }
}
```

Then we’ll apply the pair function to a `JavaDStream` of `Tuple2`s to create a `JavaPairDStream` and save it:
```java
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC");

JavaSparkContext jsc = ...
JavaStreamingContext jssc = ...

// create an RDD of between the id and the docs
JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize(         
      ImmutableList.of(
        new Tuple2<Object, Object>(1, otp),          
        new Tuple2<Object, Object>(2, jfk)));        

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();
```

When more than just the id needs to be specified, one can choose to use a `java.util.Map` populated with keys of type `org.elasticsearch.spark.rdd.Metadata`. We’ll use the same typing trick to repack the `JavaDStream` as a `JavaPairDStream`:
```java
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
import org.elasticsearch.spark.rdd.Metadata;          

import static org.elasticsearch.spark.rdd.Metadata.*; 

// data to be saved
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

// metadata for each document
// note it's not required for them to have the same structure
Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); 
Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); 

JavaSparkContext jsc = ...

// create a pair RDD between the id and the docs
JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of(
        new Tuple2<Object, Object>(otpMeta, otp),    
        new Tuple2<Object, Object>(sfoMeta, sfo)));  

Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ...
JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); 

JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) 

JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target);       
jssc.start();
```


#### Spark Streaming Type Conversion

The elasticsearch-hadoop Spark Streaming support leverages the same type mapping as the regular Spark type mapping. The mappings are repeated here for consistency:

| Scala type    | Elasticsearch type         |
|---------------|----------------------------|
| `None`        | `null`                     |
| `Unit`        | `null`                     |
| `Nil`         | empty `array`              |
| `Some[T]`     | `T` according to the table |
| `Map`         | `object`                   |
| `Traversable` | `array`                    |
| *case class*  | `object` (see `Map`)       |
| `Product`     | `array`                    |

in addition, the following *implied* conversion applies for Java types:

| Java type             | Elasticsearch type                      |
|-----------------------|-----------------------------------------|
| `null`                | `null`                                  |
| `String`              | `string`                                |
| `Boolean`             | `boolean`                               |
| `Byte`                | `byte`                                  |
| `Short`               | `short`                                 |
| `Integer`             | `int`                                   |
| `Long`                | `long`                                  |
| `Double`              | `double`                                |
| `Float`               | `float`                                 |
| `Number`              | `float` or `double` (depending on size) |
| `java.util.Calendar`  | `date`  (`string` format)               |
| `java.util.Date`      | `date`  (`string` format)               |
| `java.util.Timestamp` | `date`  (`string` format)               |
| `byte[]`              | `string` (BASE64)                       |
| `Object[]`            | `array`                                 |
| `Iterable`            | `array`                                 |
| `Map`                 | `object`                                |
| *Java Bean*           | `object` (see `Map`)                    |

It is worth re-mentioning that rich data types available only in Elasticsearch, such as [`GeoPoint`](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/geo-point) or [`GeoShape`](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/geo-shape) are supported by converting their structure into the primitives available in the table above. For example, based on its storage a `geo_point` might be returned as a `String` or a `Traversable`.

## Spark SQL support

<note>
  Added in 2.1.
</note>

[TBC: FANCY QUOTE]
On top of the core Spark support, elasticsearch-hadoop also provides integration with Spark SQL. In other words, Elasticsearch becomes a *native* source for Spark SQL so that data can be indexed and queried from Spark SQL *transparently*.
<important>
  Spark SQL works with *structured* data - in other words, all entries are expected to have the *same* structure (same number of fields, of the same type and name). Using unstructured data (documents with different structures) is *not* supported and will cause problems. For such cases, use `PairRDD`s.
</important>


#### Supported Spark SQL versions

Spark SQL while becoming a mature component, is still going through significant changes between releases. Spark SQL became a stable component in version 1.3, however it is [**not** backwards compatible](https://spark.apache.org/docs/latest/sql-programming-guide.html#migration-guide) with the previous releases. Further more Spark 2.0 introduced significant changed which broke backwards compatibility, through the `Dataset` API. elasticsearch-hadoop supports both version Spark SQL 1.3-1.6 and Spark SQL 2.0 through two different jars: `elasticsearch-spark-1.x-<version>.jar` and `elasticsearch-hadoop-<version>.jar` support Spark SQL 1.3-1.6 (or higher) while `elasticsearch-spark-2.0-<version>.jar` supports Spark SQL 2.0. In other words, unless you are using Spark 2.0, use `elasticsearch-spark-1.x-<version>.jar`
Spark SQL support is available under `org.elasticsearch.spark.sql` package.
From the elasticsearch-hadoop user perspectives, the differences between Spark SQL 1.3-1.6 and Spark 2.0 are fairly consolidated. This [document](http://spark.apache.org/docs/2.0.0/sql-programming-guide.md#upgrading-from-spark-sql-16-to-20) describes at length the differences which are briefly mentioned below:
<definitions>
  <definition term="DataFrame vs Dataset">
    The core unit of Spark SQL in 1.3+ is a `DataFrame`. This API remains in Spark 2.0 however underneath it is based on a `Dataset`
  </definition>
  <definition term="Unified API vs dedicated Java/Scala APIs">
    In Spark SQL 2.0, the APIs are further [unified](http://spark.apache.org/docs/2.0.0/sql-programming-guide.md#datasets-and-dataframes) by introducing `SparkSession` and by using the same backing code for both `Dataset`s, `DataFrame`s and `RDD`s.
  </definition>
</definitions>

As conceptually, a `DataFrame` is a `Dataset[Row]`, the documentation below will focus on Spark SQL 1.3-1.6.

#### Writing `DataFrame` (Spark SQL 1.3+) to Elasticsearch

With elasticsearch-hadoop, `DataFrame`s (or any `Dataset` for that matter) can be indexed to Elasticsearch.

##### Scala

In Scala, simply import `org.elasticsearch.spark.sql` package which enriches the given `DataFrame` class with `saveToEs` methods; while these have the same signature as the `org.elasticsearch.spark` package, they are designed for `DataFrame` implementations:
```scala
// reusing the example from Spark SQL documentation

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._

import org.elasticsearch.spark.sql._      

...

// sc = existing SparkContext
val sqlContext = new SQLContext(sc)

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = sc.textFile("people.txt")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))
        .toDF()

people.saveToEs("spark/people")           
```

<note>
  By default, elasticsearch-hadoop will ignore null values in favor of not writing any field at all. Since a `DataFrame` is meant to be treated as structured tabular data, you can enable writing nulls as null valued fields for `DataFrame` Objects only by toggling the `es.spark.dataframe.write.null` setting to `true`.
</note>


##### Java

In a similar fashion, for Java usage the dedicated package `org.elasticsearch.spark.sql.api.java` provides similar functionality through the `JavaEsSpark SQL` :
```java
import org.apache.spark.sql.api.java.*;                      
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;  
...

DataFrame people = ...
JavaEsSparkSQL.saveToEs(people, "spark/people");                     
```

Again, with Java 5 *static* imports this can be further simplied to:
```java
import static org.elasticsearch.spark.sql.api.java.JavaEsSpark SQL; 
...
saveToEs("spark/people");                                          
```

<important>
  For maximum control over the mapping of your `DataFrame` in Elasticsearch, it is highly recommended to create the mapping before hand. See [this](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapping-types) chapter for more information.
</important>


#### Writing existing JSON to Elasticsearch

When using Spark SQL, if the input data is in JSON format, simply convert it to a `DataFrame` (in Spark SQL 1.3) or a `Dataset` (for Spark SQL 2.0) (as described in Spark [documentation](https://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)) through `SQLContext`/`JavaSQLContext` `jsonFile` methods.

#### Using pure SQL to read from Elasticsearch

<important>
  The index and its mapping, have to exist prior to creating the temporary table
</important>

Spark SQL 1.2 [introduced](http://spark.apache.org/releases/spark-release-1-2-0.html) a new [API](https://github.com/apache/spark/pull/2475) for reading from external data sources, which is supported by elasticsearch-hadoop simplifying the SQL configured needed for interacting with Elasticsearch. Further more, behind the scenes it understands the operations executed by Spark and thus can optimize the data and queries made (such as filtering or pruning), improving performance.

#### Data Sources in Spark SQL

When using Spark SQL, elasticsearch-hadoop allows access to Elasticsearch through `SQLContext` `load` method. In other words, to create a `DataFrame`/`Dataset` backed by Elasticsearch in a *declarative* manner:
```scala
val sql = new SQLContext...
// Spark 1.3 style
val df = sql.load( 
  "spark/index",   
  "org.elasticsearch.spark.sql") 
```

In Spark 1.4, one would use the following similar API calls:
```scala
// Spark 1.4 style
val df = sql.read      
  .format("org.elasticsearch.spark.sql") 
  .load("spark/index") 
```

In Spark 1.5, this can be further simplified to:
```scala
// Spark 1.5 style
val df = sql.read.format("es")<1>
  .load("spark/index")
```

1. Use `es` as an alias instead of the full package name for the `DataSource` provider

Whatever API is used, once created, the `DataFrame` can be accessed freely to manipulate the data.
The *sources* declaration also allows specific options to be passed in, namely:

| Name                          | Default value | Description                                                               |
|-------------------------------|---------------|---------------------------------------------------------------------------|
| `path`                        | *required*    | Elasticsearch index/type                                                  |
| `pushdown`                    | `true`        | Whether to translate (*push-down*) Spark SQL into Elasticsearch Query DSL |
| `strict`                      | `false`       | Whether to use *exact* (not analyzed) matching or not (analyzed)          |
| Usable in Spark 1.6 or higher |               |                                                                           |
| `double.filtering`            | `true`        | Whether to tell Spark apply its own filtering on the filters pushed down  |

Both options are explained in the next section. To specify the options (including the generic elasticsearch-hadoop ones), one simply passes a `Map` to the aforementioned methods:
For example:
```scala
val sql = new SQLContext...
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/index",
                    "pushdown" -> "true",     
                    "es.nodes" -> "someNode", 
                     "es.port" -> "9200")

// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) 

// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true",     
                  "es.nodes" -> "someNode", 
                   "es.port" -> "9200")

// Spark 1.4 style
val spark14DF = sql.read.format("org.elasticsearch.spark.sql")
                        .options(options) 
                        .load("spark/index")
```

```scala
sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', nodes 'someNode')" ) " 
```

Do note that due to the SQL parser, the `.` (among other common characters used for delimiting) is not allowed; the connector tries to work around it by append the `es.` prefix automatically however this works only for specifying the configuration options with only one `.` (like `es.nodes` above). Because of this, if properties with multiple `.` are needed, one should use the `SQLContext.load` or `SQLContext.read` methods above and pass the properties as a `Map`.

#### Push-Down operations

An important *hidden* feature of using elasticsearch-hadoop as a Spark `source` is that the connector understand the operations performed within the `DataFrame`/SQL and, by default, will *translate* them into the appropriate [QueryDSL](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/explore-analyze/query-filter/languages/querydsl). In other words, the connector *pushes* down the operations directly at the source, where the data is efficiently filtered out so that *only* the required data is streamed back to Spark. This significantly increases the queries performance and minimizes the CPU, memory and I/O on both Spark and Elasticsearch clusters as only the needed data is returned (as oppose to returning the data in bulk only to be processed and discarded by Spark). Note the push down operations apply even when one specifies a query - the connector will *enhance* it according to the specified SQL.
As a side note, elasticsearch-hadoop supports *all* the `Filter`s available in Spark (1.3.0 and higher) while retaining backwards binary-compatibility with Spark 1.3.0, pushing down to full extent the SQL operations to Elasticsearch without any user interference.
Operators those have been optimized as pushdown filters:

| SQL syntax                 | ES 1.x/2.x syntax | ES 5.x syntax             |
|----------------------------|-------------------|---------------------------|
| = null , is_null           | missing           | must_not.exists           |
| = (strict)                 | term              | term                      |
| = (not strict)             | match             | match                     |
| > , < , >= , ⇐             | range             | range                     |
| is_not_null                | exists            | exists                    |
| in (strict)                | terms             | terms                     |
| in (not strict)            | or.filters        | bool.should               |
| and                        | and.filters       | bool.filter               |
| or                         | or.filters        | bool.should [bool.filter] |
| not                        | not.filter        | bool.must_not             |
| StringStartsWith           | wildcard(arg*)    | wildcard(arg*)            |
| StringEndsWith             | wildcard(*arg)    | wildcard(*arg)            |
| StringContains             | wildcard(*arg*)   | wildcard(*arg*)           |
| EqualNullSafe (strict)     | term              | term                      |
| EqualNullSafe (not strict) | match             | match                     |

To wit, consider the following Spark SQL:
```scala
// as a DataFrame
val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips")

df.printSchema()
// root
//|-- departure: string (nullable = true)
//|-- arrival: string (nullable = true)
//|-- days: long (nullable = true)

val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))
```

or in pure SQL:
```sql
CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips")
SELECT departure FROM trips WHERE arrival = "OTP" and days > 3
```

The connector translates the query into:
```json
{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}

      },
      "filter" : {
        "and" : [{
            "query" : {
              "match" : {
                "arrival" : "OTP"
              }
            }
          }, {
            "days" : {
              "gt" : 3
            }
          }
        ]
      }
    }
  }
}
```

Further more, the pushdown filters can work on `analyzed` terms (the default) or can be configured to be *strict* and provide `exact` matches (work only on `not-analyzed` fields). Unless one manually specifies the mapping, it is highly recommended to leave the defaults as they are.  This and other topics are discussed at length in the Elasticsearch [Reference Documentation](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/query-languages/query-dsl/query-dsl-term-query).
Note that `double.filtering`, available since elasticsearch-hadoop 2.2 for Spark 1.6 or higher, allows filters that are already pushed down to Elasticsearch to be processed/evaluated by Spark as well (default) or not. Turning this feature off, especially when dealing with large data sizes speed things up. However one should pay attention to the semantics as turning this off, might return different results (depending on how the data is indexed, `analyzed` vs `not_analyzed`). In general, when turning *strict* on, one can disable `double.filtering` as well.

#### Data Sources as tables

Available since Spark SQL 1.2, one can also access a data source by declaring it as a Spark temporary table (backed by elasticsearch-hadoop):
```scala
sqlContext.sql(
   "CREATE TEMPORARY TABLE myIndex    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS (resource 'spark/index', " + 
            "scroll_size '20')" ) 
```

Once defined, the schema is picked up automatically. So one can issue queries, right away:
```sql
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")
```

As elasticsearch-hadoop is aware of the queries being made, it can *optimize* the requests done to Elasticsearch. For example, given the following query:
```sql
val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")
```

it knows only the `name` and `id` fields are required (the first to be returned to the user, the second for Spark’s internal filtering) and thus will ask *only* for this data, making the queries quite efficient.

#### Reading `DataFrame`s (Spark SQL 1.3) from Elasticsearch

As you might have guessed, one can define a `DataFrame` backed by Elasticsearch documents. Or even better, have them backed by a query result, effectively creating dynamic, real-time *views* over your data.

##### Scala

Through the `org.elasticsearch.spark.sql` package, `esDF` methods are available on the `SQLContext` API:
```scala
import org.apache.spark.sql.SQLContext        

import org.elasticsearch.spark.sql._          
...

val sql = new SQLContext(sc)

val people = sql.esDF("spark/people")         

// check the associated schema
println(people.schema.treeString)             
// root
//  |-- name: string (nullable = true)
//  |-- surname: string (nullable = true)
//  |-- age: long (nullable = true)           
```

And just as with the Spark *core* support, additional parameters can be specified such as a query. This is quite a *powerful* concept as one can filter the data at the source (Elasticsearch) and use Spark only on the results:
```scala
// get only the Smiths
val smiths = sqlContext.esDF("spark/people","?q=Smith") 
```

In some cases, especially when the index in Elasticsearch contains a lot of fields, it is desireable to create a `DataFrame` that contains only a *subset* of them. While one can modify the `DataFrame` (by working on its backing `RDD`) through the official Spark API or through dedicated queries, elasticsearch-hadoop allows the user to specify what fields to include and exclude from Elasticsearch when creating the `DataFrame`.
Through `es.read.field.include` and `es.read.field.exclude` properties, one can indicate what fields to include or exclude from the index mapping. The syntax is similar to that of Elasticsearch [include/exclude](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/rest-apis/retrieve-selected-fields#source-filtering). Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included and no properties/fields are excluded. Note that these properties can include leading and trailing wildcards. Including part of a hierarchy of fields without a trailing wildcard does not imply that the entire hierarcy is included. However in most cases it does not make sense to include only part of a hierarchy, so a trailing wildcard should be included.
For example:
```ini
# include
es.read.field.include = *name, address.*
# exclude
es.read.field.exclude = *.created
```

<important>
  Due to the way SparkSQL works with a `DataFrame` schema, elasticsearch-hadoop needs to be aware of what fields are returned from Elasticsearch *before* executing the actual queries. While one can restrict the fields manually through the underlying Elasticsearch query, elasticsearch-hadoop is unaware of this and the results are likely to be different or worse, errors will occur. Use the properties above instead, which Elasticsearch will properly use alongside the user query.
</important>


##### Java

For Java users, a dedicated API exists through `JavaEsSpark SQL`. It is strikingly similar to `EsSpark SQL` however it allows configuration options to be passed in through Java collections instead of Scala ones; other than that using the two is exactly the same:
```java
import org.apache.spark.sql.api.java.JavaSQLContext;          
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;   
...
SQLContext sql = new SQLContext(sc);

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");  
```

Better yet, the `DataFrame` can be backed by a query result:
```java
DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people", "?q=Smith"); 
```


#### Spark SQL Type conversion

<important>
  When dealing with multi-value/array fields, please see [this](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapping-types#mapping-multi-values) section and in particular [these](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-field-info) configuration options. IMPORTANT: If automatic index creation is used, please review [this](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapping-types#auto-mapping-type-loss) section for more information.
</important>

elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch [types](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/field-data-types) (and back) as shown in the table below:
While Spark SQL [`DataType`s](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types) have an equivalent in both Scala and Java and thus the [RDD](#spark-type-conversion) conversion can apply, there are slightly different semantics - in particular with the `java.sql` types due to the way Spark SQL handles them:

| Spark SQL `DataType` | Elasticsearch type       |
|----------------------|--------------------------|
| `null`               | `null`                   |
| `ByteType`           | `byte`                   |
| `ShortType`          | `short`                  |
| `IntegerType`        | `int`                    |
| `LongType`           | `long`                   |
| `FloatType`          | `float`                  |
| `DoubleType`         | `double`                 |
| `StringType`         | `string`                 |
| `BinaryType`         | `string` (BASE64)        |
| `BooleanType`        | `boolean`                |
| `DateType`           | `date` (`string` format) |
| `TimestampType`      | `long` (unix time)       |
| `ArrayType`          | `array`                  |
| `MapType`            | `object`                 |
| `StructType`         | `object`                 |

In addition to the table above, for Spark SQL 1.3 or higher, elasticsearch-hadoop performs automatic schema detection for geo types, namely Elasticsearch `geo_point` and `geo_shape`. Since each type allows multiple formats (`geo_point` accepts latitude and longitude to be specified in 4 different ways, while `geo_shape` allows a variety of types (currently 9)) and the mapping does not provide such information, elasticsearch-hadoop will *sample* the determined geo fields at startup and retrieve an arbitrary document that contains all the relevant fields; it will parse it and thus determine the necessary schema (so for example it can tell whether a `geo_point` is specified as a `StringType` or as an `ArrayType`).
<important>
  Since Spark SQL is strongly-typed, each geo field needs to have the same format across *all* documents. Shy of that, the returned data will not fit the detected schema and thus lead to errors.
</important>


## Spark Structured Streaming support

<note>
  Added in 6.0.
</note>

[TBC: FANCY QUOTE]
Released as an experimental feature in Spark 2.0, Spark Structured Streaming provides a unified streaming and batch interface built into the Spark SQL integration. As of elasticsearch-hadoop 6.0, we provide native functionality to index streaming data into Elasticsearch.
<important>
  Like Spark SQL, Structured Streaming works with *structured* data. All entries are expected to have the *same* structure (same number of fields, of the same type and name). Using unstructured data (documents with different structures) is *not* supported and will cause problems. For such cases, use `DStream`s.
</important>


#### Supported Spark Structured Streaming versions

Spark Structured Streaming is considered *generally available* as of Spark v2.2.0. As such, elasticsearch-hadoop support for Structured Streaming (available in elasticsearch-hadoop 6.0+) is only compatible with Spark versions 2.2.0 and onward. Similar to Spark SQL before it, Structured Streaming may be subject to significant changes between releases before its interfaces are considered *stable*.
Spark Structured Streaming support is available under the `org.elasticsearch.spark.sql` and `org.elasticsearch.spark.sql.streaming` packages. It shares a unified interface with Spark SQL in the form of the `Dataset[_]` api. Clients can interact with streaming `Dataset`s in almost exactly the same way as regular batch `Dataset`s with only a [few exceptions](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations).

#### Writing Streaming `Datasets` (Spark SQL 2.0+) to Elasticsearch

With elasticsearch-hadoop, Stream-backed `Dataset`s can be indexed to Elasticsearch.

##### Scala

In Scala, to save your streaming based `Dataset`s and `DataFrame`s to Elasticsearch, simply configure the stream to write out using the `"es"` format, like so:
```scala
import org.apache.spark.sql.SparkSession    

...

val spark = SparkSession.builder()
   .appName("EsStreamingExample")           
   .getOrCreate()

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = spark.readStream                   
        .textFile("/path/to/people/files/*")    
        .map(_.split(","))
        .map(p => Person(p(0), p(1), p(2).trim.toInt))

people.writeStream
      .option("checkpointLocation", "/save/location")      
      .format("es")
      .start("spark/people")                               
```

<warning>
  Spark makes no type-based differentiation between batch and streaming based `Dataset`s. While you may be able to import the `org.elasticsearch.spark.sql` package to add `saveToEs` methods to your `Dataset` or `DataFrame`, it will throw an illegal argument exception if those methods are called on streaming based `Dataset`s or `DataFrame`s.
</warning>


##### Java

In a similar fashion, the `"es"` format is available for Java usage as well:
```java
import org.apache.spark.sql.SparkSession    

...

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")     
  .getOrCreate();

// java bean style class
public static class PersonBean {
  private String name;
  private String surname;                        
  private int age;

  ...
}

Dataset<PersonBean> people = spark.readStream()         
        .textFile("/path/to/people/files/*")
        .map(new MapFunction<String, PersonBean>() {
            @Override
            public PersonBean call(String value) throws Exception {
                return someFunctionThatParsesStringToJavaBeans(value.split(","));         
            }
        }, Encoders.<PersonBean>bean(PersonBean.class));

people.writeStream()
    .option("checkpointLocation", "/save/location")       
    .format("es")
    .start("spark/people");                               
```


#### Writing existing JSON to Elasticsearch

When using Spark SQL, if the input data is in JSON format, simply convert it to a `Dataset` (for Spark SQL 2.0) (as described in Spark [documentation](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)) through the `DataStreamReader’s `json` format.

#### Sink commit log in Spark Structured Streaming

Spark Structured Streaming advertises an end-to-end fault-tolerant exactly-once processing model that is made possible through the usage of offset checkpoints and maintaining commit logs for each streaming query. When executing a streaming query, most sources and sinks require you to specify a "checkpointLocation" in order to persist the state of your job. In the event of an interruption, launching a new streaming query with the same checkpoint location will recover the state of the job and pick up where it left off. We maintain a commit log for elasticsearch-hadoop’s Elasticsearch sink implementation in a special directory under the configured checkpoint location:
```sh
$> ls /path/to/checkpoint/location
metadata  offsets/  sinks/
$> ls /path/to/checkpoint/location/sinks
elasticsearch/
$> ls /path/to/checkpoint/location/sinks/elasticsearch
12.compact  13  14  15 16  17  18
```

Each file in the commit log directory corresponds to a batch id that has been committed. The log implementation periodically compacts the logs down to avoid clutter. You can set the location for the log directory a number of ways:
1. Set the explicit log location with `es.spark.sql.streaming.sink.log.path` (see below).
2. If that is not set, then the path specified by `checkpointLocation` will be used.
3. If that is not set, then a path will be constructed by combining the value of `spark.sql.streaming.checkpointLocation` from the SparkSession with the `Dataset’s given query name.
4. If no query name is present, then a random UUID will be used in the above case instead of the query name
5. If none of the above settings are provided then the `start` call will throw an exception

Here is a list of configurations that affect the behavior of Elasticsearch's commit log:
<definitions>
  <definition term="es.spark.sql.streaming.sink.log.enabled (default true)">
    Enables or disables the commit log for a streaming job. By default, the log is enabled, and output batches with the same batch id will be skipped to avoid double-writes. When this is set to `false`, the commit log is disabled, and all outputs will be sent to Elasticsearch, regardless if they have been sent in a previous execution.
  </definition>
  <definition term="es.spark.sql.streaming.sink.log.path">
    Sets the location to store the log data for this streaming query. If this value is not set, then the Elasticsearch sink will store its commit logs under the path given in `checkpointLocation`. Any HDFS Client compatible URI is acceptable.
  </definition>
  <definition term="es.spark.sql.streaming.sink.log.cleanupDelay (default 10m)">
    The commit log is managed through Spark’s HDFS Client. Some HDFS compatible filesystems (like Amazon’s S3) propagate file changes in an asynchronous manner. To get around this, after a set of log files have been compacted, the client will wait for this amount of time before cleaning up the old files.
  </definition>
  <definition term="es.spark.sql.streaming.sink.log.deletion (default true)">
    Determines if the log should delete old logs that are no longer needed. After every batch is committed, the client will check to see if there are any commit logs that have been compacted and are safe to be removed. If set to `false`, the log will skip this cleanup step, leaving behind a commit file for each batch.
  </definition>
  <definition term="es.spark.sql.streaming.sink.log.compactInterval (default 10)">
    Sets the number of batches to process before compacting the log files. By default, every 10 batches the commit log will be compacted down into a single file that contains all previously committed batch ids.
  </definition>
</definitions>


#### Spark Structured Streaming Type conversion

Structured Streaming uses the exact same type conversion rules as the Spark SQL integration.
<important>
  When dealing with multi-value/array fields, please see [this](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapping-types#mapping-multi-values) section and in particular [these](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/configuration#cfg-field-info) configuration options.
</important>

<important>
  If automatic index creation is used, please review [this](/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapping-types#auto-mapping-type-loss) section for more information.
</important>

elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch [types](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/elasticsearch/mapping-reference/field-data-types) as shown in the table below:
While Spark SQL [`DataType`s](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types) have an equivalent in both Scala and Java and thus the [RDD](#spark-type-conversion) conversion can apply, there are slightly different semantics - in particular with the `java.sql` types due to the way Spark SQL handles them:

| Spark SQL `DataType` | Elasticsearch type       |
|----------------------|--------------------------|
| `null`               | `null`                   |
| `ByteType`           | `byte`                   |
| `ShortType`          | `short`                  |
| `IntegerType`        | `int`                    |
| `LongType`           | `long`                   |
| `FloatType`          | `float`                  |
| `DoubleType`         | `double`                 |
| `StringType`         | `string`                 |
| `BinaryType`         | `string` (BASE64)        |
| `BooleanType`        | `boolean`                |
| `DateType`           | `date` (`string` format) |
| `TimestampType`      | `long` (unix time)       |
| `ArrayType`          | `array`                  |
| `MapType`            | `object`                 |
| `StructType`         | `object`                 |


### Using the Map/Reduce layer

Another way of using Spark with Elasticsearch is through the Map/Reduce layer, that is by leveraging the dedicated `Input/OuputFormat` in elasticsearch-hadoop. However, unless one is stuck on elasticsearch-hadoop 2.0, we *strongly* recommend using the native integration as it offers significantly better performance and flexibility.

#### Configuration

Through elasticsearch-hadoop, Spark can integrate with Elasticsearch through its dedicated `InputFormat`, and in case of writing, through `OutputFormat`. These are described at length in the [Map/Reduce](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapreduce-integration) chapter so please refer to that for an in-depth explanation.
In short, one needs to setup a basic Hadoop `Configuration` object with the target Elasticsearch cluster and index, potentially a query, and she’s good to go.
From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely `Writable`s. As such, `InputFormat` and `OutputFormat`s are required to return `Writables` which, out of the box, Spark does not understand. The good news is, one can easily enable a different serialization ([Kryo](https://github.com/EsotericSoftware/kryo)) which handles the conversion automatically and also does this quite efficiently.
```java
SparkConf sc = new SparkConf(); //.setMaster("local");
sc.set("spark.serializer", KryoSerializer.class.getName()); 

// needed only when using the Java API
JavaSparkContext jsc = new JavaSparkContext(sc);
```

Or if you prefer Scala
```scala
val sc = new SparkConf(...)
sc.set("spark.serializer", classOf[KryoSerializer].getName) 
```

Note that the Kryo serialization is used as a work-around for dealing with `Writable` types; one can choose to convert the types directly (from `Writable` to `Serializable` types) - which is fine however for getting started, the one liner above seems to be the most effective.

#### Reading data from Elasticsearch

To read data, simply pass in the `org.elasticsearch.hadoop.mr.EsInputFormat` class - since it supports both the `old` and the `new` Map/Reduce APIs, you are free to use either method on `SparkContext’s, `hadoopRDD`(which we recommend for conciseness reasons) or`newAPIHadoopRDD`. Which ever you chose, stick with it to avoid confusion and problems down the road.

##### *Old* (`org.apache.hadoop.mapred`) API

```java
JobConf conf = new JobConf();                             
conf.set("es.resource", "radio/artists");                 
conf.set("es.query", "?q=me*");                           

JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class,
                          Text.class, MapWritable.class); 
long docCount = esRDD.count();
```

The Scala version is below:
```scala
val conf = new JobConf()                                   
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.hadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();
```


##### *New* (`org.apache.hadoop.mapreduce`) API

As expected, the `mapreduce` API version is strikingly similar - replace `hadoopRDD` with `newAPIHadoopRDD` and `JobConf` with `Configuration`. That’s about it.
```java
Configuration conf = new Configuration();       
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 

JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class,
                Text.class, MapWritable.class); 
long docCount = esRDD.count();
```

The Scala version is below:
```scala
val conf = new Configuration()                             
conf.set("es.resource", "radio/artists")                   
conf.set("es.query", "?q=me*")                             
val esRDD = sc.newAPIHadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]], 
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();
```


## Using the connector from PySpark

Thanks to its [Map/Reduce](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/elasticsearch-hadoop/mapreduce-integration) layer, elasticsearch-hadoop can be used from PySpark as well to both read and write data to Elasticsearch. To wit, below is a snippet from the [Spark documentation](https://spark.apache.org/docs/1.5.1/programming-guide.md#external-datasets) (make sure to switch to the Python snippet):
```python
$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()        
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})
```

Also, the SQL loader can be used as well:
```python
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()
```