博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于storm的在线关联规则
阅读量:6078 次
发布时间:2019-06-20

本文共 3218 字,大约阅读时间需要 10 分钟。

    基于storm的在线视频推荐算法。  算法相对简单,能够觉得是关联规则仅仅挖掘频繁二项集。以下给出与storm的结合实如今线实时算法 , 。首先给出数据流图(不同颜色的线条代表不同的数据流。在storm里面bolt也是能够声明数据流的。

    

    关联规则挖掘数据项的时候,有事务的概念。这里的事务的定义为:给定时间窗体内用户看过的视频集。

所以。我们须要这样一个bolt,依据实时日志收集每一个用户看过的视频集----user_videos aggregate bolt。 我们如何挖掘频繁二项集呢?事实上就是视频对共同出现的次数。当视频a和b被共同观看的次数(用户看了视频a又看了视频b)大于某个阈值的时候。{a , b}就是一个频繁二项集。

所以我们定时的输出a:b这种视频对。然后对其计数就可以。

这个任务是由video_pair counter bolt完毕的。这样频繁项挖掘基本完了,假设对于推荐可能须要再走一步:对于看了a的人推荐b 的可信度有多高?假设为a推荐了b。那么对于b的曝光来说提升度是多少呢(能够这样理解。b本身非常热门,你再把b推荐出来对于b本身曝光量没有多大作用,这也叫打压热门)? 所以我们须要一个计数器,里面有每一个视频被观看的次数---video_counter_bolt。这样,我们就有了youtube算法公式所须要的全部值。

     storm本身是流式的,我们这里须要用到统计用户看过的视频集,所以得有一个池子。不停的收集用户看过的视频。定时的放水(定时放水的任务就有timed_notifier_spout完毕)。所以总体的流程例如以下描写叙述:

1、rt-log spout按user分组,将数据流推给uva-bolt.

2、tn-spout 会定期向下游推送时间窗体关闭的通知

3、uva-bolt里面维护一个map , 里面是用户到其观看过的视频集的映射。它第接收到一条日志就会更新这个map 。 同一时候向计数器vc-bolt发送一条播放数据.当收到tn-spout的通知时,便会将map里面的数据构建成视频对,分组后推送给相关的vp-bolt.

4、vp-bolt 也会维护一个map , 用以视频对的计数。

当收到tn-spout的通知时向vc-bolt发送这些统计信息,并清空这个map.

3、vc-bolt内容也维护一个map , 里面是视频到其他被观看次数的映射 。

它每接收到一条日志都会分析日志的类型, 假设是计数类型的就会更新这个map .假设收到vp-bolt的数据。便会计算两两视频的相似度(youtube的公式)。

               整个topology结构代码:

      

		TopologyBuilder builder = new TopologyBuilder();	        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(conf.getString("zk.server")),
conf.getString("topic"),conf.getString("zk.path"),conf.getString("myid"));	        spoutConfig.scheme = new NginxLogScheme();                builder.setSpout("nt-spout" , new NotifierSpout(900) , 1);	        builder.setSpout("log-spout", new KafkaSpout(spoutConfig), 3);	        builder.setBolt("uv-bolt", new UserVideoAggregationBolt(), conf.getInt("blot.threads"))                    .fieldsGrouping("log-spout" , new Fields("cookie")).allGrouping("nt-spout" , "nt");	        builder.setBolt("vp-bolt", new VideoPairBolt(), 3).fieldsGrouping("uv-bolt" , "vp" , new Fields("vidPair"))                    .allGrouping("nt-spout" , "nt");	        builder.setBolt("vc-bolt", new VideoCountBolt(), 3).allGrouping("uv-bolt" , "vc")	        	.fieldsGrouping("vp-bolt" , "vc" , new Fields("vidPair"))                .allGrouping("nt-spout" , "nt").addConfiguration("mysql.host", conf.getString("mysql.host"))	        	.addConfiguration("mysql.usr",conf.getString("mysql.usr"))	            .addConfiguration("mysql.pass",conf.getString("mysql.pass"))	            .addConfiguration("mysql.port",conf.getInt("mysql.port"))	            .addConfiguration("mysql.schema",conf.getString("mysql.schema"));                builder.setBolt("rec-redis-bolt" , new RedisRecBolt() , 1).allGrouping("nt-spout" , "nt")                    .addConfiguration("mysql.host", conf.getString("mysql.host"))                    .addConfiguration("mysql.usr",conf.getString("mysql.usr"))                    .addConfiguration("mysql.pass",conf.getString("mysql.pass"))                    .addConfiguration("mysql.port",conf.getInt("mysql.port"))                    .addConfiguration("mysql.schema",conf.getString("mysql.schema"));

注意事项:

1、bolt的outputcollector对于并发可能报错。须要一个定制的线程安全的outputcollector 。

2、这样的实现方式属于试验性,不知其是否科学

3、storm会自己主动重新启动bolt , 理由是worker heartbeat timeout , 引起这个的问题可能是worker gc的问题。由于我这里有非常多的内存缓存,所以会出现频繁full gc

                              以至于超时。这样的频繁的full gc非常可能是因为定期向下游放水时短时间内生成大量对象造成的。

4、以上代码仅限结构參考,没有整理。

我们用到了kafka.

                

    

你可能感兴趣的文章
屏幕适配那点事
查看>>
nyoj-----幸运三角形
查看>>
C166 Interfacing C to Assembler
查看>>
wcf服务编程(第3版)文摘
查看>>
T4批量生成多文件
查看>>
论述Android通过HttpURLConnection与HttpClient联网代理网关设置
查看>>
数据存储之ContentProvide
查看>>
九度 1455:珍惜现在,感恩生活(多重背包)
查看>>
同步机制
查看>>
玩了一下SDN:MININET+FLOODLIGHT,感觉这确实是一个趋势啊
查看>>
C语言printf()输出格式大全
查看>>
可执行文件(ELF)格式之讲解
查看>>
JAVA中获取当前系统时间 - Matrix54 - 博客园
查看>>
C#foreach的用法
查看>>
axure变量的使用
查看>>
PHP创建XML文件讲解
查看>>
CentOS 6.3下搭建Web服务器
查看>>
linux学习历程
查看>>
UIImagePickerController拍照与摄像(转)
查看>>
Android中三种onClick事件的实现与对比
查看>>