777 lines
25 KiB
Markdown
777 lines
25 KiB
Markdown
---
|
||
title: 使用Spring Boot搭配Kafka实现RPC调用
|
||
date: 2021-04-05 11:30:14
|
||
tags: [JVM, Spring Boot, Java, Kafka, RPC, 分布式通信]
|
||
categories:
|
||
- [JVM, Java]
|
||
- [JVM, Spring]
|
||
keywords: JVM,Spring,Spring Boot,Kafka,RPC,分布式,分布式通信
|
||
---
|
||
Kafka是目前十分流行的分布式消息队列,但是如何利用Kafka搭配Spring for Apache Kafka实现一个基于消息队列的RPC基础功能呢?
|
||
<!--more-->
|
||
## 服务架构
|
||
|
||
Spring for Apache Kafka 中提供了以下几个概念来构建 Kafka 中的生产者和消费者。
|
||
|
||
* `ProducerFactory<K, V>`,用于构建一个生产者实例的工厂类。
|
||
* `KafkaTemplate<K, V>`,执行发送消息的功能类。
|
||
* `ReplyingKafkaTemplate<K, V, R>`,具备发送消息和回收消息的功能类。
|
||
* `ConsumerFactory<K, V>`,用于构建一个消费者的工厂类。
|
||
* `KafkaMessageListenerContainer<K, V>`,用于持有消费者的容器类。
|
||
* `KafkaListenerContainerFactory<K, V>`,用于构建只有消费者的容器的工厂类。
|
||
* `NewTopic`,程序运行时自动构建的 Topic,如果 Topic 已经存在则跳过构建。
|
||
* `Message<?>`,用于承载对象的消息。
|
||
|
||
### 生产方架构
|
||
|
||
生产方的架构十分简单,只需要在生产方的类构造函数中注入 `KafkaTemplate<K, V>` Bean 即可。当不使用事务时,`ProducerFactory` 的默认实现 `DefaultKafkaProducerFactory` 会创建一个单例的生产者。
|
||
|
||
要创建一个 `ProducerFactory` 需要一个类型为 `Map<String, Object>` 的配置集,以及一个键序列化器和一个值序列化器。配置集中的各个配置项名称在 `ProducerConfig` 类中定义。`KafkaTemplate<K, V>` 实例中可以注入一个 `RecordMessageConverter` 实例,用来对复杂的对象进行承载传输。
|
||
|
||
```plantuml
|
||
@startuml
|
||
skinparam {
|
||
componentStyle uml2
|
||
monochrome false
|
||
shadowing false
|
||
}
|
||
hide fields
|
||
|
||
class KafkaProperties {
|
||
+ buildProducerProperties()
|
||
}
|
||
class KafkaTemplate {
|
||
+ setMessageConverter(RecordMessageConverter)
|
||
+ send(String, K, V)
|
||
+ send(String, V)
|
||
+ send(ProducerRecord)
|
||
+ send(Message)
|
||
}
|
||
class KeySerializer {
|
||
+ serialize(String, Header, T)
|
||
+ serialize(String, T
|
||
}
|
||
class ValueSerializer {
|
||
+ serialize(String, Header, T)
|
||
+ serialize(String, T)
|
||
}
|
||
interface ProducerFactory {
|
||
+ createProducer()
|
||
}
|
||
interface MessageConverter {
|
||
+ commonHeaders()
|
||
}
|
||
interface RecordMessageConverter {
|
||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||
+ fromMessage(Message, String)
|
||
}
|
||
|
||
ProducerFactory --* KafkaTemplate
|
||
KeySerializer --* ProducerFactory
|
||
ValueSerializer --* ProducerFactory
|
||
KafkaProperties --* ProducerFactory
|
||
MessageConverter <|-- RecordMessageConverter
|
||
RecordMessageConverter --* KafkaTemplate
|
||
@enduml
|
||
```
|
||
|
||
### 消费方架构
|
||
|
||
消费方的架构要略复杂,由于消费方需要对 Kafka 传递来的消息进行监听,所以需要将监听器(Listener)置入容器中,由容器负载并进行处理。常用的监听器接口主要有 `MessageListener<K, V>` 和 `AcknowledgingMessageListener<K, V>` 等,或者使用 `@KafkaListener` 注解标记处理方法或者处理类。容器根据功能需要,常用的则有两种 `KafkaMessageListenerContainer` 和 `ConcurrentMessageListenerContainer`,分别用于单线程监听和多线程监听。
|
||
|
||
与生产方相同,消费方也需要使用工厂类来创建消费方实例。消费方工厂类一般都实现了接口 `ConsumerFactory<K ,V>`,常用的是 `DefaultKafkaConsumerFactory<K ,V>`。监听容器的构建需要同时提供消费方工厂类实例和容器配置集。
|
||
|
||
```plantuml
|
||
@startuml
|
||
skinparam {
|
||
componentStyle uml2
|
||
monochrome false
|
||
shadowing false
|
||
}
|
||
hide fields
|
||
|
||
class ContainerProperties {
|
||
+ ContainerProperties(String...)
|
||
+ setMessageListener(MessageListener)
|
||
}
|
||
interface MessageListener {
|
||
+ onMessage(ConsumerRecord)
|
||
}
|
||
interface ConsumerFactory {
|
||
+ createConsumer()
|
||
}
|
||
class KafkaProperties {
|
||
+ buildConsumerProperties()
|
||
}
|
||
class KeyDeserializer {
|
||
+ deserialize(String, Header, T)
|
||
+ deserialize(String, T
|
||
}
|
||
class ValueDeserializer {
|
||
+ deserialize(String, Header, T)
|
||
+ deserialize(String, byte[])
|
||
}
|
||
class KafkaMessageListenerContainer {
|
||
# doStart()
|
||
}
|
||
interface KafkaListenerContainerFactory {
|
||
+ createContainer()
|
||
}
|
||
abstract class AbstractKafkaListenerContainerFactory {
|
||
+ setConsumerFactory(ConsumerFactory)
|
||
+ setMessageConverter(MessageConverter)
|
||
}
|
||
interface MessageConverter {
|
||
+ commonHeaders()
|
||
}
|
||
interface RecordMessageConverter {
|
||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||
+ fromMessage(Message, String)
|
||
}
|
||
|
||
MessageListener --* ContainerProperties
|
||
KafkaProperties --* ConsumerFactory
|
||
KeyDeserializer --* ConsumerFactory
|
||
ValueDeserializer --* ConsumerFactory
|
||
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
|
||
ConsumerFactory --* AbstractKafkaListenerContainerFactory
|
||
ContainerProperties --* AbstractKafkaListenerContainerFactory
|
||
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
|
||
MessageConverter <|-- RecordMessageConverter
|
||
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
|
||
@enduml
|
||
```
|
||
|
||
### RPC 架构
|
||
|
||
在使用 Kafka 执行 RPC 调用时,被调用的消费方的建立与其他用途中没有太多区别,只是需要在 `AbstractKafkaListenerContainerFactory<C, K, V>` 中加入一个用于发送消息的 `KafkaTemplate<K, V>` 实例即可,并在使用 `@KafkaListener` 注解的监听器上增加 `@SendTo` 注解,并使监听器返回要发回的对象即可。但是生产方的配置就相应的要复杂许多了,除了要配置专用的 `ReplyingKafkaTemplate<K, V, R>` 以外,还需要配置针对返回消息的消费方设置。
|
||
|
||
总起来说,在使用 RPC 调用时,无论调用方还是被调用方,都是一个集成了生产方和消费方的全功能 Kafka 客户端。
|
||
|
||
#### 调用方架构
|
||
|
||
```plantuml
|
||
@startuml
|
||
skinparam {
|
||
componentStyle uml2
|
||
monochrome false
|
||
shadowing false
|
||
}
|
||
hide fields
|
||
|
||
interface ProducerFactory {
|
||
+ createProducer()
|
||
}
|
||
interface RecordMessageConverter {
|
||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||
+ fromMessage(Message, String)
|
||
}
|
||
class ReplyingKafkaTemplate {
|
||
+ sendAndReceive(ProducerRecord)
|
||
+ sendAndReceive(ProducerRecord, Duration)
|
||
}
|
||
class KafkaMessageListenerContainer {
|
||
# doStart()
|
||
}
|
||
interface KafkaListenerContainerFactory {
|
||
+ createContainer()
|
||
}
|
||
abstract class AbstractKafkaListenerContainerFactory {
|
||
+ setConsumerFactory(ConsumerFactory)
|
||
+ setMessageConverter(MessageConverter)
|
||
}
|
||
interface ConsumerFactory {
|
||
+ createConsumer()
|
||
}
|
||
interface MessageListener {
|
||
+ onMessage(ConsumerRecord)
|
||
}
|
||
class ContainerProperties {
|
||
+ ContainerProperties(String...)
|
||
+ setMessageListener(MessageListener)
|
||
}
|
||
|
||
MessageListener --* ContainerProperties
|
||
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
|
||
ProducerFactory --* ReplyingKafkaTemplate
|
||
ContainerProperties --* AbstractKafkaListenerContainerFactory
|
||
KafkaListenerContainerFactory <|.. AbstractKafkaListenerContainerFactory
|
||
KafkaMessageListenerContainer - AbstractKafkaListenerContainerFactory : 生成 <
|
||
ConsumerFactory --* AbstractKafkaListenerContainerFactory
|
||
KafkaMessageListenerContainer --* ReplyingKafkaTemplate
|
||
@enduml
|
||
```
|
||
|
||
#### 被调用方架构
|
||
|
||
```plantuml
|
||
@startuml
|
||
skinparam {
|
||
componentStyle uml2
|
||
monochrome false
|
||
shadowing false
|
||
}
|
||
hide fields
|
||
|
||
class ContainerProperties {
|
||
+ ContainerProperties(String...)
|
||
+ setMessageListener(MessageListener)
|
||
}
|
||
interface MessageListener {
|
||
+ onMessage(ConsumerRecord)
|
||
}
|
||
interface ConsumerFactory {
|
||
+ createConsumer()
|
||
}
|
||
class KafkaMessageListenerContainer {
|
||
# doStart()
|
||
}
|
||
interface KafkaListenerContainerFactory {
|
||
+ createContainer()
|
||
}
|
||
abstract class AbstractKafkaListenerContainerFactory {
|
||
+ setConsumerFactory(ConsumerFactory)
|
||
+ setMessageConverter(MessageConverter)
|
||
+ setReplyTemplate(KafkaTemplate)
|
||
}
|
||
interface RecordMessageConverter {
|
||
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
|
||
+ fromMessage(Message, String)
|
||
}
|
||
class KafkaTemplate {
|
||
+ setMessageConverter(RecordMessageConverter)
|
||
+ send(String, K, V)
|
||
+ send(String, V)
|
||
+ send(ProducerRecord)
|
||
+ send(Message)
|
||
}
|
||
|
||
MessageListener --* ContainerProperties
|
||
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
|
||
ConsumerFactory --* AbstractKafkaListenerContainerFactory
|
||
ContainerProperties --* AbstractKafkaListenerContainerFactory
|
||
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
|
||
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
|
||
RecordMessageConverter --* KafkaTemplate
|
||
KafkaTemplate --* AbstractKafkaListenerContainerFactory
|
||
@enduml
|
||
```
|
||
|
||
## 配置项内容的获取
|
||
|
||
Spring Boot 所接管的配置项内容可以通过依赖注入获取,而不必像说明手册中描述的一样需要在程序中手动置入。要获取 Kafka 的配置,只需要声明并注入一个 `KafkaProperties` 类型的属性即可。
|
||
|
||
## 单向发送字符串
|
||
|
||
单向发送功能需要在发送方创建 `KafkaTemplate<K, V>` 的实例,需要注意的是, Spring Boot 已经内置提供了 `KafkaTemplate<String, String>` 的 Bean,对于字符串信息可以直接发送。
|
||
|
||
以下是发送方的示例。
|
||
|
||
```java
|
||
@Component
|
||
public class MessageProducer {
|
||
private final KafkaTemplate<String, String> template;
|
||
|
||
@Autowired
|
||
public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
|
||
this.template = kafkaTemplate;
|
||
}
|
||
|
||
public void sendMessage(String message) {
|
||
this.template.send("some-topic", message);
|
||
}
|
||
}
|
||
```
|
||
|
||
以下是消费方的示例。
|
||
|
||
```java
|
||
@Component
|
||
@Slf4j
|
||
public class MessageConsumer {
|
||
@KafkaListener(id = "client_grp", topic = "some-topic")
|
||
public void consumeMessage(String message) {
|
||
log.info(message);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 双向发送字符串
|
||
|
||
与单向发送字符串功能相似,针对字符串的发送和接收,Spring for Kafka 已经提供了许多已经配置好的现成 Bean 可供使用,但是需要注意的是,RPC 调用方的 `ReplyingKafkaTemplate<K, V, R>` 是需要手工配置的。
|
||
|
||
以下是调用方的示例。
|
||
|
||
```java
|
||
// 应用主类
|
||
@SpringBootApplication
|
||
public class RpcRequestApplication {
|
||
private final KafkaProperties kProperties;
|
||
|
||
@Autowired
|
||
public RpcRequestApplication(
|
||
KafkaProperties properties
|
||
) {
|
||
this.kProperties = properties;
|
||
}
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(RpcRequestApplication.class, args).close();
|
||
}
|
||
|
||
@Bean
|
||
public ReplyingKafkaTemplate<String, String, String> replyTemplate(
|
||
ProducerFactory<String, String> factory,
|
||
ConcurrentMessageListenerContainer<String, String> repliesContainer
|
||
) {
|
||
return new ReplyingKafkaTemplate<>(factory, repliesContainer);
|
||
}
|
||
|
||
@Bean
|
||
public ProducerFactory<String, String> producerFactory() {
|
||
Map<String, Object> props = this.kProperties.buildProducerProperties();
|
||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||
return new DefaultKafkaProducerFactory<>(props);
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(
|
||
KafkaTemplate<String, String> kafkaTemplate
|
||
) {
|
||
ConcurrentKafkaListenerContainerFactory<String, String> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(consumerFactory());
|
||
factory.setReplyTemplate(kafkaTemplate);
|
||
return factory;
|
||
}
|
||
|
||
@Bean
|
||
public ConsumerFactory<String, String> consumerFactory() {
|
||
return new DefaultKafkaConsumerFactory<>(
|
||
this.kProperties.buildConsumerProperties(),
|
||
StringDeserializer::new,
|
||
StringDeserializer::new
|
||
);
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, String> messageContainer(
|
||
ConcurrentKafkaListenerContainerFactory<String, String> factory
|
||
) {
|
||
ConcurrentMessageListenerContainer<String, String> container =
|
||
factory.createContainer("RPC-Response");
|
||
container.getContainerProperties().setGroupId("replies");
|
||
container.setAutoStartup(false);
|
||
return container;
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic rpcRequestTopic() {
|
||
return TopicBuilder.name("RPC-Request")
|
||
.partitions(1)
|
||
.replicas(3)
|
||
.build();
|
||
}
|
||
|
||
@Bean
|
||
public NewTopic rpcReplyTopic() {
|
||
return TopicBuilder.name("RPC-Response")
|
||
.partitions(1)
|
||
.replicas(3)
|
||
.build();
|
||
}
|
||
}
|
||
```
|
||
|
||
```java
|
||
// 功能类
|
||
@Component
|
||
@Slf4j
|
||
public class RpcRequester implements CommandLineRunner {
|
||
private final ReplyingKafkaTemplate<String, String> template;
|
||
|
||
@Autowired
|
||
public RpcRequester(
|
||
ReplyingKafkaTemplate<String, String> template
|
||
) {
|
||
this.template = template;
|
||
}
|
||
|
||
@Override
|
||
public void run(String... args) throws Exception {
|
||
try {
|
||
RequestReplyFuture<String, String, String> reply = this.template.sendAndReceive(
|
||
new ProducerRecord<>("RPC-Request", "greeting")
|
||
);
|
||
String result = reply.get().value();
|
||
log.info("Hello from " + result);
|
||
} catch (InterruptedException | ExecutionException e) {
|
||
log.error(e.getMessage());
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
以下是响应方的示例。
|
||
|
||
```java
|
||
@Component
|
||
@Slf4j
|
||
public class RpcReplier {
|
||
@KafkaListener(id="rpc-server", topic="RPC-Request")
|
||
@SendTo
|
||
public String replyGreeting(String message) {
|
||
log.info("Requester send: " + message);
|
||
return "Replier";
|
||
}
|
||
}
|
||
```
|
||
|
||
在响应方中,与单向发送唯一的不同是添加了 `@SendTo` 注解并在监听器上增加了返回值类型。
|
||
|
||
## 单向发送自定义对象
|
||
|
||
单向发送自定义对象需要自行配置完整的业务链条,其中生产方需要配置 `ProducerFactory<K, V>`、`KafkaTemplate<K, V>`,而消费方则需要配置 `ConsumerFactory<K, V>`,以及监听器容器工厂和容器。
|
||
|
||
以下给出生产方的示例代码。
|
||
|
||
```yaml
|
||
# 生产方配置文件
|
||
spring:
|
||
kafka:
|
||
bootstrap-servers: 192.168.1.1:9092
|
||
producer:
|
||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
||
```
|
||
|
||
```java
|
||
// 自定义载荷类
|
||
@Data
|
||
@Builder
|
||
public class Cargo {
|
||
@NonNull private final String action;
|
||
private Object payload;
|
||
|
||
public Cargo(
|
||
@JsonProperty("action") String action,
|
||
@JsonProperty("payload") @Nullable Object payload
|
||
) {
|
||
this.action = action;
|
||
this.payload = payload;
|
||
}
|
||
}
|
||
```
|
||
|
||
```java
|
||
// 主类文件
|
||
@SpringBootApplication
|
||
public class SenderApplication {
|
||
|
||
private final KafkaProperties kProperties;
|
||
|
||
@Autowired
|
||
public SenderApplication(KafkaProperties props) {
|
||
this.kProperties = props;
|
||
}
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(SenderApplication.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public ProducerFactory<String, Cargo> producerFactory() {
|
||
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
|
||
// 以下两条语句与上面配置文件中的 producer 的配置功能相同
|
||
// 择一使用即可,一般不建议在此进行硬编码
|
||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||
return new DefaultKafkaProducerFactory<>(producerProps);
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<String, Cargo> sendTemplate() {
|
||
return new KafkaTemplate<>(producerFactory());
|
||
}
|
||
}
|
||
```
|
||
|
||
```java
|
||
// 功能类
|
||
@Component
|
||
@Slf4j
|
||
public class Requester implements CommandLineRunner {
|
||
|
||
private final KafkaTemplate<String, Cargo> sendTemplate;
|
||
|
||
@Autowired
|
||
public Requester(KafkaTemplate<String, Cargo> template) {
|
||
this.sendTemplate = template;
|
||
}
|
||
|
||
@Override
|
||
public void run(String... args) throws Exception {
|
||
Cargo load = Cargo.builder().action("test").build();
|
||
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
|
||
this.sendTemplate.send(request);
|
||
log.info("Custom package sent.");
|
||
}
|
||
}
|
||
```
|
||
|
||
以下是消费方示例代码。
|
||
|
||
```yaml
|
||
# 消费方配置文件
|
||
spring:
|
||
kafka:
|
||
bootstrap-servers: 192.168.1.1:9092
|
||
consumer:
|
||
group-id: response-group
|
||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
|
||
properties:
|
||
spring.json:
|
||
trusted.packages: '*'
|
||
```
|
||
|
||
```java
|
||
// 主类文件
|
||
@SpringBootApplication
|
||
public class ReceiverApplication {
|
||
|
||
private final KafkaProperties kProperties;
|
||
|
||
@Autowired
|
||
public ReceiverApplication(KafkaProperties props) {
|
||
this.kProperties = props;
|
||
}
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(SenderApplication.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public ConsumerFactory<String, Cargo> consumerFactory() {
|
||
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
|
||
// 以下三条语句与上面配置文件中的 consumer 的配置功能相同
|
||
// 择一使用即可,一般不建议在此进行硬编码
|
||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
|
||
return new DefaultKafkaProducerFactory<>(consumerProps);
|
||
}
|
||
|
||
// 这里有一个小坑,如果生成容器工厂的 Bean 方法名不是 kafkaListenerContainerFactory,
|
||
// 就必须将 Bean 的名称设置为 kafkaListenerContainerFactory,
|
||
// 否则将提示无法找到类型为 ConsumerFactory<Object, Object> 的 Bean,
|
||
// 但实际上是没有找到监听器容器工厂 Bean。
|
||
@Bean
|
||
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(consumerFactory());
|
||
return factory;
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
|
||
ConcurrentMessageListenerContainer<String, Cargo> container =
|
||
kafkaListenerContainerFactory().createContainer("RPC-Request");
|
||
container.getContainerProperties().setGroupId("replies");
|
||
container.setAutoStartup(false);
|
||
return container;
|
||
}
|
||
}
|
||
```
|
||
|
||
```java
|
||
// 功能类
|
||
@Component
|
||
@Slf4j
|
||
public class Receiver {
|
||
|
||
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
|
||
public void receive(Cargo cargo) {
|
||
log.info("Received: {}", cargo.getAction());
|
||
}
|
||
}
|
||
```
|
||
|
||
## 双向发送自定义对象
|
||
|
||
双向发送自定义对象实际上与双向发送字符串一样,需要将生产方和消费方结合起来,形成 RPC 的调用方和被调用方。在以下示例中,调用方和被调用方都采用如下的配置文件。
|
||
|
||
```yaml
|
||
spring:
|
||
kafka:
|
||
bootstrap-servers: 192.168.1.1:9092
|
||
producer:
|
||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
||
consumer:
|
||
# 针对调用方和被调用方,group-id 可以不相同,也尽量不要相同
|
||
group-id: response-group
|
||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
|
||
properties:
|
||
spring.json:
|
||
trusted.packages: '*'
|
||
```
|
||
|
||
以下是调用方的示例。
|
||
|
||
```java
|
||
// 主类文件
|
||
@SpringBootApplication
|
||
public class SenderApplication {
|
||
|
||
private final KafkaProperties kProperties;
|
||
|
||
@Autowired
|
||
public SenderApplication(KafkaProperties props) {
|
||
this.kProperties = props;
|
||
}
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(SenderApplication.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public ProducerFactory<String, Cargo> producerFactory() {
|
||
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
|
||
return new DefaultKafkaProducerFactory<>(producerProps);
|
||
}
|
||
|
||
@Bean
|
||
public ConsumerFactory<String, Cargo> consumerFactory() {
|
||
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
|
||
return new DefaultKafkaProducerFactory<>(consumerProps);
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(consumerFactory());
|
||
return factory;
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
|
||
ConcurrentMessageListenerContainer<String, Cargo> container =
|
||
kafkaListenerContainerFactory().createContainer("RPC-Response");
|
||
container.getContainerProperties().setGroupId("requests");
|
||
container.setAutoStartup(false);
|
||
return container;
|
||
}
|
||
|
||
@Bean
|
||
public ReplyingKafkaTemplate<String, Cargo, Cargo> replyingTemplate(
|
||
ProducerFactory<String, Cargo> factory,
|
||
ConcurrentMessageListenerContainer<String, Cargo> container
|
||
) {
|
||
return new ReplyingKafkaTemplate<>(factory, container);
|
||
}
|
||
}
|
||
```
|
||
|
||
```java
|
||
// 功能类
|
||
@Component
|
||
@Slf4j
|
||
public class Requester implements CommandLineRunner {
|
||
|
||
private final ReplyingKafkaTemplate<String, Cargo, Cargo> replyTemplate;
|
||
|
||
@Autowired
|
||
public Requester(ReplyingKafkaTemplate<String, Cargo, Cargo> template) {
|
||
this.replyTemplate = template;
|
||
}
|
||
|
||
@Override
|
||
public void run(String... args) throws Exception {
|
||
try {
|
||
Cargo load = Cargo.builder().action("request").build();
|
||
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
|
||
RequestReplyFuture<String, Cargo, Cargo> requestFuture = this.replyTemplate.sendAndReceive(request);
|
||
Cargo response = requestFuture.get().value();
|
||
log.info("Received: {}", response.getAction());
|
||
} catch (InterruptedException | ExecutionException e) {
|
||
log.error(e.getMessage());
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
以下是被调用方的示例。
|
||
|
||
```java
|
||
// 主类文件
|
||
@SpringBootApplication
|
||
public class ReceiverApplication {
|
||
|
||
private final KafkaProperties kProperties;
|
||
|
||
@Autowired
|
||
public ReceiverApplication(KafkaProperties props) {
|
||
this.kProperties = props;
|
||
}
|
||
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(SenderApplication.class, args);
|
||
}
|
||
|
||
@Bean
|
||
public ProducerFactory<String, Cargo> producerFactory() {
|
||
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
|
||
return new DefaultKafkaProducerFactory<>(producerProps);
|
||
}
|
||
|
||
@Bean
|
||
public ConsumerFactory<String, Cargo> consumerFactory() {
|
||
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
|
||
return new DefaultKafkaProducerFactory<>(consumerProps);
|
||
}
|
||
|
||
@Bean
|
||
public KafkaTemplate<String, Cargo> sendTemplate() {
|
||
return new KafkaTemplate<>(producerFactory());
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
|
||
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
|
||
new ConcurrentKafkaListenerContainerFactory<>();
|
||
factory.setConsumerFactory(consumerFactory());
|
||
factory.setReplyTemplate(sendTemplate());
|
||
return factory;
|
||
}
|
||
|
||
@Bean
|
||
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
|
||
ConcurrentMessageListenerContainer<String, Cargo> container =
|
||
kafkaListenerContainerFactory().createContainer("RPC-Request");
|
||
container.getContainerProperties().setGroupId("replies");
|
||
container.setAutoStartup(false);
|
||
return container;
|
||
}
|
||
}
|
||
```
|
||
|
||
```java
|
||
// 功能类
|
||
@Component
|
||
@Slf4j
|
||
public class Receiver {
|
||
|
||
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
|
||
@SendTo
|
||
public Cargo receive(Cargo cargo) {
|
||
log.info("Received: {}", cargo.getAction());
|
||
return Cargo.builder().action("response").build();
|
||
}
|
||
}
|
||
```
|