Summary

Limitations of Thread

  • cannot return a value -> unable to pass results between tasks running on different threads, except by using shared variables
  • no execution order -> no way to specify that one task is reliant on another
  • creates a new thread instance for every task -> overhead, cannot reuse the same thread
java
// in a method
class A {
  int x = 1; // the only way to share info between threads
  
  void bo() {
	// int x = 1; // error, local variable would not be effectively final
    new Thread(() -> {x = 2;}).start();
    new Thread(() -> {x = 3;}).start(); // no guarentee which task will finish first
    System.out.println(x); // x is non deterministic, no guarantee that the tasks will finish by the time this is called
  }
}

A a = new A();
a.bo();

// in jshell
jshell> int x = 1; // because non-enclised variables behave like both fields and local variables in jshell
x ==> 1

jshell> new Thread(() -> { x = 2;}).start()

jshell> x
x ==> 2

CompletableFuture

  • overcomes the limitations of Thread
java
int findIthPrime(int i) { // time expensive function
  return Stream
          .iterate(2, x -> x + 1)
          .filter(x -> isPrime(x))
          .limit(i)
          .reduce((x, y) -> y)
          .orElse(0);
}

CompletableFuture<Integer> ith = CompletableFuture.supplyAsync(() -> findIthPrime(i));
CompletableFuture<Integer> jth = CompletableFuture.supplyAsync(() -> findIthPrime(j));

CompletableFuture<Integer> diff = ith.thenCombine(jth, (x, y) -> x - y); // blocks main until ith and jth complete, not very concurrent

diff.join(); // final step, only when we finally need the value

Async and non-async stagers

  • non-async -> next stage is run on the same thread as it’s called in
  • async -> may allow the next stage to be run on a different thread, more concurrency
  • in both cases, they still have to wait for the current instance/stage to complete
java
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
		System.out.println(Thread.currentThread().getName() + " ");
		return 1;
	})
	// non-async
	.thenApply(x -> { // called from the main thread
		System.out.print(Thread.currentThread().getName() + " ");
		return x + 1;
	});
	// ForkJoinPool.commonPool-worker-82 main ********************
	// main will be blocked until the pipeline is complete

	// async
	.thenApplyAsync(x -> {
		System.out.println(Thread.currentThread().getName());
		return x + 1;
	});
	// *ForkJoinPool.commonPool-worker-82 ******ForkJoinPool.commonPool-worker-82 *************
	// main thread is free to run other tasks

for (int i = 0; i < 20; i++) {
	System.out.print("*");
}
a.join();

Exception handling in CompletableFuture

  • handling in a different thread
java
CompletableFuture.<Integer>supplyAsync(() -> null)
            .thenApply(x -> x + 1) // NullPointerException in the thread that runs this lambda
            // deferred handling
            .join(); // CompletionException that has to be handle externally
			
			// handling in the thread
			.handle((t, e) -> (e == null) ? t : 0) // returns 0 if exception is encountered
			.join();

Concept

Thread

  • java.lang.Thread
  • encapsulates a function to run in a separate thread
  • parallel streams uses threads from the ForkJoinPool
  • program only exits once all created threads have terminated
java
Thread::new(Runnable)

Thread t = new Thread(() -> { // anonymous class of runnable, overrides the run method
  for (int i = 2; i < 100; i += 1) {
    System.out.print("*");
  }
}).start(); // starts the execution of runnable, and returns immediately

Thread.currentThread().getName(); // returns the name of the current thread, the main thread is called "main"

Thread.sleep(1000); // make the currrent sleep/block for 1000 miliseconds

t.isAlive(); // check if a thread is still running

CompletableFuture

java
// factory methds, same functionalitu as of
CompletableFuture.completedFuture(T value) // like the factory method in Lazy, accepts the precomputed value
CompletableFuture.runAsync(Runnable runnable) // completes once the lambda finishes
CompletableFuture.supplyAsync(Supplier<T> supplier) // like the factory method for Lazy

// blockers
c.get() // waits for all tasks in the pipeline to complete and return the value
// throws InterruptedException if the task is interrupted and ExecutionException if there are exceptions during execution
c.join() // same as get, but doesn't throw checked exceptions
CompletableFuture.allOf(CompletableFuture<?>... cfs) // completes when all complete
CompletableFuture.anyOf(CompletableFuture<?>... cfs) // completes when any one completes

// stagers, have async counterparts
c.thenApply(Function<T, U> fn) // map
c.thenCompose(Function<T, CompletableFuture<U>> fn) // flatMap
c.thenCombine(CompletableFuture<U> other, BiFunction<T, U, V> fn) // combine
c.thenRun(Runnable action) // runs after completion
c.runAfterBoth(CompletableFuture<?> other, Runnable action) // runs after both complete
c.runAfterEither(CompletableFuture<?> other, Runnable action) // runs afther either completes

// exception handlers, handle and whenComplete have async counterparts
c.handle(BiFunction<T,​ Throwable,​ U> fn) // process the return value or the exception value and choose what to return in place
c.exceptionally(Function<Throwable, T> action) // if task finishes exceptionally, return a value based on the exception
c.whenComplete(BiConsumer<T, Throwable> action) // run something based on the result or exception if any

maximize concurrency by calling get or join as late as possible

Application

Checking threads

java
Thread findPrime = new Thread(() -> {
  System.out.println(
      Stream.iterate(2, i -> i + 1)
          .filter(i -> isPrime(i))
          .limit(1_000_000L)
          .reduce((x, y) -> y)
          .orElse(null));
});

findPrime.start();

while (findPrime.isAlive()) {
  try {
    Thread.sleep(1000);
    System.out.print("."); // prints every second that findPrime is running, loading bar
  } catch (InterruptedException e) {
    System.out.print("interrupted");
  }
}

now our main thread is stuck checking other threads…