Publish Spark SQL DataFrame and RDD with Spark Thrift Server
The distributed and in-memory nature of the Spark engine makes it an excellent candidate to expose data to clients which expect low latencies. Dashboards, notebooks, BI studios, KPIs-based reports tools commonly speak the JDBC/ODBC protocols and are such examples.
Spark Thrift Server may be used in various fashions. It can run independently as Spark standalone application or be embedded in the existing Spark driver. It can mount into RAM the data stored inside the Hive Data Warehouse or expose a used-defined DataFrame/RDD of a Spark job.
Spark Thrift Server is a JDBC/ODBC server which is built on top of Hive’s HiveServer2. With Spark Thrift Server, data is exposed to any JDBC client such as Hive’s shell called
beeline, or any application supporting the JDBC/ODBC protocol. Datasets stored inside Hive’s Data Warehouse are exposed to applications which will leverage Spark engine through the SQL language. The SQL queries handled by Spark Thrift Server are executed with Spark’s SQL module. This gateway takes advantage of Spark’s distributed in-memory computing capability without writing any Python or Scala code.
The main difference between Spark’s Thrift Server and Hive’s HiveServer2 is that the former enforces usage of Spark by executing SQL queries using Spark’s SQL engine while the later may leverage several engines such as MapReduce, Tez or even Spark. Being based on HiveServer2, Spark Thrift Server uses Hive’s Data Warehouse to store persistent data and Hive’s Metastore to describing data (table names, column names in each table, schema, storage formats, partitions and so forth).
Spark is downloaded from its official website. The pre-built Spark releases (2.4.x, 2.3.x, 2.2.x) are available with Hive 1.2.1 support enabled and Spark Thrift Server ready to be run. When building Spark from source code, make sure to add
-Phive-thriftserver profiles to build options. Here’s how to create a small project and install the latest version of Spark:
mkdir spark_thrift_project cd $_ curl http://mirrors.standaloneinstaller.com/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz -o spark-2.4.0-bin-hadoop2.7.tgz tar xzf spark-2.4.0-bin-hadoop2.7.tgz ln -sf spark-2.4.0-bin-hadoop2.7 spark
In the context of this article, Spark will be executed without Hadoop or YARN/Mesos/Kubernetes, but with its own native cluster manager. To run a standalone local cluster, 3 components will be started:
- Spark Master with the Driver program process: central coordinator distributing tasks to execute among workers
- Spark Worker node with executor processes: carrying out the task work assigned to them by the master and report back
- Spark Thrift Server: JDBC/ODBC server allowing Spark SQL act as a distributed query engine
./spark/sbin/start-master.sh \ --host 0.0.0.0 ./spark/sbin/start-slave.sh \ spark://localhost:7077 ./spark/sbin/start-thriftserver.sh \ --total-executor-cores 2 \ --master spark://localhost:7077
The Spark master is now accepting connection on port
7077 with the address
spark://localhost:7077. You can also access its WebUI on port 8080 (
http://localhost:8080/). Note that, by default, Spark’s Master will not listen on localhost on port
7077, only its WebUI will. Passing the
--hostwith the value
0.0.0.0 will allow any connection on both ports. Browsing the WebUI, you shall see the Spark Slave service being registered as a worker. Spark Thrift Server is also started and marked as a running application.
By default, metastore_db directory is created in “./”, but that can be changed to an absolute path by setting the
javax.jdo.option.ConnectionURL property. Also, upon persisting the data later on, spark-warehouse directory will be created in “./” which can be changed by setting the
Logs available in “./spark/logs” provide additional information about each of the 3 components.
For the purpose of this article, the dataset shared by the New York City Taxi & Limousine Commission is used.
curl https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2017-02.csv -o input.csv head -1 input.csv > header.csv tail -n +3 input.csv > data.csv
head command isolates the header name present in the first line into the header.csv file. The
tail command extracts the data and saves them into the data.csv file.
Being built on Hive, Spark Thrift Server makes it easy to manipulate and expose Hive tables through JDBC interface without having to define a DataFrame. The SQL queries sent to Spark Thrift Server are interpreted with Spark SQL and processed with the Spark in-memory engine. Internally, Spark’s Thrift Server connects to Hive’s Metastore to retrieve metadata information and fetch the data from Hive’s Data Warehouse.
Once started, Spark Thrift Server exposes a WebUI (
http://localhost:4040) available on port
4040. No running or completed jobs are displayed yet.
beeline tool is an interactive shell to write and send SQL queries. First, a connection must be established with Spark Thrift Server:
./spark/bin/beeline \ -u jdbc:hive2://localhost:10000 \ -n usr \ -p pass
Once beeline is connected, it is possible to execute SQL queries. The following code defines a Hive table in Hive’s Data Warehouse and loads data from the CSV file created above.
CREATE TABLE taxi_rides ( VendorID STRING, tpep_pickup_datetime STRING, tpep_dropoff_datetime STRING, passenger_count STRING, trip_distance STRING, RatecodeID STRING, store_and_fwd_flag STRING, PULocationID STRING, DOLocationID STRING, payment_type STRING, fare_amount STRING, extra STRING, mta_tax STRING, tip_amount STRING, tolls_amount STRING, improvement_surcharge STRING, total_amount STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ","; SHOW TABLES; LOAD DATA LOCAL INPATH 'data.csv' OVERWRITE INTO TABLE taxi_rides;
At this stage, the dataset is stored and managed by Spark Thrift Server. No Spark job is marked as running or completed because none of the previous actions triggered the usage of Spark.
SparkSQL eases the expression of simple RDD operations. For example, counting the number of rows in SQL is as easy as:
SELECT count(*) FROM taxi_rides;
Going back to the WebUI (
http://localhost:4040/jobs/), a Spark job has now completed with the description
SELECT count(*) FROM taxi_rides run at AccessController.java:0.
The count query illustrates how to read data stored inside the Hive Data Warehouse and process it with Spark’s engine. It does not load the data in memory to reduce the latency of subsequent queries. It is, however, possible to cache a DataFrame which will allow transparent use of SparkSQL processing without having to access the disk for subsequent queries.
CACHE TABLE SQL instruction loads a table into memory. It is possible to see the storage memory used by an executor from the WebUI’s Executors page (
http://localhost:4040/executors/) page. The “Storage Memory” column indicates no memory associated with the previous queries. Also, the RDD Storage page (
http://localhost:4040/storage/) contains no information at the moment. The SQL query to mount the taxi_rides dataset in memory and to expose it as a taxi_rides_cache temporary table is:
CACHE TABLE taxi_rides_cache AS SELECT * FROM taxi_rides;
Note, the table will not be persisted after the restarting Spark Thrift Server.
In the WebUI’s Executors page (
http://localhost:4040/executors/), used memory from the “Storage Memory” column now displays several MB. The RDD Storage page (
http://localhost:4040/storage/) now shows information about stored data.
Let’s compare performance of processing uncached and cached data from two tables: taxi_rides and taxi_rides_cache. The query below execute a
GROUP BY query on payment_type:
SELECT payment_type, count(*) FROM taxi_rides GROUP BY payment_type;
Running the query several times takes on average 4 seconds.
The same query using the taxi_rides_cache table is:
SELECT payment_type, count(*) FROM taxi_rides_cache GROUP BY payment_type;
Running the query takes between 1 and 2 seconds. Execution time is low due to storing data in a cluster-wide memory and avoided I/O operations on disk. High performance satisfies thus use cases such as interactive analysis.
Those measures will become more relevant with larger datasets as well as with more complex queries.
UNCACHE TABLE statement releases taxi_rides table from memory:
UNCACHE TABLE taxi_rides_cache;
Observing “Storage Memory”, it shows now “0 MB / 384.1 MB” since no more data is persisted in the workers’ RAM anymore.
It is also possible to expose a DataFrame created by a custom Spark application. Spark Thrift Server can be embedded inside a Spark application, exposing DataFrames though JDBC/ODBC.
If writing your job in Python, the
startWithContext() method of
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 Scala object start the server programmatically. To import it in Python, the py4j Java gateway is used. The infinite while loop at the end keeps Spark Thrift Server running. Otherwise, the job would be completed and the JDBC gateway would no longer be available.
Let’s create a “./thriftserver-in-context.py” script:
from pyspark.sql import SparkSession from py4j.java_gateway import java_import import time spark = SparkSession.builder \ .appName("Embedding Spark Thrift Server") \ .config("spark.sql.hive.thriftServer.singleSession", "True") \ .config("hive.server2.thrift.port", "10001") \ .config("javax.jdo.option.ConnectionURL", \ "jdbc:derby:;databaseName=metastore_db2;create=true") \ .enableHiveSupport() \ .getOrCreate() df = spark.read.option("header","true").csv("input.csv").cache() df.createOrReplaceTempView("taxi_rides") sc = spark.sparkContext java_import(sc._gateway.jvm, "") #Start Spark Thrift Server using the jvm and passing the SparkSession sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver \ .HiveThriftServer2.startWithContext(spark._jwrapped) while True: time.sleep(5)
Note, the Spark Thrift Server is listening on port
10001 to avoid potential conflicts with another server running on the default
Deploy the code to the local cluster with
./spark/bin/spark-submit \ --master spark://localhost:7077 \ --total-executor-cores 2 \ ./thriftserver-in-context.py
The logs from Thrift Server are now printed to the Spark job stdout.
Neither the metastore_db nor spark-warehouse directories were created in the current working directory. When embedded, Spark Thrift Server no longer creates Hive’s Metastore and Data Warehouse for Spark temporary tables. To confirm the in-memory table is exposed to JDBC clients, launch
beeline and execute
./spark/bin/beeline \ -u jdbc:hive2://localhost:10001 \ -n usr -p pass \ -e 'SHOW TABLES;'
Run same SQL query as before:
SELECT payment_type, count(*) FROM taxi_rides GROUP BY payment_type;
Running the query takes between 2 to 3 seconds. The completed job is displayed on WebUI (
After closing the application, the table won’t be available anymore due to the volatile storage. If the underlying storage of the Spark application is persistent, the DataFrame could be persisted to a Hive table in the Data Warehouse by calling also
In a distributed environment, launching Spark Thrift Server in the Spark Application context has drawbacks. In such environments, multiple applications coexist on the same host and, unless the Spark job is assigned a dedicated IP like in Kubernetes, there is a risk of port collision. Also, orchestrators such as Kubernetes, Hadoop YARN and Mesos, will dynamically provision the Spark driver inside one of its managed worker/slave nodes. Thus, we do not know in advance the address of Spark Thrift Server. It must be published in some location, such as Zookeeper or ETCD, and fetched by the client. This issue is elegantly solved by Kubernetes with a service definition and the usage of Ingress routes. Finally, most of the distributed environments are secured by firewall and with network isolation. Spark applications executed from within the cluster are not accessible from the outside and it will be impossible for an external client to connect to the Spark Thrift Server without the presence of a proxy routing the requests.
Integration of Spark and Hadoop Hive creates new challenges in exposing data. Previously, Hive Metastore was using local Derby database created automatically by Spark Thrift Server. In Hadoop environment, there is already Hive’s Warehouse with a remote database holding Metastore. It implies that Spark Thrift Server needs to be configured to use the existing database in order to access data in Hive tables on the cluster.
On a basic Hadoop cluster, a connection to Hive Metastore on a remote database could be configured manually and common Warehouse for Spark and Hive could be specified. It’s possible since Spark provides a basic compatibility with Hive. This approach would allow basic functionalities such as exposing Hive table in Spark or accessing existing Hive table from Spark. Thus, Spark Thrift Server could expose both Hive tables in Hive Warehouse and DataFrames in Spark memory to JDBC clients. However, as of Spark 2.x, this solution doesn’t allow modern Hive features because of limited compatibility. An examples of such modern feature is support for ACID tables in Apache Hive.
Addressing lacking Hive compatibility of Spark, Hortonworks developed a Hive Warehouse Connector for the Hortonworks Data Platform (HDP). Hive Warehouse Connector is a newer generation to read and write data between Apache Spark and Apache Hive. From HDP 3.0, the use of Hive Warehouse Connector is the only way to integrate Hive and Spark. In this setting Spark and Hive have their own independent catalogs and all operations are bridged with HiveWarehouseConnector. An advantage of this approach is the capability to leverage more Hive features.
Without the necessity to program a Spark application, Hive tables can easily be exposed in RAM, using SQL queries via JDBC clients and leveraging the Spark engine. For greater control, custom manipulated DataFrames are accessible through JDBC/ODBC with SQL queries when Spark Thrift Server is embedded inside the Spark application.