Goolge-Guava Concurrent中的Service,一般翻译作服务,个人感觉这个太宽泛了,我在学习中就直接把它当做一种特殊的线程托管对象来理解的。

一个服务正常生命周期有:

  • Service.State.NEW
  • Service.State.STARTING
  • Service.State.RUNNING
  • Service.State.STOPPING
  • Service.State.TERMINATED

服务一旦被停止就无法再重新启动了。如果服务在starting、running、stopping状态出现问题、会进入Service.State.FAILED.状态。调用startAsync()方法可以异步开启一个服务,同时返回this对象形成方法调用链。注意:只有在当前服务的状态是NEW时才能调用startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法stopAsync()。但是不像startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。

Service也提供了一些方法用于等待服务状态转换的完成:通过addListener()方法异步添加监听器。此方法允许你添加一个Service.Listener、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加Listener(这时的状态是NEW)、否则之前已发生的状态转换事件是无法在新添加的Listener上被重新触发的。

同步使用awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出IllegalStateException异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。

Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承Guava包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。

AbstractIdleService

可以理解为一个空实现,在服务处于running状态时,不会做执行任何动作,只是简单的实现startUp() 和 shutDown() 这两个方法即可。比如我们需要通过服务去管理一个连接池,在startUp()时初始化连接池,shutdown时关闭连接池

AbstractExecutionThreadService

AbstractExecutionThreadService在单个线程中执行startup, running, and shutdown,必须实现run()方法,同时在方法中要能响应停止服务的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ServiceTest {
private static ServiceManager serviceManager;
public static void main(String[] args) throws Exception{
List<Service> services = new ArrayList<>();
for(int i=0;i<2;i++){
services.add(new TaskService("service"+i));
}
serviceManager = new ServiceManager(services);
serviceManager.addListener(new ServiceManager.Listener() {
@Override
public void healthy() {
Console.log("healthy");
}
public void stopped(){
Console.log("stopped");
}
});
serviceManager.startAsync().awaitHealthy();
Thread.sleep(2000);
serviceManager.stopAsync();
}
public static class TaskService extends AbstractExecutionThreadService{
private String name;
private boolean isRunning = false;
public TaskService(String name){
this.name = name;
}
@Override
protected void run() throws Exception {
while(isRunning){
try{
Console.log("{}运行中,结果:{}",name,new Random().nextInt(100));
Thread.sleep(1000);
}catch (Exception e){
//处理异常,这里如果抛出异常,会使服务状态变为failed同时导致任务终止
}
}
}
@Override
protected void startUp(){
this.isRunning = true;
Console.log("{}启动",this.name);
}
@Override
protected void triggerShutdown(){
this.isRunning = false;//修改状态值
}
@Override
protected void shutDown(){
Console.log("{}关闭",this.name);
}
}
}

打印结果

1
2
3
4
5
6
7
8
9
10
service0启动
service1启动
healthy
service0运行中,结果:51
service1运行中,结果:88
service1运行中,结果:83
service0运行中,结果:49
service0关闭
service1关闭
stopped

triggerShutdown() 方法会在执行方法stopAsync调用,startUp方法会在执行startAsync方法时调用,这个类的实现都是委托给AbstractService这个方法实现的。

AbstractScheduledService

AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的startUp()和shutDown()方法。为了能够描述执行周期,需要实现scheduler()方法。

通常情况下,可以使用AbstractScheduledService.Scheduler类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit)newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于JDK并发包中ScheduledExecutorService类提供的两种调度方式。如要自定义schedules则可以使用 CustomScheduler类来辅助实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AbstractScheduledServiceTest extends AbstractScheduledService{
@Override
protected void startUp() throws Exception {
Console.log("start");
}
@Override
protected void shutDown() throws Exception {
Console.log("shutDown");
}
@Override
protected void runOneIteration() throws Exception {
try {
Console.log("do work");
}catch (Exception e){
e.printStackTrace();
}
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(2,2, TimeUnit.SECONDS);
}
public static void main(String[] args) throws Exception{
AbstractScheduledServiceTest service = new AbstractScheduledServiceTest();
service.startAsync().awaitRunning();
Thread.sleep(10000);
service.stopAsync().awaitTerminated();
}
}

打印结果如下:

1
2
3
4
5
6
7
start
do work
do work
do work
do work
do work
shutDown

AbstractScheduledServic默认使用java concurrent包的ScheduledExecutorService来执行的

AbstractService

如需要自定义的线程管理、可以通过扩展AbstractService类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承AbstractService类。
继承AbstractService方法必须实现两个方法.

  • doStart(): 首次调用startAsync()时会同时调用doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用notifyStarted()方法;启动失败则调用notifyFailed()
  • doStop(): 首次调用stopAsync()会同时调用doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。

doStart和doStop方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。

ServiceManager

除了对Service接口提供基础的实现类,Guava还提供了ServiceManager类使得涉及到多个Service集合的操作更加容易。通过实例化ServiceManager类来创建一个Service集合,你可以通过以下方法来管理它们:

  • startAsync() : 将启动所有被管理的服务。如果当前服务的状态都是NEW的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
  • stopAsync() :将停止所有被管理的服务。
  • addListener :会添加一个ServiceManager.Listener,在服务状态转换中会调用该Listener
  • awaitHealthy() :会等待所有的服务达到Running状态
  • awaitStopped():会等待所有服务达到终止状态

检测类的方法有:

  • isHealthy() :如果所有的服务处于Running状态、会返回True
  • servicesByState():以状态为索引返回当前所有服务的快照
  • startupTimes() :返回一个Map对象,记录被管理的服务启动的耗时、以毫秒为单位,同时Map默认按启动时间排序。

建议整个服务的生命周期都能通过ServiceManager来管理,不过即使状态转换是通过其他机制触发的、也不影响ServiceManager方法的正确执行。例如:当一个服务不是通过startAsync()、而是其他机制启动时,listeners仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager唯一强制的要求是当其被创建时所有的服务必须处于New状态。