本文隶属于分类

互联网

推荐文章

广告推荐

技术交流学习或者有任何问题欢迎加群 : 154514123 爱上编程

实战录》导语

前方高能!请注意本期攻城狮幽默细胞爆表,坐地铁的拉好把手,喝水的就建议暂时先别喝了:)本期分享人为云端卫士大数据工程师韩宝君,将带来Kafka-0.10 Consumer源码解析。本文3346字,大约需要花费8-10分钟时间阅读。

Kafka在0.9版本之后,对Consumer进行了重新设计,本人也在网上看了一些Consumer源码解析博客,发现讲的都不是很详细,看过之后自己尝试去看源码的时候还是很费劲,本编将对kafka Consumer模块进行详细解析,一行一行代码的和大家讲解,内容很多,可能会分好几篇和大家分享。

看源码其实也是一件很耗时的事情,要想从代码里看懂那些大牛们的意图,肯定要费一番功夫,大家撑住劲!毕竟坚持、不懈才是硬道理嘛!

来,冲上一杯咖啡,撒一点香菜,犯困的在和点辣椒油。我们开始:

1、从Apache网站下载源码包或github上直接Checkout。2、搭建源码阅读环境。

以上两步估计吃过奶的人都会(AD钙奶,吃了身体棒),真要是没吃过,就先别看源码了,先回家吃奶吧!

启动

源码包中examples目录下有一个消费者的demo,0.10版本和之前的稍微有些差别,但改动不大,使用起来更方便,如下:

我们直接进入到第39行代码:new一个KafkaConsumer消费者实例,进入下面的构造方法中:

点击this:

在该构造方法中,首先会new一个ConsumerConfig的实例,并把key和value反序列化类的信息添加到properties中。

新建ConsumerConfig实例

在新建ConsumerConfig实例时:

1、首先会执行static静态代码块,在静态代码块中会新建一个ConfigDef实例,通过调用ConfigDef的define方法把一些Consumer端的一些配置信息包括权限配置信息等放入ConfigDef实例的configKeys中存储2、再次会调用其父类AbstractConfig的构造方法。

1.1.1 static静态代码块

这个静态代码块的代码很长,就贴一点意思意思:意思意思是啥意思?【西北风跑得快,我想和你谈恋爱】的意思。

点击define方法,一路狂点,最终会到这个方法:

说明:

1、如果configKeys中已经包含了配置名,报错(该配置被定义两次)。2、group:配置属于哪个组,在该静态代码块中,49个配置的group都没有定义,最终ConfigDef实例中[update]groups集合为empty。3、defaultValue:默认值,49个配置基本都定义了默认值,如果Type是String或List类型,一般默认值为””,如果Typ[update]e是Boolean类型,一般都会有初始值true或false,如果Type是int或Long类型,都会有默认值。4、new ConfigKey实例:ConfigKey的构造方法其实比较简单,唯一有点迷糊的有可能就是Validator,它是一个接口,有2个实现类Range、ValidString,大家发现没,在define方法中会调用atLeas[add]t和in两个方法,分别生成Range和ValidString对象的,大家可以自己点击进去看下,比较简单,就不贴代码了。

1.1.2执行父类构造方法

说明:Map<?, ?> originalsfor其实就是我们刚开始定义的properties

1、循环里是检查properties里的key类型,如果不是String,报错,key必须[add]为String类型。

2、最重要的是第55行代码,调用ConfigDef的parse方法将Map<String, ConfigKey>configKeys 解析成Map<String, Object> values。

说明:

2.1、第407行调用undefinedDependentConfigs;方法,该方法是校验每个配置(configKeys里的49个配置)所依赖的其他配置是否定义,在define方法中,那49个配置都没有声明依赖其他的配置,所以此处该方法的返回值[update]是empty。2.2、下面的逻辑就是遍历configKeys,把properties里用户的配置覆盖原始的配置。2.2.1、如果我们自定义的properties中包含key.name,会将key.name在properties中对应的值解析成相应的类型,最后放入到map中,ma[add]p中的value就是根据我们在define方法中定义的配置类型调用parseType解析而来的值,有的是List,有的是Boolean等。2.2.2、Validator是一个接口,它有两个实现类,Range和ValidString,大家可以自己看一下这两个类的ensureValid方法,实现都比较简单。3、used:实例化一个线程安全的HashSet,该集合中存储哪些key被使用过[add]。4、logAll:打印配置。

添加反序列化类信息

调用ConsumerConfig的静态方法addDeserializerToConfig把key和value反序列化类信息添加到properties中。

至此,ConsumerConfig配置这块的代码逻辑已经结束,是不是很简单,下面就是决定你是否能更深入的关键部分了。

执行KafkaConsumer构造方法

接下来会进入KafkaConsumer重载的构造方法:

从596-607行,很简单,直接从config中取值,看不懂的回家吃奶补补。

1.3.1 Metrics

说明:

1、实例化一个LinkedHashMap,并把clientId存入map中2、new一个MetricConfig实例,MetricConfig构造方法中会初始化一些成员变量

this.quota = null;this.samples = 2; //实例个数this.eventWindow = Long.MAX_VALUE; //事件窗口//时间窗口this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);this.tags = new LinkedHashMap<>;

接下来调用:

samples方法:修改this.samples的值,返回MetricConfig当前对象timeWindow:修改时间窗口(this.timeWindowMs)的值,返回MetricConfig当前对象tags:就是把那个有clientId的map复制给this.tags

3、因为我们并没有定义metric.reporters,该配置的默认值也是””,所以此时reporters是empty,默认加一个JmxReporter(这个类其实也可以不了解,在kafka调优的时候会把JMX监控关掉),其实我们也可以自定义一个reporter implements MetricsReporter{},这和Storm的差不多,我曾经写过把Storm的metrics统计信息发送到Ganglia用图形化显示出来,kafka也是一样,我们也可以在自定义的reporter中把kafka有关的metrics 信息存到数据库或写到文件或发送到Ganglia

4、实例化Metrics,这里面其实还是有一些逻辑的,代码就不贴了,都是图片也不好,看着会很乱。

4.1、找到Metrics类的第129行,对reporter进行初始化,我们以JmxReporter为例。其实这时候它什么都没干,浪一圈就回来了,因为此时参数metrics为empty,但addAttribute、reregister这两个方法还是讲一下。

4.1.1、addAttribute:

  • 调用KafkaMetric的metricName方法,返回KafkaMetric实例的成员变量MetricName。

  • 调用getMBeanN[add]ame方法构造一个组件名称,这个方法很普通,大家都没问题。

  • 判断mbeans中是否包含刚才的组件名称,如果不包含,创建一个 KafkaMbean实例put到mbeans中,把KafkaMetric实例存到KafkaMbean实例实例的map中。

4.1.2、reregister:先取消注册,在注册,用到的是jdk自带的api,感兴趣的可以自行百度

4.2、boolean enableExpiration:metrics实例是否可以垃圾收集超时的sensors,如果可以就启动一个单线程的定时器任务去[update]移除超时的sensors,包括他的子sensors和KafkaMetric,如果不可以,定时器赋值为null。

4.3、

  • 首先构造MetricName实例(点击metricName方法,一路看下去,代码很简单,关键一点是MetricName实例中的tags中存的就是Metrics实例config中tags中的值)

  • 构造一个匿名对象,该对象实现了Measurable接口并重写了measure方法

  • 接下来就是最重要的addMetric方法,一路点下去你会走到代码的第344行,KafkaMetric的构造方法比较简单,都是直接赋值。registerMetric方法就是把KafkaMetric放到metrics中存储,遍历reporters,以JmxReporter为例,调用JmxReporter的met[add]ricChange方法,在metricChange方法中,直接调用了addAttribute和reregister两个方法,请参考上面4.1.1、4.1.2

1.3.2 Metadata

说明:

1、retry.backoff.ms:在发送一个失败的请求给一个topic后,试图重试发送请求之前等待的时间量。2、Metadata的构造方法中基本上都是直接赋值,除了cluster之外。3、点击Cluster.empty;方法:最终会走到Cluster类的第52行(Cluster类的构造器),这里的逻辑其实是能看懂的,就是对map的操作,还是简单讲一下吧。

3.1、根据参数节点结合nodes构造Node的id到Node的映射(nodesById)3.2、根据参数分区信息partitions构造TopicPartition到PartitionInfo的映射(partitionsByTopicPartition)3.3、接下来创建partsForTopic和partsForNode两个map实例,遍历参数nodes和partitions,将对应的值(你看代码就知道)存入两个map中3.4、剩下的逻辑差不多,就是对map的操作,逻辑不难。提一下这个方法,有可能有的同学还不清楚,Collections.unmodifiableList(copy);意思是说:把copy这个list变成不可修改的集合。

4、metadata.update方法

4.1、Cluster.bootstrap(addresses):

遍历addresses,构造Node实例放入list中接下来就是1.3.2中第3节的逻辑,此时nodes是有值的

4.2、update:更新cluster的元数据信息

4.2.1、该方法中对成员变量进行赋值操作

this.needUpdate = false;this.lastRefreshMs = now;this.lastSuccessfulRefreshMs = now;this.version += 1;

4.2.2、遍历listeners,此时该集合为empty,不会做任何操作,后续在ConsumerCoordinator类中,会增加metadata的listener,如下:

4.2.3、needMetadataForAllTopics:该metadata实例是否只保存该metadata中topics集合中所有的topic的元数据信息。

  • 如果不是,直接赋值,保留cluster中所有topic的元数据信息。

  • 如果是,调用getClusterForCurrentTopics,该方法会移除不在metadata的topics中的cluster中存储的未经授权的topic,遍历topics获得所有topic对应的PartitionInfo,重新new一个Cluster对象复制给this.cluster。

构造Cluster的逻辑请参考1.3.2的1、2、3

1.3.3 ConsumerNetworkClient

说明:

1、创建一个ChannelBuilder实例,最后很简单的new了一个PlaintextChannelBuilder实例,调用channelBuilder的configure方法,在configure方法中最终创建了的是 DefaultPrincipalBuilder实例。2、创建一个Selector实例,在Selector的构造方法中,或新建一个java nio的选择器实例和SelectorMetrics实例(请参考1.3.5中4.1、4.2、4.3)。3、创建一个NetworkClient实例,在NetworkClient的构造方法中,会创建DefaultMetadataUpdater、InFlightRequests、ClusterConnectionStates 3个对象,都比较简单。4、创建一个ConsumerNetworkClient实例。

ConsumerNetworkClient的构造方法实现比较简单,但这个类很重要,向服务器发 送响应请求和获取服务器的响应处理逻辑基本上都在这里。

1.3.4 OffsetResetStrategy

说明:同学们看到这个是不是心情猛一得劲,这个是最简单的,估计大家都喜欢,我也喜欢,但这个类很重要,consumer订阅topic和fetch数据都会用到它。

1、调用valueOf方法把一个字符串变成枚举类型

2、SubscriptionState的构造方法也很简单,都是直接赋值或创建对象,不用多说

1.3.5 ConsumerCoordinator

说明:

为了解决之前版本的High Level Consumer存在Herd Effect和Split Brain的问题,新的Consumer使用了中心协调器(Coordinator),在所有的Broker中选举出一个Broker作为 Coordinator,由它在Zookeeper上设置Watch,从而判断是否有Partition或者Consumer的增减,然后生成Rebalance命令,并检查是否这些Rebalance 在所有相关的Consumer 中被执行成功,如果不成功则重试,若成功则认为此次Rebalance成功,这个过程跟Replication Controller非常类似。

1、调用ConsumerConfig的getConfiguredInstance方法通过反射机制创PartitionAssignor实例,PartitionAssignor为接口,该接口是定义用于为消费者分配分区的算法,此处是创建了它的子类RangeAssignor的实例,另外还有RoundRobinAssignor、MockPartitionAssignor等,后面会详细讲解。2、由于没有给interceptor.classes赋值对应的class的信息,通过调用ConsumerConfig的getConfiguredInstance方法时返回为empty,所以interceptors为null。3、创建DefaultOffsetCommitCallback实例对象,提交offset完成之后会调用该对象的onComplete方法4、执行ConsumerCoordinator父类AbstractCoordinator的构造方法。再该构造方法中会 创建Heartbeat、HeartbeatTask、GroupCoordinatorMetrics 3个实例,前2个比较简单。重点讲一下第3个GroupCoordinatorMetrics。

4.1、调用metrics的sensor方法返回一个Sensor的实例,点击sensor方法,一路点下去最终会走到这里,该方法是Metrics累的方法,蓝色这行是从sensors这个map中根据名称找出对应的Sensor,如果为null,new一个实例并放入sensors中,如果parents不为null,遍历parents,找出每个parent的子Sensor的集合,并把刚才所创建的Sensor的实例放入集合中。

4.2、调用metrics的metricName方法返回一个MetricName的实例,这个就比较简单了,构造方法中也是直接赋值。

4.3、调用Sensor的add方法把刚才创建的MetricName实例和SampledStat类型的实例作为参数,在这里说一下SampledStat,就是metrics的一些统计,它的子 类有Max、Min、Count、Avg等,和Storm的CountMetric、ReducedMetric等很类似,kafka要比Storm的稍微麻烦一点。最终会走到这个方法:

首先构造一个KafkaMetric的实例,KafkaMetric的构造方法很简单,就是给成员变量直接赋值其次注册metric,调用Metrics的registerMetric方法把KafkaMetric注册到map中存储,实现统计。剩下两行自己看看。

4.4、该构造方法中剩下的代码和上面的几乎一模一样,请参照4.1、4.2、4.3,包括最后的metrics.addMetric方法和4.3逻辑一样

5、执行ConsumerCoordinator的构造方法

5.1、创建MetadataSnapshot实例,不用说大家都能看懂5.2、给元数据增加listener:addMetadataListener,具体Listener中的onMetadataUpdate方法什么时候会被调用,后面会详细解析,其实这里的逻辑是很简单的,只是你现在还不清楚是什么意思5.3、创建ConsumerCoordinatorMetrics实例,请参考4.1、4.2、4.3

1.3.6 实例化key/value反序列化实例

说明:1、如果keyDeserializer==null,调用ConsumerConfig的getConfiguredInstance方法通过反射机制创建keyDeserializer对象(IntegerDeserializer),调用IntegerDeserializer的configure方法,其实什么都没有做2、如果keyDeserializer!=null,调用ConsumerConfig的ignore方法,把参数key放入used Set集合中3、如果valueDeserializer==null,调用ConsumerConfig的getConfiguredInstance方法通过反射机制创建valueDeserializer对象(StringDeserializer),调用StringDeserializer的configure方法,获取编码,默认为utf-84、如果keyDeserializer!=null,调用ConsumerConfig的ignore方法,把参数key放入used Set集合中

1.3.7 Fetcher

说明:

1、构造Fetcher实例2、在Fetcher的构造方法中会创建FetchManagerMetrics实例3、FetchManagerMetrics构造方法中的逻辑请参考1.3.5中4.1、4.2、4.3

1.3.8 其他

说明:1、logUnused方法:打印我们自己定义的properties中没有使用到的配置2、registerAppInfo方法:

  • 实例化一个ObjectName实例

  • 实例化一个AppInfo实例

  • 注册组件,用到的是jdk自带的api,感兴趣的可以自行百度

总结

本编只是大致讲解了下创建Consumer的流程,我相信至少流程上大家应该很清楚了,这样就可以了,一些细节暂时不明白没关系,后面还会更详细更全面的解析,包括Consumer的整体架构、负载均衡等。

由于时间有限,这边文章写的也很仓促,一些流程图、架构图也没有画,若有不对的地方欢迎指正,如果有时间,下一编会讲Consumer订阅topic,尽量讲的更详细一些。让大家看着舒服、愉悦、看着就想要!如果不在这里写,也会在我的博客中发布(http://blog.csdn.net/u012749737),谢谢大家!

技术交流学习或者有任何问题欢迎加群 : 154514123 爱上编程

广告推荐

讨论区