[Перевод] [Перевод] Почему стоит начать писать собственные Spark Native Functions?

Это мой вольный перевод статьи »Why You Should Start Writing Spark Custom Native Functions», которая вдохновила меня на некоторые собстенные изыскания по данной теме. Их результат я планирую опубликовать позже, а пока выношу на ваш суд этот перевод.

Один из первых способов, к которому люди обращаются, когда им нужно сделать что-то, что не поддерживается из коробки в Spark, это написание UDF (user-defined function), которая позволяет им получить нужный функционал. Но является ли это лучшим способом? Каковы последствия для производительности при написании UDF?

В этом посте мы рассмотрим пример по реализации функции, возвращающей UUID, с помощью двух разных подходов: с использованием UDF и написание собственного нативного кода для Spark, и сравним их производительность. Пост будет следовать следующей структуре:

  1. Введение

  2. Реализация UUID с использованием UDF

  3. Реализация UUID с использованием Catalyst Expressions

  4. Сравнение производительности

  5. Заключение

Примечание 1: Мы будем, естестенно, писать код на Scala

Примечание 2: Функция UUID () доступна в Spark с версии 3.0.0, но ее реализация все равно полезна в качестве упражнения из-за ее простоты.

Примечание 3: весь код из данной статьи вы можете найти тут: https://github.com/imdany/spark_catalyst_udf

Введение

Если вы работали с Spark, то знаете, что бывают случаи, когда сам Spark не предоставляет необходимого функционала, и вам приходится его расширять.

Обычно вы можете сделать это, написав UDF, которая выполняет нужную работу. Но знали ли вы, что есть и другой альтернативный способ? Он называется Catalyst Expressions, и, должен признаться, их не так просто написать, но они могут повысить производительность вашего приложения до нового уровня. Итак, приступим к делу!

Реализация UUID с помощью UDF

Реализация генератора UUID с использованием UDF проста. С некоторыми вариациями или другим синтаксисом, один из способов написания этой функции выглядит так:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

def uuid: UserDefinedFunction = udf(() => java.util.UUID.randomUUID().toString)

spark.sqlContext.udf.register("uuid", uuid)

Таким образом, мы можем использовать функцию UUID как с помощью SQL-выражений, так и с помощью API DataFrame.

Основная идея здесь заключается в том, что мы используем существующую функцию Java, но не в Spark. Чтобы использовать ее из Spark, необходимо обернуть этот код в UDF, а если мы хотим использовать его и в SQL API, то еще и зарегистрировать.

Просто, верно? И вы можете использовать этот подход для многих, многих разных вещей. Это легко сделать, и это работает, но есть и другие способы расширения функциональности Spark.

Реализация UUID с использованием Catalyst Expressions

Написание Catalyst Expressions определенно сложнее, чем UDF, но как вы увидите в следующем разделе, это имеет свои преимущества в части производительности.

Для написания пользовательской функции в Spark нам нужно как минимум два файла:

  • первый будет реализовывать саму логику, расширяя функционал Catalyst

  • второй сделает эту функцию доступной

Catalyst Expression

Файл, содержащий реализацию кода, должен быть создан в определенном пакете, а именно:

org.apache.spark.sql.catalyst.expressions

Следовательно, нам нужно создать файл для нашей функции в этой папке в нашем проекте Spark.

b3b90fe85ae3cf647cabc321b46eb04f.webp

И здесь дела могут стать довольно сложными. Чтобы заставить Spark использовать наши функции, нам нужно расширить доступный интерфейс. Существует множество различных интерфейсов, которые мы могли бы реализовать, и найти подходящий может быть сложным, так как документация на этот счет недостаточна. Для упрощения вещей давайте проанализируем, что мы хотим достичь — функция без входных параметров, возвращающая строку.

Я выяснил, что мне нужно реализовать Leaf Expression, так что мой класс будет начинаться так:

case class Uuid() extends LeafExpression with CodegenFallback {
  override def nullable: Boolean = ???
  override def eval(input: InternalRow): Any = ???
  override def dataType: DataType = ???
}

Итак, давайте заполним эту определенную часть, начиная с простых методов.

Для dataType, так как мы хотим вернуть строку:

override def dataType: DataType = StringType

А для nullable, поскольку мы не хотим возвращать значения null из нашей функции:

override def nullable: Boolean = false

Последний кусочек,»eval», — это фактическое выполнение функции, которая будет генерировать UUID.

override def eval(input: InternalRow): Any = 
  UTF8String.fromString(java.util.UUID.randomUUID().toString)

Вот и все! Единственное необычное, что вы можете заметить, — это UTF8String.fromString (). Если вы попытаетесь запустить код без этого, вы увидите:

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String

Причина вызова этого метода заключается в том, что Spark использует его для преобразования «внешней строки» в «строку Spark» (https://github.com/apache/spark/…/UTF8String.java#L49)

Окончательный код выглядит следующим образом:

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

case class Uuid() extends LeafExpression with CodegenFallback {
  override def nullable: Boolean = false
  override def dataType: DataType = StringType
  override def eval(input: InternalRow): Any = UTF8String.fromString(java.util.UUID.randomUUID().toString)
}

Просто, верно? Ну, в этом случае, да, реализация была легкой, но обычно это не так просто.

Function wrapper

Теперь, когда у нас есть выражение Catalyst, мы должны сделать его
доступным для использования в API Dataframe. Для этого нам нужно создать второй
файл. Местоположение этого файла не так важно, как у предыдущего, но
для порядка я обычно помещаю его в org.apache.spark.sql

1a01e8acff7294af0cd0b73f79ff455a.webp

И на этот раз я назвал его CustomFunctions, и он должен содержать следующее содержание:

object CustomFunctions { 
  private def withExpr(expr: Expression): Column = Column(expr) 
  def UUID_CUSTOM(): Column = withExpr { 
    Uuid() 
  } 
}

С помощью этого кода мы делаем функцию Uuid доступной через объект CustomFunctions.

Использование

И последний вопрос: как мы используем эту функцию? Ответ довольно прост! Мы должны импортировать ее, как и любую другую функцию и использовать ее в нашем DataFrame:

import org.apache.spark.sql.CustomFunctions.UUID_CUSTOM
//....
val newDf = df.withColumn("uuid", UUID_CUSTOM())

Сравнение производительности

Вопрос, который вы могли бы себе задать, — стоит ли все это того? Ну, давайте взглянем на цифры. Я запустил один и тот же код с использованием UDF и выражения Catalyst на DataFrame разного размера, и результаты довольно интересны.

// UDF version
val data = spark.range(nRows).toDF("ID").withColumn("uuid_udf", expr("uuid_udf()"))
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/UDF/${runID}")
// --------
// Catalyst Version
val data = spark.range(nRows).toDF("ID").withColumn("uuid", UUID_CUSTOM())
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/catalyst/${runID}")

Я запустил каждую функцию с 4 разными значениями числа строк по 100 раз, затем получил среднее время для каждой, и вот результат:

218f91880450393f821131c535c19456.webp

Для небольших DataFrame разница не очень заметна…, но когда вы увеличиваете размер DataFrame, можно начать замечать, насколько лучше Catalyst Expressions справляется с задачей по сравнению с UDF.

Заключение

Так что стоит ли мне перестать использовать UDF и начать писать Catalyst Expression?

Я не могу ответить на этот вопрос за вас, так как это будет зависеть от многих различных аспектов, таких как время, доступные ресурсы или знания. Но что ясно из результатов тестов, так это то, что если вам нужно быстродействие приложения или сократить время выполнения вашей задачи, вам стоит рассмотреть возможность изучения написания таких Catalyst Expression.

© Habrahabr.ru