项目初始化建立
This commit is contained in:
775
source/_posts/spring-boot-kafka-rpc.md
Normal file
775
source/_posts/spring-boot-kafka-rpc.md
Normal file
@@ -0,0 +1,775 @@
|
||||
---
|
||||
title: 使用Spring Boot搭配Kafka实现RPC调用
|
||||
date: 2021-04-05 11:30:14
|
||||
tags: [JVM, Spring Boot, Java, Kafka, RPC, 分布式通信]
|
||||
categories:
|
||||
- [JVM, Java]
|
||||
- [JVM, Spring]
|
||||
---
|
||||
Kafka是目前十分流行的分布式消息队列,但是如何利用Kafka搭配Spring for Apache Kafka实现一个基于消息队列的RPC基础功能呢?
|
||||
<!--more-->
|
||||
## 服务架构
|
||||
|
||||
Spring for Apache Kafka 中提供了以下几个概念来构建 Kafka 中的生产者和消费者。
|
||||
|
||||
* `ProducerFactory<K, V>`,用于构建一个生产者实例的工厂类。
|
||||
* `KafkaTemplate<K, V>`,执行发送消息的功能类。
|
||||
* `ReplyingKafkaTemplate<K, V, R>`,具备发送消息和回收消息的功能类。
|
||||
* `ConsumerFactory<K, V>`,用于构建一个消费者的工厂类。
|
||||
* `KafkaMessageListenerContainer<K, V>`,用于持有消费者的容器类。
|
||||
* `KafkaListenerContainerFactory<K, V>`,用于构建只有消费者的容器的工厂类。
|
||||
* `NewTopic`,程序运行时自动构建的 Topic,如果 Topic 已经存在则跳过构建。
|
||||
* `Message<?>`,用于承载对象的消息。
|
||||
|
||||
### 生产方架构
|
||||
|
||||
生产方的架构十分简单,只需要在生产方的类构造函数中注入 `KafkaTemplate<K, V>` Bean 即可。当不使用事务时,`ProducerFactory` 的默认实现 `DefaultKafkaProducerFactory` 会创建一个单例的生产者。
|
||||
|
||||
要创建一个 `ProducerFactory` 需要一个类型为 `Map<String, Object>` 的配置集,以及一个键序列化器和一个值序列化器。配置集中的各个配置项名称在 `ProducerConfig` 类中定义。`KafkaTemplate<K, V>` 实例中可以注入一个 `RecordMessageConverter` 实例,用来对复杂的对象进行承载传输。
|
||||
|
||||
```plantuml
|
||||
@startuml
|
||||
skinparam {
|
||||
componentStyle uml2
|
||||
monochrome false
|
||||
shadowing false
|
||||
}
|
||||
hide fields
|
||||
|
||||
class KafkaProperties {
|
||||
+ buildProducerProperties()
|
||||
}
|
||||
class KafkaTemplate {
|
||||
+ setMessageConverter(RecordMessageConverter)
|
||||
+ send(String, K, V)
|
||||
+ send(String, V)
|
||||
+ send(ProducerRecord)
|
||||
+ send(Message)
|
||||
}
|
||||
class KeySerializer {
|
||||
+ serialize(String, Header, T)
|
||||
+ serialize(String, T
|
||||
}
|
||||
class ValueSerializer {
|
||||
+ serialize(String, Header, T)
|
||||
+ serialize(String, T)
|
||||
}
|
||||
interface ProducerFactory {
|
||||
+ createProducer()
|
||||
}
|
||||
interface MessageConverter {
|
||||
+ commonHeaders()
|
||||
}
|
||||
interface RecordMessageConverter {
|
||||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||||
+ fromMessage(Message, String)
|
||||
}
|
||||
|
||||
ProducerFactory --* KafkaTemplate
|
||||
KeySerializer --* ProducerFactory
|
||||
ValueSerializer --* ProducerFactory
|
||||
KafkaProperties --* ProducerFactory
|
||||
MessageConverter <|-- RecordMessageConverter
|
||||
RecordMessageConverter --* KafkaTemplate
|
||||
@enduml
|
||||
```
|
||||
|
||||
### 消费方架构
|
||||
|
||||
消费方的架构要略复杂,由于消费方需要对 Kafka 传递来的消息进行监听,所以需要将监听器(Listener)置入容器中,由容器负载并进行处理。常用的监听器接口主要有 `MessageListener<K, V>` 和 `AcknowledgingMessageListener<K, V>` 等,或者使用 `@KafkaListener` 注解标记处理方法或者处理类。容器根据功能需要,常用的则有两种 `KafkaMessageListenerContainer` 和 `ConcurrentMessageListenerContainer`,分别用于单线程监听和多线程监听。
|
||||
|
||||
与生产方相同,消费方也需要使用工厂类来创建消费方实例。消费方工厂类一般都实现了接口 `ConsumerFactory<K ,V>`,常用的是 `DefaultKafkaConsumerFactory<K ,V>`。监听容器的构建需要同时提供消费方工厂类实例和容器配置集。
|
||||
|
||||
```plantuml
|
||||
@startuml
|
||||
skinparam {
|
||||
componentStyle uml2
|
||||
monochrome false
|
||||
shadowing false
|
||||
}
|
||||
hide fields
|
||||
|
||||
class ContainerProperties {
|
||||
+ ContainerProperties(String...)
|
||||
+ setMessageListener(MessageListener)
|
||||
}
|
||||
interface MessageListener {
|
||||
+ onMessage(ConsumerRecord)
|
||||
}
|
||||
interface ConsumerFactory {
|
||||
+ createConsumer()
|
||||
}
|
||||
class KafkaProperties {
|
||||
+ buildConsumerProperties()
|
||||
}
|
||||
class KeyDeserializer {
|
||||
+ deserialize(String, Header, T)
|
||||
+ deserialize(String, T
|
||||
}
|
||||
class ValueDeserializer {
|
||||
+ deserialize(String, Header, T)
|
||||
+ deserialize(String, byte[])
|
||||
}
|
||||
class KafkaMessageListenerContainer {
|
||||
# doStart()
|
||||
}
|
||||
interface KafkaListenerContainerFactory {
|
||||
+ createContainer()
|
||||
}
|
||||
abstract class AbstractKafkaListenerContainerFactory {
|
||||
+ setConsumerFactory(ConsumerFactory)
|
||||
+ setMessageConverter(MessageConverter)
|
||||
}
|
||||
interface MessageConverter {
|
||||
+ commonHeaders()
|
||||
}
|
||||
interface RecordMessageConverter {
|
||||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||||
+ fromMessage(Message, String)
|
||||
}
|
||||
|
||||
MessageListener --* ContainerProperties
|
||||
KafkaProperties --* ConsumerFactory
|
||||
KeyDeserializer --* ConsumerFactory
|
||||
ValueDeserializer --* ConsumerFactory
|
||||
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
|
||||
ConsumerFactory --* AbstractKafkaListenerContainerFactory
|
||||
ContainerProperties --* AbstractKafkaListenerContainerFactory
|
||||
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
|
||||
MessageConverter <|-- RecordMessageConverter
|
||||
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
|
||||
@enduml
|
||||
```
|
||||
|
||||
### RPC 架构
|
||||
|
||||
在使用 Kafka 执行 RPC 调用时,被调用的消费方的建立与其他用途中没有太多区别,只是需要在 `AbstractKafkaListenerContainerFactory<C, K, V>` 中加入一个用于发送消息的 `KafkaTemplate<K, V>` 实例即可,并在使用 `@KafkaListener` 注解的监听器上增加 `@SendTo` 注解,并使监听器返回要发回的对象即可。但是生产方的配置就相应的要复杂许多了,除了要配置专用的 `ReplyingKafkaTemplate<K, V, R>` 以外,还需要配置针对返回消息的消费方设置。
|
||||
|
||||
总起来说,在使用 RPC 调用时,无论调用方还是被调用方,都是一个集成了生产方和消费方的全功能 Kafka 客户端。
|
||||
|
||||
#### 调用方架构
|
||||
|
||||
```plantuml
|
||||
@startuml
|
||||
skinparam {
|
||||
componentStyle uml2
|
||||
monochrome false
|
||||
shadowing false
|
||||
}
|
||||
hide fields
|
||||
|
||||
interface ProducerFactory {
|
||||
+ createProducer()
|
||||
}
|
||||
interface RecordMessageConverter {
|
||||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||||
+ fromMessage(Message, String)
|
||||
}
|
||||
class ReplyingKafkaTemplate {
|
||||
+ sendAndReceive(ProducerRecord)
|
||||
+ sendAndReceive(ProducerRecord, Duration)
|
||||
}
|
||||
class KafkaMessageListenerContainer {
|
||||
# doStart()
|
||||
}
|
||||
interface KafkaListenerContainerFactory {
|
||||
+ createContainer()
|
||||
}
|
||||
abstract class AbstractKafkaListenerContainerFactory {
|
||||
+ setConsumerFactory(ConsumerFactory)
|
||||
+ setMessageConverter(MessageConverter)
|
||||
}
|
||||
interface ConsumerFactory {
|
||||
+ createConsumer()
|
||||
}
|
||||
interface MessageListener {
|
||||
+ onMessage(ConsumerRecord)
|
||||
}
|
||||
class ContainerProperties {
|
||||
+ ContainerProperties(String...)
|
||||
+ setMessageListener(MessageListener)
|
||||
}
|
||||
|
||||
MessageListener --* ContainerProperties
|
||||
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
|
||||
ProducerFactory --* ReplyingKafkaTemplate
|
||||
ContainerProperties --* AbstractKafkaListenerContainerFactory
|
||||
KafkaListenerContainerFactory <|.. AbstractKafkaListenerContainerFactory
|
||||
KafkaMessageListenerContainer - AbstractKafkaListenerContainerFactory : 生成 <
|
||||
ConsumerFactory --* AbstractKafkaListenerContainerFactory
|
||||
KafkaMessageListenerContainer --* ReplyingKafkaTemplate
|
||||
@enduml
|
||||
```
|
||||
|
||||
#### 被调用方架构
|
||||
|
||||
```plantuml
|
||||
@startuml
|
||||
skinparam {
|
||||
componentStyle uml2
|
||||
monochrome false
|
||||
shadowing false
|
||||
}
|
||||
hide fields
|
||||
|
||||
class ContainerProperties {
|
||||
+ ContainerProperties(String...)
|
||||
+ setMessageListener(MessageListener)
|
||||
}
|
||||
interface MessageListener {
|
||||
+ onMessage(ConsumerRecord)
|
||||
}
|
||||
interface ConsumerFactory {
|
||||
+ createConsumer()
|
||||
}
|
||||
class KafkaMessageListenerContainer {
|
||||
# doStart()
|
||||
}
|
||||
interface KafkaListenerContainerFactory {
|
||||
+ createContainer()
|
||||
}
|
||||
abstract class AbstractKafkaListenerContainerFactory {
|
||||
+ setConsumerFactory(ConsumerFactory)
|
||||
+ setMessageConverter(MessageConverter)
|
||||
+ setReplyTemplate(KafkaTemplate)
|
||||
}
|
||||
interface RecordMessageConverter {
|
||||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||||
+ fromMessage(Message, String)
|
||||
}
|
||||
class KafkaTemplate {
|
||||
+ setMessageConverter(RecordMessageConverter)
|
||||
+ send(String, K, V)
|
||||
+ send(String, V)
|
||||
+ send(ProducerRecord)
|
||||
+ send(Message)
|
||||
}
|
||||
|
||||
MessageListener --* ContainerProperties
|
||||
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
|
||||
ConsumerFactory --* AbstractKafkaListenerContainerFactory
|
||||
ContainerProperties --* AbstractKafkaListenerContainerFactory
|
||||
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
|
||||
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
|
||||
RecordMessageConverter --* KafkaTemplate
|
||||
KafkaTemplate --* AbstractKafkaListenerContainerFactory
|
||||
@enduml
|
||||
```
|
||||
|
||||
## 配置项内容的获取
|
||||
|
||||
Spring Boot 所接管的配置项内容可以通过依赖注入获取,而不必像说明手册中描述的一样需要在程序中手动置入。要获取 Kafka 的配置,只需要声明并注入一个 `KafkaProperties` 类型的属性即可。
|
||||
|
||||
## 单向发送字符串
|
||||
|
||||
单向发送功能需要在发送方创建 `KafkaTemplate<K, V>` 的实例,需要注意的是, Spring Boot 已经内置提供了 `KafkaTemplate<String, String>` 的 Bean,对于字符串信息可以直接发送。
|
||||
|
||||
以下是发送方的示例。
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class MessageProducer {
|
||||
private final KafkaTemplate<String, String> template;
|
||||
|
||||
@Autowired
|
||||
public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
|
||||
this.template = kafkaTemplate;
|
||||
}
|
||||
|
||||
public void sendMessage(String message) {
|
||||
this.template.send("some-topic", message);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
以下是消费方的示例。
|
||||
|
||||
```java
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MessageConsumer {
|
||||
@KafkaListener(id = "client_grp", topic = "some-topic")
|
||||
public void consumeMessage(String message) {
|
||||
log.info(message);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 双向发送字符串
|
||||
|
||||
与单向发送字符串功能相似,针对字符串的发送和接收,Spring for Kafka 已经提供了许多已经配置好的现成 Bean 可供使用,但是需要注意的是,RPC 调用方的 `ReplyingKafkaTemplate<K, V, R>` 是需要手工配置的。
|
||||
|
||||
以下是调用方的示例。
|
||||
|
||||
```java
|
||||
// 应用主类
|
||||
@SpringBootApplication
|
||||
public class RpcRequestApplication {
|
||||
private final KafkaProperties kProperties;
|
||||
|
||||
@Autowired
|
||||
public RpcRequestApplication(
|
||||
KafkaProperties properties
|
||||
) {
|
||||
this.kProperties = properties;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(RpcRequestApplication.class, args).close();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ReplyingKafkaTemplate<String, String, String> replyTemplate(
|
||||
ProducerFactory<String, String> factory,
|
||||
ConcurrentMessageListenerContainer<String, String> repliesContainer
|
||||
) {
|
||||
return new ReplyingKafkaTemplate<>(factory, repliesContainer);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
Map<String, Object> props = this.kProperties.buildProducerProperties();
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(props);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(
|
||||
KafkaTemplate<String, String> kafkaTemplate
|
||||
) {
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
factory.setReplyTemplate(kafkaTemplate);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, String> consumerFactory() {
|
||||
return new DefaultKafkaConsumerFactory<>(
|
||||
this.kProperties.buildConsumerProperties(),
|
||||
StringDeserializer::new,
|
||||
StringDeserializer::new
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentMessageListenerContainer<String, String> messageContainer(
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> factory
|
||||
) {
|
||||
ConcurrentMessageListenerContainer<String, String> container =
|
||||
factory.createContainer("RPC-Response");
|
||||
container.getContainerProperties().setGroupId("replies");
|
||||
container.setAutoStartup(false);
|
||||
return container;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public NewTopic rpcRequestTopic() {
|
||||
return TopicBuilder.name("RPC-Request")
|
||||
.partitions(1)
|
||||
.replicas(3)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public NewTopic rpcReplyTopic() {
|
||||
return TopicBuilder.name("RPC-Response")
|
||||
.partitions(1)
|
||||
.replicas(3)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
// 功能类
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RpcRequester implements CommandLineRunner {
|
||||
private final ReplyingKafkaTemplate<String, String> template;
|
||||
|
||||
@Autowired
|
||||
public RpcRequester(
|
||||
ReplyingKafkaTemplate<String, String> template
|
||||
) {
|
||||
this.template = template;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
try {
|
||||
RequestReplyFuture<String, String, String> reply = this.template.sendAndReceive(
|
||||
new ProducerRecord<>("RPC-Request", "greeting")
|
||||
);
|
||||
String result = reply.get().value();
|
||||
log.info("Hello from " + result);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
以下是响应方的示例。
|
||||
|
||||
```java
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RpcReplier {
|
||||
@KafkaListener(id="rpc-server", topic="RPC-Request")
|
||||
@SendTo
|
||||
public String replyGreeting(String message) {
|
||||
log.info("Requester send: " + message);
|
||||
return "Replier";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
在响应方中,与单向发送唯一的不同是添加了 `@SendTo` 注解并在监听器上增加了返回值类型。
|
||||
|
||||
## 单向发送自定义对象
|
||||
|
||||
单向发送自定义对象需要自行配置完整的业务链条,其中生产方需要配置 `ProducerFactory<K, V>`、`KafkaTemplate<K, V>`,而消费方则需要配置 `ConsumerFactory<K, V>`,以及监听器容器工厂和容器。
|
||||
|
||||
以下给出生产方的示例代码。
|
||||
|
||||
```yaml
|
||||
# 生产方配置文件
|
||||
spring:
|
||||
kafka:
|
||||
bootstrap-servers: 192.168.1.1:9092
|
||||
producer:
|
||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
||||
```
|
||||
|
||||
```java
|
||||
// 自定义载荷类
|
||||
@Data
|
||||
@Builder
|
||||
public class Cargo {
|
||||
@NonNull private final String action;
|
||||
private Object payload;
|
||||
|
||||
public Cargo(
|
||||
@JsonProperty("action") String action,
|
||||
@JsonProperty("payload") @Nullable Object payload
|
||||
) {
|
||||
this.action = action;
|
||||
this.payload = payload;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
// 主类文件
|
||||
@SpringBootApplication
|
||||
public class SenderApplication {
|
||||
|
||||
private final KafkaProperties kProperties;
|
||||
|
||||
@Autowired
|
||||
public SenderApplication(KafkaProperties props) {
|
||||
this.kProperties = props;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SenderApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, Cargo> producerFactory() {
|
||||
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
|
||||
// 以下两条语句与上面配置文件中的 producer 的配置功能相同
|
||||
// 择一使用即可,一般不建议在此进行硬编码
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(producerProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, Cargo> sendTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
// 功能类
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Requester implements CommandLineRunner {
|
||||
|
||||
private final KafkaTemplate<String, Cargo> sendTemplate;
|
||||
|
||||
@Autowired
|
||||
public Requester(KafkaTemplate<String, Cargo> template) {
|
||||
this.sendTemplate = template;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
Cargo load = Cargo.builder().action("test").build();
|
||||
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
|
||||
this.sendTemplate.send(request);
|
||||
log.info("Custom package sent.");
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
以下是消费方示例代码。
|
||||
|
||||
```yaml
|
||||
# 消费方配置文件
|
||||
spring:
|
||||
kafka:
|
||||
bootstrap-servers: 192.168.1.1:9092
|
||||
consumer:
|
||||
group-id: response-group
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
|
||||
properties:
|
||||
spring.json:
|
||||
trusted.packages: '*'
|
||||
```
|
||||
|
||||
```java
|
||||
// 主类文件
|
||||
@SpringBootApplication
|
||||
public class ReceiverApplication {
|
||||
|
||||
private final KafkaProperties kProperties;
|
||||
|
||||
@Autowired
|
||||
public ReceiverApplication(KafkaProperties props) {
|
||||
this.kProperties = props;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SenderApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, Cargo> consumerFactory() {
|
||||
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
|
||||
// 以下三条语句与上面配置文件中的 consumer 的配置功能相同
|
||||
// 择一使用即可,一般不建议在此进行硬编码
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
|
||||
return new DefaultKafkaProducerFactory<>(consumerProps);
|
||||
}
|
||||
|
||||
// 这里有一个小坑,如果生成容器工厂的 Bean 方法名不是 kafkaListenerContainerFactory,
|
||||
// 就必须将 Bean 的名称设置为 kafkaListenerContainerFactory,
|
||||
// 否则将提示无法找到类型为 ConsumerFactory<Object, Object> 的 Bean,
|
||||
// 但实际上是没有找到监听器容器工厂 Bean。
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
|
||||
ConcurrentMessageListenerContainer<String, Cargo> container =
|
||||
kafkaListenerContainerFactory().createContainer("RPC-Request");
|
||||
container.getContainerProperties().setGroupId("replies");
|
||||
container.setAutoStartup(false);
|
||||
return container;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
// 功能类
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Receiver {
|
||||
|
||||
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
|
||||
public void receive(Cargo cargo) {
|
||||
log.info("Received: {}", cargo.getAction());
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 双向发送自定义对象
|
||||
|
||||
双向发送自定义对象实际上与双向发送字符串一样,需要将生产方和消费方结合起来,形成 RPC 的调用方和被调用方。在以下示例中,调用方和被调用方都采用如下的配置文件。
|
||||
|
||||
```yaml
|
||||
spring:
|
||||
kafka:
|
||||
bootstrap-servers: 192.168.1.1:9092
|
||||
producer:
|
||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
||||
consumer:
|
||||
# 针对调用方和被调用方,group-id 可以不相同,也尽量不要相同
|
||||
group-id: response-group
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
|
||||
properties:
|
||||
spring.json:
|
||||
trusted.packages: '*'
|
||||
```
|
||||
|
||||
以下是调用方的示例。
|
||||
|
||||
```java
|
||||
// 主类文件
|
||||
@SpringBootApplication
|
||||
public class SenderApplication {
|
||||
|
||||
private final KafkaProperties kProperties;
|
||||
|
||||
@Autowired
|
||||
public SenderApplication(KafkaProperties props) {
|
||||
this.kProperties = props;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SenderApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, Cargo> producerFactory() {
|
||||
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
|
||||
return new DefaultKafkaProducerFactory<>(producerProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, Cargo> consumerFactory() {
|
||||
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
|
||||
return new DefaultKafkaProducerFactory<>(consumerProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
|
||||
ConcurrentMessageListenerContainer<String, Cargo> container =
|
||||
kafkaListenerContainerFactory().createContainer("RPC-Response");
|
||||
container.getContainerProperties().setGroupId("requests");
|
||||
container.setAutoStartup(false);
|
||||
return container;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ReplyingKafkaTemplate<String, Cargo, Cargo> replyingTemplate(
|
||||
ProducerFactory<String, Cargo> factory,
|
||||
ConcurrentMessageListenerContainer<String, Cargo> container
|
||||
) {
|
||||
return new ReplyingKafkaTemplate<>(factory, container);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
// 功能类
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Requester implements CommandLineRunner {
|
||||
|
||||
private final ReplyingKafkaTemplate<String, Cargo, Cargo> replyTemplate;
|
||||
|
||||
@Autowired
|
||||
public Requester(ReplyingKafkaTemplate<String, Cargo, Cargo> template) {
|
||||
this.replyTemplate = template;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
try {
|
||||
Cargo load = Cargo.builder().action("request").build();
|
||||
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
|
||||
RequestReplyFuture<String, Cargo, Cargo> requestFuture = this.replyTemplate.sendAndReceive(request);
|
||||
Cargo response = requestFuture.get().value();
|
||||
log.info("Received: {}", response.getAction());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
以下是被调用方的示例。
|
||||
|
||||
```java
|
||||
// 主类文件
|
||||
@SpringBootApplication
|
||||
public class ReceiverApplication {
|
||||
|
||||
private final KafkaProperties kProperties;
|
||||
|
||||
@Autowired
|
||||
public ReceiverApplication(KafkaProperties props) {
|
||||
this.kProperties = props;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SenderApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, Cargo> producerFactory() {
|
||||
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
|
||||
return new DefaultKafkaProducerFactory<>(producerProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, Cargo> consumerFactory() {
|
||||
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
|
||||
return new DefaultKafkaProducerFactory<>(consumerProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, Cargo> sendTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
factory.setReplyTemplate(sendTemplate());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
|
||||
ConcurrentMessageListenerContainer<String, Cargo> container =
|
||||
kafkaListenerContainerFactory().createContainer("RPC-Request");
|
||||
container.getContainerProperties().setGroupId("replies");
|
||||
container.setAutoStartup(false);
|
||||
return container;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
// 功能类
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Receiver {
|
||||
|
||||
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
|
||||
@SendTo
|
||||
public Cargo receive(Cargo cargo) {
|
||||
log.info("Received: {}", cargo.getAction());
|
||||
return Cargo.builder().action("response").build();
|
||||
}
|
||||
}
|
||||
```
|
||||
Reference in New Issue
Block a user