本文隶属于分类

互联网

推荐文章

广告推荐

技术交流学习或者有任何问题欢迎加群

编程技术交流群 : 154514123 爱上编程      Java技术交流群 : 6128790  Java

协同过滤itembase增量计算Spark实现

直接调用spark RDD实现协同过滤算法

Controller

1.      数据统计

user counts:=========>8239237

itemCode count:=====>7421567

 

spark result distinct nums ======>5826484

 

2.      运行子任务


倒叙

 

3.      Spark集群信息


Spark初始化采用硬资源分配,计算过程中动态进行资源分配。

协同过滤为大数据依赖型,需大内存,cpu要求一般

 

4.      参数配置

sparkConf.set("spark.executor.memory","7G");

        sparkConf.set("spark.executor.cores","1");

        sparkConf.set("spark.executor.heartbeatInterval","20s");

        sparkConf.set("spark.kryoserializer.buffer.max","256m");

        sparkConf.set("spark.speculation","true");

        sparkConf.set("spark.worker.timeout","500");

        sparkConf.set("spark.core.connection.ack.wait.timeout","600");

        sparkConf.set("spark.cores.max", "4");

 

5.      输出文件命名规则


recDataClean

入参:

原始文件


visitorId  viewItemCode ref

 

出参:

recDataClean文件


vistor ----[itemcode,ref;itemcode,ref…..]

recNorms

增量计算itemCode norms

1. SparkDAG图


入参:

当前计算周期内清洗数据  recDateClean 

上个计算周期itemCode模    recNorms 


itemCode 模的平方和  是否增量

出参:

保存新文件,格式不变

 

recMatrix

1. 增量计算共生矩阵

 

核心spark代码

JavaRDD<String> map = filter.union(reduceByKey)

                .reduceByKey(new Function2<Tuple2<Float, Integer>, Tuple2<Float, Integer>, Tuple2<Float, Integer>>() {

                    privatestaticfinallongserialVersionUID = 1L;

 

                    @Override

                    public Tuple2<Float, Integer> call(Tuple2<Float, Integer> v1, Tuple2<Float, Integer> v2)

                            throws Exception {

                        returnnew Tuple2<Float, Integer>(v1._1() + v2._1(), (RecStatus.increment.getStatus() == v1

                                ._2() || RecStatus.increment.getStatus() == v2._2()) ? RecStatus.increment.getStatus()

                                : RecStatus.total.getStatus());

                    }

                }, 30).map(new Function<Tuple2<Tuple2<Long, Long>, Tuple2<Float, Integer>>, String>() {

                    privatestaticfinallongserialVersionUID = 1L;

 

                    @Override

                    public String call(Tuple2<Tuple2<Long, Long>, Tuple2<Float, Integer>> v1)throws Exception {

                        return v1._1()._1() +":" + v1._1()._2() +"," + v1._2()._1() +"," + v1._2()._2();

                    }

                });

 

2. Spark执行DAG图


入参:

当前计算周期内清洗数据  recDateClean 

上个计算周期共生矩阵    recMatrix 


MasterItemCode  slaveItemCode ref  increment (1 原始  2 增量)

 

出参:

保存新文件,如果master-slave无论新增或者得分发现变化,统一打标为2,否则打标为1。

 

3. 核心步骤耗时

 

4. 问题

读入recDataClean数据进行mapToPair操作时候是否需要立即reduceByKey?

Union后 reduceByKey不可缺少

 

 

recSimilarity

此处norms取全量进行集群广播,因为存在<master, slave>仅一个模发生变化的场景。

 

1. 核心spark代码

new PairFunction<Tuple2<Long, List<Tuple2<Long, Float>>>,Long, List<Tuple2<Long, Double>>>() {

                            privatestaticfinallongserialVersionUID = 1L;

 

                            @Override

                            public Tuple2<Long, List<Tuple2<Long, Double>>> call(

                                    Tuple2<Long, List<Tuple2<Long, Float>>> t)throws Exception {

                                List<Tuple2<Long, Double>> list =new ArrayList<Tuple2<Long, Double>>();

                                for (Tuple2<Long, Float> tuple2 : t._2()) {

                                    list.add(new Tuple2<Long, Double>(tuple2._1(), 1 / (1 + Math.sqrt(broadcast

                                            .getValue().get(t._1())

                                            + broadcast.getValue().get(tuple2._1())

                                            - 2

                                            * tuple2._2()))));

                                }

                                returnnew Tuple2<Long, List<Tuple2<Long, Double>>>(t._1(), list);

 

 

入参:

当前周期内norms全量数据  recNorms

当前周期内recMatrix增量数据  recMatrix

 

出参:

增量相识度矩阵,保存新文件


 

2. 核心步骤耗时

 

union

1. 核心spark代码

filter2

                .union(filter1)

                .reduceByKey(

                        new Function2<Tuple2<Map<Long, Float>, Integer>,Tuple2<Map<Long, Float>, Integer>,Tuple2<Map<Long, Float>, Integer>>() {

                            privatestaticfinallongserialVersionUID = 1L;

 

                            @Override

                            publicTuple2<Map<Long, Float>, Integer> call(Tuple2<Map<Long, Float>, Integer> v1,

                                    Tuple2<Map<Long, Float>, Integer> v2) throws Exception {

                                if (RecStatus.total.getStatus() == v1._2()) {

                                    v1._1().putAll(v2._1());

                                    returnnewTuple2<Map<Long, Float>, Integer>(v1._1(), RecStatus.total.getStatus());

                                } else {

                                    v2._1().putAll(v1._1());

                                    returnnewTuple2<Map<Long, Float>, Integer>(v2._1(), RecStatus.total.getStatus());

                                }

                            }

                        }, 30).map(new Function<Tuple2<Long,Tuple2<Map<Long, Float>, Integer>>, String>() {

                    private static final long serialVersionUID = 1L;

牵扯到list集合的合并,使用MAP对象

 

2. sparkDAG图

 

入参

计算周期内增量  recSimilarity

全量相似度矩阵  recUnion

 

出参

聚合后保存相似度矩阵

 

3. 核心步骤耗时

 

技术交流学习或者有任何问题欢迎加群

编程技术交流群 : 154514123 爱上编程      Java技术交流群 : 6128790  Java

广告推荐

讨论区