Динамическое создание UDF в Apache Spark из строк кода: проблемы и решение («костыль»)

Ремарка:

В данной статье представлено, как автор решил свою проблему и не утверждает, что нельзя было сделать это лучше или красивее. Также автор не утверждает, что данную проблему или задачу следует решать именно таким методом и не настаивает на повторении данного подхода. Это лишь один из возможных способов решения, предложенный автором, и он представлен здесь исключительно для ознакомления.

Автор отмечает, что не смог найти информацию по данной проблеме ни на русскоязычных, ни на англоязычных ресурсах, включая Хабр и другие источники. Статья представляет собой дискуссионный материал, и автор будет рад увидеть альтернативные решения в комментариях.

Что я пытаюсь сделать:

Рассмотрим создание UDF blue, который возвращает строку »0000FF» на Scala API.»

object Example1 extends App with Configure {

	spark.udf.register("blue", udf(() => "0000FF")) // создаем и регестрируем udf
	spark.sql("select blue()").show()
	/* вывод:
			+------+
			|blue()|
			+------+
			|0000FF|
			+------+
	 */

}

Рассмотрим ситуацию, когда сама функция UDF поступает в программу в виде строки. Например, у нас есть переменная code, которая содержит "() => "0000FF"". В рантайме необходимо получить объект типа Function0[String] из этой строки и использовать его для создания UDF. Таким образом, моя цель заключается в написании программы, способной обрабатывать сложные лямбды с произвольным количеством аргументов и произвольными выходными данными в виде строк.

например:

  • »(num: Int) => num * num»

  • »(str: String) => str.reverse»

  • »(num1: Double, str: String) => Math.pow (num1, str, size)»

  • и тд

как не получилось делать и какая возникала ошибка:

расмотрим следующий код

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example2 extends App with Configure {

	def udfRegister(labdaCode: String): Unit = {

		import universe._
		val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

		val tree = toolbox.parse(labdaCode)
		val compiledCode = toolbox.compile(tree)
		val function = compiledCode().asInstanceOf[Function0[String]]
		//сторока успешно конвертируеться в Function0(String)


		println(function()) // вывод: 0000FF
		val udfBlue = udf(() => function()) // успешное создание udf


		spark.udf.register("blue", udfBlue)

	}

	udfRegister("() => \"0000FF\"")
	spark.sql("select blue()").show() // возникает ошибка

}

При запуске задачи возникает следующая ошибка:
cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

Ошибка указывает на то, что в коде или при работе с Spark возникла попытка использовать SerializedLambda вместо ожидаемого типа scala.Function1 (функция с одним аргументом).

В Spark функции должны быть сериализуемыми, чтобы их можно было передать через кластер. SerializedLambda может возникать при попытке передачи некорректно сериализуемой функции.

В свою очередь SerializedLambda есмь класс, используемый в Java для сериализации лямбда-выражений и методов ссылок. Этот интерфейс генерируется компилятором Java при сериализации лямбда-выражений. (судя по сему данный класс не предоставляет необходимого спарку инструмента для сериализации)

Особенно запутывает то, что данная ошибка не возникает при конвертации полученного объекта в Function0[String], который Spark полностью поддерживает, а также не возникает при создании объекта UDF. Ошибка проявляется уже в рантайме при запуске задачи на кластере.

Однако следующий код снова работает:

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example3 extends App with Configure {

	val labdaCode = "() => \"0000FF\""

	import universe._
	val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

	val tree = toolbox.parse(labdaCode)
	val compiledCode = toolbox.compile(tree)
	val function = compiledCode().asInstanceOf[Function0[String]]
	//сторока успешно конвертируеться в Function0(String)


	println(function()) // вывод: 0000FF
	val udfBlue = udf(() => function()) // успешное создание udf

	spark.udf.register("blue", udfBlue) // успешное регестрирование udf

	spark.sql("select blue()").show()
	/*вывод:

		+------+
		|blue()|
		+------+
		|0000FF|
		+------+

	 */

}

В сущности в Example3 происходит все тоже самое что и в Example2, но первое работает, а второе нет что вводит еще в большее замешательство

import org.apache.spark.sql.functions.udf

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object Example4 extends App with Configure {

	if (true) {

		val labdaCode = "() => \"0000FF\""

		import universe._
		val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()

		val tree = toolbox.parse(labdaCode)
		val compiledCode = toolbox.compile(tree)
		val function = compiledCode().asInstanceOf[Function0[String]]
		//сторока успешно конвертируеться в Function0(String)


		println(function()) // вывод: 0000FF
		val udfBlue = udf(() => function()) // успешное создание udf

		spark.udf.register("blue", udfBlue) // успешное регестрирование udf

	}

	spark.sql("select blue()").show() // возникает ошибка

}

как и Example3 Example4 неделает тоже самое, но почему то блок if снова введет к ошибке
так же к ошибкам приводит и использование циклов и монад, Try и тд (но тут я это демонстрировать не буду)

таким образом все сводиться к тому чтобы правильным образом создать function которая бы не являлась бы SerializedLambda и есть следующие ограничения:

  • можно создавть лямбды/функции только в мкетоде main

  • нельзя из создовать в блоках if, try, for, монадах и тд.

хоть примеры и демонстрируют работу с совсем простой функцией blue, моя задача сделать универсальное приложение, которое будет принимать любую лямбду (на самом деле от 0 до 10 поскольку максимальный UDF10) в формате строки

Определимся с тем как программа получает информацию о функциях:

Я хочу чтобы информацию получало в формате JSON и путсь у него будет следующая структура:

[ //массив поскольку функций может прийти много или неприйти вообще
  {
    "name": "blue", //имя бля udf
    "targetType": "String", //тип данных для возращаемый
    "fun": "() => \"00ff00\"", //собственно сама лямбда
    "imports": ["...", "..."] //опциональное, на случай если нужны дополнительные импорты
  },
  // остальные имеют такую же структуру
  {
    "name": "reverse",
    "targetType": "String",
    "fun": "(str:String) => str.reverse"
  },
  {
    "name": "plus",
    "targetType": "Int",
    "fun": "(i1:Int, i2:Int) => i1 + i2"
  },
  {
    "name": "sum3",
    "types": "Int,Int,Int",
    "targetType": "Int",
    "fun": "(i1:Int, i2:Int, i3:Int) => i1 + i2 + i3"
  }
]

Определимся с классом который будет отражать данную структуру Json:

// думаю тут не стоит расписывать какое поле за что отвечает
case class UDFConfig(name: String,
  					 targetType: String,
					 fun: String,
					 imports: List[String] = List.empty)

Для парсинга json файла я буду использовать библиотеку json4s и следующий класс

package Util

import org.json4s.Formats
import org.json4s.native.JsonMethods.parse

//небольшой обьект для ковертации строки в формате json в список обьектов типа JsonType 
object JsonHelper {

	private def jsonToListConvert[JsonType](json: String)(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]] = {
		parse(json)
			.extract[List[JsonType]]
			.map(Right(_))
	}

	implicit class JsonConverter(json: String) {

		def jsonToList[JsonType]()(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]]= {
			jsonToListConvert[JsonType](json)
		}

	}

}

И создадим трейт Configure в котором и определи переменые которые понадобяться для дальнейшей работы

Иimport config.UDFConfig
import org.apache.spark.sql.SparkSession
import org.json4s.DefaultFormats

import scala.io.Source

trait Configure {

	lazy val spark = SparkSession
		.builder()
		.appName("Example UDF runtime compile")
		.master("local[*]")
		.getOrCreate()
	lazy val sc = spark.sparkContext
	// создали спрак сессию и контекст

	lazy val pathUdfsJson = "./UDFs.json" //путь до json c функциями

	implicit lazy val formats = DefaultFormats // список форматов для json4s, в моем случае хватеает дефолтных форматов
	import Util.JsonHelper._

	lazy val udfJson = getJson(pathUdfsJson) // получаем сам json в виде строки
	lazy val udfsConfigs: List[UDFConfig] = udfJson
		.jsonToList[UDFConfig]
		.filter(_.isRight)
		.map {
			case Right(udfconfig) => udfconfig
		} // получаем все корректные udfconfig
		  // этот обект мне и понадобиться

	//метод для чтения файлов
  	private def getJson(path: String): String = {
		val file = Source.fromFile(path)
		try {
			file.getLines().mkString("\n")
		} finally {
			file.close()
		}
	}

}

Определим последний обьект который будет udfconfig переводить в строку которую будет динамически в ратайме превращаться в FunctionN в последствии

package Util

import config.UDFConfig

object UDFHelper {

	private def configToStringConvert(udfConfig: UDFConfig): String = {

		//создадим все импорты для итоговой строки
		val imports = udfConfig
			.imports 																 // достаем импорты из конфига
			.map(_.trim.stripPrefix("import ").trim) // удаляем с лева ключевое слово если оно есть
			.distinct 															 // оставляем уникальные
			.map(imp => f"import ${imp}")            // конкатенируем слева ключевое слово
			.mkString("\n")                          // все соеденяем через отступ

		val Array(params, functionBody) = udfConfig.fun.split("=>", 2).map(_.trim) // Отделяем функциональную часть от переменных

		val paramsTypes: Seq[(String, String)] = params
			.stripPrefix("(") 			// Убираем с лева скобку
			.stripSuffix(")") 			// Убираем с права скобку
			.split(",")       			// Разделяем параметры
			.map(_.trim)      			// Убираем лишние пробелы
			.map {
				case "" => null
				case param =>
					val Array(valueName, valueType) = param.split(":").map(_.trim)
					(valueName, valueType)
			}                 			// разделяем имя переменой от типа данных этой переменной
			.filter(_ != null) 			// отвильтровываем null-ы

		val funcTypes: String = paramsTypes.size match {
			case 0 => udfConfig.targetType
			case _ => f"${List
				.fill(paramsTypes.size)("Any")
				.mkString("", ", ", ", ")}${udfConfig.targetType}"
		} //здесь получаем перечисление через запятую типов данных Function

		val anyParams = paramsTypes.map(_._1.trim + "_any").mkString(", ") //парметры лямда вырожения

		val instances = paramsTypes.map {
			case (valueName, valueType) =>
				f"	val ${valueName}: ${valueType} = ${valueType}_any.asInstanceOf[${valueType}]"
		}.mkString("\n") // тут определяем конвертации парметров люмбды в необхадимые типы

		// собираем все вместе в итоговый стринг
		f"""
			 |${imports}
			 |
			 |val func: Function${paramsTypes.size}[${funcTypes}] = (${anyParams}) => {
			 |
			 |${instances}
			 |
			 |  (
			 |${functionBody}
			 |  ).asInstanceOf[${udfConfig.targetType}]
			 |
			 |}
			 |
			 |func
			 |""".stripMargin

	}

	implicit class Converter(udfConfig: UDFConfig) {

		def configToString(): String = {
			configToStringConvert(udfConfig)
		}

	}

}

По поводу использования типа Any: как я уже упоминала, моя задача заключается в создании универсальной логики, где передаваемая через JSON лямбда или функция может иметь любое количество параметров (на практике от 0 до 10). В условиях, где невозможно динамически преобразовать строку в FunctionN, использование типа Any является единственным выходом (в дальнейшем будет ясно, почему). Иначе пришлось бы создавать множество объектов для каждого возможного числа параметров.

\sum_{i=0}^{10} {9^{i+1}}

что весьма много.
9 поскольку я решила ограничеться 9-тью типами данных: String, Int, Boolean, Byte, Short, Long, Float, Double, Date, Timestamp

Создаем обьекты регистраторы:

Поскольку мне не разрешено использовать конструкции типа if или match, я не могу динамически определять количество параметров в одном объекте UDFRegN и соответственно настраивать его. В результате мне пришлось создавать несколько отдельных объектов UDFRegN, каждый из которых поддерживает определённое количество параметров. Например, для случая с нулевым количеством параметров я создал соответствующий объект UDFRegN.

Это подход необходим из-за ограничений, которые не позволяют динамически адаптировать логику UDF в зависимости от передаваемого числа параметров. Таким образом, в данном случае было бы необходимо создать 11 отдельных объектов UDFRegN для обработки всех возможных комбинаций параметров.

import org.apache.spark.sql.functions.udf

import java.sql.{Date, Timestamp}
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox

object UDFReg0 extends App with Configure { // как говорилось можно только функцию main использовать

	val indexcode = Integer.parseInt(args(0)) // это единственная возможность получить парметр извне и предполагается что это тндекс в списке UDFsConfig
	
	import Util.UDFHelper._
	val udfConfig = udfsConfigs(indexcode) //получаем udfConfig (из Configure)
	val functionCode = udfConfig.configToString() // преобразовываем его в код
	
	val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()

	val tree = toolbox.parse(functionCode)
	val compiledCode = toolbox.compile(tree)
	val function = compiledCode().asInstanceOf[Function0[Any]] // получаем функцию из кода

	val myUDF = udfConfig.targetType match { // выбираем instanceOF в зависимости от типа данных который должна возращать функция
		case "String" => udf(() => function().asInstanceOf[String])
		case "Int" => udf(() => function().asInstanceOf[Int])
		case "Boolean" => udf(() => function().asInstanceOf[Boolean])
		case "Byte" => udf(() => function().asInstanceOf[Byte])
		case "Short" => udf(() => function().asInstanceOf[Short])
		case "Long" => udf(() => function().asInstanceOf[Long])
		case "Float" => udf(() => function().asInstanceOf[Float])
		case "Double" => udf(() => function().asInstanceOf[Double])
		case "Date" => udf(() => function().asInstanceOf[Date])
		case "Timestamp" => udf(() => function().asInstanceOf[Timestamp])
		case _ => throw new IllegalArgumentException(f"Неизвестный тип")
	}

	spark.udf.register(udfConfig.name, myUDF) // регистраця UDF

}

И осталось написать еще 10 однотипных почти точно таких же «регестраторов»
и для этого напишем последний скрипт, чтоб не делать жто в ручную)

import java.io.{File, PrintWriter}

object GenerateRegestrators extends App {

	(0 to 10).toList.map {

		num =>

			val types = (0 to num).toList.map(_ => "Any").mkString(", ")
			val lambdaVaues = (0 until  num).toList.map(n => f"value${n}: Any").mkString(", ")
			val functionValues = (0 until  num).toList.map(n => f"value${n}").mkString(", ")

			f"""
				 |import org.apache.spark.sql.functions.udf
				 |
				 |import java.sql.{Date, Timestamp}
				 |import scala.reflect.runtime.universe
				 |import scala.tools.reflect.ToolBox
				 |
				 |object UDFReg${num} extends App with Configure {
				 |
				 |	val indexcode = Integer.parseInt(args(0))
				 |
				 |	import Util.UDFHelper._
				 |	val udfConfig = udfsConfigs(indexcode)
				 |	val functionCode = udfConfig.configToString()
				 |
				 |	val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
				 |
				 |	val tree = toolbox.parse(functionCode)
				 |	val compiledCode = toolbox.compile(tree)
				 |	val function = compiledCode().asInstanceOf[Function${num}[${types}]]
				 |
				 | 	val myUDF = udfConfig.targetType match {
				 |		case "String" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[String])
				 |		case "Int" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Int])
				 |		case "Boolean" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Boolean])
				 |		case "Byte" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Byte])
				 |		case "Short" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Short])
				 |		case "Long" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Long])
				 |		case "Float" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Float])
				 |		case "Double" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Double])
				 |		case "Date" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Date])
				 |		case "Timestamp" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Timestamp])
				 |		case _ => throw new IllegalArgumentException(f"Неизвестный тип")
				 |	}
				 |
				 |	spark.udf.register(udfConfig.name, myUDF)
				 |
				 |}
				 |""".stripMargin


	}
		.zipWithIndex
		.foreach {
			case (str, index) =>
				val file = new File(f"./src/main/scala/UDFReg${index}.scala")
				val writer = new PrintWriter(file)

				try {
					writer.write(str)
				} finally {
					writer.close()
				}
		}


}

тут пояснять нечего, оно просто генерит остальные обьекты регистраторы по аналогии
после запуска скрипта в проекте появляются недомтоющие классы UDFRegN.

собираем все вместе:

Итого остается лишишь написать последний пример где наконец динамическая компиляция UDF из строки заработает:

object Example5 extends App with Configure {

	import Util.UDFHelper._
	udfsConfigs.zipWithIndex.foreach { //индекс нужен чтоб передать в args
		case (udfConfig, index) => 
			udfConfig.fun.split("=>", 2)(0).count(_ == ':') match { 
              //тут и происходит выбор какому регестратору передать
				case 0 => UDFReg0.main(Array( index.toString ))
				case 1 => UDFReg1.main(Array( index.toString ))
				case 2 => UDFReg2.main(Array( index.toString ))
				case 3 => UDFReg3.main(Array( index.toString ))
				case 4 => UDFReg4.main(Array( index.toString ))
				case 5 => UDFReg5.main(Array( index.toString ))
				case 6 => UDFReg6.main(Array( index.toString ))
				case 7 => UDFReg7.main(Array( index.toString ))
				case 8 => UDFReg8.main(Array( index.toString ))
				case 9 => UDFReg9.main(Array( index.toString ))
				case 10 => UDFReg10.main(Array( index.toString ))
				case _ => Unit
			}

	}

	spark.sql("select blue(), reverse('namoW tobor ociP ociP') AS rev, plus(1, 2), sum3(1, 1, 1)").show
    /* вывод
	
		+------+--------------------+----------+-------------+
		|blue()|                 rev|plus(1, 2)|sum3(1, 1, 1)|
		+------+--------------------+----------+-------------+
		|00ff00|Pico Pico robot W...|         3|            3|
		+------+--------------------+----------+-------------+
	
	 */

}

и таки получилось.

P.S. код можно глянуть тут

© Habrahabr.ru