Implementation of Factorization Machines on Spark using parallel stochastic gradient descent (python and scala)
Implementation of Factorization Machines on Spark using parallel stochastic gradient descent (python and scala)
Factorization Machines is a smart general predictor introduced by Rendle in 2010, which can capture all single and pairwise interactions in a dataset. It can be applied to any real valued feature vector and also performs well on highly sparse data. An extension on FMs, namely Field Factorization Machines, proved to be a successful method in predicting advertisement clicks in the Display Advertising Challenge on Kaggle.
I built a custom Spark implementation to use in Python and Scala. To make optimum use of parallel computing in Spark, I implemented Parallel Stochastic Gradient Descent to train the FMs. This forms an alternative to Mini-batch SGD, which is currently available in MLLib to train Logistic Regression models.
This implementation shows impressive results in terms of speed and effectivness.
I worked on this project during my summer internship at ING Netherlands in 2015. ING has strong teams of data scientists and I thank them for their help during this project. I could also use a powerful cluster to test my code and train my models.
Here's a short tutorial on how to use them in pyspark. (Note: the procedure is quite the same in scala, see below) You may prefer try it directly using the ipython notebook tutorial FMonSparkdemoa9a.ipynb. You will need to download the a9a dataset first.
sc.addPyFile("spark-FM-parallelSGD/fm/fmparallelsgd.py")
import fmparallelsgd as fm
or by running the codes directly when starting spark:
pyspark –py-files spark-FM-parallelSGD/fm/fmparallelsgd.py
a) Divide it into test and train
b) The data is an RDD with labeled points - Labels should be -1 or 1. If your data has 0/1 labels, transform them with the function fm.transform_data(data) - Features should be either SparseVector or DenseVector from mllib.linalg library.
Call the function fm.trainFMparallelsgd(sc, train, params...). There are the following parameters that you can specify:
This returns a weight matrix w. if you want to store this for future use, you can use the function fm.saveModel(w, "path/to/store/model")
To evaluate the perfomance of the model on the test set, call fm.evaluate(train, w). This returns the area under the Precision/recall curve, the AUC of ROC, the average logloss, the MSE and the accuracy.
To calculate the probabilities according to the model for a test set, call fm.predictFM(data, w). This returns an RDD with probability scores.
To load a model that you saved, you can use the function fm.loadModel("path/to/store/model”)
Load the file fmparallelsgd.scala. You can do this by adding the following lines to your code:
:load spark-FM-parallelSGD/fm/fmparallelsgd.scala
or by running the code directly when starting spark
spark-shell –i spark-FM-parallelSGD/fm/fmparallelsgd.scala
Preprocess your data such that you
a) Divide it into test and train
b) The data is an RDD with labeled points - Labels should be -1 or 1. - Features should be Vector from mllib.linalg.
Call the function fm.trainFMparallelsgd(train, params...). There are the following parameters that you can specify:
This returns a weight matrix w.
To evaluate the perfomance of the model on the test set, call fm.evaluate(train, w). This returns the average logloss.
To calculate the probabilities according to the model for a test set, call fm.predictFM(data, w). This returns an RDD with probability scores.