spark数据挖掘 - 基于 Audioscrobbler 数据集音乐推荐实战

原创
2015/10/30 10:10
阅读数 5K

基于 Audioscrobbler 数据集音乐推荐实战

1. 数据集

这个例子将使用 Audioscrobbler 公开的数据集。Audioscrobbler是http://www.last.fm/zh/第一个音乐推荐系统.
http://www.last.fm/zh/ 是第一个网络流媒体音频网站,成立与2002年。
Audioscrobbler 为 “scrobbling” 提供了一个开发的 API,主要记录听众听取了哪些作家的歌曲。 这个网站利用这些信息建立了一个强大的额音乐推荐系统。这个系统达到了数百万用户,因为第三方的App和网站可以提供收听数据给推荐引擎。 在那个时期,研究推荐系统大部分局限在学习类似评级的数据集。也就是说,推荐的人往往使用需要输入像 “某某某 评分3.5分” 这样的工具。 然而,Audioscrobbler 数据集有趣地方在于仅仅记录播放的历史:“某某某 播放了 什么”。一个播放记录带来的信息量远远小于一个评分数据带来的信息量,但是评分数据总量肯定没有播放历史记录的数据多,当大量播放历史记录放在一起的时候,比评分数据将更有价值。 由这个网站公布的一个2005年的数据集合可以在http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html上面下载。这个数据集合没有解压之前大小是 135MB,解压之后是 500MB,解压之后将会看到主要的数据集是 user_artist_data.txt 文件,里面大约包含 141000 唯一的用户和 1.6 百万唯一的artist艺术家,大约 24.2 百万用户播放记录。 当然每个 artist 都是用 id 记录的,id 与 名字的对照关系是在 artist_data.txt 中。注意同一个 artist 可能对应很多不同名字即有不同的 id。 所以这里还有一个文件 artist_alias.txt 标识每个 artist 的别名。里面用一个唯一的id 标识所有同一个artist id 列表。

2. 推荐算法

我们需要选择一种适合于这种隐士反馈数据的算法。这个数据集全部是用户与作家的歌曲之间的交互行为数据。它没有包含用户和作家本身除了名字之外的其他属性信息。 这个就是典型的协同过滤算法。举个例子:比如决定两个用户具有相同的品味的原因是他们有相同的年龄,这个不是协同过滤。决定两个用户都喜欢同一首歌曲的原因是他们之间有很多共同喜欢的歌曲,这才是协同过滤。
这个数据集很大,因为他包含1千万多条用户播放记录。但是从另一个方面来说,它又很小数据量不够,因为它很稀疏。平均起来,每个用户才放过171个艺术家的歌曲,而总共的艺术家有 1.6 百万个。有些用户甚至只是听歌一个作曲家的歌曲,我们需要一种算法,能够对这种用户也给出合理的推荐。毕竟,每个人刚开始在系统中开始产生记录的那一刻都只听过一个作家。这个也说明算法对新用户准确度低,这种情况当用户交互行为变多的时候会慢慢变好。 当然,我们需要我们的算法有能力扩展,处理大数据,并且很快。
接下来我们例子里面展示的算法是广泛分类算法模型中的一个叫做隐因素模型。模型尝试通过观察不到的潜在的原因去解释这些观察到的大量的用户产品交互行为。 更具体的来说,这个例子将使用一个矩阵分解模型。下面介绍交叉最小方差模型。

交叉最小方差

算法名字 交叉最小方差, Alternating Least Squares, ALS
算法描述 Spark上的交替性最小二乘ALS本质是一种协同过滤的算法
算法原理 1. 首先将用户推荐对象交互历史转换为矩阵,行表示用户,列表示推荐对象,矩阵对应 i,j 表示用户 i 在对象 j 上有没有行为 <br /> 2. 协同过滤就是要像填数独一样,填满1得到的矩阵,采用的方法是矩阵分解算法原理图 算法原理图 <br /> 3. 原始矩阵 A 是一个很大的稀疏矩阵,然后利用 ALS 分解成近似两个矩阵 B 和 C 的乘,另外两个矩阵就比较密集,而且 B 矩阵的列可以解释为一个事物的几个方面。<br /> 4. 用户 k 对对象 h 的喜好程度就可以通过矩阵 B 的 k 行乘 矩阵 C 的 h 列得到
使用场景 当用户和推荐的对象本身属性数据没有,只存在用户和推荐对象历史交互数据的时候,当提炼出用户推荐对象的关系矩阵可以发现是一个大型的稀疏矩阵
算法优缺点 优点: 1. 此算法可伸缩 2. 速度很快 3. 适合大数据 4.新异兴趣发现、不需要领域知识 5. 随着时间推移性能提高 6. 推荐个性化、自动化程度高 7. 能处理复杂的非结构化对象 <br /> 缺点: 1. 稀疏问题 2. 可扩展性问题 3. 新用户问题 4. 质量取决于历史数据集 5. 系统开始时推荐质量差
参考资料 1. 算法原理 Large-scale Parallel Collaborative Filtering for the Netflix Prize <br /> 2. MLlib实现 MLlib - Collaborative Filtering

3. 数据准备

首先将样例数据上传到HDFS,如果想要在本地测试这些功能的话,需要内存数量至少 6g, 当然可以通过减少数据量来达到通用的测试。 下面给出完整的代码,注释已经说明每段代码的含义:

package clebeg.spark.action

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

/**
 * Spark 数据挖掘实战 案例音乐推荐
 * 注意:ALS 限制每一个用户产品对必须有一个 ID,而且这个 ID 必须小于 Integer.MAX_VALUE
 * Created by clebeg.xie on 2015/10/29.
 */
object MusicRecommend {
  val rootDir = "F:\\clebeg\\spark\\datas\\profiledata_06-May-2005\\";
  //本地测试
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val rawUserArtistData = sc.textFile(rootDir + "user_artist_data.txt")
    //检查数据集是否超过最大值,对于计算密集型算法,原始数据集最好多分块
    //println(rawUserArtistData.first())
    //println(rawUserArtistData.map(_.split(' ')(0).toDouble).stats())
    //println(rawUserArtistData.map(_.split(' ')(1).toDouble).stats())
    //艺术家ID和名字对应
    val artistById = artistByIdFunc(sc)
    //艺术家名字重复
    val aliasArtist = artistsAlias(sc)

    aslModelTest(sc, aliasArtist, rawUserArtistData, artistById)
    //查看一下 2093760 这个用户真正听的歌曲
    val existingProducts = rawUserArtistData.map(_.split(' ')).filter {
      case Array(userId, _, _) => userId.toInt == 2093760
    }.map{
      case Array(_, artistId, _) => {
        aliasArtist.getOrElse(artistId.toInt, artistId.toInt)
      }
    }.collect().toSet

    artistById.filter {
      line => line match {
        case Some((id, name)) => existingProducts.contains(id)
        case None => false
      }

    }.collect().foreach(println)

  }

  /**
   * 获取艺术家名字和ID的对应关系
   * 有些艺术家名字和ID没有按 \t 分割,错误处理就是放弃这些数据
   * @param sc
   * @return
   */
  def artistByIdFunc(sc: SparkContext): RDD[Option[(Int, String)]] = {
    val rawArtistData = sc.textFile(rootDir + "artist_data.txt")
    val artistByID = rawArtistData.map {
      line =>
        //span 碰到第一个不满足条件的开始划分, 少量的行转换不成功, 数据质量问题
        val (id, name) = line.span(_ != '\t')
        if (name.isEmpty) {
          None
        } else {
          try {
            //特别注意Some None缺失值处理的方式,Scala 中非常给力的一种方法
            Some((id.toInt, name.trim))
          } catch {
            case e: NumberFormatException =>
              None
          }
        }
    }
    artistByID
  }

  /**
   * 通过文件 artist_alias.txt 得到所有艺术家的别名
   * 文件不大,每一行按照 \t 分割包含一个拼错的名字ID 还有一个正确的名字ID
   * 一些行没有第一个拼错的名字ID,直接跳过
   * @param sc Spark上下文
   * @return
   */
  def artistsAlias(sc: SparkContext) = {
    val rawArtistAlias = sc.textFile(rootDir + "artist_alias.txt")
    val artistAlias = rawArtistAlias.flatMap { line =>
      val tokens = line.split('\t')
      if (tokens(0).isEmpty) {
        None
      } else {
        Some((tokens(0).toInt, tokens(1).toInt))
      }
    }.collectAsMap()
    artistAlias
  }

  def aslModelTest(sc: SparkContext,
                   aliasArtist: scala.collection.Map[Int, Int],
                   rawUserArtistData: RDD[String],
                   artistById: RDD[Option[(Int, String)]] ) = {
    //将对应关系广播出去,因为这个数据量不大,Spark广播变量类似于 hive 的 mapjoin
    val bArtistAlias = sc.broadcast(aliasArtist)
    //转换重复的艺术家的ID为同一个ID,然后将
    val trainData = rawUserArtistData.map{
      line =>
        val Array(userId, artistId, count) = line.split(' ').map(_.toInt)
        val finalArtistID = bArtistAlias.value.getOrElse(artistId, artistId)
        Rating(userId, finalArtistID, count)
    }.cache()
    //模型训练
    val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

    //模型建立之后,为某个用户给出一个具体的推荐列表
    val recommendations = model.recommendProducts(2093760, 5) //为ID为2093760的用户推荐5个产品
    recommendations.foreach(println)
    val recommendedProductIDs = recommendations.map(_.product).toSet
    //输出推荐的艺术家的名字
    artistById.filter {
      line => line match {
        case Some((id, name)) => recommendedProductIDs.contains(id)
        case None => false
      }
    }.collect().foreach(println)
  }
}

代码中模型训练好之后,预测了用户 2093760 的推荐结果,我测试结果如下,由于里面代码使用了随机生成初始矩阵,每个人的结果都有可能不一样。

Some((2814,50 Cent))
Some((829,Nas))
Some((1003249,Ludacris))
Some((1001819,2Pac))
Some((1300642,The Game))

代码中也给出了该用户以前听过的艺术家的名字如下:

Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))

模型评价

auc评价方法

  def areaUnderCurve(
      positiveData: RDD[Rating],
      bAllItemIDs: Broadcast[Array[Int]],
      predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = {
    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive", and map to tuples
    val positiveUserProducts = positiveData.map(r => (r.user, r.product))
    // Make predictions for each of them, including a numeric score, and gather by user
    val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user)

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other items, excluding those that are "positive" for the user.
    val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions {
      // mapPartitions operates on many (user,positive-items) pairs at once
      userIDAndPosItemIDs => {
        // Init an RNG and the item IDs set once for partition
        val random = new Random()
        val allItemIDs = bAllItemIDs.value
        userIDAndPosItemIDs.map { case (userID, posItemIDs) =>
          val posItemIDSet = posItemIDs.toSet
          val negative = new ArrayBuffer[Int]()
          var i = 0
          // Keep about as many negative examples per user as positive.
          // Duplicates are OK
          while (i < allItemIDs.size && negative.size < posItemIDSet.size) {
            val itemID = allItemIDs(random.nextInt(allItemIDs.size))
            if (!posItemIDSet.contains(itemID)) {
              negative += itemID
            }
            i += 1
          }
          // Result is a collection of (user,negative-item) tuples
          negative.map(itemID => (userID, itemID))
        }
      }
    }.flatMap(t => t)
    // flatMap breaks the collections above down into one big set of tuples

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user)

    // Join positive and negative by user
    positivePredictions.join(negativePredictions).values.map {
      case (positiveRatings, negativeRatings) =>
        // AUC may be viewed as the probability that a random positive item scores
        // higher than a random negative one. Here the proportion of all positive-negative
        // pairs that are correctly ranked is computed. The result is equal to the AUC metric.
        var correct = 0L
        var total = 0L
        // For each pairing,
        for (positive <- positiveRatings;
             negative <- negativeRatings) {
          // Count the correctly-ranked pairs
          if (positive.rating > negative.rating) {
            correct += 1
          }
          total += 1
        }
        // Return AUC: fraction of pairs ranked correctly
        correct.toDouble / total
    }.mean() // Return mean AUC over users
  }

参数选择

可以通过调整参数看 auc 的结果来反复选择

参考资料

Advanced Analytics with Spark
二分类器评价

展开阅读全文
打赏
0
51 收藏
分享
加载中
clebeg博主

引用来自“笔阁”的评论

厉害。

引用来自“clebeg”的评论

过奖

引用来自“笔阁”的评论

有空来看看我和小伙伴的spark内容,http://www.hubwiz.com/course/5449c691e564e50960f1b7a9/,有兴趣的话分享分享你的经验啊。
非常好,多多交流
2015/11/02 18:45
回复
举报

引用来自“笔阁”的评论

厉害。

引用来自“clebeg”的评论

过奖
有空来看看我和小伙伴的spark内容,http://www.hubwiz.com/course/5449c691e564e50960f1b7a9/,有兴趣的话分享分享你的经验啊。
2015/11/02 18:16
回复
举报
clebeg博主

引用来自“JackFace”的评论

强悍!
过奖,欢迎多多交流
2015/11/02 15:41
回复
举报
强悍!
2015/11/02 10:44
回复
举报
强悍!
2015/11/02 10:44
回复
举报
clebeg博主

引用来自“笔阁”的评论

厉害。
过奖
2015/10/30 13:24
回复
举报
厉害。
2015/10/30 13:08
回复
举报
更多评论
打赏
7 评论
51 收藏
0
分享
返回顶部
顶部