フジーコの日記

自分が試してみたプログラミング関連のブログです

Scalaの行列計算ライブラリBreezeによるMatrix Factorizationの実装

Matrix FactorizationをScalaの行列計算ライブラリであるBreezeで実装してみました。

Matrix Factorizationとは、推薦アルゴリズムの手法の一つである協調フィルタリングで有名なモデルです。 ユーザーの推薦対象物(以降アイテムと呼ぶ)への評価のデータを元にユーザーとアイテムの潜在ベクトルを学習するモデルです。 Matrix Factorizationの説明は以下のリンクに詳しく書かれています。 qiita.com

本記事ではこのMatrix FactorizationをScalaで実装して、Pythonによる実装と処理速度を比較してみました。

Python機械学習においてよく使われる言語です。 Pythonは行列計算を行うライブラリが多く有り、そのライブラリの中身はcで書かれているため、動的型付け言語でありながら、ライブラリを上手く使うことによって、簡単に行列計算を高速に行うことができます。

一方でpythonは動的型付け言語のため、以下のようなデメリットが生じます。
1. 型エラーなどの事前条件エラーを防げない。
2. 変数や関数の引数、返り値の型がわからない。

また、PythonにはGIL(グローバルインタプリタロック)の仕組みがあるため、容易にマルチスレッドで並列に処理を行うことができません。

一方で、Scalaは静的型付け言語であるため、上で述べたPythonのデメリットに対して、以下の点で解決できます。
1. コンパイラによる事前条件エラーのチェック
2. 型によるコードの表現
3. 標準で搭載されている並列コレクションによる容易な並列処理の実現

前置きはこれくらいにして、早速比較をしていきます。
本記事ではMatrix Factorizationはユーザーベクトル、アイテムベクトルによるシンプルな回帰モデルであり、目的関数には正規化を用いたものを実装します。

回帰モデル   { \displaystyle
\begin{equation}
\hat{r}_{u,i} = \vec{p_u}^T\vec{q_i}
\end{equation}
}

目的関数 { \displaystyle
\begin{equation}
 {min\_{P, Q} \sum_{(u,v) \in R} (r\_{u,i}-\vec{p_u}^T\vec{q_i})^2 + \frac{\lambda}{2} (\parallel \vec{p_u} \parallel _{F}^2 + \parallel \vec{q_i} \parallel _{F}^2) }
\end{equation}
}

まずはPythonによる実装ですが、以下のリンクを参考に実装を行いました。

上のリンクの実装に少し工夫を加えて、ループ処理を行う学習部分を以下のようにCythonで実装しました。 全コードは以下のgithubを参考にしてください。

github.com

cdef class FastMF(object):
  
   ...

   """
   CythonによるMatrix Factorizationの学習
   主にforによるループ処理の高速化を目的とする
   """
   def learning(self):

        cdef:
            double err = 0.0
            int step
            long user_index
            long item_index
            double start
            int k
            double all_error = 0.0
            np.ndarray[DOUBLE_t, ndim=1, mode = 'c'] user_matrix
            double rating

        for step in xrange(self.steps):
            for user_index, user_matrix in enumerate(self.R):
                for item_index, rating in enumerate(user_matrix):
                    if not rating:
                        continue
                    pre_u = np.transpose(self.P)[user_index]
                    err = self._get_rating_error(user_index, item_index)
                    np.transpose(self.P)[user_index] += self.gamma * (2 * err * np.transpose(self.Q)[item_index] - self.beta * np.transpose(self.P)[user_index]) # ユーザーベクトルの更新
                    np.transpose(self.Q)[item_index] += self.gamma * (2 * err * pre_u - self.beta * np.transpose(self.Q)[item_index])  # アイテムベクトルの更新
           
            all_error = self._get_error()
            print all_error
            if all_error < self.threshold:
                self.nR = np.dot(np.transpose(self.P), self.Q) # 得られた評価値行列
                return
        
        self.nR = np.dot(np.transpose(self.P), self.Q)

次にScalaによる実装を行いました。
この実装では、BreezeというScalaの行列計算ライブラリを用いました。
BreezeはPythonのNumpyのように配列の初期化、行列同士の足し算、内積などの行列計算を容易に行えるライブラリです。

全コードは以下のgithubを参考にしてください。 github.com

Breezeを使うことによって、行列計算も簡単にできました。さらに、Scalaの標準で用意されているコレクションメソッドであるparを用いることによって簡単に並列に学習を行うようにできました。 また、Scalaのcase classを用いることによって、Pythonよりも型による表現力が上がりました(あくまで個人的な意見ですが、、)

/**
  * Matrix Factorizationモデル 
  */
class MatrixFactorization(useIdMap: UserIdMap, itemIdMap: ItemIdMap, K: Int) {

  // ユーザー重み行列
  val userW: MFW = ...
  // アイテム重み行列
  val itemW: MFW = ...
  ...

   /**
    * 学習
    */
  def fitIterator(mfd: MFD, epochs: Int = 30, eta: Double = 0.005, lambda: Double = 0.02, threshold: Double = 0.1): Unit = {
    // parによる並列で処理を行う
    (1 to epochs).par.foreach { i =>
      fit(mfd.iterator, eta, lambda)
      val allError = getAllError(mfd, lambda)
      println(allError)
    }
  }

  @annotation.tailrec
  private def fit(mfdIter: MFDIter, eta: Double, lambda: Double): Unit = {
    if(mfdIter.hasNext) {
      val (userId, itemId, rate) = mfdIter.next
      if(rate != 0) {
        val error = rate - predict(userId, itemId)
        for(k <- 0 until K) {
          val prevU = userW.value(userId, k)
          userW.value(userId, k) += eta * (2 * error * itemW.value(k, itemId) - lambda * userW.value(userId, k))
          itemW.value(k, itemId) += eta * (2 * error * prevU - lambda * itemW.value(k, itemId))
        }
      }
    fit(mfdIter, eta, lambda)
    }
  }
}

// 特徴ベクトルのケースクラス
case class MFD(value: DenseMatrix[Double]) extends CFD[Double] {
  def iterator = MFDIter(value.iterator)
}

// DenseMatrix[Double].iteratorをラップしたケースクラス
case class MFDIter(value: Iterator[((Int, Int), Double)]) extends CFDIterator[Double] {
  def next = {
    val data = value.next
    val userId= data._1._1
    val itemId = data._1._2
    val rate = data._2
    (userId, itemId, rate)
  }
}

次に、Pythonで実装されたプログラムとScalaで実装されたプログラムを実行して、評価を行ってみます。

今回はwebで公開されているMovieLens 100k Datasetのデータを使います。このデータは、各ユーザーが各映画に対して1~5の5段階で評価値を付けたデータです。このうちの80000個の評価データを含むu1.baseを学習に用い、20000個の評価データを含むu1.testをテスト用のデータに用います。評価指標はRMSE(2乗平均平方根誤差)を使いました。 以下にエポック数を100にして行った場合の、学習時間とrmseを示します。実行環境は、CPU: 2.6 GHz Intel Core i5のメモリ: 8GBのMacBookで実行しました。

Pythonによる実装よりも、Scalaによる実装の方が、学習時間が7倍近く早く、精度も遜色ない結果になりました。

学習時間(s) RMSE
Python 216.453 1.177
Scala 31.996 1.045

以上のように、Scalaを用いてMatrix Factorizationを実装した場合、Pythonに比べても行列計算の実装が難しくなく、簡単に並列処理による高速化ができました