What's new in Apache Spark 2.3?

What's new in Apache Spark 2.3?

By César BEREZOWSKI

May 23, 2018

Let’s dive into the new features offered by the 2.3 distribution of Apache Spark.

This article is a composition of the following talks seen at the DataWorks Summit 2018 and additional research:

  • Apache Spark 2.3 boosts advanced analytics & deep learning by Yanbo Liang, Staff Software Engineer @ Hortonworks
  • ORC Improvement in Apache Spark 2.3 by Dongjoon Hyun, Principal Software Engineer @ Hortonworks Data Science Team

Summary

Spark 2.0 introduced a lot of major updates that improved performances by more than 10 times.

Spark 2.3 is the latest release of the 2.X line, adding the following features:

  • Support for Pandas / Vectorized UDFs in PySpark
  • Support for image representation & reader in DataFrame & Dataset APIs
  • Parallel Machine Learning model tuning
  • New streaming mode “Continuous processing” (experimental)
  • Enable stream-to-stream joins
  • New Datasource API V2 (beta)
  • Native ORC Support
  • Spark on Kubernetes (experimental)

Advanced analytics UDFs

The Spark community worked a lot on Python UDFs.

Since Spark 0.7, Python UDFs would apply transformation on data one row at a time, which lead to high serialization and invocation overhead. This of course had a huge performance impact on a PySpark job and modified the Data Scientist’s workflow to develop jobs (either use Scala and learn a new language or work in pair programming with a developper assisting a Data Scientist, or accept poor performances on your job).

With Spark 2.3 you can now encapsulate and manipulate your DataFrame with the Pandas library in a PySpark UDF and have proper performances. These UDFs are called Vectorized UDFs (or Pandas UDFs). They are the result of two complementary efforts from the development team:

DataFrames are now serialized using Apache Arrow which allows a faster conversion between Spark & Pandas formats (about 10 to several 100 times faster).

The resulting Pandas UDFs are split into two categories:

  • scalar: apply a transformation to every value of the column, ex: v => v + 1.
  • grouped map: apply a “split - apply - combine” pattern on a DataFrame that you previously grouped, ex: v => v - mean(v) (for each value of each group, substract the mean of the group’s values).

To enable Pandas UDFs you need to set the following Spark property:

spark.sql.execution.arrow.enabled = true

More on the subject here.

Learning

Deep Learning can be defined as a “set of Machine Learning techniques that can learn useful representations of features directly from images, text and sound”

MLlib is a simple and concise API allowing you to create pipelines for Machine Learning that leverage Spark’s capacity to scale and compute huge workloads.

Support for image representation and reader in DataFrame & Dataset APIs

MLlib is a Machine Learning library, however you could also use it to process a TensorFlow graph or Keras Model as a Transformer or even a SparkSQL function, and use it for deep learning.

Databricks open sourced a Spark Package (spark-deep-learning) to do just that and recently released version 0.3.0.

However, to be able to process images for Deep Learning, you have to be able to read them, which is exactly what SPARK-21866 is about. From version 2.3 Spark can read an image from a path and parse it as a DataFrame with the following schema:

root 
├── image: struct (nullable = true)
│   ├── origin: string (nullable = true)
│   ├── height: integer (nullable = false)
│   ├── width: integer (nullable = false)
│   ├── nChannels: integer (nullable = false)
│   ├── mode: string (nullable = false)
│   └── data: binary (nullable = false)

The DataFrame contains an image Structure with the image’s metadata and the binary data based on OpenCV conventions on which you can apply trained Models.

Parallel Machine Learning model tuning

Up until Spark 2.3, when training and tuning a Machine Learning model, Spark would sequentially train the same model with multiple parameters using the algorithms CrossValidator or TrainValidationSplit to determine the ones which gives the best performances. It would parallelize each training but that’s it. SPARK-19357 introduces a parallelism parameter to the algorithms that defines the number of training jobs to run at a time and in doing so, allows a better usage of a Spark cluster’s resources.

More on the subject here and here.

Streaming

Spark 2.3 introduces a number of Streaming novelties that enhance a lot its capacities and allows it to bring Streaming a bit closer to Apache Flink’s current capacities.

Continuous processing (experimental)

Spark Streaming used to be micro-batch processing at low latency (~100ms) with guaranteed exactly-once fault-tolerance. The new experimental triggering mode introduced in 2.3, Continuous Processing, allows for lower latency (~1ms) with at-least-once fault-tolerance guarentees.

Since Continuous Processing is a triggering mode and not a specific new engine, its checkpoints are compatible with micro-batch mode.

A Continuous Processing stream supports any SparkSQL function (other than aggregation) but only map-like operations (select, where, map, filter, …).

Some things to note:

  • A Continuous Processing query will run as much Spark parallel tasks as the number of partitions it will read from so you need to provide enough cores on your cluster (e.g 10 Kafka partitions = 10 cores).
  • Stopping a Continuous Processing stream may produce (ignorable) task termination warnings.
  • There’s no automatic retry for failed tasks, thus resulting in a failed query that has to be restarted manually from a checkpoint.

Stream-stream join

Spark 2.0 introduced Stream-static joins, allowing to join a stream with a static DataFrame/DataSet (think reference table). Spark 2.3 now allows joining between two data streams.

To be able to assert proper joining at any point of time, past streaming states are buffered so that any received row can be matched with future rows from the other stream. To not saturate Spark’s buffering memory, a watermark delay must be defined on both input so that the engine knows how delayed the input can be and a time constraint must be set on the JOIN query (either time range condition or event-time window).

The watermark + time constraint is optional for INNER JOIN queries but mandatory for LEFT and RIGHT OUTER JOIN. FULL OUTER JOIN queries are not currently supported.

Caveats of outer joins is that unassociated rows (NULL results) will be generated at expiration of the watermark and if using micro-batch mode, data of the current batch with expired watermark is only cleared when the next batch is launched (e.g. when new data arrive) thus an expired result may be delayed longer than the watermark.

More on Continuous Processing and Stream-stream joins in Spark’s Structured Streaming programming guide.

Datasource API V2 (beta)

Spark’s Datasource API makes it capable to integrate with a large number of input sources (Hive, Avro, CSV, Parquet, …) with finer interactions than just a read and load process.

However the V1 has its weaknesses: leaking upper-level API in the data source through DataFrame & SQLContext, zero transaction guarentees when writing and it’s hard to extend if you need to do any optimization. This is what motivated the Databricks team to design a V2 API with the following goals in mind:

  • Written in Java.
  • No dependency on upper-level APIs (DataFrame, RDD, …).
  • Easy to extend, can add new optimizations while keeping backward compatibility.
  • Can report physical information like size, partitioning, …
  • Supports streaming source/sink.
  • Write API is powerful, transactional and flexible.
  • No change to end users.

For Spark’s 2.3 release, the following features are available to test (it is still undergoing development and more will come with 2.4):

  • Support for row-based scan and columnar scan.
  • Column pruning and filter push-down.
  • Can report basic statistics and data partitioning.
  • Transactional write API.
  • Streaming source and sink support for micro-batch and continuous mode.

A more in-depth analysis of Datasource API V2 is available here.

ORC improvements

Before the Apache ORC project, Spark (since version 1.4) used a legacy code based on Hive 1.2.1 to read and write ORC files. When the project came out the reader was improved however there was still a dependency on Hive and some issues resulting in mismatches between what ORC should be and what Spark produced (unicode / dots in column names, schema evolutions, empty ORC files on empty DataFrame partitions, …).

Spark 2.3 changes all of this by adding a new native vectorized ORC reader in the sql/core module that is fully independent from Hive. The new OrcFileFormat (see SPARK-20682) includes a writer and a vectorized reader which improves performances by a 2 to 5 factor.

To be able to use it, either set spark.sql.orc.impl=native and specify source/target format as orc, or set source/target to org.apache.spark.sql.execution.datasources.orc (see SPARK-20728). Until Spark 2.4, the default ORC implementation remains Hive to maintain compatibility on old data.

Spark is also able to convert existing datasource tables to use the vecorized reader, just set spark.sql.hive.convertMetastoreOrc=true (default = false, native reader required).

Finally the reader supports Schema evolution. Instead of dropping and recreating a table as you had to do previously, you can add new columns at the end, hide columns, change a column’s type and position.

The reader also does not support ACID features (there’s no activity on the subject among the Spark community) or bucketting (Spark differed from Hive and there are difficulties merging both implementations, not much progress on the subject).

Spark on Kubernetes (experimental)

There’s a new experimental scheduler that allows to submit Spark jobs to a cluster managed by Kubernetes instead of YARN, Mesos or Standalone Spark. Driver and executors are deployed in Kubernetes Pods using a Docker image specified with the configuration parameter spark.kubernetes.container.image=/path/to/img (a Dockerfile is shipped with Spark 2.3). Application dependencies should be either added to a custom-build Docker image or referenced remotely.

See more in SPARK-18278 and documentation.

Further reading

Canada - Morocco - France

International locations

10 rue de la Kasbah
2393 Rabbat
Canada

We are a team of Open Source enthusiasts doing consulting in Big Data, Cloud, DevOps, Data Engineering, Data Science…

We provide our customers with accurate insights on how to leverage technologies to convert their use cases to projects in production, how to reduce their costs and increase the time to market.

If you enjoy reading our publications and have an interest in what we do, contact us and we will be thrilled to cooperate with you.