﻿---
title: Map/Reduce integration
description: For low-level or performance-sensitive environments, elasticsearch-hadoop provides dedicated InputFormat and OutputFormat implementations that can read...
url: https://www.elastic.co/elastic/docs-builder/docs/3028/reference/elasticsearch-hadoop/mapreduce-integration
products:
  - Elasticsearch
  - Elasticsearch for Apache Hadoop
---

# Map/Reduce integration
For low-level or performance-sensitive environments, elasticsearch-hadoop provides dedicated `InputFormat` and `OutputFormat` implementations that can read and write data to Elasticsearch. In Map/Reduce, the `Mapper`s and `Reducer`s are reading and writing `Writable` objects, a Hadoop specific interface optimized for serialization. As such, elasticsearch-hadoop `InputFormat` and `OutputFormat` will return and expect `MapWritable` objects; A map is used for each document being read or written. The map itself can have any type of internal structure as long as its objects are also `Writable` - it can hold nested maps, numbers or strings in their `Writable` representation. Internally elasticsearch-hadoop automatically converts the `Map` of `Writable` to JSON documents and vice-versa so you do not have to deal with the low-level parsing or conversion to and from JSON. Moreover, if the data sent to Elasticsearch is already in JSON format, it can be streamed in directly without any conversion to `Writable` objects. Read the rest of the chapter to find out more.

## Installation

In order to use elasticsearch-hadoop, the [jar](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/elasticsearch-hadoop/installation) needs to be available to the job class path. At ~`250kB` and without any dependencies, the jar can be either bundled in the job archive, manually or through CLI [Generic Options](http://hadoop.apache.org/docs/r1.2.1/commands_manual.html#Generic`Options) (if your jar implements the [Tool](http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/util/Tool.html) interface), be distributed through Hadoop’s [DistributedCache](http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#DistributedCache) or made available by provisioning the cluster manually.
<important>
  All the options above affect *only* the code running on the distributed nodes. If your code that launches the Hadoop job refers to elasticsearch-hadoop, make sure to include the JAR in the `HADOOP_CLASSPATH`: `HADOOP_CLASSPATH="<colon-separated-paths-to-your-jars-including-elasticsearch-hadoop>"`
</important>

```bash
$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar
```


## Configuration

When using elasticsearch-hadoop in a Map/Reduce job, one can use Hadoop’s `Configuration` object to configure elasticsearch-hadoop by setting the various options as properties on the aforementioned object. Typically one would set the Elasticsearch host and port (assuming it is not running on the default `localhost:9200`), the target index/type and potentially the query, for example:
```java
Configuration conf = new Configuration();
conf.set("es.nodes", "es-server:9200");    
conf.set("es.resource", "radio/artists");  
...
```

Simply use the configuration object when constructing the Hadoop job and you are all set.

## Writing data to Elasticsearch

With elasticsearch-hadoop, Map/Reduce jobs can write data to Elasticsearch making it searchable through [indexes](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/glossary/#glossary-index). elasticsearch-hadoop supports both (so-called)  [*old*](http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/package-use.html) and [*new*](http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapreduce/package-use.html) Hadoop APIs.
`EsOutputFormat` expects a `Map<Writable, Writable>` representing a *document* value that is converted internally into a JSON document and indexed in Elasticsearch. Hadoop `OutputFormat` requires implementations to expect a key and a value however, since for Elasticsearch only the document (that is the value) is necessary, `EsOutputFormat` ignores the key.

### *Old* (`org.apache.hadoop.mapred`) API

To write data to ES, use `org.elasticsearch.hadoop.mr.EsOutputFormat` on your job along with the relevant configuration [properties](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/elasticsearch-hadoop/configuration):
```java
JobConf conf = new JobConf();
conf.setSpeculativeExecution(false);           
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");      
conf.setOutputFormat(EsOutputFormat.class);    
conf.setMapOutputValueClass(MapWritable.class);<4>
conf.setMapperClass(MyMapper.class);
...
JobClient.runJob(conf);
```

A `Mapper` implementation can use `EsOutputFormat` as follows:
```java
public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   // write the result to the output collector
   // one can pass whatever value to the key; EsOutputFormat ignores it
   output.collect(NullWritable.get(), map);
 }}
```

For cases where the id (or other metadata fields like `ttl` or `timestamp`) of the document needs to be specified, one can do so by setting the appropriate [mapping](/elastic/docs-builder/docs/3028/reference/elasticsearch-hadoop/configuration#cfg-mapping) namely `es.mapping.id`. Thus assuming the documents contain a field called `radioId` which is unique and is suitable for an identifier, one can update the job configuration as follows:
```java
JobConf conf = new JobConf();
conf.set("es.mapping.id", "radioId");
```

At runtime, elasticsearch-hadoop will extract the value from each document and use it accordingly during the bulk call.

### Writing existing JSON to Elasticsearch

For cases where the job input data is already in JSON, elasticsearch-hadoop allows direct indexing *without* applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting the `es.input.json` parameter. As such, in this case elasticsearch-hadoop expects either a `Text` or `BytesWritable` (preferred as it requires no `String` conversion) object as output; if these types are not used, the library will simply fall back to the `toString` representation of the target object.

| `Writable`      | Comment                                                             |
|-----------------|---------------------------------------------------------------------|
| `BytesWritable` | use this when the JSON data is represented as a `byte[]` or similar |
| `Text`          | use this if the JSON data is represented as a `String`              |
| *anything else* | make sure the `toString()` returns the desired JSON document        |

<important>
  Make sure the data is properly encoded, in `UTF-8`. The job output is considered the final form of the document sent to Elasticsearch.
</important>

```java
JobConf conf = new JobConf();
conf.set("es.input.json", "yes");        
conf.setMapOutputValueClass(Text.class); 
...
JobClient.runJob(conf);
```

The `Mapper` implementation becomes:
```java
public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   // assuming the document is a String called 'source'
   String source =  ...
   Text jsonDoc = new Text(source);
   // send the doc directly
   output.collect(NullWritable.get(), jsonDoc);
 }}
```


### Writing to dynamic/multi-resources

For cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the `es.resource.write` field which accepts pattern that are resolved from the document content, at runtime. Following the aforementioned [media example](/elastic/docs-builder/docs/3028/reference/elasticsearch-hadoop/configuration#cfg-multi-writes), one could configure it as follows:
```java
JobConf conf = new JobConf();
conf.set("es.resource.write","my-collection-{media-type}/doc");
```

If `Writable` objects are used, for each `MapWritable` elasticsearch-hadoop will extract the value under `media-type` key and use that as the Elasticsearch index suffix. If raw JSON is used, then elasticsearch-hadoop will parse the document, extract the field `media-type` and use its value accordingly.

### *New* (`org.apache.hadoop.mapreduce`) API

Using the *new* is strikingly similar - in fact, the exact same class (`org.elasticsearch.hadoop.mr.EsOutputFormat`) is used:
```java
Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);    
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); 
conf.set("es.nodes", "es-server:9200");
conf.set("es.resource", "radio/artists");                            
Job job = new Job(conf);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputValueClass(MapWritable.class);                       
...
job.waitForCompletion(true);
```

Same goes for the `Mapper` instance :
```java
public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // create the MapWritable object
   MapWritable doc = new MapWritable();
   ...
   context.write(NullWritable.get(), doc);
 }}
```

Specifying the id or other document [metadata](/elastic/docs-builder/docs/3028/reference/elasticsearch-hadoop/configuration#cfg-mapping) is just as easy:
```java
Configuration conf = new Configuration();
conf.set("es.mapping.id", "radioId");
```


### Writing existing JSON to Elasticsearch

As before, when dealing with JSON directly, under the *new* API the configuration looks as follows:
```java
Configuration conf = new Configuration();
conf.set("es.input.json", "yes");                 
Job job = new Job(conf);
job.setMapOutputValueClass(BytesWritable.class); 
...
job.waitForCompletion(true);
```

```java
public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   // assuming the document is stored as bytes
   byte[] source =  ...
   BytesWritable jsonDoc = new BytesWritable(source);
   // send the doc directly
   context.write(NullWritable.get(), jsonDoc);
 }}
```


### Writing to dynamic/multi-resources

As expected, the difference between the `old` and `new` API are minimal (to be read non-existing) in this case as well:
```java
Configuration conf = new Configuration();
conf.set("es.resource.write","my-collection-{media-type}/doc");
...
```


## Reading data from Elasticsearch

In a similar fashion, to read data from Elasticsearch, one needs to use `org.elasticsearch.hadoop.mr.EsInputFormat` class. While it can read an entire index, it is much more convenient to use a query - elasticsearch-hadoop will automatically execute the query *in real time* and return back the feed the results back to Hadoop. Since the query is executed against the real data, this acts as a *live* view of the data set.
Just like its counter partner (`EsOutputFormat`), `EsInputFormat` returns a `Map<Writable, Writable>` for each JSON document returned by Elasticsearch. Since the `InputFormat` requires both a key and a value to be returned, `EsInputFormat` will return the document id (inside Elasticsearch) as the key (typically ignored) and the document/map as the value.
<tip>
  If one needs the document structure returned from Elasticsearch to be preserved, consider using `org.elasticsearch.hadoop.mr.LinkedMapWritable`. The class extends Hadoop’s `MapWritable` (and thus can easily replace it) and preserve insertion order; that is when iterating the map, the entries will be returned in insertion order (as oppose to `MapWritable` which does *not* maintain it). However, due to the way Hadoop works, one needs to specify `LinkedMapWritable` as the job map output value (instead of `MapWritable`).
</tip>


### *Old* (`org.apache.hadoop.mapred`) API

Following our example above on radio artists, to get a hold of all the artists that start with *me*, one could use the following snippet:
```java
JobConf conf = new JobConf();
conf.set("es.resource", "radio/artists");       
conf.set("es.query", "?q=me*");                 
conf.setInputFormat(EsInputFormat.class);       
conf.setMapOutputKeyClass(Text.class);          
conf.setMapOutputValueClass(MapWritable.class); 

...
JobClient.runJob(conf);
```

A `Mapper` using `EsInputFormat` might look as follows:
```java
public class MyMapper extends MapReduceBase implements Mapper {
 @Override
 public void map(Object key, Object value, OutputCollector output,
                    Reporter reporter) throws IOException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;      
   ...
 }}
```

<note>
  Feel free to use Java 5 generics to avoid the cast above. For clarity and readability, the examples in this chapter do not include generics.
</note>


### *New* (`org.apache.hadoop.mapreduce`) API

As expected, the `mapreduce` API version is quite similar:
```java
Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists/");            
conf.set("es.query", "?q=me*");                       
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);        
...

job.waitForCompletion(true);
```

and well as the `Mapper` implementation:
```java
public class SomeMapper extends Mapper {
 @Override
 protected void map(Object key, Object value, Context context)
        throws IOException, InterruptedException {
   Text docId = (Text) key;
   MapWritable doc = (MapWritable) value;             
   ...
 }}
```


### Reading from Elasticsearch in JSON format

In the case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can instruct elasticsearch-hadoop to return the data as is. By setting `es.output.json` to `true`, the connector will parse the response from Elasticsearch, identify the documents and, without converting them, return their content to the user as `Text` objects:
```java
Configuration conf = new Configuration();
conf.set("es.resource", "source/category");
conf.set("es.output.json", "true");
```


### Using different indices for reading and writing

Sometimes, one needs to read data from one Elasticsearch resource, process it and then write it back to a different resource inside the *same* job . `es.resource` setting is not enough since it implies the same resource both as a source and destination. In such cases, one should use `es.resource.read` and `es.resource.write` to differentiate between the two resources (the example below uses the *mapreduce* API):
```java
Configuration conf = new Configuration();
conf.set("es.resource.read", "source/category");
conf.set("es.resource.write", "sink/group");
```


## Type conversion

<important>
  If automatic index creation is used, please review [this](/elastic/docs-builder/docs/3028/reference/elasticsearch-hadoop/mapping-types#auto-mapping-type-loss) section for more information.
</important>

elasticsearch-hadoop automatically converts Hadoop built-in `Writable` types to Elasticsearch [field types](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/elasticsearch/mapping-reference/field-data-types) (and back) as shown in the table below:

| `Writable`            | Elasticsearch type |
|-----------------------|--------------------|
| `null`                | `null`             |
| `NullWritable`        | `null`             |
| `BooleanWritable`     | `boolean`          |
| `Text`                | `string`           |
| `ByteWritable`        | `byte`             |
| `IntWritable`         | `int`              |
| `VInt`                | `int`              |
| `LongWritable`        | `long`             |
| `VLongWritable`       | `long`             |
| `BytesWritable`       | `binary`           |
| `DoubleWritable`      | `double`           |
| `FloatWritable`       | `float`            |
| `MD5Writable`         | `string`           |
| `ArrayWritable`       | `array`            |
| `AbstractMapWritable` | `map`              |
| `ShortWritable`       | `short`            |

It is worth mentioning that rich data types available only in Elasticsearch, such as [`GeoPoint`](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/elasticsearch/mapping-reference/geo-point) or [`GeoShape`](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/elasticsearch/mapping-reference/geo-shape) are supported by converting their structure into the primitives available in the table above. For example, based on its storage a `geo_point` might be returned as a `Text` (basically a `String`) or an `ArrayWritable`.