引言
《任务取消》由于篇幅较多,拆分了两篇来介绍各种实现取消和中断的机制,以及如何编写任务和服务,使它们能对取消请求做出响应。
1. 任务限时运行
我们知道许多任务可能永远也无法处理完成(例如,枚举所有的素数),而某些任务,可能很快被处理掉,也可能很长时间才能处理完。这个时候如果能够对任务处理加个时间限制,比如指定 “最多花1分钟搜索答案” 或者 “枚举出1秒钟内能找到的素数”,那将会是非常有用的。
我们来思考一下,本篇开头提到的素数生成器 PrimeGenerator,如果它在指定时限内抛出一个未检查的异常,会怎么样呢?
可以肯定的是这个异常会被忽略掉,因为素数生成器在另一个独立的线程中运行,而这个线程并不会显式地处理异常。
大多数时候,我们还是希望能够知道在任务执行过程中是否会抛出异常的。
下面我们来看一下如下示例【在外部线程中安排中断,不推荐使用,仅用于理解】:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class TaskUtils { private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(10);
public static void timeRun(Runnable r, long timeout, TimeUnit unit) { final Thread taskThread = Thread.currentThread(); cancelExec.schedule(new Runnable() { public void run() { taskThread.interrupt(); } }, timeout, unit); r.run(); } }
|
上述示例给出了在指定时间内运行一个任意的 Runnable 的场景。timeRun 在调用线程中运行任务,并安排了一个取消任务,用于在运行指定的时间间隔后中断 timeRun 所在线程。从任务中抛出未检查异常的问题,也会被 timeRun 的调用者捕获。
下面我们来看一下如下测试场景【演示下 1s 后结束素数生成器的任务】:
1 2 3 4 5 6 7 8 9 10
| public class TimeRunTest { @Test public void timeRun() { LOGGER.debug("timeRun start"); BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<>(100); PrimeProducer producer = new PrimeProducer(primes); TaskUtils.timeRun(producer, 1, SECONDS); LOGGER.debug("timeRun end"); } }
|
读者可以自行调试下,虽然 timeRun 能实现限时执行的功能,但它是通过外部线程安排中断实现。
在前面的 《任务取消(上)》中我们了解到,每个线程都有自己的中断策略,在中断线程之前,应该了解它的中断策略,否则就不应该中断该线程。
由于 timeRun 可以从任意一个线程中调用,因此它无法知道这个调用线程的中断策略。
如果任务在超时之前完成,会怎么样呢?
下面我们再来看一下如下测试场景【任务在超时之前完成】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class TimeRunTest { @Test public void timeRun1() { LOGGER.debug("timeRun start"); TaskUtils.timeRun(new Runnable() { @Override public void run() { LOGGER.debug("task"); } }, 400, TimeUnit.MILLISECONDS); try { LOGGER.debug("sleep start"); SECONDS.sleep(1); LOGGER.debug("sleep end"); } catch (InterruptedException e) { LOGGER.debug("InterruptedException"); } LOGGER.debug("timeRun end"); } }
|
读者可以自行调式下,运行如下:
上述示例中,任务在超时之前完成,而中断 timeRun 所在线程的取消任务将在 timeRun 返回到调用者之后启动。其中 SECONDS.sleep(1);
响应了中断,并抛出了 InterruptedException 异常,示例代码捕获该异常后打印了 InterruptedException。
虽然我们的任务在超时之前已经运行完了,但是取消任务在指定时间后还是对 timeRun 所在线程发出了中断请求。我们不知道在这种情况下 timeRun 返回之后调用者将运行什么代码【SECONDS.sleep(1);
这段只是为了演示】,但结果一定是不好的。【当然这里可以使用 schedule 返回的 ScheduledFuture 来取消这个取消任务以避免这种风险,这种做法虽然可行,但却非常复杂。】
如果任务不响应中断,会怎么样呢?
下面我们再来看一下如下测试场景【任务不响应中断请求】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class TimeRunTest { @Test public void timeRun2() { LOGGER.debug("timeRun start"); TaskUtils.timeRun(new PrimeGenerator(), 400, TimeUnit.MILLISECONDS); try { LOGGER.debug("sleep start"); SECONDS.sleep(1); LOGGER.debug("sleep end"); } catch (InterruptedException e) { LOGGER.debug("InterruptedException"); } LOGGER.debug("timeRun end"); } }
|
上述示例中,素数生成器任务采用了自定义的取消策略,并没有响应中断,结果就是 timeRun 一直等待素数生成器任务结束,而它却永远不会结束。
如果任务不响应中断,那么 timeRun 会在任务结束时才返回,此时可能已经超过了指定的时限(或者还没有超过时限)。如果某个限时运行的服务没有在指定的时间内返回,那么将对调用者带来负面的影响。
下面我们来看一下如下示例【在专门的线程中中断任务】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class TaskUtils { private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(10);
public static void timeRunNew(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { class RethrowableTask implements Runnable { private volatile Throwable t;
public void run() { try { r.run(); } catch (Throwable t) { this.t = t; } }
void rethrow() { if (null != t) throw launderThrowable(t); } }
RethrowableTask task = new RethrowableTask(); final Thread taskThread = new Thread(task); taskThread.start();
LOGGER.debug("join start"); taskThread.join(unit.toMillis(timeout)); LOGGER.debug("join end"); task.rethrow(); } }
|
上述示例中,执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。
在启动任务线程之后,timeRun 将执行一个限时的 join 方法。在 join 返回后,它将检查任务中是否有异常抛出,如果有的话,则会在调用 timeRun 的线程中再次抛出该异常。由于 Throwable 将在两个线程之间共享,因此该变量被声明为 volatile 类型,从而确保安全地将其从任务线程发布到 timeRun 线程。
虽然上述示例代码解决了前面的问题,但是由于它依赖一个限时的 join,因此存在着 join 的不足: 无法知道执行控制是因为线程正常退出而返回,还是因为 join 超时而返回
。
这是 Thread API 的一个缺陷,因为无论 join 是否成功地完成,在 Java 内存模型中都会有内存可见性结果,但 join 本身不会返回某个状态来表明它是否成功。
2. 通过 Future 来实现取消
在前面的《同步工具类》博文中,咱们已经初步了解 Future,它可以管理任务的生命周期、处理异常以及实现取消。
而在另一篇《任务执行演示》博文中,我们知道 ExecutorService.submit
将返回一个 Future 来描述任务。Future 拥有一个 cancel 方法,该方法带有一个 boolean 类型的参数 mayInterruptIfRunning,一个 boolean 类型的返回值。如果 mayInterruptIfRunning 为 true 并且任务当前正在某个线程中运行,那么这个线程能被中断。如果 mayInterruptIfRunning 为 false,则允许完成正在进行的任务,同时还未启动的任务也不再运行,这种方式适用于那些不处理中断的任务中。如果任务无法取消,则 cancel 方法返回 false,通常是因为任务已经正常完成;否则返回 true。
前文中我们一直强调,除非知道线程的中断策略,否则就不要中断线程。
那么使用 Future ,在什么情况下调用 cancel 可以将 mayInterruptIfRunning 参数指定为 true ?
执行任务的线程是由标准的 Executor 创建的,其实现了一种中断策略使得任务可以通过中断被取消。
当尝试取消某个任务时,不宜直接中断线程池,因为你并不知道当中断请求到达时正在运行什么任务–只能通过任务的 Future 来实现取消。
下面我们来看一下如下的示例【通过 Future 来取消任务】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class TaskUtils {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();
public static void timeRunByFuture(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } catch (TimeoutException e) { } finally { task.cancel(true); } } }
|
上述示例应该很好理解,读者可以尝试跑下面的自测类来验证下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
@Test public void timeRunByFuture() { LOGGER.debug("timeRun start"); try { BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<>(100); PrimeProducer producer = new PrimeProducer(primes); TaskUtils.timeRunByFuture(producer, 1, SECONDS); } catch (InterruptedException e) { LOGGER.debug("InterruptedException"); } LOGGER.debug("timeRun end"); }
@Test public void timeRunByFuture1() { LOGGER.debug("timeRun start"); try { TaskUtils.timeRunByFuture(new PrimeGenerator(), 500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.debug("InterruptedException"); } LOGGER.debug("timeRun end"); }
@Test public void timeRunByFuture2() { LOGGER.debug("timeRun start"); try { TaskUtils.timeRunByFuture(new Runnable() { @Override public void run() { LOGGER.debug("task"); } }, 400, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.debug("InterruptedException"); } LOGGER.debug("timeRun end"); }
|
3. 处理不可中断的阻塞
我们知道,为了方便开发人员构建出能响应取消请求的任务,在 Java 类库中的大多数可阻塞的方法都是通过提前返回或者抛出 InterruptedException 来响应中断请求的。
对于那些由于执行不可中断操作而被阻塞的线程,在知晓线程阻塞原因的前提下,我们也是可以使用类似中断的手段来停止这些线程。
java.io 包中的同步 Socket I/O。在服务器应用程序中,最常见的阻塞 I/O 形式 就是对套接字进行读取和写入。虽然 InputStream 和 OutputStream 中的 read 和 write 等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行 read 或 write 等方法而被阻塞的线程抛出一个 SocketException。
java.io 包中的同步 I/O。当中断一个正在 InterruptibleChannel【可中断通道】上等待的线程时,将抛出 ClosedByInterruptedException 并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出 ClosedByInterruptedException)。当关闭一个 InterruptibleChannel 时,将导致所有在链路操作上阻塞的线程抛出 AsynchronousCloseException。大多数标准的 Channel 都实现了 InterruptibleChannel。
Selector 的异步 I/O。如果一个线程在调用 Selector.select
方法(在 java.nio.channels
中)时阻塞了,那么调用 close 或 wakeup 方法会使线程抛出 ClosedSelectorException 并提前返回。
获取某个锁。如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求。不过,在 Lock 类中提供了 lockInterruptibly 方法,它允许在等待一个锁的同时仍能响应中断。
下面我们来看一下如下示例【通过改写 interrput 方法将非标准的取消操作封装在 Thread 中】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public class ReaderThread extends Thread {
private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(ReaderThread.class);
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); }
@Override public void interrupt() { LOGGER.debug("interrupt"); try { socket.close(); LOGGER.debug("socket close"); } catch (IOException e) { } finally { super.interrupt(); } }
@Override public void run() { try { InputStreamReader inputStreamReader = new InputStreamReader(in); BufferedReader bufferedReader = new BufferedReader(inputStreamReader); String data; while ((data = bufferedReader.readLine()) != null) { processData(data); } } catch (IOException e) { } }
private void processData(String data) { LOGGER.debug("0 < All Primes < {}", data); BigInteger prime = BigInteger.ONE; while (!Thread.currentThread().isInterrupted() && prime.compareTo(BigInteger.valueOf(Long.valueOf(data))) < 0) { LOGGER.debug("prime = {}", prime); prime = prime.nextProbablePrime(); } } }
|
上述 ReaderThread 管理了一个套接字连接,它采用同步方式从该套接字中读取数据,并将接收到的数据传递给 processData。同时由于 ReaderThread 改写了 interrupt 方法,使其既能处理标准的中断,也能关闭底层的套接字。
感兴趣的读者,可以自行测试如下【先启动 SocketServer ,再运行 SocketClient 】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
public class SocketServer {
private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(SocketServer.class);
private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(10);
public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8888);
while (true) { LOGGER.debug("start serverSocket.accept()"); Socket socket = serverSocket.accept(); LOGGER.debug("end serverSocket.accept()");
ReaderThread readerThread = new ReaderThread(socket); readerThread.start(); cancelExec.schedule(new Runnable() { public void run() { readerThread.interrupt(); } }, 2, SECONDS);
} } }
public class SocketClient {
public static void main(String[] args) throws IOException { Socket socket = new Socket("localhost", 8888);
OutputStream os = socket.getOutputStream(); PrintWriter pw = new PrintWriter(os); pw.write("1000000\n" + "10000"); pw.flush(); socket.shutdownOutput();
pw.close(); os.close(); socket.close(); } }
|
Socket 服务端启动后,执行 Socket 客户端,笔者 Socket 服务端运行结果如下【以实际运行为准】:
4. 采用 newTaskFor 来封装非标准的取消
我们可以通过 Java 6 在 ThreadPoolExecutor 中新增的 newTaskFor 方法来进一步优化 ReaderThread 中封装非标准取消的技术。
当把一个 Callable 提交给 ExecutorService 时,submit 方法会返回一个 Future,我们可以使用这个 Future 来取消任务。
newTaskFor 是一个工厂方法,它将创建 Future 来代表任务。 newTaskFor 还能返回一个 RunnableFuture 接口,该接口扩展了 Future 和 Runnable(并由 FutureTask 实现)。
通过定制表示任务的 Future 可以改变 Future.cancel
的行为。定制的取消代码可以实现,例如:
- 日志记录
- 收集取消操作的统计信息
- 取消一些不响应中断的操作
下面我们来看一下如下示例【通过 newTaskFor 将非标准的取消操作封装到一个任务中】:
我们首先定义了一个 CancellableTask 接口,该接口扩展了 Callable,其中增加了一个 取消方法 和一个 newTask 工厂方法来构造 RunnableFuture。
1 2 3 4 5
| public interface CancellableTask<T> extends Callable<T> { void cancel();
RunnableFuture<T> newTask(); }
|
然后我们定义抽象类 SocketUsingTask ,它实现了 CancellableTask,并通过 Future.cancel
来关闭套接字和调用 super.cancel
。如果 SocketUsingTask 通过其自己的 Future 来取消,那么底层的套接字将被关闭并且线程将被中断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public abstract class SocketUsingTask<T> implements CancellableTask<T> {
private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(SocketUsingTask.class);
@GuardedBy("this") private Socket socket;
public SocketUsingTask(Socket socket) { this.socket = socket; }
protected synchronized Socket getSocket() { return socket; }
public synchronized void cancel() { LOGGER.debug("start custom cancel"); try { if (socket != null) { socket.close(); LOGGER.debug("socket close"); } } catch (IOException e) { } LOGGER.debug("end custom cancel"); }
public RunnableFuture<T> newTask() { return new FutureTask<T>(this) { @Override public boolean cancel(boolean mayInterruptIfRunning) { LOGGER.debug("start cancel"); SocketUsingTask.this.cancel(); boolean result = super.cancel(mayInterruptIfRunning); LOGGER.debug("end cancel"); LOGGER.debug("cancel result = {}", result); return result; } }; } }
|
紧接着,我们定义 CancellingExecutor ,它扩展了 ThreadPoolExecutor,并通过改写 newTaskFor 使得 CancellableTask 可以创建自己的 Future。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @ThreadSafe public class CancellingExecutor extends ThreadPoolExecutor {
public CancellingExecutor() { super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); }
@Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask) return ((CancellableTask<T>) callable).newTask(); else return super.newTaskFor(callable); } }
|
最后,我们定义了任务类 PrimeSumTask ,它继承了上面的抽象类 SocketUsingTask,call 方法用于计算指定范围内的 素数总和 ,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class PrimeSumTask extends SocketUsingTask<BigInteger> {
private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(PrimeSumTask.class);
public PrimeSumTask(Socket socket) { super(socket); }
@Override public BigInteger call() { BigInteger result = null; try { InputStream in = getSocket().getInputStream(); InputStreamReader inputStreamReader = new InputStreamReader(in); BufferedReader bufferedReader = new BufferedReader(inputStreamReader); String data; while ((data = bufferedReader.readLine()) != null) { result = processData(data); } } catch (IOException e) { } return result; }
private BigInteger processData(String data) { LOGGER.debug("0 < All Primes < {}", data); BigInteger prime = BigInteger.ONE; BigInteger sum = BigInteger.ZERO; while (!Thread.currentThread().isInterrupted() && prime.compareTo(BigInteger.valueOf(Long.valueOf(data))) < 0) { sum = sum.add(prime); prime = prime.nextProbablePrime(); } return sum; } }
|
感兴趣的读者,可以自行测试如下【先启动 SocketServer ,再运行 SocketClient 】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class SocketServer {
private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(SocketServer.class);
private static CancellingExecutor executor = new CancellingExecutor();
public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8888);
while (true) { LOGGER.debug("start serverSocket.accept()"); Socket socket = serverSocket.accept(); LOGGER.debug("end serverSocket.accept()");
PrimeSumTask primeSumTask = new PrimeSumTask(socket); Future<BigInteger> future = executor.submit(primeSumTask);
try { BigInteger result = future.get(2, TimeUnit.SECONDS); LOGGER.debug("result = {}", result); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } catch (TimeoutException e) { LOGGER.error("TimeoutException"); } catch (InterruptedException e) { } finally { LOGGER.debug( "task is done : {}", future.isDone()); LOGGER.debug( "future cancel start"); future.cancel(true); LOGGER.debug( "future cancel end"); LOGGER.debug( "task is cancelled : {}", future.isCancelled()); }
} } }
public class SocketClient {
public static void main(String[] args) throws IOException { Socket socket = new Socket("localhost", 8888);
OutputStream os = socket.getOutputStream(); PrintWriter pw = new PrintWriter(os); pw.write("1000000"); pw.flush(); socket.shutdownOutput();
pw.close(); os.close(); socket.close(); } }
|
Socket 服务端启动后,执行 Socket 客户端,笔者 Socket 服务端运行结果如下:
上面场景是任务超时运行,接下来我们调整 future.get
的超时时间为 5s, 如下所示:
1
| BigInteger result = future.get(5, TimeUnit.SECONDS);
|
再重新执行 Socket 客户端,此时运行结果如下:
5. 总结
《任务取消》的内容已告一段落,下篇开始介绍各种任务和服务的关闭机制,以及如何编写任务和服务,使它们能够优雅地处理关闭。