Blog

Rumble, an engine to run JSONiq on top of Spark

Ghislain Fourny, 06 June 2019

The increasing amount of data available to process, as well as the ever-growing discrepancy between storage capacity, throughput and latency, has forced the database community to come up with new querying paradigms in the last two decades.

MapReduce has become very popular due to its very simple yet general abstraction, which consists in processing key-value pairs in two phases, mapping and reducing, separated by a shuffling phase. Apache Spark has more recently generalized the MapReduce paradigm from two phases to a full DAGs of phases called RDDs.

Yet, this is not the end of the road: MapReduce and Apache Spark are tools, not goals. Writing a program to query data with Spark in Java, Python or Scala feels more like writing assembly instructions (the Spark transformations) than high-level code.

Data independence

A truly data-independent system, in the sense envisioned by Edgar Codd in 1970, must expose the data to the technical user via a functional, declarative query language accompanied by a clean data model. Such languages can then be executed on top of MapReduce or Spark in a way transparent to the user, that is, hiding underlying key-value pairs or RDDs.

A declarative language is a language in which the user says what they want, not how they want. Rather than giving a set of instructions, like a cooking recipe (break the eggs, put the sugar, heat up the oven, etc), the user states what they want (“Mom, can I have a lemon cake?”).

A functional language is a language in which one manipulates mathematical objects. A program is made of an arrangement of expressions that take and return these objects, most often deterministically.

A functional and declarative language leaves room for interpretation and optimization to the engine: it is then open whether the query runs locally, whether it is spread across cores, or across machines in a cluster, or whether (who knows) it runs on DNA. A query optimizer can also rewrite inefficient code, cache intermediate values when it makes sense, etc.

Is the Spark API functional?

Some argue the Spark API, whether in Java, Scala or Python, is already declarative and functional. It is true to some extent, in the sense that transformations are functions that manipulate RDDs. However, there are a few differences with functional languages in the classical sense:

  1. From a productivity perspective, the functions are explicitly called, in other words the program is full of parentheses. A functional language (think Haskell, SQL, …) typically hides the underlying functions in a natural, easy-to-use, English-like syntax.

  2. From a performance perspective, the transformations used are the query plan. Spark will not further optimize and execute the plan as given by the user.

  3. The manipulated instances (RDD) cohabit with the host language (Java objects, etc). This adds extra complexity to the programming experience.

Enter data frames

DataFrames were a major improvement in Spark. When RDDs are highly structured (think rows with the same attributes and types), the underlying memory layout can be optimized, and Spark SQL can be used insted of explicit transformations. This is making Apache Spark more data independent, with a direct benefit in performance: SQL is declarative and (to some extent) functional, so that the underlying engine is free to find an optimal query plan, relying on decades of research done in the area of relational databases.

For example, if we have an input dataset that looks like so:

{ "Name" : "Peter", "Year" : 2015 }
{ "Name" : "John", "Year" : 2018 }
{ "Name" : "Helen", "Year" : 2016 }

It can be imported into dataframes like so:

Name (string) Year (integer)
Peter 2015
John 2018
Helen 2016

And queried like so:

SELECT year, COUNT(Name) as count
FROM input
GROUP BY year

Heterogeneous and nested data

But in real life, data is not always highly structured.

First, data can be nested. Here is an example with arrays, but this can also be with objects, and with an arbitrary depth.

{ "Name" : "Peter", "Year" : [ 2015, 2014 ] }
{ "Name" : "John", "Year" : [ 2013, 2018 ] }
{ "Name" : "Helen", "Year" : [ 2012, 2017, 2019 ] }

The above input can be read into a dataframe like so:

Name (string) Years (array of integers)
Peter [ 2015, 2014 ]
John [ 2013, 2018 ]
Helen [ 2012, 2017, 2019 ]

Nestedness can be handled by Spark SQL. Object lookup is done with dots, and Array lookup with the EXPLODE function like so:

SELECT Name, EXPLODE(Years) as Year
FROM input

yielding the following dataframe

Name (string) Year (integer)
Peter 2015
Peter 2014
John 2013
John 2018
Helen 2012
Helen 2017
Helen 2019

Even though having to explicitly do this conversion adds complexity, this may not feel yet like trying to feet a square peg (trees) in a round-shaped hole (a table).

But this becomes more apparent when the data is heterogeneous, like so:

{ "Name" : "Peter", "Year" : 2015 }
{ "Name" : "John", "Year" : 2013 }
{ "Name" : "Helen", "Year" : [ 2012, 2017, 2019 ] }

where the Year field is fit into a attribute with the least common denominator type: string (quotes are used to make explicit that we do not have integers and an array, but really three strings):

Name (string) Years (string)
Peter “2015”
John “2013”
Helen ”[ 2012, 2017, 2019 ]”

This puts a lot of burden on the user, who has to manually parse these strings back to whichever types make sense to them.

In the real world, data can be heterogeneous: the rows may not have the same fields, some may be missing some may be extra, and the types may be different. This happens, for example, when datasets are created over decades of data gathering. The schema evolves, fields are added or removed, types are changed, but past data does not, or cannot, get converted to the new schema.

JSONiq: a language that natively processes nested, heterogeneous data

JSONiq, which inherits 95% of its features from XQuery, a W3C standard, addresses these problems. Its most useful expression, FLWOR, is the counterpart of SQL’s SELECT FROM WHERE, but more flexible.

With a data model based on sequences of items, which can be homogeneous or heterogeneous or not, it deals natively with nested and heterogeneous data.

For example, we can query the above heterogeneous dataset even though the Year field may consist of integers or arrays of integers, like so:

for $person in json-file("people")
let $years := flatten($person.Year)
return { "Name" : $person.Name, "NumberOfYears" : count($years) }

Which gives the following result:

{ "Name" : "Peter", "NumberOfYears" : 1 }
{ "Name" : "John", "NumberOfYears" : 1 }
{ "Name" : "Helen", "NumberOfYears" : 3 }

FLWOR expressions support projection, selection, grouping, ordering, joining, couting (aka zipping), etc.

Rumble: pushing down the computations to Spark

Rumble is a JSONiq engine that processes heterogeneous, nested datasets with billions of JSON objects by dynamically pushing down computations to Spark. The query above, for example, will be executed with map() and flatMap() transformations, but this is all transparent to the user.

We are now actively working on pushing down more features.

For example, the following queries, even without using FLWOR expressions, are automatically pushed down to Spark, but are significantly simpler and easier to write than using Spark RDDs or Spark SQL for this kind of dataset:

Counting the number of objects in the dataset:

count(json-file("people"))

Discovering all the top-level keys in the dataset:

keys(json-file("people"))

Discovering all the distinct values of the field Name:

distinct-values(json-file("people").Name)

Discovering all the distinct values of the years, whether nested in an array or not:

distinct-values(flatten(json-file("people").Year))

Open-source download

Rumble is available for download under an Apache 2.0 license under http://rumbledb.org . If you find a bug, miss a feature, a builtin function or a pushdown, let us know on the GitHub issues page and we will gladly help.

Written by Ghislain Fourny