How to parse Spark SQL faster

Wubaoqi
4 min readMar 30, 2022

During my daily work, we built a data analysis product, deeply based on Apache Spark. A common task is to analyze Spark SQL.

After some web search, most articles are about using ANTLR to implement SQL Parser.

But this solution has some limitations: first, it can not support all features provided by Spark. The other is: it would be hard to maintain.

Let’s use Spark directly to parse its own SQL.

Use SparkSession to parse SQL

For example, assume we need to parse the following SQL expression:

(count(distinct col1) + 12345) as c1

Due to the environment used for SQL parsing, is not always the environment having the matching data files. To get a complete analysis, we can fake a full SQL like:

select (count(distinct col1) + 12345) as c1
from (
select cast(null as string) as col1
)

To make this happen, we need to know the column names and data types. (This is not a problem normally)

Then, we can parse this SQL using a standard way.

public static void main(String[] args) throws Exception {
String sqlString = "select (count(distinct col1) + 12345) as c1 from (select cast(null as string) as col1) ";
SparkSession sparkSession = SparkSession.builder()
.master("local[1]")
.getOrCreate();
LogicalPlan logicalPlan = sparkSession.sessionState().sqlParser().parsePlan(sqlString);;LogicalPlan analyzedPlan = sparkSession.sessionState().analyzer().executeAndCheck(logicalPlan, new QueryPlanningTracker());System.out.println(analyzedPlan.analyzed());
System.out.println(analyzedPlan.toJSON());
sparkSession.close();
}

For Spark SQL, it has the following stages:

For our SQL Analysis, we need to read the “Logical Plan” in chart. (To be more accurate, it should be “Resolved Logical Plan”). At this stage, Spark will check whether this column exist, deduce the data type, and check whether all functions are registered.

After run this program, we can get the following output:

And get the Abstract Syntax Tree JSON:

Although we can get the correct result, but we need to start a complete SparkSession. This has a larger cost, whether we can have some more lightweight solution?

A lightweight solution

Luckily, Spark has many kinds of Unit Tests. Maybe we can find some solutions inside those facilities. For example, for our scenario (mainly Analyzer related), there is a similar one under “src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala” file:

/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]], [[EmptyFunctionRegistry]] and
* [[EmptyTableFunctionRegistry]]. Used for testing when all relations are already filled
* in and the analyzer needs only to resolve attribute references.
*/
object SimpleAnalyzer extends Analyzer(
new CatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
EmptyTableFunctionRegistry) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
})) {
override def resolver: Resolver = caseSensitiveResolution
}

This SimpleAnalyzer is almost what we need, expect, it used EmptyFunctionRegistry, while we need all Spark builtin functions.

Then, we can create a similar class. (Due to SimpleAnalyzer used many classes which can only accessed under “org.apache.spark.sql” package), we can create new class under “org.apache.spark.sql” package:

package org.apache.spark.sql.catalyst.analysisimport org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.connector.catalog.CatalogManager
object MySimpleAnalyzer extends Analyzer(
new CatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
FunctionRegistry.builtin,
TableFunctionRegistry.builtin) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
})) {
override def resolver: Resolver = caseSensitiveResolution
}

Then, Use our new simple Analyzer, we can implement SQL Parser as following:

public static void main(String[] args) {
String sqlString = "select (count(distinct col1) + 12345) as c1 from (select cast(null as string) as col1) ";
SparkSqlParser parser = new SparkSqlParser();LogicalPlan logicalPlan = parser.parsePlan(sqlString);LogicalPlan analyzedPlan = MySimpleAnalyzer.executeAndCheck(logicalPlan, new QueryPlanningTracker());System.out.println(analyzedPlan.analyzed());
System.out.println(analyzedPlan.toJSON());
}

After run this program, the console output is like:

The output is more clean now, and it also run faster. Once we have our Spark SQL Parser ready, we have a lot more cool features to implement now!

--

--