--- title: 使用Spring Boot搭配Kafka实现RPC调用 date: 2021-04-05 11:30:14 tags: [JVM, Spring Boot, Java, Kafka, RPC, 分布式通信] categories: - [JVM, Java] - [JVM, Spring] --- Kafka是目前十分流行的分布式消息队列,但是如何利用Kafka搭配Spring for Apache Kafka实现一个基于消息队列的RPC基础功能呢? ## 服务架构 Spring for Apache Kafka 中提供了以下几个概念来构建 Kafka 中的生产者和消费者。 * `ProducerFactory`,用于构建一个生产者实例的工厂类。 * `KafkaTemplate`,执行发送消息的功能类。 * `ReplyingKafkaTemplate`,具备发送消息和回收消息的功能类。 * `ConsumerFactory`,用于构建一个消费者的工厂类。 * `KafkaMessageListenerContainer`,用于持有消费者的容器类。 * `KafkaListenerContainerFactory`,用于构建只有消费者的容器的工厂类。 * `NewTopic`,程序运行时自动构建的 Topic,如果 Topic 已经存在则跳过构建。 * `Message`,用于承载对象的消息。 ### 生产方架构 生产方的架构十分简单,只需要在生产方的类构造函数中注入 `KafkaTemplate` Bean 即可。当不使用事务时,`ProducerFactory` 的默认实现 `DefaultKafkaProducerFactory` 会创建一个单例的生产者。 要创建一个 `ProducerFactory` 需要一个类型为 `Map` 的配置集,以及一个键序列化器和一个值序列化器。配置集中的各个配置项名称在 `ProducerConfig` 类中定义。`KafkaTemplate` 实例中可以注入一个 `RecordMessageConverter` 实例,用来对复杂的对象进行承载传输。 ```plantuml @startuml skinparam { componentStyle uml2 monochrome false shadowing false } hide fields class KafkaProperties { + buildProducerProperties() } class KafkaTemplate { + setMessageConverter(RecordMessageConverter) + send(String, K, V) + send(String, V) + send(ProducerRecord) + send(Message) } class KeySerializer { + serialize(String, Header, T) + serialize(String, T } class ValueSerializer { + serialize(String, Header, T) + serialize(String, T) } interface ProducerFactory { + createProducer() } interface MessageConverter { + commonHeaders() } interface RecordMessageConverter { + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type) + fromMessage(Message, String) } ProducerFactory --* KafkaTemplate KeySerializer --* ProducerFactory ValueSerializer --* ProducerFactory KafkaProperties --* ProducerFactory MessageConverter <|-- RecordMessageConverter RecordMessageConverter --* KafkaTemplate @enduml ``` ### 消费方架构 消费方的架构要略复杂,由于消费方需要对 Kafka 传递来的消息进行监听,所以需要将监听器(Listener)置入容器中,由容器负载并进行处理。常用的监听器接口主要有 `MessageListener` 和 `AcknowledgingMessageListener` 等,或者使用 `@KafkaListener` 注解标记处理方法或者处理类。容器根据功能需要,常用的则有两种 `KafkaMessageListenerContainer` 和 `ConcurrentMessageListenerContainer`,分别用于单线程监听和多线程监听。 与生产方相同,消费方也需要使用工厂类来创建消费方实例。消费方工厂类一般都实现了接口 `ConsumerFactory`,常用的是 `DefaultKafkaConsumerFactory`。监听容器的构建需要同时提供消费方工厂类实例和容器配置集。 ```plantuml @startuml skinparam { componentStyle uml2 monochrome false shadowing false } hide fields class ContainerProperties { + ContainerProperties(String...) + setMessageListener(MessageListener) } interface MessageListener { + onMessage(ConsumerRecord) } interface ConsumerFactory { + createConsumer() } class KafkaProperties { + buildConsumerProperties() } class KeyDeserializer { + deserialize(String, Header, T) + deserialize(String, T } class ValueDeserializer { + deserialize(String, Header, T) + deserialize(String, byte[]) } class KafkaMessageListenerContainer { # doStart() } interface KafkaListenerContainerFactory { + createContainer() } abstract class AbstractKafkaListenerContainerFactory { + setConsumerFactory(ConsumerFactory) + setMessageConverter(MessageConverter) } interface MessageConverter { + commonHeaders() } interface RecordMessageConverter { + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type) + fromMessage(Message, String) } MessageListener --* ContainerProperties KafkaProperties --* ConsumerFactory KeyDeserializer --* ConsumerFactory ValueDeserializer --* ConsumerFactory KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory ConsumerFactory --* AbstractKafkaListenerContainerFactory ContainerProperties --* AbstractKafkaListenerContainerFactory AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 > MessageConverter <|-- RecordMessageConverter RecordMessageConverter --* AbstractKafkaListenerContainerFactory @enduml ``` ### RPC 架构 在使用 Kafka 执行 RPC 调用时,被调用的消费方的建立与其他用途中没有太多区别,只是需要在 `AbstractKafkaListenerContainerFactory` 中加入一个用于发送消息的 `KafkaTemplate` 实例即可,并在使用 `@KafkaListener` 注解的监听器上增加 `@SendTo` 注解,并使监听器返回要发回的对象即可。但是生产方的配置就相应的要复杂许多了,除了要配置专用的 `ReplyingKafkaTemplate` 以外,还需要配置针对返回消息的消费方设置。 总起来说,在使用 RPC 调用时,无论调用方还是被调用方,都是一个集成了生产方和消费方的全功能 Kafka 客户端。 #### 调用方架构 ```plantuml @startuml skinparam { componentStyle uml2 monochrome false shadowing false } hide fields interface ProducerFactory { + createProducer() } interface RecordMessageConverter { + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type) + fromMessage(Message, String) } class ReplyingKafkaTemplate { + sendAndReceive(ProducerRecord) + sendAndReceive(ProducerRecord, Duration) } class KafkaMessageListenerContainer { # doStart() } interface KafkaListenerContainerFactory { + createContainer() } abstract class AbstractKafkaListenerContainerFactory { + setConsumerFactory(ConsumerFactory) + setMessageConverter(MessageConverter) } interface ConsumerFactory { + createConsumer() } interface MessageListener { + onMessage(ConsumerRecord) } class ContainerProperties { + ContainerProperties(String...) + setMessageListener(MessageListener) } MessageListener --* ContainerProperties RecordMessageConverter --* AbstractKafkaListenerContainerFactory ProducerFactory --* ReplyingKafkaTemplate ContainerProperties --* AbstractKafkaListenerContainerFactory KafkaListenerContainerFactory <|.. AbstractKafkaListenerContainerFactory KafkaMessageListenerContainer - AbstractKafkaListenerContainerFactory : 生成 < ConsumerFactory --* AbstractKafkaListenerContainerFactory KafkaMessageListenerContainer --* ReplyingKafkaTemplate @enduml ``` #### 被调用方架构 ```plantuml @startuml skinparam { componentStyle uml2 monochrome false shadowing false } hide fields class ContainerProperties { + ContainerProperties(String...) + setMessageListener(MessageListener) } interface MessageListener { + onMessage(ConsumerRecord) } interface ConsumerFactory { + createConsumer() } class KafkaMessageListenerContainer { # doStart() } interface KafkaListenerContainerFactory { + createContainer() } abstract class AbstractKafkaListenerContainerFactory { + setConsumerFactory(ConsumerFactory) + setMessageConverter(MessageConverter) + setReplyTemplate(KafkaTemplate) } interface RecordMessageConverter { + toMessage(ConsumeRecord, Acknowledgement, Consumer, Type) + fromMessage(Message, String) } class KafkaTemplate { + setMessageConverter(RecordMessageConverter) + send(String, K, V) + send(String, V) + send(ProducerRecord) + send(Message) } MessageListener --* ContainerProperties KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory ConsumerFactory --* AbstractKafkaListenerContainerFactory ContainerProperties --* AbstractKafkaListenerContainerFactory AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 > RecordMessageConverter --* AbstractKafkaListenerContainerFactory RecordMessageConverter --* KafkaTemplate KafkaTemplate --* AbstractKafkaListenerContainerFactory @enduml ``` ## 配置项内容的获取 Spring Boot 所接管的配置项内容可以通过依赖注入获取,而不必像说明手册中描述的一样需要在程序中手动置入。要获取 Kafka 的配置,只需要声明并注入一个 `KafkaProperties` 类型的属性即可。 ## 单向发送字符串 单向发送功能需要在发送方创建 `KafkaTemplate` 的实例,需要注意的是, Spring Boot 已经内置提供了 `KafkaTemplate` 的 Bean,对于字符串信息可以直接发送。 以下是发送方的示例。 ```java @Component public class MessageProducer { private final KafkaTemplate template; @Autowired public MessageProducer(KafkaTemplate kafkaTemplate) { this.template = kafkaTemplate; } public void sendMessage(String message) { this.template.send("some-topic", message); } } ``` 以下是消费方的示例。 ```java @Component @Slf4j public class MessageConsumer { @KafkaListener(id = "client_grp", topic = "some-topic") public void consumeMessage(String message) { log.info(message); } } ``` ## 双向发送字符串 与单向发送字符串功能相似,针对字符串的发送和接收,Spring for Kafka 已经提供了许多已经配置好的现成 Bean 可供使用,但是需要注意的是,RPC 调用方的 `ReplyingKafkaTemplate` 是需要手工配置的。 以下是调用方的示例。 ```java // 应用主类 @SpringBootApplication public class RpcRequestApplication { private final KafkaProperties kProperties; @Autowired public RpcRequestApplication( KafkaProperties properties ) { this.kProperties = properties; } public static void main(String[] args) { SpringApplication.run(RpcRequestApplication.class, args).close(); } @Bean public ReplyingKafkaTemplate replyTemplate( ProducerFactory factory, ConcurrentMessageListenerContainer repliesContainer ) { return new ReplyingKafkaTemplate<>(factory, repliesContainer); } @Bean public ProducerFactory producerFactory() { Map props = this.kProperties.buildProducerProperties(); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory containerFactory( KafkaTemplate kafkaTemplate ) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setReplyTemplate(kafkaTemplate); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>( this.kProperties.buildConsumerProperties(), StringDeserializer::new, StringDeserializer::new ); } @Bean public ConcurrentMessageListenerContainer messageContainer( ConcurrentKafkaListenerContainerFactory factory ) { ConcurrentMessageListenerContainer container = factory.createContainer("RPC-Response"); container.getContainerProperties().setGroupId("replies"); container.setAutoStartup(false); return container; } @Bean public NewTopic rpcRequestTopic() { return TopicBuilder.name("RPC-Request") .partitions(1) .replicas(3) .build(); } @Bean public NewTopic rpcReplyTopic() { return TopicBuilder.name("RPC-Response") .partitions(1) .replicas(3) .build(); } } ``` ```java // 功能类 @Component @Slf4j public class RpcRequester implements CommandLineRunner { private final ReplyingKafkaTemplate template; @Autowired public RpcRequester( ReplyingKafkaTemplate template ) { this.template = template; } @Override public void run(String... args) throws Exception { try { RequestReplyFuture reply = this.template.sendAndReceive( new ProducerRecord<>("RPC-Request", "greeting") ); String result = reply.get().value(); log.info("Hello from " + result); } catch (InterruptedException | ExecutionException e) { log.error(e.getMessage()); } } } ``` 以下是响应方的示例。 ```java @Component @Slf4j public class RpcReplier { @KafkaListener(id="rpc-server", topic="RPC-Request") @SendTo public String replyGreeting(String message) { log.info("Requester send: " + message); return "Replier"; } } ``` 在响应方中,与单向发送唯一的不同是添加了 `@SendTo` 注解并在监听器上增加了返回值类型。 ## 单向发送自定义对象 单向发送自定义对象需要自行配置完整的业务链条,其中生产方需要配置 `ProducerFactory`、`KafkaTemplate`,而消费方则需要配置 `ConsumerFactory`,以及监听器容器工厂和容器。 以下给出生产方的示例代码。 ```yaml # 生产方配置文件 spring: kafka: bootstrap-servers: 192.168.1.1:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer ``` ```java // 自定义载荷类 @Data @Builder public class Cargo { @NonNull private final String action; private Object payload; public Cargo( @JsonProperty("action") String action, @JsonProperty("payload") @Nullable Object payload ) { this.action = action; this.payload = payload; } } ``` ```java // 主类文件 @SpringBootApplication public class SenderApplication { private final KafkaProperties kProperties; @Autowired public SenderApplication(KafkaProperties props) { this.kProperties = props; } public static void main(String[] args) { SpringApplication.run(SenderApplication.class, args); } @Bean public ProducerFactory producerFactory() { Map producerProps = this.kProperties.buildProducerProperties(); // 以下两条语句与上面配置文件中的 producer 的配置功能相同 // 择一使用即可,一般不建议在此进行硬编码 producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(producerProps); } @Bean public KafkaTemplate sendTemplate() { return new KafkaTemplate<>(producerFactory()); } } ``` ```java // 功能类 @Component @Slf4j public class Requester implements CommandLineRunner { private final KafkaTemplate sendTemplate; @Autowired public Requester(KafkaTemplate template) { this.sendTemplate = template; } @Override public void run(String... args) throws Exception { Cargo load = Cargo.builder().action("test").build(); ProducerRecord request = new ProducerRecord<>("RPC-Request", load); this.sendTemplate.send(request); log.info("Custom package sent."); } } ``` 以下是消费方示例代码。 ```yaml # 消费方配置文件 spring: kafka: bootstrap-servers: 192.168.1.1:9092 consumer: group-id: response-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json: trusted.packages: '*' ``` ```java // 主类文件 @SpringBootApplication public class ReceiverApplication { private final KafkaProperties kProperties; @Autowired public ReceiverApplication(KafkaProperties props) { this.kProperties = props; } public static void main(String[] args) { SpringApplication.run(SenderApplication.class, args); } @Bean public ConsumerFactory consumerFactory() { Map consumerProps = this.kProperties.buildConsumerProperties(); // 以下三条语句与上面配置文件中的 consumer 的配置功能相同 // 择一使用即可,一般不建议在此进行硬编码 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaProducerFactory<>(consumerProps); } // 这里有一个小坑,如果生成容器工厂的 Bean 方法名不是 kafkaListenerContainerFactory, // 就必须将 Bean 的名称设置为 kafkaListenerContainerFactory, // 否则将提示无法找到类型为 ConsumerFactory 的 Bean, // 但实际上是没有找到监听器容器工厂 Bean。 @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConcurrentMessageListenerContainer cargoListenerContainer() { ConcurrentMessageListenerContainer container = kafkaListenerContainerFactory().createContainer("RPC-Request"); container.getContainerProperties().setGroupId("replies"); container.setAutoStartup(false); return container; } } ``` ```java // 功能类 @Component @Slf4j public class Receiver { @KafkaListener(id = "rpc-server", topics = "RPC-Request") public void receive(Cargo cargo) { log.info("Received: {}", cargo.getAction()); } } ``` ## 双向发送自定义对象 双向发送自定义对象实际上与双向发送字符串一样,需要将生产方和消费方结合起来,形成 RPC 的调用方和被调用方。在以下示例中,调用方和被调用方都采用如下的配置文件。 ```yaml spring: kafka: bootstrap-servers: 192.168.1.1:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: # 针对调用方和被调用方,group-id 可以不相同,也尽量不要相同 group-id: response-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json: trusted.packages: '*' ``` 以下是调用方的示例。 ```java // 主类文件 @SpringBootApplication public class SenderApplication { private final KafkaProperties kProperties; @Autowired public SenderApplication(KafkaProperties props) { this.kProperties = props; } public static void main(String[] args) { SpringApplication.run(SenderApplication.class, args); } @Bean public ProducerFactory producerFactory() { Map producerProps = this.kProperties.buildProducerProperties(); return new DefaultKafkaProducerFactory<>(producerProps); } @Bean public ConsumerFactory consumerFactory() { Map consumerProps = this.kProperties.buildConsumerProperties(); return new DefaultKafkaProducerFactory<>(consumerProps); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConcurrentMessageListenerContainer cargoListenerContainer() { ConcurrentMessageListenerContainer container = kafkaListenerContainerFactory().createContainer("RPC-Response"); container.getContainerProperties().setGroupId("requests"); container.setAutoStartup(false); return container; } @Bean public ReplyingKafkaTemplate replyingTemplate( ProducerFactory factory, ConcurrentMessageListenerContainer container ) { return new ReplyingKafkaTemplate<>(factory, container); } } ``` ```java // 功能类 @Component @Slf4j public class Requester implements CommandLineRunner { private final ReplyingKafkaTemplate replyTemplate; @Autowired public Requester(ReplyingKafkaTemplate template) { this.replyTemplate = template; } @Override public void run(String... args) throws Exception { try { Cargo load = Cargo.builder().action("request").build(); ProducerRecord request = new ProducerRecord<>("RPC-Request", load); RequestReplyFuture requestFuture = this.replyTemplate.sendAndReceive(request); Cargo response = requestFuture.get().value(); log.info("Received: {}", response.getAction()); } catch (InterruptedException | ExecutionException e) { log.error(e.getMessage()); } } } ``` 以下是被调用方的示例。 ```java // 主类文件 @SpringBootApplication public class ReceiverApplication { private final KafkaProperties kProperties; @Autowired public ReceiverApplication(KafkaProperties props) { this.kProperties = props; } public static void main(String[] args) { SpringApplication.run(SenderApplication.class, args); } @Bean public ProducerFactory producerFactory() { Map producerProps = this.kProperties.buildProducerProperties(); return new DefaultKafkaProducerFactory<>(producerProps); } @Bean public ConsumerFactory consumerFactory() { Map consumerProps = this.kProperties.buildConsumerProperties(); return new DefaultKafkaProducerFactory<>(consumerProps); } @Bean public KafkaTemplate sendTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setReplyTemplate(sendTemplate()); return factory; } @Bean public ConcurrentMessageListenerContainer cargoListenerContainer() { ConcurrentMessageListenerContainer container = kafkaListenerContainerFactory().createContainer("RPC-Request"); container.getContainerProperties().setGroupId("replies"); container.setAutoStartup(false); return container; } } ``` ```java // 功能类 @Component @Slf4j public class Receiver { @KafkaListener(id = "rpc-server", topics = "RPC-Request") @SendTo public Cargo receive(Cargo cargo) { log.info("Received: {}", cargo.getAction()); return Cargo.builder().action("response").build(); } } ```