359 lines
22 KiB
Markdown
359 lines
22 KiB
Markdown
---
|
||
title: 快速上手Akka
|
||
tags:
|
||
- JVM
|
||
- Java
|
||
- Actor
|
||
- 高并发
|
||
- 分布式
|
||
- Akka
|
||
- Spring
|
||
categories:
|
||
- - JVM
|
||
- Java
|
||
- - JVM
|
||
- Spring
|
||
keywords: 'Java,Akka,Actor,高并发,分布式,并发编程,模式,Spring'
|
||
date: 2021-05-20 12:08:22
|
||
---
|
||
|
||
Akka是一个构建分布式高并发,带有容错能力的事件驱动的工具包。Akka有Java和Scala两种实现,是Actor模式的常用实现。在Akka中,Actors是基础执行单元,并且也拥有Actor模式中所涉及的一个Actor要具有的所有元素。<!-- more -->
|
||
|
||
关于Actor模式可以参考本站文章:{% post_link actor-pattern %}
|
||
|
||
Akka框架有两个实现,目前在网络中出现最多的是Akka Classic。Akka Classic一般会使用`AbstractActor`或者`UntypedAbstractActor`来定义Actor,这带来的坏处是Actor不能区分所发来消息的类型,也就是Actor所接受的消息不是指定类型的,Actor可以接受任意类型的消息,甚至是`Object`类型。在这种自由类型作为消息的实现模式下,一旦系统规模变大,那么系统中的Actor就会变得难以控制。并且Akka会将Actor无法处理的消息自动转给`unhandled`方法,以保证所有的消息都能得到处理,这也就吞噬了一些潜在异常和逻辑错误。
|
||
|
||
为了引入一种通讯协议机制,Akka推出了改进以后的Akka Typed实现。明确指定Actor所能够处理的消息类型,为Akka带来了强制通讯协议的支持。所以在使用Akka Typed实现进行系统设计时,就需要采用“协议优先”的设计方法,先从Actor能够处理的消息类型出发开始设计。
|
||
|
||
虽然Actor模式能够支持数量众多的并发系统设计,但是如果功能粒度切分过细,也同样会引入不必要的复杂度。
|
||
|
||
## 实现自己的Actor
|
||
|
||
Actor是Akka框架中的核心元素,也是需要定义的数量最为庞大的内容。在Akka Classic实现中,通常都是使用静态内部类来定义Actor所使用的消息类,但是在Akka Typed实现中,消息类的定义,还是推荐独立出来。
|
||
|
||
### 定义消息协议
|
||
|
||
根据“协议优先”的设计原则,要开始定义一个Actor模式的系统,需要首先从Actor之间的交互开始定义。Actor之间用来进行交互的消息主要其实主要有两种:发送和接收,也就是很多示例中常见的Request和Response。
|
||
|
||
在请求消息的设计上,一般除了会包括消息需要承载的内容以外,通常还会包括消息的发出方。在Akka中,对于其他Actor的引用一般都不直接使用对方Actor的明确类型,而是使用Actor所能够接受的消息类型利用`ActorRef<T>`引用类型来引用。Akka框架的这种设计可以充分的使Actor之间解耦。
|
||
|
||
!!! warning ""
|
||
在Akka框架中,除了要明确创建子Actor,一般很少会直接使用Actor的类名,都是使用Actor能够接受的消息类名。
|
||
|
||
为了能够达到通用设计,在设计Actor的消息协议的时候,一般会采用定义空接口作为通用的消息类型,各个Actor中所使用的消息类型均从空接口中派生。这样的设计可以使Actor与消息之间的耦合大大降低。例如Request和Response可以这么设计。
|
||
|
||
```java
|
||
public interface Command {}
|
||
|
||
public class Request<T> implements Command {
|
||
private T payload;
|
||
private ActorRef<Command> sender;
|
||
|
||
// getter和setter先省略了
|
||
}
|
||
|
||
public class Response<T> implements Command {
|
||
private T payload;
|
||
private ActorRef<Command> responder;
|
||
|
||
// getter和setter同样先省略
|
||
}
|
||
```
|
||
|
||
在网络能够见到还有一种将Request和Response类定义在接口中的写法,这样也是可以的,而且比较节省文件数量(Java会在乎这点儿文件数量么?)。转换成这样的写法如下例所示。
|
||
|
||
```java
|
||
public interface Command {
|
||
class Request<T> implements Command {
|
||
private T payload;
|
||
private ActorRef<Command> sender;
|
||
|
||
// getter和setter都已经省略
|
||
}
|
||
|
||
class Response<T> implements Command {
|
||
private T payload;
|
||
private ActorRef<Command> responder;
|
||
|
||
// getter和setter都已经省略
|
||
}
|
||
}
|
||
```
|
||
|
||
!!! note ""
|
||
像上例中这样在接口中定义的类,默认是静态的,实际上就是一个定义在接口里的静态内部类。这样定义的目的主要是将功能用途相近的内容集中放置。
|
||
|
||
这样定义出来的Request和Response在使用的时候就需要使用`Command.Request<T>`和`Command.Response<T>`的形式了。
|
||
|
||
在以上两个示例中,可以发现一个很相似的位置就是Request和Response都使用`ActorRef<Command>`引用了消息的发送者。这也是Actor模式在实际使用中的一些习惯,通过`ActorRef<Command>`的定义,Actor可以从消息中获得消息的发送来源。如果Actor的逻辑是需要回复的,那么就可以直接利用消息中附带的发送者引用,直接将所需要发送的数据返回发送者。而在消息体中所有的具名类型,都明确的指示了需要在Actor之间传递什么样的消息。虽然不再像Akka Classic那么自由,但是Actor之间交互所使用的协议变得更加安全了,消息的处理也变得更加明确了。
|
||
|
||
### Actor Context
|
||
|
||
Actor Context顾名思义,是一个Actor运行的上下文环境。这个上下文环境通常被用来做以下这些事情。
|
||
|
||
- 创建子Actor并进行监管。
|
||
- 监视其他的Actor收到的终止事件。
|
||
- 记录和输出日志。
|
||
- 创建消息适配器。
|
||
- 利用Ask与其他的Actor进行交互。
|
||
- 动态获取Actor自身的引用。
|
||
|
||
要在Actor中使用Actor Context,可以将其作为Actor构造方法的参数来将其传入。例如常用的Actor的基类`AbstractBehavior`就需要使用`ActorContext`来进行实例化。
|
||
|
||
### 常用的Actor种类
|
||
|
||
与Akka Classic中所使用的`AbstractActor`和`AbstractUntypedActor`不同,Akka Typed使用`Behavior<T>`作为Actor的代表。在Akka Typed中,一个`Behavior`就代表了一个处理信息的行为,而在一个`Behavior`中还可以通过返回另一个`Behavior`来表明如何处理下一个消息。
|
||
|
||
在一般的情况下,要定义一个Actor,大多都是从`AbstractBehavior<T>`类继承来的。除了可以自定义Behavior以外,Akka Typed还通过`Behaviors`类提供了若干拥有固定行为的Actor可供使用。
|
||
|
||
- `Behaviors.empty()`,会将所有发送来的消息都处理为unhandled。
|
||
- `Behaviors.ignore()`,会忽略所有发送来的消息。
|
||
- `Behaviors.stopped()`,使当前的Actor终止。
|
||
- `Behaviors.unhandled()`,建议系统使用之前的Behavior,包括未处理的行为。
|
||
- `Behaviors.same()`,建议系统使用之前的Behavior。
|
||
|
||
!!! note ""
|
||
从`ExtensibleBehavior<T>`类继承实现Actor是另一种更加原始的选择,但是这样会丢掉`AbstractBehavor<T>`提供的更加便利的操作方法。
|
||
|
||
### Actor的基本构成
|
||
|
||
一个Actor完成处理一个消息是通过返回一个新的Behavior来实现的,而Actor里面要实现这一行为最核心的方法就是`ExtensibleBehavior<T>`中提供的`receive()`方法和`reeiveSingal()`方法。这两个方法都需要返回一个`Behavior<T>`实例。
|
||
|
||
如果Actor是通过继承`AbstractBehavior<T>`来实现的,那么一般会选择实现`createReceive()`方法,这个方法会要求返回一个`Receive<T>`实例,Akka会根据返回的`Receive<T>`实例来创建对应的`receive()`方法和`recceiveSignal()`方法。
|
||
|
||
在`createReceive()`方法中,一般会使用`AbstractBehavior<T>`类中提供的`newReceiveBuilder()`方法来获取一个`ReceiveBuilder`类的实例,然后再通过对这个实例进行配置来使其构建出我们所需要的`Receive`实例。例如可以参考以下实例。
|
||
|
||
```java
|
||
public class GreetingAction extends AbstractBehavior<GreetingMessage> {
|
||
|
||
public GreetingAction(TypedActorContext<GreetingMessage> ctx) {
|
||
super(ctx);
|
||
}
|
||
|
||
@Override
|
||
public Receive<GreetingMessage> createReceive() {
|
||
return newReceiveBuilder()
|
||
.onMessage(MorningGreetingMessage.class, message -> {
|
||
// Actor处理消息的过程
|
||
// 发送消息给其他的Actor
|
||
message.getSender().tell(new MorningGreetingMessage());
|
||
// 返回下一个Behavior,this表示当前行为
|
||
return this;
|
||
})
|
||
.build();
|
||
}
|
||
}
|
||
```
|
||
|
||
从上面这个例子可以看出来,定义一个Actor的行为,最主要的就是定义`ReceiveBuilder`里的`onMessage()`方法,其实除了`onMessage()`方法以外,`ReceiveBuilder`里还有`onSignal()`方法可供使用。只不过`onSignal()`是用来响应Akka系统信号的。
|
||
|
||
`ReceiveBuilder`中常用的`onMessage()`方法和`onSignal()`方法的签名有以下几个,可以参考在定义Actor时使用。
|
||
|
||
- `ReceiveBuilder<T> onAnyMessage(Function<T, Behavior<T>> handler)`,处理所有传入的信息。
|
||
- `<M extends T> ReceiveBuilder<T> onMessage(Class<M> type, Function<M, Behavior<T>> handler)`,当传入的信息是`T`的子类型`M`的时候要进行的处理。
|
||
- `<M extends T> ReceiveBuilder<T> onMessage(Class<M> type, Predicate<M> test, Function<M, Behavior<T>> handler)`,当传入的信息是`T`的子类型`M`,并同时满足筛选条件的时候要执行的处理。
|
||
- `ReceiveBuilder<T> onMessageEquals(T msg, Creator<Behavior<T>> handler)`,当传入信息满足实例相等条件时要执行的Behavior创建过程。
|
||
- `<M extends Signal> ReceiveBuilder<T> onSignal(Class<M> type, Function<M, Behavior<T>> handler)`,处理传入的信号是`Signal`的子类型`M`的时候要进行的处理。
|
||
- `<M extends Signal> ReceiveBuilder<T> onSignal(Class<M> type, Predicate<M> test, Function<M, Behavior<T>> handler)`,当传入的信号是`Signal`的子类型`M`,并同时满足筛选条件的时候要进行的处理。
|
||
- `ReceiveBuilder<T> onSignalEquals(Signal signal, Creator<Behavior<T>> handler)`,当传入信号满足实例相等条件时要执行的Behavior创建过程。
|
||
|
||
### 生命周期
|
||
|
||
Actor的生命周期主要包括Actor的创建和停止。在Akka框架中,Actor是有状态的,而且持有相当数量的资源,所以Actor必须被显式创建和停止。
|
||
|
||
!!! caution ""
|
||
一个Actor在不被引用以后并不会自动停止,所有被显式创建的Actor必须被显式销毁。
|
||
|
||
!!! caution ""
|
||
如果停止了一个父Actor,那么其下所有被其创建的子Actor都会被停止。同样的,如果停止了Actor System,那么所有的Actor也都会被停止。
|
||
|
||
要创建一个Actor,可以使用Actor Context提供的`spawn()`方法。`spawn()`方法的方法签名如下`ActorRef<U> spawn(Behavior<U> behavior, String name)`,在`spawn()`方法中,`name`参数只是给创建的Actor起一个名字。
|
||
|
||
在Actor System中有一个比较特殊的Actor,被称为守护Actor(Guardian Actor)。这个守护Actor除了需要负责其本身的业务逻辑以外,还需要负责创建所有的子系统Actor以及监视他们的生命周期。要创建这个守护Actor不是使用Actor Context提供的`spawn()`方法,而是采用Actor System提供的`create()`方法,这个`create()`方法的方法签名与Actor Context提供的`spawn()`方法的签名一致。
|
||
|
||
在程序设计中,一般并不推荐直接通过`new`来实例化一个类,所以在Akka里,最常见的创建Actor本身的方法是使用`Behaviors.setup()`方法。其使用可以参考以下示例。
|
||
|
||
```java
|
||
public class GreetingBot extends AbstractBehavior<GreetingMessage> {
|
||
public static Behavior<GreetingMessage> create() {
|
||
return Behaviors.setup(GreetingBot::new);
|
||
}
|
||
|
||
private GreetingBot(ActorContext<GreetingMessage> ctx) {
|
||
super(ctx);
|
||
}
|
||
}
|
||
```
|
||
|
||
要停止一个Actor并不难,如果一个Actor返回了`Behaviors.stopped()`就会使它自己停止。然后这个Actor就会收到`PostStop`信号,可以允许Actor做一些停止工作以后的清理工作。
|
||
|
||
如果要监视另一个Actor结束运行的情况,需要监听`Terminated`信号。要监听指定Actor的`Terminated`信号,需要使用Actor Context提供的`watch()`方法来监听指定Actor的活动,如果想计划使用自定义的消息,那么就需要使用`watchWith()`方法了。如果被监视的Actor产生了停止事件,那么监听Actor将会收到`Terminated`信号,可以允许监听Actor做出一些对应的动作。除了可以监听到`Terminated`信号以外,Actor还可以收到其他的信号。
|
||
|
||
### 消息的传递
|
||
|
||
Actor之间的消息传递一般都是通过`tell()`方法来完成的,这个方法的使用在前面的示例中已经出现过。`tell()`方法是`ActorRef<T>`提供的,所以如果想要给Actor发送消息,首先就必须先获得这个Actor的引用。
|
||
|
||
除了可以使用`ActorRef<T>`提供的`tell()`方法以外,还可以借助`ActorContext<T>`提供的`ask()`方法来向其他的Actor传递消息。以下是`ask()`方法的签名信息。
|
||
|
||
```java
|
||
<Req, Res> void ask(
|
||
Class<Res> resClass,
|
||
RecipientRef<Req> target,
|
||
Duration timeout,
|
||
Function<ActorRef<Res>, Req> createRequest,
|
||
Function2<Res, Throwable, T> applyToResponse
|
||
)
|
||
```
|
||
|
||
## Actor System
|
||
|
||
初看Actor模式的介绍,感觉Actor模式在实现的时候应该所有的Actor都是散在的,或者是都存在于一个Actor池中,相互之间仅通过消息产生关联。但是在Akka框架中,所有的Actor实际上是以一个等级分层结构组织在一起的。所有的Actor根据它们的调用链自然形成了一个具有等级的分层结构。
|
||
|
||
Actor System是用于存放这个等级分层结构的容器,自然其中的Actor也就按照分层结构被组织在了一起。可以说Actor System是启动整个Actor处理链条的开始。
|
||
|
||
常用的Actor System实现是位于`akka.actor.typed`包中的`ActorSystem<T>`类。其实阅读过这个类的源码或者API以后就会发现,这个`ActorSystem<T>`类其实也是一个Actor,只不过这个Actor比较特殊,是用来为其他Actor提供运行环境、资源分配和基础设施的。在这个Actor System中,每个Actor都是有其父Actor的,而且从根Actor开始,有三个Actor是比较特殊的。
|
||
|
||
最根部的Actor一般被称为根节点,通常用`/`表示,它有两个子Actor,一是`/user`,一是`/system`。其中`/system`主要用于Akka系统的内部管理,并不参与用户定义的Actor的运行。用户定义的所有Actor都隶属于`/user`,也都由`/user`负责监管,但是根据Akka的建议,用户自己定义的Actor最好还是由用户自己来进行监管。
|
||
|
||
在Akka的Actor模式中,Actor的监管机制一般都是采用级联方式处理Actor的失败问题,也就是说父Actor需要决定如何处理子Actor。所以对Actor的分组就影响了监管策略的建立。在默认情况下,Akka Typed采用的策略是停止子Actor,Akka Classic采用的策略是重启子Actor,但是监管窜略如果由我们来制定,那么就可以按照实际需求来定了。
|
||
|
||
### 子Actor的监管
|
||
|
||
一个Actor在创建的时候,可以利用`Behaviors.supervise()`方法进行封装,设定其监管策略。例如以下设定代码都是十分常见的。
|
||
|
||
```java
|
||
Behaviors.supervise(behavior).onFailure(SupervisorStrategy.restart());
|
||
|
||
Behaviors.supervise(behavior).onFailure(IllegalStateException.class, SupervisorStrategy.resume());
|
||
```
|
||
|
||
按照Akka中Actor的组织和监管形式,父级Actor在重新启动的时候,它所创建的所有子Actor也会随之停止和重启。如果想保持子Actor的运行状态,可以参考以下示例代码。
|
||
|
||
```java
|
||
static Behavior<Command> setupParent() {
|
||
return Behaviors.setup(ctx -> {
|
||
final ActorRef<Command> child = ctx.spawn(createChild(), "child1");
|
||
|
||
return Behaviors.<Command>supervise(
|
||
Behaviors.receiveMessage(msg -> {
|
||
// 对子Actor进行操作。
|
||
return Behaviors.same();
|
||
})
|
||
)
|
||
.onFailure(SupervisorStrategy.restart().withStopChildren(false));
|
||
});
|
||
}
|
||
```
|
||
|
||
!!! caution ""
|
||
如果让`supervise()`包裹`Behaviors.setup()`,那么父级Actor创建的所有子Actor都会随着父级Actor的重启,全部重新创建一遍。所以如果需要保持Actor的运行状态,需要使用`Behaviors.setup()`包裹`supervise()`。
|
||
|
||
### 获取Actor引用
|
||
|
||
要获取一个Actor的引用,除了在创建这个Actor的时候保存它的引用以外,还可以使用Actor System提供的Receptionist功能。
|
||
|
||
Receptionist采用注册制,Actor必须主动在Receptionist上注册才能让其他的Actor通过Receptionist获取它的引用。在Receptionist上注册Actor实际上就是将一个Service Key与Actor建立关联,在Akka中,一个Service Key可以对应多个Actor,这样在向Receptionist请求Actor时,Receptionist会回复一个列表,其中包括所有符合条件的Actor。
|
||
|
||
要获取一个Receptionist的实例,一般可以通过Actor Context来获取,即`context.getSystem().receptionist()`来获取,或者还可以直接通过`Receptionist`类提供的静态方法来进行操作。
|
||
|
||
Actor注册用的Service Key是需要通过`ServiceKey.create()`方法来创建的,这个方法的签名为`static <T> ServiceKey<T> create(Class<T> clazz, String id)`。从`create()`方法的签名可以看出来,创建一个Service Key需要提供一个类和一个字符串。
|
||
|
||
以下是在Receptionist中注册一个Actor的常见过程。
|
||
|
||
```java
|
||
public static Behavior<Command> create() {
|
||
ServiceKey<Command> key = ServiceKey.create(Command.class, "testCommand");
|
||
return Behaviors.setup(context -> {
|
||
context
|
||
.getSystem()
|
||
.receptionist()
|
||
.tell(Receptionist.register(key, context.getSelf()));
|
||
return new ExecuteActor(context);
|
||
})
|
||
}
|
||
```
|
||
|
||
在Receptionist中,所有已经注册的Actor组成的注册表是动态的,Actor会根据其状态不同,而从注册表中消失。如果需要监控Receptionist中注册表的变化,可以订阅Receptionist上某个Service Key的变化事件,并将其发送给指定的Actor。订阅事件的编码结构基本上与注册Actor的编码结构相似,只是注册中的`Receptionist.register()`方法需要换成`Receptionist.subscribe()`方法。`Receptionist.subscribe()`方法的签名为`static <T> Receptionist.Command subscribe(ServiceKey<T> key, ActorRef<Receptionist.Listing> subscriber)`。从`Receptionist.subscribe()`方法的签名可以看出,订阅Receptionist变化的Actor需要能够处理`Receptionist.Listing`消息。
|
||
|
||
在Receptionist中寻找Actor的`Receptionist.find()`方法的签名基本上与`Receptionist.subscribe()`相同。
|
||
|
||
## 最小入口程序
|
||
|
||
!!! caution ""
|
||
Actor System是一个会管理运行线程的重量级数据结构,所以原则上一个JVM进程中只创建一个Actor System的实例。
|
||
|
||
要构建一个最小的入口程序并不难,只需要使用已经定义好的守卫Actor构建Actor System即可,但是不要忘了Actor System也是一个Actor,需要手动关闭和拆解。
|
||
|
||
```java
|
||
public static main(String[] args) {
|
||
final ActorSystem<String> system = ActorSystem.create(Greeting.create(), "Greeting");
|
||
system.tell("start");
|
||
|
||
// 这里可以使用Akka Typed提供的Coordinated Shutdown来关闭Actor System并进行一些额外的处理操作
|
||
// 或者还可以使用ActorSystem中的terminated()方法直接进行关闭。
|
||
system.terminated();
|
||
}
|
||
```
|
||
|
||
## 与Spring集成
|
||
|
||
虽然Akka是一个体积不小的框架结构,但是要把它与Spring结合使用,并不是很难。这其中的关键是要明确Akka中的哪些内容是可以有Spring来管理的。
|
||
|
||
Spring中最底层的功能是IoC,这需要Spring能够掌握系统中绝代多数的组件实例才可以。但是在Akka中,Actor都是有Actor System控制的,并且都是呈登记分层结构组织的,而且Actor的实例基本上都是在父级Actor中创建的。明白了这些内容,就基本上明白了将Akka与Spring结合时所需要解决的难点。
|
||
|
||
首先必须要实现的是,把Actor System实例托管给Spring。这一步非常简单,只需要在Spring中实例化一个单例的Bean即可。其次,对于Actor的生成,可以利用Spring的`prototype`级别的Bean来生成。例如以下示例。
|
||
|
||
```java
|
||
@Named("greeting_bot")
|
||
@scope("prototype")
|
||
public class GreetingBot extends AbsractBehavior<Greeting> {
|
||
// 省略Actor中具体的实现
|
||
}
|
||
```
|
||
|
||
使用`@Named`注解主要是因为Akka在初始化Actor的时候必须使用Actor System或者Actor Context提供的工厂方法,不能直接使用构造函数。
|
||
|
||
另一种生成Actor的方法是实现Akka中提供的`IndirectActorProducer`接口。通过这个接口可以定制一些Actor的生成方式。例如可以构建这样一个通过构造函数生成Actor的Producer。
|
||
|
||
```java
|
||
public class SpringActorProducer implements IndirectActorProducer {
|
||
private final ApplicationContext applicationContext;
|
||
private final String actorBeanName;
|
||
private final Object[] args;
|
||
|
||
public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object... args) {
|
||
this.applicationContext = applicationContext;
|
||
this.actorBeanName = actorBeanName;
|
||
this.args = args;
|
||
}
|
||
|
||
public Behavior produce() {
|
||
return (Behavior)applicationContext.getBean(actorBeanName, args);
|
||
}
|
||
|
||
public Class<? extends Behavior> actorClass() {
|
||
return (Class<? extends Behavior>)applicationContext.getType(actorBeanName);
|
||
}
|
||
}
|
||
```
|
||
|
||
之后可利用其构建一个用于生成Actor的代理。
|
||
|
||
```java
|
||
@Component("spring_ext")
|
||
public class SpringExtension implements Extension, ApplicationContextAware {
|
||
private ApplicationContext applicationContext;
|
||
|
||
public Props props(String actorBeanName, Object... args) {
|
||
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
|
||
}
|
||
|
||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||
this.applicationContext = applicationContext;
|
||
}
|
||
}
|
||
```
|
||
|
||
!!! caution "请勿直接使用的警告"
|
||
以上Akka与Spring结合的代码转摘自Akka Classic并做了简单的对应到Akka Typed的处理,尚未在实际代码中做验证。如需在项目中使用,可先进行测试验证。
|