Reader small image

You're reading from  Java Coding Problems - Second Edition

Product typeBook
Published inMar 2024
PublisherPackt
ISBN-139781837633944
Edition2nd Edition
Right arrow
Author (1)
Anghel Leonard
Anghel Leonard
author image
Anghel Leonard

Anghel Leonard is a Chief Technology Strategist and independent consultant with 20+ years of experience in the Java ecosystem. In daily work, he is focused on architecting and developing Java distributed applications that empower robust architectures, clean code, and high-performance. Also passionate about coaching, mentoring and technical leadership. He is the author of several books, videos and dozens of articles related to Java technologies.
Read more about Anghel Leonard

Right arrow

Concurrency ‒ Virtual Threads and Structured Concurrency: Diving Deeper

This chapter includes 18 problems meant to dive deep into how virtual threads and structured concurrency work and how they should be used in your applications.

If you don’t have a background in concurrency in Java then I strongly recommend postponing this chapter until you read some good introductory coverage on this topic. For instance, you could try out Chapter 10 and Chapter 11 from Java Coding Problems, First Edition.

We start this chapter by explaining how virtual threads work internally. This will be helpful to help you better understand the subsequent problems about extending and assembling StructuredTaskScope, hooking ThreadLocal and virtual threads, avoiding pinning, solving producer-consumer problems, implementing an HTTP web server, and so on.

By the end of this chapter, you’ll have comprehensive and crystal-clear knowledge about working with virtual threads and structured...

Problems

Use the following problems to test your advanced programming prowess in virtual threads and structured concurrency in Java. I strongly encourage you to give each problem a try before you turn to the solutions and download the example programs:

  1. Tackling continuations: Provide a detailed explanation of what continuations are and how they work in the context of virtual threads.
  2. Tracing virtual thread states and transitions: Build a meaningful diagram of virtual thread states and transitions and explain it.
  3. Extending StructuredTaskScope: Explain and demonstrate the steps for extending the StructuredTaskScope. Explain why we cannot extend ShutdownOnSuccess and ShutdownOnFailure.
  4. Assembling StructuredTaskScope: Write a Java application that assembles (nests) multiple StructuredTaskScope instances.
  5. Assembling StructuredTaskScope with timeout: Modify the application developed in Problem 228 to add a timeout/deadline to the forked tasks.
  6. Hooking...

225. Tackling continuations

The concept that sits behind virtual threads is known as delimited continuations or simply continuations. This concept is used internally by the JVM in the following piece of code:

List<Thread> vtThreads = IntStream.range(0, 5)
  .mapToObj(i -> Thread.ofVirtual().unstarted(() -> {
    if (i == 0) { 
      logger.info(Thread.currentThread().toString()); 
    }
    try { Thread.sleep(1000); } 
      catch (InterruptedException ex) {}
   if (i == 0) { 
      logger.info(Thread.currentThread().toString()); 
   }
 })).toList();
 vtThreads.forEach(Thread::start);
 vtThreads.forEach(thread -> {
   try { thread.join(); } catch (InterruptedException ex) {}
 }); 

In this code, we create and start five virtual threads but we only log information about one thread (thread #22 – of course, the id value may vary among executions). So, the output will be as follows:

VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1 
VirtualThread[#22...

226. Tracing virtual thread states and transitions

As you know, a thread can be in one of the following states: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, or TERMINATED. These states are elements of the State enum and are exposed via the Thread.currentThread().getState() call. These states are valid for platform threads and for virtual threads as well and we can use them in our applications. (If you’re unfamiliar with this, you can find more details about it in Java Coding Problems, First Edition, Chapter 10, Problem 199.)

However, internally speaking, a virtual thread works on a state transition model, as shown in the following figure:

Figure 11.4: Virtual thread state transitions

These states are declared in the VirtualThread class as private static final int. So, they are not public. However, they are essential for understanding the lifecycle of a virtual thread, so let’s briefly attempt to trace a virtual thread’s states during its lifetime...

227. Extending StructuredTaskScope

We cannot extend StructuredTaskScope.ShutdownOnSuccess (Chapter 10, Problem 221) or ShutdownOnFailure (Chapter 10, Problem 222) since these are final classes. But, we can extend StructuredTaskScope and provide a custom behavior via its handleComplete() method.

Let’s assume that we want to travel from our current location to a certain destination in our town:

String loc = "124 NW Bobcat L, St. Robert"; // from user
String dest = "129 West 81st Street";       // from user

On our phone, we have an application that can query a ridesharing service and the public transport service. The ridesharing service can simultaneously query multiple ridesharing servers to find the cheapest offer. On the other hand, the public transport service can simultaneously query the public transport servers to find the offer that leaves the earliest, no matter whether it is by bus, train, tram, or subway. In a diagram, we can represent...

228. Assembling StructuredTaskScope

In the previous problem (Problem 227), we developed an application containing a ridesharing service and a public transport service. In both services, we used StructuredTaskScope to concurrently query the proper servers. However, only the servers were called concurrently while these two services were executed sequentially – first, we run the ridesharing service (which queries concurrently three servers), and after we have a result from this service, we run the public transport service (which queries concurrently four servers).

Going further, we want to assemble these two services into a third service capable of running them concurrently as in the following diagram:

Figure 11.6.png

Figure 11.6: Running the ridesharing and public transport services concurrently

We start by assembling the RidesharingOffer and PublicTransportOffer into a record named TravelOffer:

public record TravelOffer(RidesharingOffer ridesharingOffer, 
  PublicTransportOffer...

229. Assembling StructuredTaskScope instances with timeout

Let’s continue our journey from Problem 228 by assuming that the ridesharing service should be implemented with a timeout/deadline. In other words, if any of the ridesharing servers don’t answer in 10 milliseconds, then we abort the request and report the thrown TimeoutException via a meaningful message to the user.

This means that instead of scope.join(), which waits indefinitely, we should use joinUntil(Instant deadline), which waits only for the given deadline before throwing a TimeoutException. So, the fetchRidesharingOffers() method should be modified as follows:

public static RidesharingOffer fetchRidesharingOffers(
    String loc, String dest) 
      throws InterruptedException, TimeoutException {
  try (StructuredTaskScope scope 
    = new StructuredTaskScope<RidesharingOffer>()) {
    ...
    scope.joinUntil(Instant.now().plusMillis(10));
    ...
  }
}

By simply simulating a delay...

230. Hooking ThreadLocal and virtual threads

In a nutshell, ThreadLocal was introduced in JDK 1.2 (in 1998) as a solution to provide dedicated memory for each thread in order to share information with untrusted code (maybe some of your code has been written externally as third-party components) or between different components (that may run in multiple threads) of your application. Basically, if you are in such a scenario, then you don’t want to (or you cannot) share information via method arguments. If you need a more in-depth introduction to the ThreadLocal API, then consider Java Coding Problems, First Edition, Chapter 11, Problem 220.

A thread-local variable is of type ThreadLocal and relies on set() to set a value and on get() to get a value. In Java Coding Problems, First Edition, it was said that: “If thread A stores the x value and thread B stores the y value in the same instance of ThreadLocal, then later on, thread A retrieves the x value and thread B retrieves...

231. Hooking ScopedValue and virtual threads

The ScopedValue API was added to handle the shortcomings of ThreadLocal. But what are the shortcomings of ThreadLocal?

Thread-local variables’ shortcomings

First of all, it is hard to say and track who’s mutating a thread-local variable. This is a shortcoming of the API design. Basically, a ThreadLocal variable is globally available (at the application level or at a lower level), so it is hard to say from where it is mutated. Imagine that it is your responsibility to read, understand, and debug an application that uses several thread-local variables. How will you manage to follow the code logic and how will you know, at any given time, what values are stored by these thread-local variables? It would be a nightmare to track these variables from class to class and to signal when they mutated.

Second, thread-local variables may live forever or longer than they should. How is this possible? Thread-local variables...

232. Using ScopedValue and executor services

In Problem 230, we wrote an application that combines ThreadLocal and executor services (we have used newVirtualThreadPerTaskExecutor() and newFixedThreadPool()).

In this problem, we re-write the code from Problem 230 in order to use ScopedValue. First, we have the following Runnable:

Runnable task = () -> { 
  logger.info(() -> Thread.currentThread().toString() 
    + " | before sleep | " + (SCOPED_VALUE.isBound() 
    ? SCOPED_VALUE.get() : "Not bound"));
  try {
    Thread.sleep(Duration.ofSeconds(new Random().nextInt(5)));
  } catch (InterruptedException ex) {} 
  logger.info(() -> Thread.currentThread().toString() 
    + " | after sleep | " + (SCOPED_VALUE.isBound() 
    ? SCOPED_VALUE.get() : "Not bound"));
};

This code is straightforward. We retrieve the value mapped to SCOPED_VALUE, we sleep from a random number of seconds (between 0 and 5), and we retrieve the value...

233. Chaining and rebinding scoped values

In this problem, you’ll see how to chain and rebind scoped values. These are very handy operations that you’ll love to use.

Changing scoped values

Let’s assume that we have three ScopedValue instances, as follows:

private static final ScopedValue<String> SCOPED_VALUE_1 
 = ScopedValue.newInstance();
private static final ScopedValue<String> SCOPED_VALUE_2 
 = ScopedValue.newInstance();
private static final ScopedValue<String> SCOPED_VALUE_3 
 = ScopedValue.newInstance();

We also have a Runnable that uses all three ScopedValue instances:

Runnable task = () -> {
  logger.info(Thread.currentThread().toString());
  logger.info(() -> SCOPED_VALUE_1.isBound() 
    ? SCOPED_VALUE_1.get() : "Not bound");
  logger.info(() -> SCOPED_VALUE_2.isBound() 
    ? SCOPED_VALUE_2.get() : "Not bound");
  logger.info(() -> SCOPED_VALUE_3.isBound() 
    ? SCOPED_VALUE_3...

234. Using ScopedValue and StructuredTaskScope

In this problem, we will reiterate the application developed in Problems 227 and 228, and we will enrich it with a few ScopedValue variables for implementing new features. I’ll consider that you are already familiar with that application.

The ScopedValue that we plan to add are listed here (these are added in the main class because we want them to be accessible at the application level):

public static final ScopedValue<String> USER 
  = ScopedValue.newInstance();
public static final ScopedValue<String> LOC 
  = ScopedValue.newInstance();
public static final ScopedValue<String> DEST 
  = ScopedValue.newInstance();
public static final ScopedValue<Double> CAR_ONE_DISCOUNT 
  = ScopedValue.newInstance();
public static final ScopedValue<Boolean>
  PUBLIC_TRANSPORT_TICKET = ScopedValue.newInstance();

First, let’s focus on the fetchTravelOffers() method, which is the point from where we...

235. Using Semaphore instead of Executor

Let’s say that we have the following task (Runnable):

Runnable task = () -> {
  try {
    Thread.sleep(5000);
  } catch (InterruptedException ex) { /* handle exception */ }
  logger.info(Thread.currentThread().toString());
};

And we plan to execute this task 15 times by 3 threads:

private static final int NUMBER_OF_TASKS = 15;
private static final int NUMBER_OF_THREADS = 3;

We can easily solve this problem via Executors.newFixedThreadPool() and platform threads:

// using cached platform threads
try (ExecutorService executor = 
  Executors.newFixedThreadPool(NUMBER_OF_THREADS)) {
  for (int i = 0; i < NUMBER_OF_TASKS; i++) {
    executor.submit(task);
  }
}

A snippet of the possible output:

Thread[#24,pool-1-thread-3,5,main] 
Thread[#22,pool-1-thread-1,5,main] 
Thread[#23,pool-1-thread-2,5,main] 
Thread[#22,pool-1-thread-1,5,main] 
Thread[#24,pool-1-thread-3,5,main] 
Thread[#23,pool-1-thread-2,5,main...

236. Avoiding pinning via locking

Remember from Chapter 10, Problem 213, that a virtual thread is pinned (not unmounted from its carrier thread) when the execution goes through a synchronized block of code. For instance, the following Runnable will cause virtual threads to be pinned:

Runnable task1 = () -> {
  synchronized (Main.class) {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException ex) { /* handle it */ }
    logger.info(() -> "Task-1 | " 
      + Thread.currentThread().toString());
  }
};

The synchronized block contains a blocking operation (sleep()), but the virtual thread that hits this point of execution is not unmounted. It is pinned on its carrier thread. Let’s try to capture this behavior via the following executor:

private static final int NUMBER_OF_TASKS = 25;
try (ExecutorService executor 
     = Executors.newVirtualThreadPerTaskExecutor()) {
  for (int i = 0; i < NUMBER_OF_TASKS; i++) {
    executor.submit...

237. Solving the producer-consumer problem via virtual threads

Let’s assume that we want to write a program simulating an assembly line (or a conveyor) for checking and packing bulbs using two workers. By checking, we mean that the worker tests if the bulb lights up or not. By packing, we mean that the worker takes the verified build and puts it in a box.

Next, let’s assume a fixed number of producers (3), and a fixed number of consumers (2); let’s represent it via the following diagram:

Figure 11.8.png

Figure 11.8: The producer-consumer problem with a fixed number of workers

We can implement this scenario via the well-known Executors.newFixedThreadPool(PRODUCERS), Executors.newFixedThreadPool(CONSUMERS), and ConcurrentLinkedQueue as the temporary storage for checked bulbs, as you can see at https://github.com/PacktPublishing/Java-Coding-Problems/tree/master/Chapter10/P203_ThreadPoolFixed_ConcurrentLinkedQueue.

Let’s consider this code as legacy and...

238. Solving the producer-consumer problem via virtual threads (fixed via Semaphore)

In the previous problem, we implemented the producer-consumer problem via a fixed number of producers (three virtual threads) and consumers (two virtual threads). Moreover, since our application works as an assembly line, we can say that the number of tasks is boundless. Practically, the producers and consumers work without breaks until the assembly line is stopped. This means the virtual threads assigned by the executor as producers and consumers remain exactly the same between a start-stop lifecycle of the assembly line.

Next, let’s assume that we want to use Semaphore objects instead of newVirtualThreadPerTaskExecutor() to obtain the exact same behavior.

Based on Problem 235, we can implement the fixed number of producers as follows:

private final static Semaphore producerService
    = new Semaphore(PRODUCERS);
...
for (int i = 0; i < PRODUCERS; i++) {
  Thread.ofVirtual(...

239. Solving the producer-consumer problem via virtual threads (increase/decrease consumers)

Let’s continue our producer-consumer problem with another scenario that starts with three producers and two consumers:

private static final int PRODUCERS = 3;
private static final int CONSUMERS = 2;

Let’s assume that each producer checks a bulb in no more than one second. However, a consumer (packer) needs a maximum of 10 seconds to pack a bulb. The producer and consumer times can be shaped as follows:

private static final int MAX_PROD_TIME_MS = 1 * 1000;
private static final int MAX_CONS_TIME_MS = 10 * 1000;

Obviously, in these conditions, the consumers cannot face the incoming flux. The queue (here, LinkedBlockingQueue) used for storing bulbs until they are packed will continuously increase. The producers will push into this queue much faster than the consumers can poll.

Since we have only two consumers, we have to increase their number to be able to...

240. Implementing an HTTP web server on top of virtual threads

Implementing a simple HTTP web server in Java is quite easy since we already have an API ready to guide and serve our goals. We start from the HttpServer class (this class is present in the com.sun.net.httpserver package), which allows us to achieve our goal straightforwardly in a few steps.

Before jumping into the code, let’s quickly mention that our web server will allow us to choose between platform and virtual threads and between non-locking or locking (for instance, to simulate access to a database). We will make these choices via two boolean parameters of our startWebServer(boolean virtual, boolean withLock) method, named virtual and withLock, respectively. So, we will have four possible configurations.

First, we create an HttpServer via the create() method. At this point, we also set up the port of our web server:

private static final int MAX_NR_OF_THREADS = 200;
private static final int WEBSERVER_PORT...

241. Hooking CompletableFuture and virtual threads

CompletableFuture is one of the main asynchronous programming APIs in Java (if you need deep coverage of this topic, then you could consider checking out Java Coding Problems, First Edition, Chapter 11).

In order to use CompletableFuture with virtual threads, we just have to use the proper executor for virtual threads:

private static final ExecutorService executor 
  = Executors.newVirtualThreadPerTaskExecutor();

Next, we use this executor to fetch three application testers in asynchronous mode via CompletableFuture:

public static CompletableFuture<String> fetchTester1() {
  return CompletableFuture.supplyAsync(() -> {
  String tester1 = null;
  try {
    logger.info(Thread.currentThread().toString());
    tester1 = fetchTester(1);
  } catch (IOException | InterruptedException ex) 
    { /* handle exceptions */ }
  return tester1;
  }, executor);
}
public static CompletableFuture<String> fetchTester2...

242. Signaling virtual threads via wait() and notify()

The wait(), notify(), and notifyAll() are three methods defined in the Object class that allow multiple threads to communicate with each other and coordinate their access to resources without issues.

The wait() method must be called only by the thread that owns the object’s monitor to force this thread to wait indefinitely until another thread calls notify() or notifyAll() on the same object. In other words, the wait() method must be called in a synchronized context (instance, block, or static method).

Here is a virtual thread calling the wait() method:

Object object = new Object();
Thread wThread = Thread.ofVirtual().unstarted(() -> {
  synchronized (object) {
    try {
      logger.info("Before calling wait()");
      logger.info(() -> Thread.currentThread() + " | " 
        + Thread.currentThread().getState());
      object.wait();
      logger.info("After calling notify()"...

Summary

This chapter covered 18 advanced problems about virtual threads and structured concurrency. You can see this chapter as a masterclass designed to help you speed up your learning and get ready for production with strong confidence in your knowledge. With that covered, you have now finished the chapter and the book.

Join our community on Discord

Join our community’s Discord space for discussions with the author and other readers:

https://discord.gg/8mgytp5DGQ

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Java Coding Problems - Second Edition
Published in: Mar 2024Publisher: PacktISBN-13: 9781837633944
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at ₹800/month. Cancel anytime

Author (1)

author image
Anghel Leonard

Anghel Leonard is a Chief Technology Strategist and independent consultant with 20+ years of experience in the Java ecosystem. In daily work, he is focused on architecting and developing Java distributed applications that empower robust architectures, clean code, and high-performance. Also passionate about coaching, mentoring and technical leadership. He is the author of several books, videos and dozens of articles related to Java technologies.
Read more about Anghel Leonard