Increased-order Features, Avro and Customized Serializers

sparklyr 1.3 is now accessible on CRAN, with the next main new options:

  • Higher-order Functions to simply manipulate arrays and structs
  • Help for Apache Avro, a row-oriented information serialization framework
  • Custom Serialization utilizing R capabilities to learn and write any information format
  • Other Improvements resembling compatibility with EMR 6.0 & Spark 3.0, and preliminary help for Flint time collection library

To put in sparklyr 1.3 from CRAN, run

On this put up, we will spotlight some main new options launched in sparklyr 1.3, and showcase eventualities the place such options come in useful. Whereas quite a few enhancements and bug fixes (particularly these associated to spark_apply(), Apache Arrow, and secondary Spark connections) had been additionally an necessary a part of this launch, they won’t be the subject of this put up, and it is going to be a simple train for the reader to seek out out extra about them from the sparklyr NEWS file.

Increased-order Features

Higher-order functions are built-in Spark SQL constructs that enable user-defined lambda expressions to be utilized effectively to advanced information sorts resembling arrays and structs. As a fast demo to see why higher-order capabilities are helpful, let’s say in the future Scrooge McDuck dove into his enormous vault of cash and located giant portions of pennies, nickels, dimes, and quarters. Having an impeccable style in information buildings, he determined to retailer the portions and face values of every part into two Spark SQL array columns:


sc <- spark_connect(grasp = "native", model = "2.4.5")
coins_tbl <- copy_to(
    portions = list(c(4000, 3000, 2000, 1000)),
    values = list(c(1, 5, 10, 25))

Thus declaring his web price of 4k pennies, 3k nickels, 2k dimes, and 1k quarters. To assist Scrooge McDuck calculate the overall worth of every sort of coin in sparklyr 1.3 or above, we will apply hof_zip_with(), the sparklyr equal of ZIP_WITH, to portions column and values column, combining pairs of parts from arrays in each columns. As you might need guessed, we additionally have to specify tips on how to mix these parts, and what higher strategy to accomplish that than a concise one-sided system   ~ .x * .y   in R, which says we wish (amount * worth) for every sort of coin? So, we now have the next:

result_tbl <- coins_tbl %>%
  hof_zip_with(~ .x * .y, dest_col = total_values) %>%

result_tbl %>% dplyr::pull(total_values)
[1]  4000 15000 20000 25000

With the consequence 4000 15000 20000 25000 telling us there are in complete $40 {dollars} price of pennies, $150 {dollars} price of nickels, $200 {dollars} price of dimes, and $250 {dollars} price of quarters, as anticipated.

Utilizing one other sparklyr operate named hof_aggregate(), which performs an AGGREGATE operation in Spark, we will then compute the web price of Scrooge McDuck based mostly on result_tbl, storing the end in a brand new column named complete. Discover for this combination operation to work, we have to make sure the beginning worth of aggregation has information sort (particularly, BIGINT) that’s in step with the information sort of total_values (which is ARRAY<BIGINT>), as proven under:

result_tbl %>%
  dplyr::mutate(zero = dplyr::sql("CAST (0 AS BIGINT)")) %>%
  hof_aggregate(begin = zero, ~ .x + .y, expr = total_values, dest_col = complete) %>%
  dplyr::select(complete) %>%
[1] 64000

So Scrooge McDuck’s web price is $640 {dollars}.

Different higher-order capabilities supported by Spark SQL up to now embody remodel, filter, and exists, as documented in here, and much like the instance above, their counterparts (particularly, hof_transform(), hof_filter(), and hof_exists()) all exist in sparklyr 1.3, in order that they are often built-in with different dplyr verbs in an idiomatic method in R.


One other spotlight of the sparklyr 1.3 launch is its built-in help for Avro information sources. Apache Avro is a broadly used information serialization protocol that mixes the effectivity of a binary information format with the pliability of JSON schema definitions. To make working with Avro information sources easier, in sparklyr 1.3, as quickly as a Spark connection is instantiated with spark_connect(..., bundle = "avro"), sparklyr will robotically determine which model of spark-avro bundle to make use of with that connection, saving a whole lot of potential complications for sparklyr customers attempting to find out the proper model of spark-avro by themselves. Just like how spark_read_csv() and spark_write_csv() are in place to work with CSV information, spark_read_avro() and spark_write_avro() strategies had been applied in sparklyr 1.3 to facilitate studying and writing Avro information via an Avro-capable Spark connection, as illustrated within the instance under:


# The `bundle = "avro"` choice is just supported in Spark 2.4 or increased
sc <- spark_connect(grasp = "native", model = "2.4.5", bundle = "avro")

sdf <- sdf_copy_to(
    a = c(1, NaN, 3, 4, NaN),
    b = c(-2L, 0L, 1L, 3L, 2L),
    c = c("a", "b", "c", "", "d")

# This instance Avro schema is a JSON string that basically says all columns
# ("a", "b", "c") of `sdf` are nullable.
avro_schema <- jsonlite::toJSON(list(
  sort = "report",
  title = "topLevelRecord",
  fields = list(
    list(title = "a", sort = list("double", "null")),
    list(title = "b", sort = list("int", "null")),
    list(title = "c", sort = list("string", "null"))
), auto_unbox = TRUE)

# persist the Spark information body from above in Avro format
spark_write_avro(sdf, "/tmp/information.avro", as.character(avro_schema))

# after which learn the identical information body again
spark_read_avro(sc, "/tmp/information.avro")
# Supply: spark<information> [?? x 3]
      a     b c
  <dbl> <int> <chr>
  1     1    -2 "a"
  2   NaN     0 "b"
  3     3     1 "c"
  4     4     3 ""
  5   NaN     2 "d"

Customized Serialization

Along with generally used information serialization codecs resembling CSV, JSON, Parquet, and Avro, ranging from sparklyr 1.3, custom-made information body serialization and deserialization procedures applied in R will also be run on Spark employees through the newly applied spark_read() and spark_write() strategies. We will see each of them in motion via a fast instance under, the place saveRDS() is known as from a user-defined author operate to avoid wasting all rows inside a Spark information body into 2 RDS information on disk, and readRDS() is known as from a user-defined reader operate to learn the information from the RDS information again to Spark:


sc <- spark_connect(grasp = "native")
sdf <- sdf_len(sc, 7)
paths <- c("/tmp/file1.RDS", "/tmp/file2.RDS")

spark_write(sdf, author = operate(df, path) saveRDS(df, path), paths = paths)
spark_read(sc, paths, reader = operate(path) readRDS(path), columns = c(id = "integer"))
# Supply: spark<?> [?? x 1]
1     1
2     2
3     3
4     4
5     5
6     6
7     7

Different Enhancements


Sparklyr.flint is a sparklyr extension that goals to make functionalities from the Flint time-series library simply accessible from R. It’s at present underneath energetic growth. One piece of excellent information is that, whereas the unique Flint library was designed to work with Spark 2.x, a barely modified fork of it can work effectively with Spark 3.0, and throughout the current sparklyr extension framework. sparklyr.flint can robotically decide which model of the Flint library to load based mostly on the model of Spark it’s linked to. One other bit of excellent information is, as beforehand talked about, sparklyr.flint doesn’t know an excessive amount of about its personal future but. Possibly you may play an energetic half in shaping its future!

EMR 6.0

This launch additionally contains a small however necessary change that permits sparklyr to accurately connect with the model of Spark 2.4 that’s included in Amazon EMR 6.0.

Beforehand, sparklyr robotically assumed any Spark 2.x it was connecting to was constructed with Scala 2.11 and tried to load any required Scala artifacts constructed with Scala 2.11 as effectively. This turned problematic when connecting to Spark 2.4 from Amazon EMR 6.0, which is constructed with Scala 2.12. Ranging from sparklyr 1.3, such downside could be mounted by merely specifying scala_version = "2.12" when calling spark_connect() (e.g., spark_connect(grasp = "yarn-client", scala_version = "2.12")).

Spark 3.0

Final however not least, it’s worthwhile to say sparklyr 1.3.0 is understood to be absolutely appropriate with the lately launched Spark 3.0. We extremely advocate upgrading your copy of sparklyr to 1.3.0 when you plan to have Spark 3.0 as a part of your information workflow in future.


In chronological order, we wish to thank the next people for submitting pull requests in the direction of sparklyr 1.3:

We’re additionally grateful for helpful enter on the sparklyr 1.3 roadmap, #2434, and #2551 from [@javierluraschi](, and nice non secular recommendation on #1773 and #2514 from @mattpollock and @benmwhite.

Please observe when you imagine you’re lacking from the acknowledgement above, it might be as a result of your contribution has been thought-about a part of the following sparklyr launch reasonably than half of the present launch. We do make each effort to make sure all contributors are talked about on this part. In case you imagine there’s a mistake, please be happy to contact the creator of this weblog put up through e-mail (yitao at rstudio dot com) and request a correction.

For those who want to be taught extra about sparklyr, we advocate visiting,, and a number of the earlier launch posts resembling sparklyr 1.2 and sparklyr 1.1.

Thanks for studying!

Time collection prediction with FNN-LSTM

The place deep studying meets chaos