I will describe in this blog post the difference between the mapred.* and mapreduce.* API in Hadoop with respect to the custom InputFormats and OutputFormats. Additionally I will write on the impact of having both APIs on the Hadoop Ecosystem and related Big Data platforms, such as Apache Flink, Apache Hive and Apache Spark. Finally, I will conclude and outlook future needs for innovation.
These APIs are of key importance for nearly all Big Data applications based on popular Big Data platforms. They all leverage these APIs to connect to various file and non-filebased data sources and sinks. These APIs offer transparent access to open any custom or standard file format on any storage on-premise (e.g. HDFS) or in the cloud (e.g. Amazon S3 or Microsoft Azure Blob Storage). Furthermore, they can be used to transparently decompress files using any given codec without the need that the fileformat needs to support the codec. This allows introducing new compression formats without changing the underlying fileformat (* this might be not in all situations useful, e.g. in case of of unsplitable compression formats. Hence ORC / parquet use a different approach for parallel processing of compressed data).
Basically from a custom input/output format perspective both APIs provide essentially the same functionality and performance, but the interface is different. The APIs have merely been introduced to provide different ways to express MapReduce jobs, but this is not the focus here. Historically the idea was to move from the older mapred.* API to the new mapreduce.* API. In fact, mapred.* was already marked as deprecated with the aim to be removed. However, a very rare event happened in the software world to un-deprecate the mapred.* API. Officially, the reason was that the new mapreduce.* API was not yet finished leading to confusions and deprecated warnings. Interestingly, there was the plan already to deprecate the mapred.* API again, but it has never been executed (status: 10/2017).
Effect on Big Data platforms
I mentioned before that several open and commercial Big Data platforms essentially rely on this APIs, because they want to have out-of-the box support for various filesystems and file formats without reinventing the wheel. This is how the coexistence of the APIs affected them.
Naturally, since the mapred.* and mapreduce.* API stem from Hadoop MapReduce both are supported. However, in the supplied documentation for Hadoop you find only examples using the mapreduce.* API. Popular books, such as “Hadoop – The Definitive Guide” by Tom White also only use the mapreduce.* API.
Since Hive is already a very mature product and was open sourced by Facebook rather recently after Hadoop appeared it does use the mapred.* API. This means to leverage any custom input/output format in Hive (more about this later) it needs to use the mapred.* API. If the input/output format supports only the new mapreduce.* API then an error will be thrown in Hive. Note that for output formats you need to extend the HiveOutputFormat which is based on the mapred.* API. You can integrate any mapred.* output format by using the HivePassThroughOutputFormat.
Apache Flink supports custom Hadoop Input and Output formats for the mapred.* and mapreduce.* API.
Additionally you can implement custom Flink Input and Output formats. I found the API very concise and if you have anyway already developed a reusable input/output format to support the two Hadoop APIs then it is rather trivial to create a custom Flink FileInput and FileOutput format.
Apache spark supports custom Hadoop Input and Output formats for the mapred.* and mapreduce.* API. In case of a FileInput format you can use the hadoopRDD(mapred.*)/newAPIHadoopRDD (mapreduce.*) methods of the SparkContext class. In case of a FileOutput format you can use the saveAsHadoopFile (mapred,*)/saveAsNewHadoopFile (mapreduce.*) methods of the PairRDD class.
This loads/saves data only from/to RDDs in Spark. However, in order to use the modern DataFrame (aka Dataset<Row>) API with all its features, such as compact and highly efficient in-memory representation, you need to convert them explicitly which takes time. This is, for example, not the case for Flink which always uses the highly efficient in-memory representation.
Apache Spark has a DataSource API, which can be compared to a custom InputFormat in Spark terms. However, it is not as well designed as the one for Apache Flink. For instance, most data loading/storing operations happening in the driver (the node running the program that coordinates the execution of distributed tasks), which makes the driver a bottleneck. Certain DataSources avoided this by calling the HadoopRdd/newAPIHadoopRDD methods of Spark within the data source, which distributes loading to the executors and not the driver. For instance, this is the way the HadoopCryptoLedger library to read blockchains, such as Bitcoin, did it.
Hence, at some point of time the Apache Spark project decided to add certain data sources to the Apache Spark source code and use not the public data source API and create a dedicated internal data source API bound to each Spark release. Hence, these formats did not need to use the HadoopRdd/newAPIHadoopRDD methods, but could be more native. Nevertheless, as mentioned before, the Hadoop APIs are still used to read/write from/to various different filesystem or compressed files.
Other external data sources, such as the one for the Avro format or the HadoopOffice library for reading/writing office documents, such as Excel, then also leveraged these internal APIs, which had the disadvantage that they may not work immediately for new Spark version, because the internal APIs arbitrarily changed.
In the end the Spark developers recognized that this is not an optimal situation and started thinking about a data source V2 API.
Impala supports only a fixed set of Input/Output formats and no custom input or iutput format. These fixed input/output formats are based on the Hadoop APIs for the aforementioned reasons. Nevertheless, you can use any custom input/output format by defining the tables in Hive and letting Impala read/write via Hive.
Apache Beam is a programing framework in which you express computation once and it can be executed on different big data engines, such as Apache Apex, Apache Flink, Apache Spark, Google Cloud Dataflow and Apache Gearpump.
This means it can be easier to switch between different platforms depending on needs at the cost that you may not always be able to use the latest new feature of the specific engine.
Apache Beam supports reading of Hadoop InputFormats using its HadoopInputFormatIO interface. Apache Beam supports writing of Hadoop OutputFormats using its HDFSFileSink interface. Both support only the new mapreduce.* API.
Effect on file formats
Provided as part of Hadoop
|Mapreduce.* API||Mapred.* API|
|MultipleInputs (covers more than mapred.*)||MultiFileInputFormat|
|MultipleOutputs (covers more than mapred.*)||MultipleOutputFormat|
|MultipleOutputs (covers more than mapred.*)||MultipleSequenceFileOutputFormat|
|MultipleOutputs (covers more than mapred.*)||MultipleTextOutputFormat|
It can be noted that both APIs are comparable, but you can see already that there are difference, for example, the MultipleInputs and MultipleOutputs format of mapreduce.* is much more flexible, e.g. you can have different input or outputformats at the same time in comparison to the mapred.* API, you can use MultipleOutputs in map and reduce functions and you can have different key value types.
Reading/Writing of JSON files is already implicitly covered in this API. Usually one use the TextInputFormat and parses each line as a JSON object or one uses the NLineInputFormat and parsing several lines as one JSON object. Similarly you can write JSON as output.
Hint: Many developers are not aware of all of those input and output formats available out of the box in Hadoop, but changing from one of the more known to the more unknown ones can drastically improve performance.
Apache Avro is a compact exchange format between Big Data platforms. It is much more efficient than XML (in terms of space and computational needs) and should be used for exchanging large data volumes of structured data.
Avro supports both APIs and is supported by virtually all Big Data platforms
Apache ORC is an extremely efficient data query format. It supports predicate pushdown so that data that is not relevant for a query can automatically skipped without reading it. This reduces query time significantly. More details can be found here.
It is virtually supported by all Big Data platforms.
ORC supports both APIs.
Apache Parquet is another efficient data query format. It also supports predicate pushdown.
It is virtually supported by all Big Data platforms
Apache Parquet supports both APIs, but the mapred.* one is named Deprecated… However, it is clearly not marked as deprecated and thus both APIs are supported.
Apache Hbase is a columnar key/value store suitable for reading frequently small data volumes out of a large dataset or for writing frequently small data volumes into a large dataset.
Hbase can be read using the TableInputFormat and written using the TableOutputFormat.
Hbase supports both APIs.
HadoopCryptoLedger supports both APIs due to the need to cater for many Big Data platforms, but also commercial internal developments using different APIs.
HadoopOffice supports reading/writing of office formats, such as MS Excel, on Big Data platforms. It also supports many Big Data platforms.
HadoopOffice supports both APIs due to the need to cater for many Big Data platforms, but also commercial internal developments using different APIs.
Apache Kudu is a format that supports querying large data volumes and writing small data volumes in large datasets. Performance-wise it is lower than Hbase for writing and lower than ORC or Parquet for querying. However, the advantage is that it is one format. It has a much larger operational complexity, because several additional non-standard daemons are added to the nodes in a Big Data cluster to manage it.
Apache Kudu only supports the mapreduce.* API.
Apache Mahout (a machine learning library for Hadoop) supports reading XML files using its input format.
It only supports the mapreduce.* API.
Hadoop has for historically reasons two APIs to describe input and output formats (as well as mapreduce jobs). These formats are leveraged by all Big Data platforms for the aforementioned benefits. However, in fact there are not so many input/output formats. Hence, developers will need to provide their own ones to be even more efficient for processing data or simply for processing legacy formats.
Basically the advice here is as follows.
- Developers that leverage only existing input/output formats should use the mapreduce.* API wherever possible.
- Developers that create their custom input/output format should write generic classes as much as possible independent of the API. Then, they should first provide a mapreduce.* API and afterwards a mapred.* API. These are the reasons
- Generic classes are highly beneficial to write Big Data platform specific input/output formats. For example, Flink or Spark can use Hadoop formats, but it can be beneficial to provide them a „native“ format. It has been proven highly beneficial for me for writing the HadoopCryptoLedger and HadoopOffice library.
- You will support all important platforms. Although Hive seems to be the only one requiring the mapred.* API, it is a very important one. It is the defacto standard to connect analytical tools via a SQL API. It has been out since a long time and is used nowadays in many organizations. Additionally, there is a lot of internal (non Hive-related) code in organizations that may still use the mapred.* API (however there are no statistics about this).
- If you have written unit tests for one of the APIs (which is mandatory) it is easily to derive the same unit test case for the other API
Nevertheless, there is still a lot of potential of innovation in the area of input/output formats:
For high performance processing it would be beneficial to support complex predicate pushdown operations including statistics.
Another improvement for performance would be that the format on disk is the same as in-memory to avoid costly serializations/deserializations. This is currently the case for none of the Big Data platforms.
- Formats supporting legacy formats are rare and many are missing. This means they need to be converted locally without support of a Big Data platform which introduces additional costs ansd complexity. Here, artificial intelligence may be used to extract meaningful data out of unknown or not well-specified formats.
Proof of validation. A lot of data needs to have more or less complex validations (e.g. validating that a date field is valid or that references to other data are correct). Usually these validations are done in the source system and in the destination system again. A proof of validation would be a cryptographically secured proof that the source system has done the validations, so that the destination system does not have to redo them again.
The complex quest of OLTP and OLAP loads using the same format has not been solved.
Formats supporting other data structures besides tables, such as graphs, hash maps etc. are rare, but could lead for certain analytic scenarios to drastically improved performance.
Input/output formats supporting encryption at rest and while processing do not exist. For instance, MIT CryptDB allows query over encrypted data without decrypting it. A similar approach for input/output format could signficantly increase certain security aspects of data.
Leveraging of new CPU security features, such as Intel SGX, to isolate processing of formats in-memory from other processes in-memory.