在 Java 生态中,Apache Kafka 通过 kafka-clients.jar
提供了原生客户端支持。开发者需要手动创建 KafkaConsumer
实例并订阅指定主题(Topic)来实现消息消费。典型实现如下:
1 2 3 4 5 6 7 8 9 10
| public void pollMessages() { Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig()); consumer.subscribe(Collections.singleton(topic), new RebalanceListener()); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); consumer.commitSync(); }
|
Spring Kafka 在原生客户端基础上进行了深度封装,通过声明式注解显著简化了开发流程。例如,只需使用 @KafkaListener
注解即可实现消息监听:
1 2 3 4
| @KafkaListener(id = "orderService", topics = "order.topic") public void handleOrderEvent(ConsumerRecord<String, String> record) { }
|
这种简洁的语法背后,Spring Kafka 实际上构建了一套完整的消费者(Consumer)管理机制。那么问题来了:Spring Kafka 是如何创建这些消费者的呢?
本文源码版本:spring-kafka v2.6.6
一、使用Spring Kafka消费消息
首先,我们通过一个完整的项目集成示例,具体说明其实现步骤。项目里要接入 Spring Kafka,通常需要经过以下几个步骤。
第一步:引入依赖
需在项目中声明 Spring Kafka Starter 依赖。
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.6.6</version> </dependency>
|
第二步:消费者配置
配置类上添加 @EnableKafka
注解,并初始化 ConcurrentKafkaListenerContainerFactory
Bean,这是最常见的使用方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration @EnableKafka public class Config {
@Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(configs); } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(3); return factory; } }
|
第三步:实现消息监听
在业务层方法上添加 @KafkaListener
注解,实现消息监听。
1 2 3 4 5 6 7
| @Service public class OrderMessageListener { @KafkaListener(id = "orderService", topics = "order.topic") public void handleOrderEvent(ConsumerRecord<String, String> record) { } }
|
至此,我们已经完成 Spring Kafka 的基础集成。接下来将深入分析@KafkaListener
注解背后的消费者创建过程,揭示 Spring 是如何构建 KafkaConsumer 实例的。
二、消费者的初始化过程
基于上面示例,我们以 @EnableKafka
注解为切入点,源码如下:
1 2 3 4 5 6 7 8 9 10 11
| @Import(KafkaListenerConfigurationSelector.class) public @interface EnableKafka { }
@Order public class KafkaListenerConfigurationSelector implements DeferredImportSelector { @Override public String[] selectImports(AnnotationMetadata importingClassMetadata) { return new String[] { KafkaBootstrapConfiguration.class.getName() }; } }
|
该注解的核心作用是通过KafkaBootstrapConfiguration
向 Spring 容器注册两个关键 Bean。注册的核心 Bean 如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { registry.registerBeanDefinition( KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME, new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class)); registry.registerBeanDefinition( KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, new RootBeanDefinition(KafkaListenerEndpointRegistry.class)); } }
|
注解处理器(KafkaListenerAnnotationBeanPostProcessor
)负责扫描和解析@KafkaListener
及其派生注解,并将监听方法转换为可执行的端点描述符(KafkaListenerEndpointDescriptor
)。
容器注册表(KafkaListenerEndpointRegistry
)作为所有消息监听容器的中央仓库,实现了生命周期管理(启动/停止容器)。
代码阅读小记:
1 2 3 4 5
| 切入点: @EnableKafka -> KafkaListenerConfigurationSelector -> KafkaBootstrapConfiguration [注册Bean: KafkaListenerAnnotationBeanPostProcessor] [注册Bean: KafkaListenerEndpointRegistry]
|
接下来,我们就重点剖析一下这两个 Bean。
2.1 消费者注册流程剖析
1、注解扫描阶段
首先来看第一个 Bean: KafkaListenerAnnotationBeanPostProcessor
,它通过 Spring 后置处理器机制(postProcessAfterInitialization
)实现了注解扫描:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class KafkaListenerAnnotationBeanPostProcessor<K, V> implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton { @Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> { Set<KafkaListener> listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); }); for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { processKafkaListener(listener, method, bean, beanName); } } return bean; } }
|
上述代码有两个关键点,第一是通过MetadataLookup
支持派生注解;第二是处理 @KafkaListener
监听方法。
什么是MetadataLookup
呢?
举个例子,我们定义了一个新的注解 @EventHandler
,并在该注解上标记 @KafkaListener
。
1 2 3 4 5 6
| @KafkaListener public @interface EventHandler { @AliasFor(annotation = KafkaListener.class, attribute = "topics") String value(); }
|
这种设计使得业务注解(如@EventHandler
)可以透明地继承@KafkaListener
的全部功能。
1 2 3 4 5 6 7
| @Service public class OrderMessageListener { @EventHandler("order.topic") public void handleOrderEvent(ConsumerRecord<String, String> record) { } }
|
2、端点注册阶段
我们继续处理 KafkaListener 代码跟踪,现在来到了 KafkaListenerAnnotationBeanPostProcessor
的 processListener()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class KafkaListenerAnnotationBeanPostProcessor<K, V> implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton { private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar(); protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { endpoint.setBean(bean); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); this.registrar.registerEndpoint(endpoint, factory); } }
|
被 @KafkaListener
标记的方法会被封装为 MethodKafkaListenerEndpoint
,并由注册器 KafkaListenerEndpointRegistrar
进行注册,注册器内部维护了一个端点描述符列表:
1 2 3 4 5 6 7 8 9
| public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean { private final List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>(); public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); this.endpointDescriptors.add(descriptor); } }
|
由此可见,KafkaListener
会被注册到 List 集合中。
代码阅读小记:
1 2 3 4 5
| -> KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization() -> processKafkaListener() -> processListener() -> KafkaListenerEndpointRegistrar#registerEndpoint() -> endpointDescriptors [注册到容器里List]
|
到这里,BeanPostProcessor
的 postProcessAfterInitialization
方法已经执行完了,程序完成了 KafkaListener
的注册并存储至 endpointDescriptors 中。
3、容器实例化阶段
当所有 Bean 初始化完成后,接下来会通过afterSingletonsInstantiated
触发最终注册:
1 2 3 4 5 6 7 8 9 10 11
| public class KafkaListenerAnnotationBeanPostProcessor<K, V> implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton { private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar(); @Override public void afterSingletonsInstantiated() { this.registrar.afterPropertiesSet(); } }
|
注册器 KafkaListenerEndpointRegistrar
的注册逻辑如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean { private KafkaListenerEndpointRegistry endpointRegistry; @Override public void afterPropertiesSet() { registerAllEndpoints(); }
protected void registerAllEndpoints() { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } } }
|
可见,注册器最终委托给了注册表处理,注册表中由一个 ConcurrentHashMap
进行保存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>(); public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { registerListenerContainer(endpoint, factory, false); } public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { String id = endpoint.getId(); MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); } }
|
在示例中,我们配置的是 ConcurrentKafkaListenerContainerFactory
来创建 KafkaListener
容器的,因此这里往注册表(KafkaListenerEndpointRegistry)里添加的是 ConcurrentMessageListenerContainer
对象实例。
代码阅读小记:
1 2 3 4 5
| KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated() -> KafkaListenerEndpointRegistrar#afterPropertiesSet() -> registerAllEndpoints() -> KafkaListenerEndpointRegistry#registerListenerContainer() -> [listenerContainers] [注册到容器里Map]
|
2.2 消费者启动机制
1、并发监听容器
再来看第二个 Bean: KafkaListenerEndpointRegistry
。它实现了 Spring 生命周期 SmartLifecycle 接口,在程序启动时,会调用它的 start
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { @Override public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } } private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { listenerContainer.start(); } } }
|
注册表(KafkaListenerEndpointRegistry
)维护的容器(MessageListenerContainer
)实例分为两类:
ConcurrentMessageListenerContainer
:多线程容器
KafkaMessageListenerContainer
:单线程容器
ConcurrentMessageListenerContainer
内部通过创建多个单线程容器实现并发:
1 2 3 4 5 6 7 8 9 10 11 12
| public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
@Override protected void doStart() { for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container = constructContainer(containerProperties, topicPartitions, i); container.start(); } } }
|
可见,ConcurrentMessageListenerContainer
通过委托给多个KafkaMessageListenerContainer
实例从而实现多线程消费。
2、底层消费者创建
最终我们在 KafkaMessageListenerContainer
的内部类 ListenerConsumer
中发现了 kafka-clients.jar 中的 Consumer 接口类。它的创建过程是由 ConsumerFactory
代为创建,ConsumerFactory
是一个接口类,它只有一个实现:DefaultKafkaConsumerFactory
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> { private volatile ListenerConsumer listenerConsumer; @Override protected void doStart() { this.listenerConsumer = new ListenerConsumer(listener, listenerType); } private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback { private final Consumer<K, V> consumer; protected final ConsumerFactory<K, V> consumerFactory; ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) { Properties consumerProperties = propertiesFromProperties(); this.consumer = this.consumerFactory.createConsumer( this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix, consumerProperties); subscribeOrAssignTopics(this.consumer); } } }
|
Consumer 消费者的创建代码如下。
1 2 3 4 5 6 7 8 9 10 11 12
| public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory implements ConsumerFactory<K, V>, BeanNameAware {
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) { return createRawConsumer(configProps); } protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) { return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(), this.valueDeserializerSupplier.get()); } }
|
代码阅读小记:
1 2 3 4 5
| KafkaListenerEndpointRegistry#start() -> AbstractMessageListenerContainer#start() -> ConcurrentMessageListenerContainer#doStart() (concurrency不能大于partitions) -> KafkaMessageListenerContainer#start() -> doStart() -> DefaultKafkaConsumerFactory#createRawConsumer()
|
三、总结
总结一下上文中各部分的代码阅读小记,得到如下代码链路:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| 切入点: @EnableKafka -> KafkaListenerConfigurationSelector -> KafkaBootstrapConfiguration [注册Bean:KafkaListenerAnnotationBeanPostProcessor] [注册Bean:KafkaListenerEndpointRegistry] -> KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization() -> processKafkaListener() -> processListener() -> KafkaListenerEndpointRegistrar#registerEndpoint() -> endpointDescriptors [注册到容器里List] (===== 此时,程序里已经有endpointDescriptor了 =====)
(===== 开始遍历endpointDescriptors =====) KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated() -> KafkaListenerEndpointRegistrar#afterPropertiesSet() -> registerAllEndpoints() -> KafkaListenerEndpointRegistry#registerListenerContainer() -> [listenerContainers] [注册到容器里Map]
(===== 开始启动监听 =====) KafkaListenerEndpointRegistry#start() -> AbstractMessageListenerContainer#start() -> ConcurrentMessageListenerContainer#doStart() (concurrency不能大于partitions) -> KafkaMessageListenerContainer#start() -> doStart() -> DefaultKafkaConsumerFactory#createRawConsumer()
|

封面

相关文章