Let’s dive into the new features offered by the 2.3 distribution of Apache Spark.
This article is a composition of the following talks seen at the DataWorks Summit 2018 and additional research:
- Apache Spark 2.3 boosts advanced analytics & deep learning by Yanbo Liang, Staff Software Engineer @ Hortonworks
- ORC Improvement in Apache Spark 2.3 by Dongjoon Hyun, Principal Software Engineer @ Hortonworks Data Science Team
Spark 2.0 introduced a lot of major updates that improved performances by more than 10 times.
Spark 2.3 is the latest release of the 2.X line, adding the following features:
- Support for Pandas / Vectorized UDFs in PySpark
- Support for image representation & reader in DataFrame & Dataset APIs
- Parallel Machine Learning model tuning
- New streaming mode “Continuous processing” (experimental)
- Enable stream-to-stream joins
- New Datasource API V2 (beta)
- Native ORC Support
- Spark on Kubernetes (experimental)
The Spark community worked a lot on Python UDFs.
Since Spark 0.7, Python UDFs would apply transformation on data one row at a time, which lead to high serialization and invocation overhead. This of course had a huge performance impact on a PySpark job and modified the Data Scientist’s workflow to develop jobs (either use Scala and learn a new language or work in pair programming with a developper assisting a Data Scientist, or accept poor performances on your job).
With Spark 2.3 you can now encapsulate and manipulate your DataFrame with the Pandas library in a PySpark UDF and have proper performances. These UDFs are called Vectorized UDFs (or Pandas UDFs). They are the result of two complementary efforts from the development team:
- Spark-21187: Use Apache Arrow for data type conversion.
- Spark-22216: Improve PySpark/Pandas interoperability.
The resulting Pandas UDFs are split into two categories:
- scalar: apply a transformation to every value of the column, ex:
v => v + 1.
- grouped map: apply a “split - apply - combine” pattern on a DataFrame that you previously grouped, ex:
v => v - mean(v)(for each value of each group, substract the mean of the group’s values).
To enable Pandas UDFs you need to set the following Spark property:
spark.sql.execution.arrow.enabled = true
More on the subject here.
Deep Learning can be defined as a “set of Machine Learning techniques that can learn useful representations of features directly from images, text and sound”
MLlib is a simple and concise API allowing you to create pipelines for Machine Learning that leverage Spark’s capacity to scale and compute huge workloads.
MLlib is a Machine Learning library, however you could also use it to process a TensorFlow graph or Keras Model as a Transformer or even a SparkSQL function, and use it for deep learning.
Databricks open sourced a Spark Package (spark-deep-learning) to do just that and recently released version 0.3.0.
However, to be able to process images for Deep Learning, you have to be able to read them, which is exactly what SPARK-21866 is about. From version 2.3 Spark can read an image from a path and parse it as a DataFrame with the following schema:
root ├── image: struct (nullable = true) │ ├── origin: string (nullable = true) │ ├── height: integer (nullable = false) │ ├── width: integer (nullable = false) │ ├── nChannels: integer (nullable = false) │ ├── mode: string (nullable = false) │ └── data: binary (nullable = false)
The DataFrame contains an image Structure with the image’s metadata and the binary data based on OpenCV conventions on which you can apply trained Models.
Up until Spark 2.3, when training and tuning a Machine Learning model, Spark would sequentially train the same model with multiple parameters using the algorithms
TrainValidationSplit to determine the ones which gives the best performances. It would parallelize each training but that’s it. SPARK-19357 introduces a
parallelism parameter to the algorithms that defines the number of training jobs to run at a time and in doing so, allows a better usage of a Spark cluster’s resources.
Spark 2.3 introduces a number of Streaming novelties that enhance a lot its capacities and allows it to bring Streaming a bit closer to Apache Flink’s current capacities.
Spark Streaming used to be micro-batch processing at low latency (~100ms) with guaranteed exactly-once fault-tolerance. The new experimental triggering mode introduced in 2.3, Continuous Processing, allows for lower latency (~1ms) with at-least-once fault-tolerance guarentees.
Since Continuous Processing is a triggering mode and not a specific new engine, its checkpoints are compatible with micro-batch mode.
A Continuous Processing stream supports any SparkSQL function (other than aggregation) but only map-like operations (select, where, map, filter, …).
Some things to note:
- A Continuous Processing query will run as much Spark parallel tasks as the number of partitions it will read from so you need to provide enough cores on your cluster (e.g 10 Kafka partitions = 10 cores).
- Stopping a Continuous Processing stream may produce (ignorable) task termination warnings.
- There’s no automatic retry for failed tasks, thus resulting in a failed query that has to be restarted manually from a checkpoint.
Spark 2.0 introduced Stream-static joins, allowing to join a stream with a static DataFrame/DataSet (think reference table). Spark 2.3 now allows joining between two data streams.
To be able to assert proper joining at any point of time, past streaming states are buffered so that any received row can be matched with future rows from the other stream. To not saturate Spark’s buffering memory, a watermark delay must be defined on both input so that the engine knows how delayed the input can be and a time constraint must be set on the JOIN query (either time range condition or event-time window).
The watermark + time constraint is optional for
INNER JOIN queries but mandatory for
RIGHT OUTER JOIN.
FULL OUTER JOIN queries are not currently supported.
Caveats of outer joins is that unassociated rows (
NULL results) will be generated at expiration of the watermark and if using micro-batch mode, data of the current batch with expired watermark is only cleared when the next batch is launched (e.g. when new data arrive) thus an expired result may be delayed longer than the watermark.
More on Continuous Processing and Stream-stream joins in Spark’s Structured Streaming programming guide.
Spark’s Datasource API makes it capable to integrate with a large number of input sources (Hive, Avro, CSV, Parquet, …) with finer interactions than just a read and load process.
However the V1 has its weaknesses: leaking upper-level API in the data source through DataFrame & SQLContext, zero transaction guarentees when writing and it’s hard to extend if you need to do any optimization. This is what motivated the Databricks team to design a V2 API with the following goals in mind:
- Written in Java.
- No dependency on upper-level APIs (DataFrame, RDD, …).
- Easy to extend, can add new optimizations while keeping backward compatibility.
- Can report physical information like size, partitioning, …
- Supports streaming source/sink.
- Write API is powerful, transactional and flexible.
- No change to end users.
For Spark’s 2.3 release, the following features are available to test (it is still undergoing development and more will come with 2.4):
- Support for row-based scan and columnar scan.
- Column pruning and filter push-down.
- Can report basic statistics and data partitioning.
- Transactional write API.
- Streaming source and sink support for micro-batch and continuous mode.
A more in-depth analysis of Datasource API V2 is available here.
Before the Apache ORC project, Spark (since version 1.4) used a legacy code based on Hive 1.2.1 to read and write ORC files. When the project came out the reader was improved however there was still a dependency on Hive and some issues resulting in mismatches between what ORC should be and what Spark produced (unicode / dots in column names, schema evolutions, empty ORC files on empty DataFrame partitions, …).
Spark 2.3 changes all of this by adding a new
native vectorized ORC reader in the sql/core module that is fully independent from Hive. The new OrcFileFormat (see SPARK-20682) includes a writer and a vectorized reader which improves performances by a 2 to 5 factor.
To be able to use it, either set
spark.sql.orc.impl=native and specify source/target format as
orc, or set source/target to
org.apache.spark.sql.execution.datasources.orc (see SPARK-20728). Until Spark 2.4, the default ORC implementation remains Hive to maintain compatibility on old data.
Spark is also able to convert existing datasource tables to use the vecorized reader, just set
spark.sql.hive.convertMetastoreOrc=true (default =
false, native reader required).
Finally the reader supports Schema evolution. Instead of dropping and recreating a table as you had to do previously, you can add new columns at the end, hide columns, change a column’s type and position.
The reader also does not support ACID features (there’s no activity on the subject among the Spark community) or bucketting (Spark differed from Hive and there are difficulties merging both implementations, not much progress on the subject).
There’s a new experimental scheduler that allows to submit Spark jobs to a cluster managed by Kubernetes instead of YARN, Mesos or Standalone Spark. Driver and executors are deployed in Kubernetes Pods using a Docker image specified with the configuration parameter
spark.kubernetes.container.image=/path/to/img (a Dockerfile is shipped with Spark 2.3). Application dependencies should be either added to a custom-build Docker image or referenced remotely.
- Apache Arrow
- Spark JIRA 21187
- Spark JIRA 22216
- Databricks’ article on Vectorized UDFs for PySpark
- Databricks’ Spark package for Deep Learning
- Spark JIRA 18278
- Spark JIRA 19357
- Spark JIRA 20682
- Spark JIRA 20728
- Spark JIRA 21866
- Spark JIRA 23355
- Bryan Cutler’s article on parallel moodel training
- Spark’s Structured Streaming programming guide
- Spark’s Upcoming 2.3 features slides
- IBM’s introduction to Datasource API V2
- Running Spark on Kubernetes