[230531] stop all threads when one thread throws an exception. (하나의 스레드에서 예외가 발생하면 다른 스레드도 모두 종료시키기.)

Table of Contents

1 motivation

in my case, it was a legacy code and there is a CountDownLatch to wait for all tasks to finish. so I used CountDownLatch.await() to wait for all tasks to finish or one task to throw an exception.

problem was that when one task throws an exception, other tasks were still running. running is not a problem but resource leak is a problem. so I wanted to stop all tasks when one task throws an exception.

2 how to catch exception

2.1 Runnable

  1. if uncaughtExceptionHandler is set, it will be called.
  2. if uncaughtExceptionHandler is not set, ThreadGroup.uncaughtException() will be called. and ThreadGroup.uncaughtException() will print stacktrace to System.err.

when i use Executors.newFixedThreadPool, ThreadFactory implementation name is ThreadPoolExecutor$DefaultThreadFactory. and ThreadPoolExecutor$DefaultThreadFactory uses ThreadGroup.uncaughtException() it does not set uncaughtExceptionHandler.

so I need to set uncaughtExceptionHandler to catch exception.

2.2 Callable

  1. if Future.get() is called, it will throw ExecutionException.
  2. if Future.get() is not called, it will be ignored.

Future.get() is blocking method so it is not a good idea to use it to catch exception. better way is to use uncaughtExceptionHandler.

3 ThreadFactory

class CustomThreadFactory(val hasFailure: AtomicBoolean) : ThreadFactory {
    override fun newThread(r: Runnable?): Thread {
	return Thread(r).apply {
	    // this is important
	    setUncaughtExceptionHandler { t, e ->
		hasFailure.set(true)
	    }
	}
    }
}

fun main() {
    val atomicBoolean = AtomicBoolean(false)
    val executor = Executors.newFixedThreadPool(10, CustomThreadFactory(atomicBoolean))

    val latch = CountDownLatch(10)

    val tasks = List(10) { i ->
	Runnable {
	    try {
		println("Task $i started")

		if (i == 5) throw RuntimeException("HIHI")

		TimeUnit.SECONDS.sleep(10)

		println("Task $i completed")
	    } catch (e: Exception) {
		println("Task $i threw exception $e")
		atomicBoolean.set(true)
		Thread.currentThread().interrupt()
	    } finally {
		latch.countDown()
	    }
	}
    }

    tasks.forEach { executor.submit(it) }

    while (!latch.await(500, TimeUnit.MILLISECONDS)) {
	println("Waiting for tasks to finish...")
	if (atomicBoolean.get()) {
	    println("hasFailure is true!!!! let's shutdownNow")
	    println("------------------")
	    executor.shutdownNow()
	    break
	}
    }

    executor.shutdown()
    println("All tasks finished or cancelled")
}

simple example to stop all tasks when one task throws an exception.

Date: 2023-05-31 Wed 00:00

Author: 남영환

Created: 2024-05-02 Thu 03:16

Emacs 27.2 (Org mode 9.4.4)

Validate