Pull to refresh

Нелинейная регрессия в Apache Spark. Разрабатываем своими руками

Reading time 11 min
Views 6.9K


При решении задач обработки сигналов часто применяют метод аппроксимации сырых данных моделью регрессии. Исходя из структуры, модели можно разделить на три типа – линейные, сводящиеся к линейным и нелинейные. В модуле машинного обучения «Spark ML» Apache Spark функционал для первых двух типов представлен классами LinearRegression и GeneralizedLinearRegression соответственно. Обучение нелинейных моделей в стандартной библиотеке не представлено и требует самостоятельной разработки.

Сначала мы кратко рассмотрим теоретические основы построения нелинейных моделей, а затем перейдём к практической разработке расширения Spark ML.

Немного математики


Обучение нелинейных моделей по сравнению с линейными задача более сложная. Это может быть связано с наличием более одного экстремума и / или «овражным» характером поверхности отклика. Главным стимулом использования нелинейных функции служит возможность получения более компактных моделей. Следует также заметить, что многие аналитические уравнения из области физики и инженерии исходно имеют нелинейный вид, поэтому использование соответствующих моделей может быть вынужденным.

Для обучения нелинейных моделей существует достаточно широкий спектр инструментов, выбор которых зависит от конкретного вида функции, наличия и вида применяемых ограничений и т. п. В статье будет использована широко представленная в литературе связка квадратичной функции ошибки и метода Ньютона-Гаусса – алгоритма первого порядка квази-ньютоновского типа. Этот алгоритм обладает достаточно хорошей сходимостью в большинстве случае.

Шаг итерации в методе Ньютона-Гаусса определяется соотношением:

d=-(J^T  J)^{-1} J^T  r, где J – матрица Якоби, r – вектор-столбец остатков r_i=y_i-f(w; x_i).

Указанная формула логически состоит из двух частей: аппроксимация матрицы Гессе H \approx (J^T  J) и аппроксимация градиента \nabla f \approx J^T  r.

Количество строк матрицы Якоби определяется количеством обучающих примеров n, количество столбцов – размером вектора весов m. Как показано в работе [1], аппроксимацию матрицы Гессе можно рассчитать, считывая по две строки матрицы матрицы Якоби и перемножая их. Полученные n^2 матриц размером m \times m остаётся только сложить. Предложенная перестановка операций не изменяет общей вычислительной сложности, но позволяет не загружать в память матрицу Якоби целиком и вести расчёт параллельно. Аналогичным образом проводится расчёт аппроксимации градиента только сложению подлежат n векторов длиной m. Обращение полученной матрицы Гессе не представляет большой сложности в виду относительно небольшого размера. Для обеспечения сходимости алгоритма необходимо следить за положительной определённостью рассчитанной матрицы (J^T  J), что реализуется расчётом собственных чисел и векторов.

В статьях [2, 3] была предложена общая схема применения описанного выше подхода для Apache Spark. Единственным, на мой взгляд, недочётом этих работ является отсутствие чёткой связи с существующим Spark ML API. Мы попытаемся восполнить этот пробел в следующем разделе.

Реализация Spark ML API


Для успешной реализации нелинейных моделей нам потребуется разобраться в структуре и назначении базовых классов. API машинного обучения в системе Spark имеет две версии: 1.x расположен внутри пакета mllib, 2.x в пакете ml. В документации к модулю Spark ML [4] указано, что переход с API версии 1.x на версию 2.x имеет целью обеспечения возможности встраивания в цепочки (Pipelines) и работы с типизированной структурой DataFrame. В нашем примере мы будем использовать более новую структуру классов, но при необходимости она довольно просто может быть реализована и под старую структуру.

Важные классы Spark API


  • класс org.apache.spark.ml.feature.Instance описывает экземпляр обучающего примера, включающий вещественную метку, весовой коэффициент примера и вектор значений признаков;
  • org.apache.spark.ml.regression.{Regressor, RegressionModel} – ключевые классы, которые требуется расширять. Первый представляет собой построитель (builder) модели, а второй уже обученную модель.
  • При помощи введённого нами трейта org.apache.spark.ml.regression.NonLinearFunction определяется контракт нелинейной функции, для которой ставится цель подобрать вектор весов. В контракте всего 3 метода: eval возвращает значение функции в точке, grad — вектор градиента, dim — размерность модели (длину вектора весов).
  • Библиотека линейной алгебры Breeze [5] берет на себя операции с матрицами и имеет готовые реализации функций оптимизации. Чтобы использовать алгоритм Ньютона-Гаусса или другой алгоритм первого порядка, потребуется реализовать в нашей функции потерь трейт breeze.optimize.DiffFunction. Метод calculate по переданному вектору коэффициентов должен возвращать кортеж из двух величин: значения штрафной функции и вектора её градиента в точке.

Расширение класса Regressor


Класс org.apache.spark.ml.regression.NonLinearRegression расширяет контракт Regressor и в результате обучения возвращает экземпляр NonLinearRegressionModel. Для создания нелинейной модели требуется задать уникальный строковый идентификатор и функцию ядра модели NonLinearFunction. Из опциональных параметров можно перечислить: максимальное количество итераций обучения, начальное приближение вектора коэффициентов и требуемая точность. Нелинейные функции часто имеют множество экстремумов и выбор начального приближения, исходя из априорных представлений о поведении конкретной ядерной функции, позволяет направить поиск именно в область глобального экстремума. Стоит отметить использование готовой реализации алгоритма Бройдена-Флетчера-Гольдфарба-Шанно с ограниченным расходом памяти (LBFGS) из библиотеки Breeze. Код обучения модели приведен ниже.

Код org.apache.spark.ml.regression.NonLinearRegression#train
override protected def train(dataset: Dataset[_]): NonLinearRegressionModel = {
  // set instance weight to 1 if not defined the column
  val instanceWeightCol: Column = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
  val instances: RDD[Instance] = dataset.select(
    col($(labelCol)).cast(DoubleType), instanceWeightCol, col($(featuresCol))).rdd.map {
    case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features)
  }

  // persists dataset if defined any storage level
  val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
  if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

  val costFunc = new SquaresLossFunctionRdd(kernel, instances)
  val optimizer = new LBFGS[BDV[Double]]($(maxIter), 10, $(tol))

  // checks and assigns the initial coefficients
  val initial = {
    if (!isDefined(initCoeffs) || $(initCoeffs).length != kernel.dim) {
      if ($(initCoeffs).length != kernel.dim)
        logWarning(s"Provided initial coefficients vector (${$(initCoeffs)}) not corresponds with model dimensionality equals to ${kernel.dim}")
      BDV.zeros[Double](kernel.dim)
    } else
      BDV($(initCoeffs).clone())
  }

  val states = optimizer.iterations(new CachedDiffFunction[BDV[Double]](costFunc), initial)
  val (coefficients, objectiveHistory) = {
    val builder = mutable.ArrayBuilder.make[Double]
    var state: optimizer.State = null
    while (states.hasNext) {
      state = states.next
      builder += state.adjustedValue
    }

    // checks is method failed
    if (state == null) {
      val msg = s"${optimizer.getClass.getName} failed."
      logError(msg)
      throw new SparkException(msg)
    }

    (Vectors.dense(state.x.toArray.clone()).compressed, builder.result)
  }
  // unpersists the instances RDD
  if (handlePersistence) instances.unpersist()

  // produces the model and saves training summary
  val model = copyValues(new NonLinearRegressionModel(uid, kernel, coefficients))
  val (summaryModel: NonLinearRegressionModel, predictionColName: String) = model.findSummaryModelAndPredictionCol()
  val trainingSummary = new NonLinearRegressionSummary(
    summaryModel.transform(dataset), predictionColName, $(labelCol), $(featuresCol), objectiveHistory, model
  )
  model.setSummary(trainingSummary)
}


Приведенный код метода train можно разделить на три части: получение обучающих примеров из набора данных; инициирование штрафной функции и поиск оптимального решения; сохранение вектора коэффициентов и результатов обучения в экземпляре модели.

Расширение класса RegressionModel


Реализация класса org.apache.spark.ml.regression.NonLinearRegressionModel довольно тривиальна. Метод predict использует функцию ядра для получения значения в точке:

override protected def predict(features: Vector): Double = {
  kernel.eval(coefficients.asBreeze.toDenseVector, features.asBreeze.toDenseVector)
}

Единственное требование Spark ML API, на которое стоит обратить внимание – требование к сериализуемости модели. Нужное поведение обеспечивается в объекте-компаньоне за счет расширения абстрактных классов org.apache.spark.ml.util.{MLReader, MLWriter} [6]. Состояние обученной модели состоит из двух частей: вектор коэффициентов и функция ядра. Если с вектором коэффициентов всё уже придумано до нас, то с функцией ядра несколько сложнее. Напрямую функцию ядра сохранить в DataFrame не получится, но есть несколько альтернативных вариантов.

Для простоты был выбран вариант бинарной сериализации функции в строку Base64. К недостаткам можно отнести недоступность прочтения человеком полученной строки, а также необходимость поддержки версионирования реализаций.

Гораздо более перспективным представляется подход к сохранению функции в символьном виде. Это может быть сделано по образу объектов класса formula пакета stats в языке R, например, log(y) ~ a + log(x). Этот метод сложнее первого, но решает ряд проблем: человеко-читаемое представление функций и возможность десериализации разными версиями парсеров при сохранении обратной совместимости. Значительной сложностью здесь представляется разработка достаточно гибкого парсера символьных выражений функций.

Возможно, полезным улучшением станет возможность выбора шага для численного дифференцирования функции ядра. На сложность сохранения модели это существенно не влияет.

Реализация квадратичной функции потерь


Последним элементом, необходимым для обучения, является функция потерь. В нашем примере используется квадратичная функция потерь в виде двух реализаций. В одной обучающие примеры задаются в виде матрицы Breeze, в другом – в виде Spark-структуры RDD[Instance]. Первая реализация проста для понимания (непосредственно использует матричные выражения) и подходит для небольших обучающих наборов. Она служит для нас тестовым эталоном.

Код org.apache.spark.ml.regression.SquaresLossFunctionBreeze
package org.apache.spark.ml.regression

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}

/**
  * Breeze implementation for the squares loss function.
  *
  * @param fitmodel concrete model implementation
  * @param xydata   labeled data combined into the matrix:
  *                 the first n-th columns consist of feature values, (n+1)-th columns contains labels
  */
class SquaresLossFunctionBreeze(val fitmodel: NonLinearFunction, xydata: BDM[Double])
  extends SquaresLossFunction {

  /**
    * The number of instances.
    */
  val instanceCount: Int = xydata.rows
  /**
    * The number of features.
    */
  val featureCount: Int = xydata.cols - 1
  /**
    * Feature matrix.
    */
  val X: BDM[Double] = xydata(::, 0 until featureCount)
  /**
    * Label vector.
    */
  val y: BDV[Double] = xydata(::, featureCount)

  /**
    * The model dimensionality (the number of weights).
    *
    * @return dimensionality
    */
  override def dim: Int = fitmodel.dim

  /**
    * Evaluates loss function value and the gradient vector
    *
    * @param weights weights
    * @return (loss function value, gradient vector)
    */
  override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
    val r: BDV[Double] = diff(weights)
    (0.5 * (r.t * r), gradient(weights))
  }

  /**
    * Calculates a positive definite approximation of the Hessian matrix.
    *
    * @param weights weights
    * @return Hessian matrix approximation
    */
  override def hessian(weights: BDV[Double]): BDM[Double] = {
    val J: BDM[Double] = jacobian(weights)
    posDef(J.t * J)
  }

  /**
    * Calculates the Jacobian matrix
    *
    * @param weights weights
    * @return the Jacobian
    */
  def jacobian(weights: BDV[Double]): BDM[Double] = {
    val gradData = (0 until instanceCount) map { i => fitmodel.grad(weights, X(i, ::).t).toArray }
    BDM(gradData: _*)
  }

  /**
    * Calculates the difference vector between the label and the approximated values.
    *
    * @param weights weights
    * @return difference vector
    */
  def diff(weights: BDV[Double]): BDV[Double] = {
    val diff = (0 until instanceCount) map (i => fitmodel.eval(weights, X(i, ::).t) - y(i))
    BDV(diff.toArray)
  }

  /**
    * Calculates the gradient vector
    *
    * @param weights weights
    * @return gradient vector
    */
  def gradient(weights: BDV[Double]): BDV[Double] = {
    val J: BDM[Double] = jacobian(weights)
    val r = diff(weights)
    2.0 * J.t * r
  }
}


Второй вариант предназначен для выполнения в распределённой среде. Для расчёта используется функция RDD.treeAggregate, позволяющая реализовать алгоритм в стиле «Map-Reduce».

Код org.apache.spark.ml.regression.SquaresLossFunctionRdd
package org.apache.spark.ml.regression

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.Instance
import org.apache.spark.rdd.RDD

/**
  * Spark RDD implementation for the squares loss function.
  *
  * @param fitmodel concrete model implementation
  * @param xydata   RDD with instances
  */
class SquaresLossFunctionRdd(val fitmodel: NonLinearFunction, val xydata: RDD[Instance])
  extends SquaresLossFunction {

  /**
    * The model dimensionality (the number of weights).
    *
    * @return dimensionality
    */
  override def dim: Int = fitmodel.dim

  /**
    * Evaluates loss function value and the gradient vector
    *
    * @param weights weights
    * @return (loss function value, gradient vector)
    */
  override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
    val bcW: Broadcast[BDV[Double]] = xydata.context.broadcast(weights)

    val (f: Double, grad: BDV[Double]) = xydata.treeAggregate((0.0, BDV.zeros[Double](dim)))(
      seqOp = (comb, item) => (comb, item) match {
        case ((loss, oldGrad), Instance(label, _, features)) =>
          val featuresBdv = features.asBreeze.toDenseVector
          val w: BDV[Double] = bcW.value
          val prediction = fitmodel.eval(w, featuresBdv)
          val addedLoss: Double = 0.5 * math.pow(label - prediction, 2)
          val addedGrad: BDV[Double] = 2.0 * (prediction - label) * fitmodel.grad(w, featuresBdv)
          (loss + addedLoss, oldGrad + addedGrad)
      },
      combOp = (comb1, comb2) => (comb1, comb2) match {
        case ((loss1, grad1: BDV[Double]), (loss2, grad2: BDV[Double])) => (loss1 + loss2, grad1 + grad2)
      })

    (f, grad)
  }

  /**
    * Calculates a positive definite approximation of the Hessian matrix.
    *
    * @param weights weights
    * @return Hessian matrix approximation
    */
  override def hessian(weights: BDV[Double]): BDM[Double] = {
    val bcW = xydata.context.broadcast(weights)

    val (hessian: BDM[Double]) = xydata.treeAggregate(new BDM[Double](dim, dim, Array.ofDim[Double](dim * dim)))(
      seqOp = (comb, item) => (comb, item) match {
        case ((oldHessian), Instance(_, _, features)) =>
          val grad = fitmodel.grad(bcW.value, features.asBreeze.toDenseVector)
          val subHessian: BDM[Double] = grad * grad.t
          oldHessian + subHessian
      },
      combOp = (comb1, comb2) => (comb1, comb2) match {
        case (hessian1, hessian2) => hessian1 + hessian2
      }
    )

    posDef(hessian)
  }
}


Сборка проекта


Для упрощения разработки и тестирования из оригинального проекта Spark ML мы позаимствовали pom.xml с небольшой модификацией. Мы фиксируем версию Spark до одной из вышедших в релиз, в нашем случае 2.0.1. Следует обратить внимание на наследование нашего POM-файла от org.apache.spark:spark-parent_2.11:2.0.1, что позволяет нам не верстать заново конфигурацию плагинов Maven.

Для прогона тестов, требующих SparkContext, в тестовые зависимости мы вносим org.apache.spark:spark-mllib_2.11:2.0.1:test-jar: трейты org.apache.spark.mllib.util.MLlibTestSparkContext, org.apache.spark.ml.util.TempDirectory внедряем в соответствующие тестовые классы. Также полезными для тестирования могут оказаться расширения классов Suite из пакета org.apache.spark, помогающие работать с контекстом, например, SparkFunSuite.

На правах заключения


Есть несколько моментов, которые в данной статье не охвачены, но их изучение представляется чрезвычайно интересным:

  • использование взвешенной обучающей выборки;
  • применение ограничений на область оптимизации как мягких (регуляризация), так и жестких (граничные условия);
  • оценка статистических показателей модели (доверительные интервалы коэффициентов, значимость и др.).

На данный момент по обозначенным выше аспектам у меня недостаточно информации, но я буду благодарен всем поделившимся источниками.

Полный код можно посмотреть на Github.

Полное и всестороннее тестирование этого решения ещё предстоит провести, поэтому прошу относиться к материалу как к концепту и теме для обсуждения улучшений.

Для замечаний и предложений предпочтительнее использовать личные сообщения, для обсуждений лучше подходят комментарии.

Спасибо всем за внимание.

Использованные материалы


  1. ieeexplore.ieee.org/document/5451114
  2. www.nodalpoint.com/nonlinear-regression-using-spark-part-1-nonlinear-models
  3. www.nodalpoint.com/non-linear-regression-using-spark-part2-sum-of-squares
  4. spark.apache.org/docs/latest/ml-guide.html
  5. github.com/scalanlp/breeze
  6. jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-pipelines-persistence.html
Tags:
Hubs:
+16
Comments 2
Comments Comments 2

Articles