Serializers for Classes in Datasets

Apache Spark powers a lot of modern big data processing pipelines at software companies today. At FullContact, we’ve found its Dataset API to be particularly useful, since it combines the type-safe, expressive, functional style of the older RDD API with the efficiency of Spark SQL and its Catalyst optimizer.

However, it has a major limitation on the types it’s most easily usable with -- primitive types, tuples, and case classes.

I was faced with this exact limitation when a colleague's DM slid into view with a familiar brush-knock sound:

Question for you -- I'm trying to get kryo working in zeppelin by setting "spark.serializer" and "spark.kryo.registrator" on the spark interpreter.I should NOT have to do this:
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[CustomMessage]
if kryo is working correctly right?The error I get is:
java.lang.UnsupportedOperationException: No Encoder found for com.fullcontact.publish.CustomMessage
- field (class: "com.fullcontact.publish.CustomMessage", name: "_1")
- root class: "scala.Tuple2"

 

His actual code looked a little like this:

implicit val myObjEncoder =
  org.apache.spark.sql.Encoders.kryo[CustomMessage]

val inputs: Dataset[InputRecord] =
  spark.read.csv(“s3://...").as[InputRecord]
val messages: Dataset[(String, CustomMessage)] =
  inputs.map(keyMsgById)

messages.forEach(publishMsgToKafka)

 

By defining a kryo encoder in that way, his intended effect was to allow the Dataset class to accept CustomMessage (a java type) as a type parameter.

So why was his code still throwing that exception when he had clearly provided an Encoder? Normally, Spark uses several features of Scala’s robust type system to make the Encoder system invisible to the programmer, but that invisibility comes with the drawback of the type limitations I mentioned earlier. Trying to make it work outside those preferred types requires the programmer to be very particular with how they set up their Encoder to ensure Spark uses it properly.

This post will cover:

  1. How Spark uses Scala’s implicits and type system to construct Encoders
  2. How Spark exposes an "escape hatch" for custom type Encoders
  3. Why custom type Encoders don’t work well with the automatic Encoders systems, which led to my coworker’s problem

How Spark Finds An Encoder For Your Class

By default, Spark reads records in a Dataset as Row objects (Dataframe is an alias for the Dataset[Row] type in recent Spark releases). But Row objects are unwieldy. They cannot know the structure of your data at compile time, so you have to interact via either untyped access of indexed fields or inspect the carried schema at runtime to find the proper type casts.

It is possible to get a statically-typed Dataset via the method Dataset.as[_]. This lets you have typed access to the data in your records. Rather than interacting through the spark-sql API:

val df: Dataframe =
  spark.read.csv(“s3://my-bucket/my-csv.csv”)
df.select($”email”, explode($”contacts”) as “contact”)
  .where($”sent” > 1000)
  .where($”received” < 10)

 

You can use maps, flatmaps, reduces, folds, etc., just like in the RDD API:

val ds: Dataset[EmailAcc] =
  spark.read.csv(“s3://my-bucket/my-csv.csv”).as[EmailAcc]
ds.filter(acc => acc.sent > 1000 && acc.received < 10)
  .flatMap(acc => acc.contacts.map((acc.email, _)))

 

You lose some performance because the Catalyst optimizer has to assume you need every column of your rows to deserialize your objects, but you can often gain readability compared to complex spark-sql calls or UDFs.

Let’s take a look at the definition of .as[_]:

 /**
   * :: Experimental ::
   * Returns a new Dataset where each record has been mapped on to the specified type. The
   * method used to map columns depend on the type of `U`:
   *  - When `U` is a class, fields for the class will be mapped to columns of the same name
   *    (case sensitivity is determined by `spark.sql.caseSensitive`).
   *  - When `U` is a tuple, the columns will be mapped by ordinal (i.e. the first column will
   *    be assigned to `_1`).
   *  - When `U` is a primitive type (i.e. String, Int, etc), then the first column of the
   *    `DataFrame` will be used.
   *
   * If the schema of the Dataset does not match the desired `U` type, you can use `select`
   * along with `alias` or `as` to rearrange or rename as required.
   *
   * @group basic
   * @since 1.6.0
   */
  @Experimental
  @InterfaceStability.Evolving
  def as[U : Encoder]: Dataset[U] =
    Dataset[U](sparkSession, logicalPlan)

 

The method appears to take no arguments, but the syntax in the type parameters list is actually hiding an implicit parameter list. This syntax is called a context bound and during compilation is expanded to:

def as[U](implicit evidence: Encoder[U]): Dataset[U]

 

By including this implicit parameter to its signature, .as[_] is doing two things:

  1. Requiring that there exist an object of type Encoder[U]
  2. Accepting that object via the implicit scope at its call site

This is an implementation of the "type class" pattern in Scala. Type classes enable ad-hoc polymorphism, meaning methods on Dataset can use different code depending on the type they contain, but the choice of which code to use is deferred to some time after the Dataset class itself is implemented. In fact, the necessary code path is not chosen until the programmer’s code is compiled!

The necessary code does exist, though. It is chosen in such a way that if the programmer is using the basic supported types, they never need to mention the Encoder type by name:

val spark = SparkSession.builder.createOrGet()
import spark.implicits._

// A single-column CSV with integer values
val numbers: Dataframe =
  spark.read.csv(“s3://my-bucket/numbers.csv”)

val numbersDS: Dataset[Int] = numbers.as[Int]

 

As we are about to see, importing spark.implicits._ makes several implicit values available to the compiler. These implicit values make it possible for the Dataset to have the necessary Encoder for a type the Dataset did not know existed until the code is compiled.

spark.implicits is an object which extends SQLImplicits, which contains the actual definitions we’re interested in:

// Primitives

  /** @since 1.6.0 */
  implicit def newIntEncoder: Encoder[Int] =
    Encoders.scalaInt

  /** @since 1.6.0 */
  implicit def newLongEncoder: Encoder[Long] =
    Encoders.scalaLong

  /** @since 1.6.0 */
  implicit def newDoubleEncoder: Encoder[Double] =
    Encoders.scalaDouble

... and so on ...

 

These implicit methods provide values of their return type into implicit scope wherever they are visible. The import statement from before takes all of them, thereby making Encoders for all the listed types: Int, Long, Double, and so on; there is also a provider for any Scala Product types to cover tuples and case classes. Examining the definitions for the examples above, we see:

  def scalaInt: Encoder[Int] = ExpressionEncoder()

  def scalaLong: Encoder[Long] = ExpressionEncoder()

  def scalaDouble: Encoder[Double] = ExpressionEncoder()

... and so on ...

 

They are all using the same ExpressionEncoder.apply method, and relying on the compiler’s type inference to sort out what type the apply call should be parameterized with. The type parameter list for apply once again uses a context bound, but for a different purpose this time:

def apply[T : TypeTag](): ExpressionEncoder[T] = {
  // We convert the not-serializable TypeTag into StructType and ClassTag.
  val mirror = ScalaReflection.mirror
  val tpe = typeTag[T].in(mirror).tpe
  (... method implementation ...)
}

 

The context bound generates a TypeTag to circumvent the usual erasure restriction on the JVM. That way, the Encoder implementation can leverage Scala’s Reflection APIs to inspect the type the Encoder was built for to know how to convert between JVM values and Catalyst expressions for Spark’s internal Row format. These conversions are taken from a large static mapping between Scala types and Catalyst expressions; primitive types are supported directly while tuple and case class types are supported via recursive definitions.

The actual implementation of ExpressionEncoder that combines the Catalyst expressions is an impressive stack of scala reflections code, too complex to review in-depth here, yet still relevant to the investigation of why the custom Encoder wasn’t working. But first, a brief look at how Spark lets you create an Encoder for non-tuple, non-case class types.

How You Can Create An Encoder For “Non-Supported” Types

In addition to definitions of Encoders for the supported types, the Encoders object has methods to create Encoders using other Encoders (for tuples), using java serialization, using kryo serialization, and using reflection on Java beans.

In addition to definitions of Encoders for the supported types, the Encoders objects has methods to create Encoders using java serialization, kryo serialization, reflection on Java beans, and tuples of other Encoders.

The java and kryo serializers work very similarly. They serialize the entire object to bytes, and then store that in a single column of data of type binary:

// Assume we have a Bean class with fields 's' and 'i'
val beans = Seq(
  new Bean("a", 0),
  new Bean("b", 1),
  new Bean("c", 2)
)

implicit val kryoEncoder: Encoder[Bean] = Encoders.kryo[Bean]

val beanRDD: RDD[Bean] = sc.parallelize(beans)
val beanDS: Dataset[Bean] = beanRdd.toDS
// beanDS: Dataset[Bean] = [value: binary]

 

This is generally not what you want, because you now have to pay the overhead cost of going through a full serialization layer instead of letting the Catalyst engine optimize the plan for your data. And, you can’t do anything with your data except to deserialize it into JVM objects -- even if you wanted to start interacting with it in the SparkSQL API, you couldn’t.

The Encoder specialized for Java beans makes the data columns more available:

// Assume we have a Bean class with fields 's' and 'i'
val beans = Seq(
  new Bean("a", 0),
  new Bean("b", 1),
  new Bean("c", 2)
)

implicit val beanEncoder: Encoder[Bean] = Encoders.bean[Bean]

val beanRdd: RDD[Bean] = sc.parallelize(beans)
val beanDS: Dataset[Bean] = beanRdd.toDS
// beanDS: Dataset[Bean] = [i: int, s: string]

 

But also comes with the major drawback of requiring your data type to adhere to the bean spec: a no-args constructor, with getters and setters for every data field.

The tuple methods are interesting because they do something we haven’t seen in other Encoders yet: they accept pre-existing Encoders:

def tuple[T1, T2](
  e1: ExpressionEncoder[T1],
  e2: ExpressionEncoder[T2]): ExpressionEncoder[(T1, T2)] =
  tuple(Seq(e1, e2)).asInstanceOf[ExpressionEncoder[(T1, T2)]]

 

The tuple method pulls existing serializers from the supplied Encoders and composes them together without reflection. These tuple methods are not called when Spark sets up Encoders for a tuple of supported types, though. This leads to precisely the problem my colleague was running into when he Slacked me for help.

Why Custom Encoders Don’t Work With Tuple Datasets

If we look at the definition of of Encoder.apply again, we can see that it does not accept any pre-existing Encoder objects:

def apply[T : TypeTag](): ExpressionEncoder[T] = {
  // We convert the not-serializable TypeTag into StructType and ClassTag.
  val mirror = ScalaReflection.mirror
  val tpe = typeTag[T].in(mirror).tpe
  ... method implementation ...
}

 

In addition,  the tuple methods we saw earlier are not called when generating Encoders for tuple-typed Dataset. Instead, these methods are how we reach the necessary ExpressionEncoder constructor:

// org.apache.spark.LowPrioritySQLImplicits
implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] =
  Encoders.product[T]

//org.apache.spark.Encoders
def product[T <: Product : TypeTag]: Encoder[T] =
  ExpressionEncoder()

 

If T is a tuple type, and one of its fields is not one of the default supported types, the ExpressionEncoder generated has no way of knowing whether an Encoder already exists for that field, much less use that Encoder. Instead, what will happen is that at runtime the reflection code inside ExpressionEncoder will run, try to generate Catalyst expressions for the unsupported field, and fail because the type is not supported in its mapping of supported expressions! It then throws exactly the exception my coworker was seeing!

The solution my colleague went with was to fall back to the older RDD API. He needed most of the data fields from the objects he was working with anyway, which made it more practical to accept a performance hit from using the API rather than attempt to build up a mapping to an analogous case class. Had he wanted to keep using his custom models in tuples in a Dataset, the solution would have been to supply evidence of an Encoder for the tuple type himself rather than rely on the type-inferred Encoder system:

implicit val myObjEncoder =
  Encoders.kryo[CustomMessage]
implicit val tupleEncoder =
  Encoders.tuple(Encoders.STRING, myObjEncoder)

val inputs: Dataset[InputRecord] =
  spark.read.csv("s3://...").as[InputRecord]
val messages: Dataset[(String, CustomMessage)] =
  inputs.map(keyMsgById)

messages.forEach(publishMsgToKafka)

 

Key Points

After digging into the implementation of Spark’s Encoder system, I understood it well enough that I could answer my colleague’s need for a way to use his custom data type in Spark. The process of unraveling the system led me to several valuable concepts:

  • Type classes are a very powerful way to generalize code at a level beyond just type parameters. I had some knowledge of the pattern beforehand, but had always struggled to generalize it without leaning on a variation of Haskell’s Show. Seeing a concrete example in the Encoder system helped solidify my understanding of it greatly.
  • The way Spark leverages Scala implicits is impressive, abstracting nearly the entire Encoder system away from the user such that they often never even know it’s there. But, this abstraction makes it difficult to understand and debug. If something goes wrong, it can be difficult to find the issue, which led me to spend several hours of reading, testing, and code-stepping to identify the issue! It is no surprise to me that implicits are getting a big overhaul in Scala 3.
  • When absolutely necessary, Spark offers some "side entrances" to work with types it is not optimal for. The java, kryo, and java-bean Encoders all offer a way to have Spark’s Dataset operations work on types that don’t map nicely onto Catalyst expressions. However, they carry restrictions on how the programmer can interact with the data or how the type must be structured. These special encoders should be used sparingly and with good reason.
  • Encoders for nested types are constructed in one go, so if a field in a type is not compatible with the default Catalyst expression mapping, Spark will reject the enclosing type. Specifically, this means nested types that require the special encoders to use won’t work when nested inside a tuple (as my colleague discovered). Dedicated tuple methods that accept Encoder evidence must be used instead.

Unraveling how an Encoder is made was like solving a fascinating puzzle, and ended with a satisfying conclusion and greater understanding. I was impressed by the engineering talent which has gone into building Apache Spark even in just this one small corner of the library. I’m sure the deeper knowledge of Spark I gained will be helpful in making sure we are using the library effectively to solve problems at FullContact, and I hope sharing what I learned in this dive into its internals is helpful to others outside of FullContact as well!

Recent Blogs