blog/source/_posts/spring-boot-kafka-rpc.md
2021-04-08 09:45:39 +08:00

776 lines
25 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: 使用Spring Boot搭配Kafka实现RPC调用
date: 2021-04-05 11:30:14
tags: [JVM, Spring Boot, Java, Kafka, RPC, 分布式通信]
categories:
- [JVM, Java]
- [JVM, Spring]
---
Kafka是目前十分流行的分布式消息队列但是如何利用Kafka搭配Spring for Apache Kafka实现一个基于消息队列的RPC基础功能呢
<!--more-->
## 服务架构
Spring for Apache Kafka 中提供了以下几个概念来构建 Kafka 中的生产者和消费者。
* `ProducerFactory<K, V>`,用于构建一个生产者实例的工厂类。
* `KafkaTemplate<K, V>`,执行发送消息的功能类。
* `ReplyingKafkaTemplate<K, V, R>`,具备发送消息和回收消息的功能类。
* `ConsumerFactory<K, V>`,用于构建一个消费者的工厂类。
* `KafkaMessageListenerContainer<K, V>`,用于持有消费者的容器类。
* `KafkaListenerContainerFactory<K, V>`,用于构建只有消费者的容器的工厂类。
* `NewTopic`,程序运行时自动构建的 Topic如果 Topic 已经存在则跳过构建。
* `Message<?>`,用于承载对象的消息。
### 生产方架构
生产方的架构十分简单,只需要在生产方的类构造函数中注入 `KafkaTemplate<K, V>` Bean 即可。当不使用事务时,`ProducerFactory` 的默认实现 `DefaultKafkaProducerFactory` 会创建一个单例的生产者。
要创建一个 `ProducerFactory` 需要一个类型为 `Map<String, Object>` 的配置集,以及一个键序列化器和一个值序列化器。配置集中的各个配置项名称在 `ProducerConfig` 类中定义。`KafkaTemplate<K, V>` 实例中可以注入一个 `RecordMessageConverter` 实例,用来对复杂的对象进行承载传输。
```plantuml
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
}
hide fields
class KafkaProperties {
+ buildProducerProperties()
}
class KafkaTemplate {
+ setMessageConverter(RecordMessageConverter)
+ send(String, K, V)
+ send(String, V)
+ send(ProducerRecord)
+ send(Message)
}
class KeySerializer {
+ serialize(String, Header, T)
+ serialize(String, T
}
class ValueSerializer {
+ serialize(String, Header, T)
+ serialize(String, T)
}
interface ProducerFactory {
+ createProducer()
}
interface MessageConverter {
+ commonHeaders()
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
ProducerFactory --* KafkaTemplate
KeySerializer --* ProducerFactory
ValueSerializer --* ProducerFactory
KafkaProperties --* ProducerFactory
MessageConverter <|-- RecordMessageConverter
RecordMessageConverter --* KafkaTemplate
@enduml
```
### 消费方架构
消费方的架构要略复杂,由于消费方需要对 Kafka 传递来的消息进行监听所以需要将监听器Listener置入容器中由容器负载并进行处理。常用的监听器接口主要有 `MessageListener<K, V>``AcknowledgingMessageListener<K, V>` 等,或者使用 `@KafkaListener` 注解标记处理方法或者处理类。容器根据功能需要,常用的则有两种 `KafkaMessageListenerContainer``ConcurrentMessageListenerContainer`,分别用于单线程监听和多线程监听。
与生产方相同,消费方也需要使用工厂类来创建消费方实例。消费方工厂类一般都实现了接口 `ConsumerFactory<K ,V>`,常用的是 `DefaultKafkaConsumerFactory<K ,V>`。监听容器的构建需要同时提供消费方工厂类实例和容器配置集。
```plantuml
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
}
hide fields
class ContainerProperties {
+ ContainerProperties(String...)
+ setMessageListener(MessageListener)
}
interface MessageListener {
+ onMessage(ConsumerRecord)
}
interface ConsumerFactory {
+ createConsumer()
}
class KafkaProperties {
+ buildConsumerProperties()
}
class KeyDeserializer {
+ deserialize(String, Header, T)
+ deserialize(String, T
}
class ValueDeserializer {
+ deserialize(String, Header, T)
+ deserialize(String, byte[])
}
class KafkaMessageListenerContainer {
# doStart()
}
interface KafkaListenerContainerFactory {
+ createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
+ setConsumerFactory(ConsumerFactory)
+ setMessageConverter(MessageConverter)
}
interface MessageConverter {
+ commonHeaders()
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
MessageListener --* ContainerProperties
KafkaProperties --* ConsumerFactory
KeyDeserializer --* ConsumerFactory
ValueDeserializer --* ConsumerFactory
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
ConsumerFactory --* AbstractKafkaListenerContainerFactory
ContainerProperties --* AbstractKafkaListenerContainerFactory
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
MessageConverter <|-- RecordMessageConverter
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
@enduml
```
### RPC 架构
在使用 Kafka 执行 RPC 调用时,被调用的消费方的建立与其他用途中没有太多区别,只是需要在 `AbstractKafkaListenerContainerFactory<C, K, V>` 中加入一个用于发送消息的 `KafkaTemplate<K, V>` 实例即可,并在使用 `@KafkaListener` 注解的监听器上增加 `@SendTo` 注解,并使监听器返回要发回的对象即可。但是生产方的配置就相应的要复杂许多了,除了要配置专用的 `ReplyingKafkaTemplate<K, V, R>` 以外,还需要配置针对返回消息的消费方设置。
总起来说,在使用 RPC 调用时,无论调用方还是被调用方,都是一个集成了生产方和消费方的全功能 Kafka 客户端。
#### 调用方架构
```plantuml
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
}
hide fields
interface ProducerFactory {
+ createProducer()
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
class ReplyingKafkaTemplate {
+ sendAndReceive(ProducerRecord)
+ sendAndReceive(ProducerRecord, Duration)
}
class KafkaMessageListenerContainer {
# doStart()
}
interface KafkaListenerContainerFactory {
+ createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
+ setConsumerFactory(ConsumerFactory)
+ setMessageConverter(MessageConverter)
}
interface ConsumerFactory {
+ createConsumer()
}
interface MessageListener {
+ onMessage(ConsumerRecord)
}
class ContainerProperties {
+ ContainerProperties(String...)
+ setMessageListener(MessageListener)
}
MessageListener --* ContainerProperties
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
ProducerFactory --* ReplyingKafkaTemplate
ContainerProperties --* AbstractKafkaListenerContainerFactory
KafkaListenerContainerFactory <|.. AbstractKafkaListenerContainerFactory
KafkaMessageListenerContainer - AbstractKafkaListenerContainerFactory : 生成 <
ConsumerFactory --* AbstractKafkaListenerContainerFactory
KafkaMessageListenerContainer --* ReplyingKafkaTemplate
@enduml
```
#### 被调用方架构
```plantuml
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
}
hide fields
class ContainerProperties {
+ ContainerProperties(String...)
+ setMessageListener(MessageListener)
}
interface MessageListener {
+ onMessage(ConsumerRecord)
}
interface ConsumerFactory {
+ createConsumer()
}
class KafkaMessageListenerContainer {
# doStart()
}
interface KafkaListenerContainerFactory {
+ createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
+ setConsumerFactory(ConsumerFactory)
+ setMessageConverter(MessageConverter)
+ setReplyTemplate(KafkaTemplate)
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
class KafkaTemplate {
+ setMessageConverter(RecordMessageConverter)
+ send(String, K, V)
+ send(String, V)
+ send(ProducerRecord)
+ send(Message)
}
MessageListener --* ContainerProperties
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
ConsumerFactory --* AbstractKafkaListenerContainerFactory
ContainerProperties --* AbstractKafkaListenerContainerFactory
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
RecordMessageConverter --* KafkaTemplate
KafkaTemplate --* AbstractKafkaListenerContainerFactory
@enduml
```
## 配置项内容的获取
Spring Boot 所接管的配置项内容可以通过依赖注入获取,而不必像说明手册中描述的一样需要在程序中手动置入。要获取 Kafka 的配置,只需要声明并注入一个 `KafkaProperties` 类型的属性即可。
## 单向发送字符串
单向发送功能需要在发送方创建 `KafkaTemplate<K, V>` 的实例,需要注意的是, Spring Boot 已经内置提供了 `KafkaTemplate<String, String>` 的 Bean对于字符串信息可以直接发送。
以下是发送方的示例。
```java
@Component
public class MessageProducer {
private final KafkaTemplate<String, String> template;
@Autowired
public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.template = kafkaTemplate;
}
public void sendMessage(String message) {
this.template.send("some-topic", message);
}
}
```
以下是消费方的示例。
```java
@Component
@Slf4j
public class MessageConsumer {
@KafkaListener(id = "client_grp", topic = "some-topic")
public void consumeMessage(String message) {
log.info(message);
}
}
```
## 双向发送字符串
与单向发送字符串功能相似针对字符串的发送和接收Spring for Kafka 已经提供了许多已经配置好的现成 Bean 可供使用但是需要注意的是RPC 调用方的 `ReplyingKafkaTemplate<K, V, R>` 是需要手工配置的。
以下是调用方的示例。
```java
// 应用主类
@SpringBootApplication
public class RpcRequestApplication {
private final KafkaProperties kProperties;
@Autowired
public RpcRequestApplication(
KafkaProperties properties
) {
this.kProperties = properties;
}
public static void main(String[] args) {
SpringApplication.run(RpcRequestApplication.class, args).close();
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyTemplate(
ProducerFactory<String, String> factory,
ConcurrentMessageListenerContainer<String, String> repliesContainer
) {
return new ReplyingKafkaTemplate<>(factory, repliesContainer);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = this.kProperties.buildProducerProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(
KafkaTemplate<String, String> kafkaTemplate
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
this.kProperties.buildConsumerProperties(),
StringDeserializer::new,
StringDeserializer::new
);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory
) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("RPC-Response");
container.getContainerProperties().setGroupId("replies");
container.setAutoStartup(false);
return container;
}
@Bean
public NewTopic rpcRequestTopic() {
return TopicBuilder.name("RPC-Request")
.partitions(1)
.replicas(3)
.build();
}
@Bean
public NewTopic rpcReplyTopic() {
return TopicBuilder.name("RPC-Response")
.partitions(1)
.replicas(3)
.build();
}
}
```
```java
// 功能类
@Component
@Slf4j
public class RpcRequester implements CommandLineRunner {
private final ReplyingKafkaTemplate<String, String> template;
@Autowired
public RpcRequester(
ReplyingKafkaTemplate<String, String> template
) {
this.template = template;
}
@Override
public void run(String... args) throws Exception {
try {
RequestReplyFuture<String, String, String> reply = this.template.sendAndReceive(
new ProducerRecord<>("RPC-Request", "greeting")
);
String result = reply.get().value();
log.info("Hello from " + result);
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage());
}
}
}
```
以下是响应方的示例。
```java
@Component
@Slf4j
public class RpcReplier {
@KafkaListener(id="rpc-server", topic="RPC-Request")
@SendTo
public String replyGreeting(String message) {
log.info("Requester send: " + message);
return "Replier";
}
}
```
在响应方中,与单向发送唯一的不同是添加了 `@SendTo` 注解并在监听器上增加了返回值类型。
## 单向发送自定义对象
单向发送自定义对象需要自行配置完整的业务链条,其中生产方需要配置 `ProducerFactory<K, V>`、`KafkaTemplate<K, V>`,而消费方则需要配置 `ConsumerFactory<K, V>`,以及监听器容器工厂和容器。
以下给出生产方的示例代码。
```yaml
# 生产方配置文件
spring:
kafka:
bootstrap-servers: 192.168.1.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
```
```java
// 自定义载荷类
@Data
@Builder
public class Cargo {
@NonNull private final String action;
private Object payload;
public Cargo(
@JsonProperty("action") String action,
@JsonProperty("payload") @Nullable Object payload
) {
this.action = action;
this.payload = payload;
}
}
```
```java
// 主类文件
@SpringBootApplication
public class SenderApplication {
private final KafkaProperties kProperties;
@Autowired
public SenderApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ProducerFactory<String, Cargo> producerFactory() {
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
// 以下两条语句与上面配置文件中的 producer 的配置功能相同
// 择一使用即可,一般不建议在此进行硬编码
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public KafkaTemplate<String, Cargo> sendTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
```
```java
// 功能类
@Component
@Slf4j
public class Requester implements CommandLineRunner {
private final KafkaTemplate<String, Cargo> sendTemplate;
@Autowired
public Requester(KafkaTemplate<String, Cargo> template) {
this.sendTemplate = template;
}
@Override
public void run(String... args) throws Exception {
Cargo load = Cargo.builder().action("test").build();
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
this.sendTemplate.send(request);
log.info("Custom package sent.");
}
}
```
以下是消费方示例代码。
```yaml
# 消费方配置文件
spring:
kafka:
bootstrap-servers: 192.168.1.1:9092
consumer:
group-id: response-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json:
trusted.packages: '*'
```
```java
// 主类文件
@SpringBootApplication
public class ReceiverApplication {
private final KafkaProperties kProperties;
@Autowired
public ReceiverApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ConsumerFactory<String, Cargo> consumerFactory() {
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
// 以下三条语句与上面配置文件中的 consumer 的配置功能相同
// 择一使用即可,一般不建议在此进行硬编码
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaProducerFactory<>(consumerProps);
}
// 这里有一个小坑,如果生成容器工厂的 Bean 方法名不是 kafkaListenerContainerFactory
// 就必须将 Bean 的名称设置为 kafkaListenerContainerFactory
// 否则将提示无法找到类型为 ConsumerFactory<Object, Object> 的 Bean
// 但实际上是没有找到监听器容器工厂 Bean。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
ConcurrentMessageListenerContainer<String, Cargo> container =
kafkaListenerContainerFactory().createContainer("RPC-Request");
container.getContainerProperties().setGroupId("replies");
container.setAutoStartup(false);
return container;
}
}
```
```java
// 功能类
@Component
@Slf4j
public class Receiver {
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
public void receive(Cargo cargo) {
log.info("Received: {}", cargo.getAction());
}
}
```
## 双向发送自定义对象
双向发送自定义对象实际上与双向发送字符串一样,需要将生产方和消费方结合起来,形成 RPC 的调用方和被调用方。在以下示例中,调用方和被调用方都采用如下的配置文件。
```yaml
spring:
kafka:
bootstrap-servers: 192.168.1.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
# 针对调用方和被调用方group-id 可以不相同,也尽量不要相同
group-id: response-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json:
trusted.packages: '*'
```
以下是调用方的示例。
```java
// 主类文件
@SpringBootApplication
public class SenderApplication {
private final KafkaProperties kProperties;
@Autowired
public SenderApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ProducerFactory<String, Cargo> producerFactory() {
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public ConsumerFactory<String, Cargo> consumerFactory() {
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
return new DefaultKafkaProducerFactory<>(consumerProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
ConcurrentMessageListenerContainer<String, Cargo> container =
kafkaListenerContainerFactory().createContainer("RPC-Response");
container.getContainerProperties().setGroupId("requests");
container.setAutoStartup(false);
return container;
}
@Bean
public ReplyingKafkaTemplate<String, Cargo, Cargo> replyingTemplate(
ProducerFactory<String, Cargo> factory,
ConcurrentMessageListenerContainer<String, Cargo> container
) {
return new ReplyingKafkaTemplate<>(factory, container);
}
}
```
```java
// 功能类
@Component
@Slf4j
public class Requester implements CommandLineRunner {
private final ReplyingKafkaTemplate<String, Cargo, Cargo> replyTemplate;
@Autowired
public Requester(ReplyingKafkaTemplate<String, Cargo, Cargo> template) {
this.replyTemplate = template;
}
@Override
public void run(String... args) throws Exception {
try {
Cargo load = Cargo.builder().action("request").build();
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
RequestReplyFuture<String, Cargo, Cargo> requestFuture = this.replyTemplate.sendAndReceive(request);
Cargo response = requestFuture.get().value();
log.info("Received: {}", response.getAction());
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage());
}
}
}
```
以下是被调用方的示例。
```java
// 主类文件
@SpringBootApplication
public class ReceiverApplication {
private final KafkaProperties kProperties;
@Autowired
public ReceiverApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ProducerFactory<String, Cargo> producerFactory() {
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public ConsumerFactory<String, Cargo> consumerFactory() {
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
return new DefaultKafkaProducerFactory<>(consumerProps);
}
@Bean
public KafkaTemplate<String, Cargo> sendTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(sendTemplate());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
ConcurrentMessageListenerContainer<String, Cargo> container =
kafkaListenerContainerFactory().createContainer("RPC-Request");
container.getContainerProperties().setGroupId("replies");
container.setAutoStartup(false);
return container;
}
}
```
```java
// 功能类
@Component
@Slf4j
public class Receiver {
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
@SendTo
public Cargo receive(Cargo cargo) {
log.info("Received: {}", cargo.getAction());
return Cargo.builder().action("response").build();
}
}
```