Multithreading in Java

by Mauricio Avellar
1.7k views

In this article, we will learn what multithreading is and how to multithread in Java.

1. Overview

Multithreading allows us to execute multiple tasks at the same time preventing our application from hanging for a long time giving the impression it has frozen or crashed.

1.1 What’s a Thread?

To understand Thread, we need to talk about processes. For every application, every executable you have running on your computer, a process is created, and memory is allocated. The same is true for Java applications.

A Thread is a lightweight process that uses or shares the process’s resources, hence the lightweight name. It does not create a new process, nor does it allocate more memory.

1.2 Threads in Java

The Thread class in Java resides in the java.lang package, which is imported automatically by the Java compiler, so you won’t need to import anything to used Threads in your code.

A Thread will always execute its code sequentially and all Java Core applications have at least one thread, which means they execute tasks sequentially one by one, and the controller of the task is not passed to the next task until the current one is completed. In other words, our code is executed in the exact order we define them (except for static blocks learn more about it here).

The single thread every Java programme has is the main thread represented by the public static void main(String[] args) method and everything we declare here is executed in sequence.

To show that every java application runs in the main thread, let’s add a throw new RuntimeException() to our main method and run it. The output will be:

Exception in thread “main” java.lang.RuntimeException

As you can see, an exception in thread “main” was thrown.

1.3 Thread States in Java

In Java, a thread will always be in one of these six states: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING or TERMINATED. The diagram below shows how thread states behave. The Scheduler is responsible for moving the threads from runnable to running and vice versa.

You can use the getState() method on any thread to check which state a thread is current in or call the Thread.currentThread().getState() method to obtain the current thread state.

Figure 1. Java Thread States and Life Cycle

2. Multithread

With multithread, multiple pieces of code are executed by different threads concurrently and the controller is shared among them.

We may have a scenario where a task takes too long to execute. We can run this code in a separate thread, so our application keeps moving the controller back and forth to prevent this one task from holding the application controller causing our application to hang indefinitely.

To multithread in Java, we can either extend Thread or Implement Runnable, which Thread certainly does. There are some differences between these two approaches, but first, let’s understand each one individually.

Depicts two different approaches to creating a thread: Extends Thread versus Implements Runnable
Figure 2. Extends Thread v. Implements Runnable

2.1 Extending Thread

One way to create threads in Java is by extending the Thread class and overriding the run() method. The run method is where we will write all our code.

We must add all the code that we want to run concurrently (in a separate thread) inside the run method. Any other code will be run by the main thread.

One main disadvantage of extending Thread is that you cannot extend any other class since Java doesn’t allow multiple inheritance. It is also less flexible because we are tightly coupling classes.

Keep in mind that you won’t be able to reuse the thread code. If you call the methods start() more than once, you will get an exception: IllegalThreadStateException. Instead, you will have to new one.

2.2 Creating a Thread

Now we will create a class, extend Thread, add a constructor and overrider the run method.

We will use Thread.sleep() method to delay the execution of the program a bit.

public class PrintService extends Thread {
  
  private final String file;
  
  public PrintService(String file) {
    this.file = file;
  }
  
  @Override
  public void run() {
    System.out.println("Print Service started!");
    for (int i = 0; i < 5; i++) {
      System.out.println(file);
      try {
        Thread.sleep(500);
      } catch (InterruptedException ignored) {}
    }
    System.out.println("Print Service finished!");
  }
}

We added a final field, so we can pass the “file” to be printed by our thread. Then we will use a loop and the sleep method to simulate a task that takes time to complete. Finally, we will print the start and finish of its execution.

Bear in mind we must surround Thread.sleep() method with a try catch block since it throws a checked InterruptedException.

2.3 Implementing Runnable

Another way to create threads is by implementing the Runnable interface and its run() method. In the same way, we must add all the code that will execute inside the method run.

One advantage of implementing Runnable is that our class can extend any other class in case it is needed, and we can still implement as many interfaces as necessary. It is more flexible since we are using composition (loosely coupling classes).

One minor disadvantage is that by implementing Runnable you will need an extra step for creating thread objects, that is, you will have to instantiate an object of type Thread passing a Runnable (the class that implements Runnable) to it, which can be done in one line.

Let’s create another class called EmailService that implements the Runnable interface. The code will be the same as in the Print Service class.

public class EmailService implements Runnable {
  
  private final String service;
  
  public EmailService(String service) {
    this.service = service;
  }
  
  @Override
  public void run() {
    System.out.println("Email service started!");
    for (int i = 0; i < 5; i++) {
      System.out.println(service);
      try {
        Thread.sleep(500);
      } catch (InterruptedException ignored) { }
    }
    System.out.println("Email service finished!");
  }
}

2.4 Anonymous Class

It’s possible to use anonymous class to create threads as well. The downside is that you must start the thread immediately.

new Thread() {
  @Override
  public void run() {
    System.out.println("Anonymous Thread!");
  }
}.start();

Or a more elegant solution using Lambda expression:

new Thread(() -> System.out.println("Anonymous Thread")).start();

3. Multithreading Application

Multithreading in Java is the process of executing two or more threads concurrently. Threads can execute different pieces of code or concur for the same piece and the latter may lead to some nasty behavior in your application.

So, let’s create two EmailService objects (two threads) in the main method. Note that one common mistake is to call the run() method instead of the start() method. If we call the run method, our application will run single-threadedly in the main thread.

Let’s add a print statement at the end of our main method to informe us the main thread has reached its end.

public class Main {

  public static void main(String[] args) throws InterruptedException {
    
    Thread emailService1 = new Thread(new EmailService("Receiving Email..."));
    emailService1.run();
    Thread emailService2 = new Thread(new EmailService("Sending Email..."));
    emailService2.run();

    System.out.println("The main thread has finished!");
  }
}

Here, we will call the run method just to demonstrate that that is not multithreading. Check the output:

Email service started!
Receiving Email...
Receiving Email...
Receiving Email...
Receiving Email...
Receiving Email...
Email service finished!
Email service started!
Sending Email...
Sending Email...
Sending Email...
Sending Email...
Sending Email...
Email service finished!
The main thread has finished!

In the output above, our code ran sequentially as it would normally do if there were no threads. In other words, calling the method run defeats the purpose of multithreading.

Now, let’s comment out both run methods, call the start method, and run the code again.

Thread emailService1 = new Thread(new EmailService("Receiving Email..."));
//emailService1.run();
emailService1.start();
Thread emailService2 = new Thread(new EmailService("Sending Email..."));
//emailService2.run();
emailService2.start();

Note that the main thread finished earlier than the other two. Also note that now both sending and receiving are overlapping, which is desirable in this situation.

The main thread has finished!
Email service started!
Sending Email...
Email service started!
Receiving Email...
Receiving Email...
Sending Email...
Receiving Email...
Sending Email...
Receiving Email...
Sending Email...
Receiving Email...
Sending Email...
Email service finished!
Email service finished!

3.1 Synchronized Code

Synchronized blocks or methods guarantee that one, and only one thread will be running that piece of code at a time.

Whenever developers synchronize the critical pieces of code in a method or class, we say that this method or class is Thread Safe, e.g., the Vector class. In the same way, whenever dealing with code that is NOT synchronized, we need to provide synchronization ourselves to ensure code correctness. One example of a class that does not provide thread safety is the Array List in Java. Here is what the documentation states:

This class is roughly equivalent to Vector, except that it is unsynchronized.

ArrayList Java Documentation – Oracle

There will be situations where our code may require synchronization, so just one thread will be able to execute that code at a time. To exemplify that, let’s rerun our code, but this time we will create two PrintService threads.

PrintService printService1 = new PrintService("Printing Java doc...");
printService1.start();
PrintService printService2 = new PrintService("Printing Code Learn Hub article...");
printService2.start();

And here is the output:

The main thread has finished!
Print Service started!
Printing Code Learn Hub article...
Print Service started!
Printing Java doc...
Printing Code Learn Hub article...
Printing Java doc...
Printing Code Learn Hub article...
Printing Java doc...
Printing Code Learn Hub article...
Printing Java doc...
Printing Code Learn Hub article...
Printing Java doc...
Print Service finished!
Print Service finished!

Here, again, the main thread finished first. However, when it comes to printing documents, we do NOT want them to overlap. That’s undesirable behavior. What we want is for other threads concurring for the PrintService to wait until the current one is finished and only then run.

3.2 The Join method

The join method is synchronized; hence it will lock the thread. As with most thread related methods, it may take no parameters or, one or two parameters, both to specify how long the wait will be. It calls the Object.wait() method internally.

Let’s adjust our code to lock the thread once it starts printing. Keep in mind that the join() method throws an exception, so we either surround it with a try catch block or throw the exception at the main method, and that is what we did.

public static void main(String[] args) throws InterruptedException {
  PrintService printService1 = new PrintService("Printing Java doc...");
  printService1.start();
  printService1.join();
  PrintService printService2 = new PrintService("Printing Code Learn Hub article...");
  PrintService2.start();
  printService2.join();
}

And here is the output:

Print Service started!
Printing Java doc...
Printing Java doc...
Printing Java doc...
Printing Java doc...
Printing Java doc...
Print Service finished!
Print Service started!
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Print Service finished!
The main thread has finished!

As you can see, that is exactly what we want for a printing service. However, it looks just like a single-threaded application. Now let’s run the email service and print service all together and see what we get.

To make the code cleaner, I’ll extract the code into two different methods.

public static void main(String[] args) throws InterruptedException {  
  emailService();
  printService();
}

private static void emailService() {
  Thread emailService1 = new Thread(new EmailService("Receiving Email..."));
  //emailService1.run();
  emailService1.start();
  Thread emailService2 = new Thread(new EmailService("Sending Email..."));
  //emailService2.run();
  emailService2.start();
}

private static void printService() throws InterruptedException {
  PrintService printService1 = new PrintService("Printing Java doc...");
  printService1.start();
  printService1.join();
  PrintService printService2 = new PrintService("Printing Code Learn Hub article...");
  printService2.start();
  printService2.join();
}
Email service started!
Receiving Email...
Email service started!
Sending Email...
Print Service started!
Printing Java doc...
Receiving Email...
Printing Java doc...
Sending Email...
Receiving Email...
Printing Java doc...
Sending Email...
Receiving Email...
Printing Java doc...
Sending Email...
Receiving Email...
Printing Java doc...
Sending Email...
Print Service finished!
Email service finished!
Print Service started!
Printing Code Learn Hub article...
Email service finished!
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Print Service finished!
The main thread has finished!

As you can see, our email service and print service, which are two unrelated services, overlapped, however, our printing service only started printing the second file once the first was finished.

The main disadvantage of calling the join() method, besides having to call it, is that we will have to call this method for every single thread we new. Imagine having ten threads that need synchronization… That would mean having to call the join method ten times.

Another disadvantage is that you must invoke join() before starting any other thread that will concur for the code, otherwise synchronization won’t work.

3.3 Locker

Synchronized blocks require an object to lock on, a locker if you will. We will add a static locker to the synchronized block, and it must be static because two different objects will be competing for the lock.

Bear in mind that we must synchronize just the necessary part of the code. Synchronizing huge chunks of code may cause performance problems in the application.

private static final Object locker = new Object();

Here we have a private static final Object variable that will work as our locker in our PrintService class.

@Override
public void run() {
  synchronized (locker) {
    System.out.println("Print Service started!");
    for (int i = 1; i <= 5; i++) {
      System.out.println(file);
      try {
        Thread.sleep(500);
      } catch (InterruptedException ignored) {}
    }
    System.out.println("Print Service finished!");
  }
}

Now that we added the synchronized block inside out run method. Now let’s comment the join() method out for both Print Services and run the code again.

printService1.start();
//printService1.join();
printService2.start();
//printService2.join();

The output came out as expected.

mail service started!
Receiving Email...
Email service started!
Sending Email...
The main thread has finished!
Print Service started!
Printing Java doc...
Sending Email...
Receiving Email...
Printing Java doc...
Sending Email...
Printing Java doc...
Receiving Email...
Sending Email...
Printing Java doc...
Receiving Email...
Sending Email...
Printing Java doc...
Receiving Email...
Email service finished!
Print Service finished!
Print Service started!
Printing Code Learn Hub article...
Email service finished!
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Printing Code Learn Hub article...
Print Service finished!

From version 1.5 on, Java introduced classes to facilitate lock and to help prevent dead locks. We will be talking about them later in this article.

4. Local vs Instance Variables in Threads

One important thing we need to be aware of is that local variables are stored (copied) in the thread-stack (piece of memory reserved for that specific thread and therefore inaccessible to other threads) while instance variables are stored on the heap (as with every object in Java), memory shared by the application, which means when different threads are working with the same object, they will all modify and share the instance variable and that may cause unwanted results in our application. Let us demonstrate what we mean by that:

So, we will create another class named Counter and, in this class, we will add a method with a for loop and, in the print statement, we will call Thread.currentThread() to get the current thread running and then the getName() method. All threads must have a name. When we do not provide one, the default thread name will be assigned to them.

public class Counter {
  
  public void increment() {
    for (int i = 0; i <= 10; i++) {
      System.out.println(Thread.currentThread().getName() + ": " + i);
    }
  }
}

We will create a CounterThread class, add a constructor that takes a Counter as parameter and finally, we will call the method increment() on the counter object inside the run() method.

public class CounterThread extends Thread {
  private Counter counter;
  
  public CounterThread(Counter counter) {
    this.counter = counter;
  }
  
  @Override
  public void run() {
    counter.increment();
  }
}

For the main method, we will create two counter threads and only one counter object and pass this object to both threads.

Counter counter = new Counter();
new CounterThread(counter).start();
new CounterThread(counter).start();

We would expect the output to be two threads counting from 1 to 10 concurrently and that’s what we get because each thread here had its own copy of the local variables.

Thread-0: 1
Thread-0: 2
Thread-1: 1
Thread-0: 3
Thread-0: 4
Thread-0: 5
Thread-1: 2
Thread-0: 6
Thread-1: 3
Thread-0: 7
Thread-1: 4
Thread-0: 8
Thread-1: 5
Thread-1: 6
Thread-0: 9
Thread-1: 7
Thread-0: 10
Thread-1: 8
Thread-1: 9
Thread-1: 10

4.1 Thread Interference or Race Condition

It happens when two or more threads have access to data, and they try to modify it at the time. What happens is that threads will be blocked trying to access shared state at the same time and by the time they get unblocked, the value may have been modified by other threads causing inconsistency.

Let’s modify our application to use an instance variable instead of a local for loop variable.

public class Counter {
  private int i = 1;
  
  public void increment() {
    for (; i <= 10; i++) {
      System.out.println(Thread.currentThread().getName() + ": " + i);
    }
  }
}

Note that our loop now has only two parameters since we are initializing the int i variable at class level and that’s the only change. Time to rerun our code.

Thread-0: 1
Thread-1: 1
Thread-1: 2
Thread-1: 4
Thread-1: 5
Thread-1: 6
Thread-1: 7
Thread-1: 8
Thread-1: 9
Thread-0: 3
Thread-1: 10

We’d expect the results to show two numbers of each thread. However, we only got number one printed twice and, oddly enough, number three got printed almost last. That’s because of the shared heap memory, any change in values is shared across all threads concurring.

Here, when Thread-0 acquired the value 3, it tried to print it, but it got suspended and by the time it got unsuspended the iterator was at number 10. Note that number nine had already been printed out.

More on this topic in Chapter 6.

5. Producer & Consumer

Now we will talk about some methods that are specially used in multithreaded applications. They help us have some control over threads that require communication among themselves, especially when they are all working on the same shared object. So, we can ask an object to wait while others perform some tasks and vice versa.

The wait(), notify() and notifyAll() methods belong to the Object class and therefore they work with any objects. Remember: every thread is an object. Let’s write some code to make some sense out of these:

public class Array {
  
  private int[] values = new int[10];

  public int add(int index, int value) {
    values[index] = value;
    return value;
  }
  
  public int get(int index) {
    return values[index];
  }
}

Now let’s create two more threads, a producer and a consumer one. We will pass our simple Array class as a parameter for those threads. Note that both methods return an integer, so we can print them out to the console and see what’s going on.

5.1 Producer Thread Class

This thread will be responsible for adding data to the Array class. We added some random thread sleep to simulate the unpredictability of a real-world application.

public class Producer implements Runnable {
  
  private Array array;
  
  public Producer(Array array) {
    this.array = array;
  }
  
  @Override
  public void run() {
    Random random = new Random();
    for (int i = 0; i < 10; i++) {
      System.out.println("Added: " + array.add(i, i+1));
      try { Thread.sleep(random.nextInt(5000));
      } catch (InterruptedException ignored) { }
    }
  }
}

5.2 Consumer Thread Class

The thread that will use or consume the data that is being inputted by the Producer.

public class Consumer implements Runnable {
  
  private Array array;
  
  public Consumer(Array array) {
    this.array = array;
  }
  
  @Override
  public void run() {
    Random random = new Random();
    for (int i = 0; i < 10; i++) {
      System.out.println("Gotten: " + array.get(i));
      try { Thread.sleep(random.nextInt(5000));
      } catch (InterruptedException ignored) { }
    }
  }
}

We want the producer to add a number to our array and our consumer to use them upon being added and only after being consumed, should our application add (produce) another value. So, let’s run the code and see what we get.

Added: 1
Gotten: 1
Gotten: 0
Added: 2
Gotten: 0
Added: 3
Gotten: 0
Added: 4
Gotten: 0
Added: 5
Gotten: 0
Added: 6
Gotten: 0
Added: 7
Gotten: 0
Added: 8
Gotten: 0
Added: 9
Gotten: 0
Added: 10

As you can see, since we have no control over the order of the execution here, we got some weird results. We added one and got one, then it tried to get again before it was even added and we got zero and since we are getting values before they are even added, we keep getting the default zero value instead of what we want.

5.3 Guarded Block

To fix this all we need to do is modify the Array class to wait (be suspended) until some data is added (produced) and then wait until some data is used (consumed).

Guarded blocks are well documented in the Java documentation and here are some pointers:

  • The wait() method can throw InterrupterException.
  • The wait() method should always be invoked from within a loop.
  • Always test for the condition your application is waiting for.
  • Never assume a thread will be interrupted by a certain condition.
  • To invoke wait() you must own the intrinsic lock for the object, otherwise an exception is thrown.
  • They should always be invoked inside a synchronized method or block.
  • Once wait() is invoked, it releases the lock.
  • We should always use notifyAll(), unless our application has too many concurring threads.
  • The notify() method only wakes up one thread, but it doesn’t take any parameters, so there is no way to guarantee which one will be awoken by it.

Here is the Array class. We added a boolean variable to control when and which threat will be suspended. We also synchronized the methods. Finally, we will notify other threads when we are done producing or consuming data.

public class Array {
  
  private int[] values = new int[10];
  private boolean adding = true;
  
  public synchronized int add(int index, int value) {
    while (!adding) {
      try {
        this.wait();
      } catch (InterruptedException ignored) { }
    }
    values[index] = value;
    adding = false;
    notifyAll();
    return value;
  }
  
  public synchronized int get(int index) {
    while (adding) {
      try {
        this.wait();
      } catch (InterruptedException ignored) { }
    }
    adding = true;
    notifyAll();
    return values[index];
  }
}

Note that even though we may have up to five seconds of sleep, our application continues to work synchronously and without any issues. Check the output:

Added: 1
Gotten: 1
Added: 2
Gotten: 2
Added: 3
Gotten: 3
Added: 4
Gotten: 4
Added: 5
Gotten: 5
Added: 6
Gotten: 6
Added: 7
Gotten: 7
Added: 8
Gotten: 8
Added: 9
Gotten: 9
Added: 10
Gotten: 10

5.4 The Volatile Keyword in Java

The volatile field is desirable when threads read and write shared values and here is why: when we create a thread, it allocates a portion of memory called stack, which cannot be shared among threads. Local variable values are copied to the stack, and, during this process, a thread may be suspended a lot of times and the value first copied might have been changed several times, but we still have that first value due to suspension.

One way to fix that is to use the volatile keyword, which guarantees a happens-before relationship. What that means is that as soon as we modify a variable, the change will reflect throughout all threads since there is no copy of that variable on the stack. When we write to it, it will go straight to the main memory. Check the code below.

private static volatile int countdown = 6;
private static void volatileProducerConsumer() {
    
  new Thread(() -> {
    int local_countdown = countdown;
    while (local_countdown > 0) {
      System.out.println("Decrementing "+ (local_countdown - 1));
      countdown = --local_countdown;
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ignored) { }
    }
  }).start();
  
  new Thread(() -> {
    int local_countdown = countdown;
    while (local_countdown > 0) {
      if (local_countdown != countdown) {
        System.out.println("Value changed: " + countdown);
        local_countdown = countdown;
      }
    }
  }).start();
}

When we run it, we get the expected output. Keep in mind the initial value was six.

Decrementing to 5
Value changed: 5
Decrementing to 4
Value changed: 4
Decrementing to 3
Value changed: 3
Decrementing to 2
Value changed: 2
Decrementing to 1
Value changed: 1
Decrementing to 0
Value changed: 0

If we ran it without the volatile keyword, the result would be:

Decrementing to 5
Decrementing to 4
Decrementing to 3
Decrementing to 2
Decrementing to 1
Decrementing to 0

It happens because the other thread cannot see the variable change because the change is only in the stack, not on the main memory. You will have to force-stop your application will hang indefinitely.

Let’s consider some main characteristics of both the volatile and synchronized keywords.

Volatile Keyword

  • It is a field modifier.
  • It can be used with primitives and objects.
  • It is high performance.
  • It does not use locks, nor does it block threads.

Synchronized Keyword

  • It modifies code blocks and methods.
  • It can only be used with objects and this keyword.
  • It is low performance is scalable.
  • It uses locks and blocks threads whenever needed.

5.5 Deadlocks

Deadlocks occur when two or more threads are blocked waiting for each other, but the notification never comes. They hang permanently. We will try to recreate a deadlock situation.

First, let’s add another class, Person and add a final field name to it, a constructor, a getter, and two methods one public and another one private.

public class Person {
  private final String name;
  
  public Person(String name) {
    this.name = name;
  }
  
  public String getName() {
    return name;
  }
  
  public synchronized void wave(Person person) {
    System.out.println(name + " waved at " + person.getName());
    person.waveBack(this);
  }
  
  private synchronized void waveBack(Person person) {
    System.out.println(name + " waved at " + person.getName());
  }
}

Now, in the main method, let’s create two objects of that class and two threads as well. We will use anonymous class to make the code shorter and we will sleep for 10 seconds. If our application is still stuck, we will terminate it with System.exit(130). Check the code below:

final Person john = new Person("John");
final Person georgios = new Person("Georgios");
new Thread(() -> {john.wave(georgios);}).start();
new Thread(() -> {georgios.wave(john);}).start();
  
try {
  Thread.sleep(10000);
  System.exit(130);
} catch (InterruptedException ignored) { }

Here is the output:

John waved at Georgios
Georgios waved at John

Process finished with exit code 130

After 10 seconds passed, our fellows hadn’t waved back to another, so we forced the termination of our application because both threads were blocked. We can easily demonstrate that the two threads got blocked by showing their state with the Thread.getState() method. Let’s adjust our code so it shows the threads states.

5.6 Locking on this

final Person john = new Person("John");
final Person georgios = new Person("Georgios");
Thread t1 = new Thread(() -> {john.wave(georgios);});
Thread t2 = new Thread(() -> {georgios.wave(john);});
t1.start();
t2.start();
  
try {
  Thread.sleep(10000);
  System.out.println(t1.getState());
  System.out.println(t2.getState());
  System.exit(130);
} catch (InterruptedException ignored) { }

Notice that now we need a reference to the object, so we defined two variables t1 and t2. Check the output:

Georgios waved at John
John waved at Georgios
BLOCKED
BLOCKED

Process finished with exit code 130

Let us make a minor change to our Person class to fix this problem. In our method wave, instead of synchronizing the whole method, we will only synchronize the invoke to the waveBack method and pass this as our locker.

public void wave(Person person) {
    System.out.println(name + " waved at " + person.getName());
    synchronized (this) {
      person.waveBack(this);
    }
  }

Note that now we are synchronizing only the necessary code locking on the object represented by the word this, which may either be john or georgios.

The one thing we can never assume is who is going to wave first. Sometimes it will be john and other times it will be georgios. Nevertheless, once one of them acquires the lock, no other thread can execute until the current thread releases the lock.

And here is the output:

John waved at Georgios
Georgios waved back at John
Georgios waved at John
John waved back at Georgios
TERMINATED
TERMINATED

Process finished with exit code 0

As you can see, now we got the result we wanted with no deadlocks, our threads are TERMINATED, and we no longer need the System.exit() method to terminate our application.

6. The Java Util Concurrent Package

The java.util.concurrent package offers two other packages to help us deal with concurrency problems. The atomic package and locks package, which we will cover in the section, were introduced to help us keep our application inconsistency-free.

In a multithread environment where variables and objects may be changed by different sub-processes simultaneously, which is highly inconsistency-prone, ensuring the consistency of those objects and shares values is key, and that’s when those two packages come in handy.

6.1 What Is Atomicity

Atomicity is when an operation will go all the way or not at all. In other words, the operation will have to be fully concluded with no suspension or interference. But why do we need that?

In Java only a few operations are atomic. For instance:

  • Reading and writing references. E.g. myReference1 = myReference2;
  • Reading and writing primitive variables, except long and double (they may require two operations and a thread could be suspended between them).
  • Reading and writing volatile variables including long and double.

6.2 The Atomic Package and Classes

The java.util.concurrent.atomic package provides us with atomic classes that are thread-safe and lock-free.

So, in Chapter 4 we created a method called increment to show the problems we may run into with Thread Interference. This is such a case where using volatile wouldn’t help because the problem here is not visibility as it could have been with our Producer and Consumer classes.

Here are some of the most common classes in the atomic package.

  • AtomicInteger
  • AtomicLong
  • AtomicBoolean
  • AtomicReference

Back to the counter problem, we had two threads competing for the counter, the problem was, during incrementation of the variable i, our thread got suspended after reading, but before writing it back to memory.

6.2.1 The AtomicInteger Class

First, let’s fix our code. This is how our Counter class will look like:

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
  AtomicInteger intValue = new AtomicInteger(1);
  
  public void increment() {
    while (intValue.get() <= 10) {
      System.out.println(Thread.currentThread().getName() + ": " + intValue.getAndIncrement());
    }
  }
}

So, we replaced the primitive int with an AtomicInteger and now instead of a for loop, we are using a while loop. Since AtomicInteger is a class, we need to invoke the method get() to get the integer value. Finally, instead of invoking i++, which involves 3 operations, hence it isn’t atomic, we will use the method getAndIncrement(), which returns the value and then increment it atomically. Note: should you need to increment first, just use the incrementAndGet() method.

Thread-0: 1
Thread-0: 3
Thread-0: 4
Thread-1: 2
Thread-0: 5
Thread-1: 6
Thread-0: 7
Thread-0: 9
Thread-0: 10
Thread-1: 8

This time we didn’t get two number ones due to a thread being suspended during incrementation.

Keep in mind that there are many more classes and methods to be explored which are outside of our scope here.

6.3 The Locks Package and Classes

Now it is time to learn a little about the locks package and the lock interface which some of the classes in this package implement. So, before java 1.5 we did not have any other option other than synchronization via the synchronized keyword discussed previously here.

6.3.1 The Reentrant Lock Class

As the name suggests, this lock is re-entrant, so, if a thread already owns the lock and reaches the code that needs that specific lock, it can just continue executing. It is not necessary for this thread to re-obtain the lock. Bear in mind that not all implementations of the Lock interface are re-entrant.

Again, we will create another producer-consumer application to demonstrate how to use the ReentrantLock. Here is the Producer.

import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

public class MessageWriter implements Runnable {
  
  private final List<String> messages;
  private final ReentrantLock msgLock;
  
  public MessageWriter(List<String> messages, ReentrantLock msgLock) {
    this.messages = messages;
    this.msgLock = msgLock;
  }
  
  String[] incomingMsg = {
    "Something important is happening here.",
    "The Reentrant Lock works like a charm.",
    "But one has to be careful when dealing with those locks.",
    "One slip and we may find ourselves in a deadlock limbo.",
    "Try finally is your friend and it won't disappoint you.",
    ""
  };
  
  @Override
  public void run() {
    for (var msg : incomingMsg) {
      System.out.println("Sending message...");
      msgLock.lock();
      messages.add(msg);
      msgLock.unlock();
    }
  }
}

And here is our Consumer:

import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

public class MessageReader implements Runnable {
  
  private final List<String> messages;
  private final ReentrantLock msgLock;
  
  public MessageReader(List<String> messages, ReentrantLock msgLock) {
    this.messages = messages;
    this.msgLock = msgLock;
  }
  
  @Override
  public void run() {
    while(true) {
      msgLock.lock();
      if (messages.isEmpty())
        continue;
      if (messages.get(0).equals(""))
        break;
      System.out.println("Receiving message:");
      System.out.println(messages.remove(0));
      msgLock.unlock();
    }
  }
}

Finally, here is the code for the main method:

List<String> messages = new ArrayList<>();
ReentrantLock msgLock = new ReentrantLock();
MessageWriter msgWriter = new MessageWriter(messages, msgLock);
MessageReader msgReader1 = new MessageReader(messages, msgLock);
MessageReader msgReader2 = new MessageReader(messages, msgLock);

new Thread(msgWriter).start();
new Thread(msgReader1).start();
new Thread(msgReader2).start();

Let’s run this code and see what happens.

The main thread has finished!
Sending message...
Exception in thread "Thread-1" java.lang.Error: Maximum lock count exceeded
	at java.base/java.util.concurrent.locks.ReentrantLock$NonfairSync.initialTryLock(ReentrantLock.java:231)
	at java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:152)
	at java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
	at com.codelearnhub.application.MessageReader.run(MessageReader.java:19)
	at java.base/java.lang.Thread.run(Thread.java:833)

Obviously, something went wrong here. We reached Maximum lock count exceeded, but why?

The problem relies on our consumer, while our messages list is empty, it keeps re-getting the lock repeatedly and it keeps track of how many times the lock has been held and, let’s say, it has gotten the lock ten times, it will have to release the lock ten times as well and that never happens in our application.

6.3.2 Using Locks with a Try Finally Block.

By simply adding a try finally block we can prevent our lock from repeatedly getting the lock since the finally block always gets executed no matter what. The bottom line is you should always put the unlock() method in a finally block.

So, here is the fixed producer.

@Override
public void run() {
  for (var msg : incomingMsg) {
    System.out.println("Sending message...");
    msgLock.lock();
    try {
      messages.add(msg);
    } finally {
      msgLock.unlock();
    }
  }
}

Our consumer:

@Override
public void run() {
  while(true) {
    msgLock.lock();
    try {
      if (messages.isEmpty())
        continue;
      if (messages.get(0).equals(""))
        break;
      System.out.println("Receiving message:");
      System.out.println(messages.remove(0));
    } finally {
      msgLock.unlock();
    }
  }
}

And now the output is just what we expect it to be:

Sending message...
Sending message...
Receiving message:
Something important is happening here.
Receiving message:
The Reentrant Lock works like a charm.
Sending message...
Sending message...
Sending message...
Receiving message:
But one has to be careful when dealing with those locks.
Receiving message:
One slip and we may find ourselves in a deadlock limbo.
Sending message...
Receiving message:
Try finally is your friend and it won't disappoint you.

Now our code works perfectly, and it does not need any alterations. However, we may be in a situation where we may need to check whether the lock is available before executing some code.

6.3.3 The Try Lock Method.

If that’s the case, the tryLock() method is here for you. Let’s change our application to accommodate this method. Keep in mind that for this specific application it isn’t necessary. We will do it just to show you how it could be helpful in a real-world scenario.

Keep in mind we won’t mess with our producer, just our consumer. Otherwise, we may get into a deadlock situation again. Here is our run method for the consumer class. This time we will show which thread is consuming.

@Override
public void run() {
  while(true) {
    if (msgLock.tryLock()) {
      try {
        if (messages.isEmpty())
          continue;
        if (messages.get(0).equals(""))
          break;
        System.out.println(Thread.currentThread().getName() + " is receiving a message:");
        System.out.println(messages.remove(0));
      } finally {
        msgLock.unlock();
      }
    }
  }
}

With tryLock(), which returns a boolean, if the lock is available, it will hold it and execute the code, if it is not, it will skip the conditional. Check out the output:

Sending message...
Sending message...
Sending message...
Sending message...
Sending message...
Sending message...
Thread-1 is receiving a message:
Something important is happening here.
Thread-2 is receiving a message:
The Reentrant Lock works like a charm.
Thread-2 is receiving a message:
But one has to be careful when dealing with those locks.
Thread-2 is receiving a message:
One slip and we may find ourselves in a deadlock limbo.
Thread-2 is receiving a message:
Try finally is your friend and it won't disappoint you.

6.3.4 The Lock Condition API

Java Condition API provides us with an alternative for the wait(), notify() and notifyAll() methods. It enables threads to suspend their execution until a certain given condition is true and to signal other threads, they may resume their execution.

It must be bound to a lock and can only be instantiated by the lock object factory method newCondition().

If you have been following this tutorial, you will recall our array class which had two synchronized methods, add and get. We will rewrite this class using the Condition interface. We will rewrite the Producer and Consumer classes as well.

The ArrayCondition class. Note that we add two Condition objects just to make the code clearer. It would’ve worked just as fine if we had added just one.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ArrayCondition {
  
  private int[] values = new int[10];
  private boolean adding = true;
  private final Lock lock = new ReentrantLock();
  private final Condition get = lock.newCondition();
  private final Condition add = lock.newCondition();
  
  public int add(int index, int value) {
    try {
      lock.lock();
      while (!adding) {
        try {
          add.await();
        } catch (InterruptedException ignored) { }
      }
      values[index] = value;
      adding = false;
      get.signal();
      return value;
    } finally {
      lock.unlock();
    }
  }
  
  public int get(int index) {
    lock.lock();
    try {
      while (adding) {
        try {
          get.await();
        } catch (InterruptedException ignored) {
        }
      }
      adding = true;
      add.signal();
      return values[index];
    } finally {
      lock.unlock();
    }
  }
}

The ProducerCondition class.

import java.util.Random;

public class ProducerCondition implements Runnable {
  
  private ArrayCondition arrayCondition;
  public ProducerCondition(ArrayCondition arrayCondition) {
    this.arrayCondition = arrayCondition;
  }
  
  @Override
  public void run() {
    for (int i = 0; i < 10; i++) {
      System.out.println("Added: " + arrayCondition.add(i, i+1));
      Random random = new Random();
      try { Thread.sleep(random.nextInt(5000));
      } catch (InterruptedException ignored) { }
    }
  }
}

The ConsumerCondition class:

import java.util.Random;

public class ConsumerCondition implements Runnable {
  
  private ArrayCondition arrayCondition;
  public ConsumerCondition(ArrayCondition arrayCondition) {
    this.arrayCondition = arrayCondition;
  }
  
  @Override
  public void run() {
    Random random = new Random();
    for (int i = 0; i < 10; i++) {
      System.out.println("Gotten: " + arrayCondition.get(i));
      try { Thread.sleep(random.nextInt(5000));
      } catch (InterruptedException ignored) { }
    }
  }
}

The main method will be about the same:

ArrayCondition arrayCondition = new ArrayCondition();
new Thread(new ProducerCondition(arrayCondition)).start();
new Thread(new ConsumerCondition(arrayCondition)).start();

The output:

Added: 1
Gotten: 1
Added: 2
Gotten: 2
Added: 3
Gotten: 3
Added: 4
Gotten: 4
Added: 5
Gotten: 5
Added: 6
Gotten: 6
Added: 7
Gotten: 7
Added: 8
Gotten: 8
Added: 9
Gotten: 9
Added: 10
Gotten: 10

As you can see, we achieved the exact same result as before, but what are the advantages of using this API?

  • Less error prone.
  • More readable code.
  • It is clearer which object is invoking the await() and signal() methods.
  • It offers two very handy methods awaitUninterruptibly() and awaitUntil(Date deadline).

6.3.5 Lock Fairness

In our previous exemple, we had two consumers and the second one ran most of the time. This is an issue known as Thread Starvation. Our application only has two threads but imagine a real-life application where there could be hundreds of them. What happens is that sometimes one or some of the threads may acquire the lock and run once and never have the opportunity to regain the lock, a.k.a. thread starvation.

To circumvent this issue, the ReentrantLock class offers an overload constructor that takes a boolean fair parameter which will grant the lock to the longest-waiting thread.

Let’s test it out.

public class FairCounter implements Runnable {
  
  private static final Object lock = new Object();
  final private String name;

  public FairCounter(String name) {
    this.name = name;
  }
  
  @Override
  public void run() {
    for (int i = 0; i < 10; i++) {
      synchronized (lock) {
        System.out.println(name + ": " + i);
      }
    }
  }
}

That’s our thread. Simple for loop to count from 0 to 9 and a sleep time of a tenth of a second.

Here is the code for the main method.

new Thread(new FairCounter("Thread-1")).start();
new Thread(new FairCounter("Thread-2")).start();
new Thread(new FairCounter("Thread-3")).start();

Outout:

Thread-1: 0
Thread-1: 1
Thread-1: 2
Thread-1: 3
Thread-1: 4
Thread-1: 5
Thread-1: 6
Thread-1: 7
Thread-1: 8
Thread-1: 9
Thread-3: 0
Thread-3: 1
Thread-3: 2
Thread-3: 3
Thread-3: 4
Thread-3: 5
Thread-3: 6
Thread-3: 7
Thread-3: 8
Thread-3: 9
Thread-2: 0
Thread-2: 1
Thread-2: 2
Thread-2: 3
Thread-2: 4
Thread-2: 5
Thread-2: 6
Thread-2: 7
Thread-2: 8
Thread-2: 9

Note that Thread-2 was executed last and, in a real-world application where there may be dozens of them concurring for a lock and threads are generated on demand by factory threads, some may never get a lock, which means they may never run.

Now try the same example with lock fairness.

import java.util.concurrent.locks.ReentrantLock;

public class FairCounter implements Runnable {
  
  private static final ReentrantLock lock = new ReentrantLock(true);
  final private String name;
  public FairCounter(String name) {
    this.name = name;
  }
  
  @Override
  public void run() {
    for (int i = 0; i < 10; i++) {
      lock.lock();
      try {
        System.out.println(name + ": " + i);
      } finally{
        lock.unlock();
      }
    }
  }
}

Note that now all threads will run fairly without starvation. Bear in mind that in a real-world application, threads may be doing different, yet important tasks and if some of them starve and never run, the functionality of our application may be compromised.

Check the output:

Thread-1: 0
Thread-2: 0
Thread-3: 0
Thread-2: 1
Thread-1: 1
Thread-3: 1
Thread-2: 2
Thread-1: 2
Thread-3: 2
Thread-2: 3
Thread-1: 3
Thread-3: 3
Thread-2: 4
Thread-1: 4
Thread-3: 4
Thread-2: 5
Thread-1: 5
Thread-3: 5
Thread-2: 6
Thread-1: 6
Thread-3: 6
Thread-2: 7
Thread-1: 7
Thread-3: 7
Thread-2: 8
Thread-1: 8
Thread-3: 8
Thread-2: 9
Thread-1: 9
Thread-3: 9

6.4 The Executor Service Interface

The ExecutorService interface in the java.util.consurrent Package helps us manage threads and eliminates the need for thread creation and starting. Conveniently, Java provides an implementation of this interface, which we are going to be using here. It is up to you to create your own implementation should you need it.

It uses a thread pool to optimize the overhead of creating and scheduling thread which can be very costly to performance and memory consumption in application with a substantial number of threads.

With ExecutorService, we do not have to worry about thread management, and we can zero in on what is important, the code that will be executed when our threads are running. Everything will be taken care of by the ExecutorService.

ExecutorService is an interface, so we will need to call one of the Executors class factory methods to instantiate a ThreadPoolExecutor which will use a LinkedBlockingQueue<Runnable> internally.

Keep in mind we won’t go over every single method in this class. We are going to create another version of the previous producer-consumer application and compare the two approaches.

For that, we only need to change the main method.

List<String> messages = new ArrayList<>();
ReentrantLock msgLock = new ReentrantLock();
  
MessageWriter msgWriter = new MessageWriter(messages, msgLock);
MessageReader msgReader1 = new MessageReader(messages, msgLock);
MessageReader msgReader2 = new MessageReader(messages, msgLock);
    
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(msgWriter);
executorService.execute(msgReader1);
executorService.execute(msgReader2);
    
executorService.shutdown();

6.5 The Fixed Thread Pool Factory Methods

So, up to the line where we instantiate our ThreadPoolExecutor using the Executors.newFixedThreadPool factory method, everything is the same. It takes an argument, the maximum number of threads that will be running simultaneously, which means that any additional thread will be queued and may run after any of the current threads are finished.

Whenever using ExecutorService, we must shut it down manually, otherwise our threads will continue executing nonstop. All threads will be executed before it shuts down, even the ones already queued. However, it won’t accept any new thread and adding a new one will result in a RejectedExecutionException.

A second option would be invoking the shutdownNow() method, the main difference is that it will try to stop all remaining tasks and it will also try to eliminate all the queued ones. Bear in mind there’s no guarantee that the ExecutorService will be able to do it and some threads might never even get to terminate. It’s better to stick with the regular shutdown whenever possible.

Note that using ExecutorService for an application like this is too much. We have done it for demonstrational purposes.

Here is the output. The only difference is the default name given to each thread.

Sending message...
Sending message...
Sending message...
Sending message...
Sending message...
Sending message...
pool-1-thread-1 is receiving a message:
Something important is happening here.
pool-1-thread-2 is receiving a message:
The Reentrant Lock works like a charm.
pool-1-thread-2 is receiving a message:
But one has to be careful when dealing with those locks.
pool-1-thread-1 is receiving a message:
One slip and we may find ourselves in a deadlock limbo.
pool-1-thread-1 is receiving a message:
Try finally is your friend and it won't disappoint you.

We can see that, even though we limited the maximum thread to two, the producer thread did its work, thread-1 went three times and thread-2, twice.

6.5.1 Implementing the Callable Interface

Another way to use ExecutorService is by using the submit(Callable<T> task) method instead of execute(), which takes a Callable object and returns an object of type Future.

6.5.2 The Future Class

So far, we have been executing a lot of tasks, but none of them returned any value. It’s because the Runnable interface contract requires a run method that returns void whereas the Callable interface contract requires the call method that can return any object type since it uses generics. Remember, we do not call this method directly!

A Future object is a future representing pending completion of the task. It works much like a promise that will be fulfilled sometime in the future asynchronously, we just don’t know when. Provided no exceptions occur in the meantime. The submit(Callable<T> task) method returns a Future object

Let’s write some code to demonstrate how to use this and what to watch for. So, let’s say you have a task that requires a lot of time to complete, and you need this task to return values. One thing to keep in mind is that the call method throws a Checked Exception.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

public class NumberGenerator implements Callable<List<Integer>> {
  
  private final List<Integer> counter;
  
  public NumberGenerator() {
    this.counter = new ArrayList<>();
  }
  
  @Override
  public List<Integer> call() throws Exception {
    System.out.println("Adding started...");
    for (int i = 1; i <= 100; i++) {
        counter.add(i);
        Thread.sleep(500);
    }
    System.out.println("Adding finished...");
    return counter;
  }
}

Here is our class, it implements Callable<List<Integer>> to return a list of integers. In the call method we are just adding numbers up to one hundred and sleeping half a second between each iteration.

6.5.3 The Get Method

The get() method is a blocking method which means if your future object is not ready yet, it will block the current thread until it has the values to return or throw an exception if unable to compute a result.

Let’s write the code for the main method:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<List<Integer>> future = executorService.submit(new NumberGenerator());
// Pretend we are doing some other tasks meanwhile...
try {
  Thread.sleep(3000);
} catch (InterruptedException ignored) { }
List<Integer> numbers;
try {
  System.out.println("About to invoke the get() method!");
  numbers = future.get(); // Lock if not ready!
  System.out.println("I will have to wait until future is done. :(");
} catch (ExecutionException | InterruptedException e) {
  e.printStackTrace(); // May throw exceptions.
}
executorService.shutdown(); // Don't forget to shut me down!

First line we have an object of type ExecutorService, but this this we use the factory method for a single thread. On line two we instantiate our future object by invoking the submit method on the ExecutorService object and we pass our NumberGenerator object as a parameter.

Since we are simulating an application doing some other stuff while future is executing, we used Thread.sleep(3000) to halt the application for a while. Finally, we will print out to console so you will be able to see how the order in which thing happen and Future works.

Here is the output:

Adding started...
About to invoke the get() method!
Adding finished...
I will have to wait until future is done. :(

As you can see, our call method had already started by the time we invoked the get method. After invoking the get method, our application was blocked for a while because our task hadn’t yet been completed. Finally, our application was unblocked as soon as our NumberGenerator task was completed. Just remember that by invoking get(), your application will be blocked if this task is not finished.

Note that Future objects do not contain values. They are holders of potential future value and that’s why calling the get() method when the values haven’t been set causes the application to halt.

6.5.4 The Overloaded Get Method

We have interesting methods to help us deal with this future-not-being-ready problem. First, the get(long timeout, TimeUnit unit) is an overload method that takes two parameters. So, we could specify how long we are willing to wait for this task to be completed, otherwise it throws an exception. Let’s code that:

try {
  System.out.println("About to invoke the get() method!");
  numbers = future.get(30, TimeUnit.SECONDS); // Lock if not ready!
  numbers.forEach(System.out::println);
  System.out.println("I will have to wait until future is done. :(");
} catch (TimeoutException e) {
  System.out.println("I couldn't wait after all!");
} catch (ExecutionException | InterruptedException e) {
   e.printStackTrace(); // May throw exceptions.
}

Now if we don’t get the computed result in 30 seconds, it’ll throw a TimeoutException. This time, we will try to print out the numbers to console using the forEach method.

Adding started...
About to invoke the get() method!
I couldn't wait after all!
Adding finished...

As you can see, due to timing out, we never got to print out the result to console. However, the code inside the call method finished its task. Check the last line where we got “Adding finished…” after the exception was thrown which means, it continued using resources from the application. Future provides a method for cancelling should you need to do so.

6.5.5 The Cancel Method

The boolean cancel(boolean mayInterruptIfRunning) in the Future class will interrupt the thread if it hasn’t yet started and, in case it has already started, it will try to cancel it, but again, there’s no guarantee it will succeed. Let’s change our fir catch block and try to cancel our thread.

catch (TimeoutException e) {
  System.out.println("I couldn't wait after all!");
  future.cancel(true);
}

As you will see, we were able to cancel the thread since the “Adding finished…” wasn’t printed out this time.

Adding started...
About to invoke the get() method!
I couldn't wait after all!

A better solution would be to invoke get() in a separate thread, so we wouldn’t have to worry about our application getting blocked. Let’s change our code one last time. We will create a Task class with a constructor that takes a Future object.

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Task implements Runnable {
  
  Future<List<Integer>> future;
  
  public Task(Future<List<Integer>> future) {
    this.future = future;
  }
  
  @Override
  public void run() {
    try {
      future.get().forEach(System.out::println);
    } catch (InterruptedException | ExecutionException e) {
      System.out.println("Something went wrong!");
    }
  }
}

For the sake of space, we reduced the counter in NumberGenerator from 100 to 20. Note that instead of single thread, now we have a pool that can run up to two threads concurrently.

Our main method will look like this:

ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<List<Integer>> future = executorService.submit(new NumberGenerator());
  
try {
  Thread.sleep(1000);
} catch (InterruptedException ignored) {}
  
System.out.println("About to invoke the get() method!");
executorService.execute(new Task(future));
executorService.shutdown();
System.out.println("I won't have to wait until future is done ANYMORE! ;)");

Let’s think about what we have done here: we have a Callable thread that returns a Future which will print out to console. However, we are not invoking Future.get() from our main thread so as not to block it, instead we are invoking it from another Runnable thread that runs separately.

Here is the output:

Adding started...
About to invoke the get() method!
I won't have to wait until future is done ANYMORE! ;)
Adding finished...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

Some other methods that may be useful in the Future class are: isDone() and isCancelled() both return a boolean.

6.6 The Cached Thread Pool Factory Method

newCachedThreadPool is a bit different from the newFixedThreadPool. First, you can’t specify the number of threads. Secondly, there’s no LinkedBlockingQueue, instead we have a SynchronousQueue.

Here, whenever we submit a task, this queue will hold the task and look for any available thread to run it. If there is none, it will create a new one, add to the pool and assign it to this task. The synchronous queue will reuse any previously threads that have been created, if available, but it will also terminate and remove any thread that has been idle for 60 seconds.

As you can see, we can’t control the number os threads when using this cached thread pools. Our pool will automatically expand or shrink as needed. It is likely to increase the performance of applications with a lot of short-lived threads.

Let’s switch our factory method for this one in our previous application. We just need to change one line of code.

ExecutorService executorService = Executors.newCachedThreadPool();

The output will be the same.

6.7 The Schedule Thread Pool Factory Method

With newScheduleThreadPool we must provide a pool size which internally uses a special kind of queue, the DelayedWorkQueue. Contrary to the previous ones, this factory method returns a ScheduledExecutorService object which provides three different methods, schedule, scheduleAtFixedRate, and scheduleWithFixedDelay. These methods return a ScheduleFuture which extends Future.

This pool is specially used for when you want your threads to execute after a certain delay or to execute periodically after a specified time. Threads are not executed sequentially, instead they are executed based on delay.

6.7.1 The Schedule Method

To schedule a task or a function, you will have to provide a Runnable or Callable, the delay — time a thread will wait before it starts executing, and the time unit. Time to code:

import java.util.concurrent.atomic.AtomicInteger;

public class Job implements Runnable {
  
  private final String name;
  private final int cap;
  
  public Job(String name, int cap) {
    this.name = name;
    this.cap = cap;
  }
  
  @Override
  public void run() {
    Main.getInstant(name);
    AtomicInteger atomicSum = new AtomicInteger(0);
    for (int i = 1; i <= cap; i++) {
      atomicSum.accumulateAndGet(i, this::sum);
    }
    System.out.println(name + ": " + atomicSum.get());
  }
  
  private int sum(int value, int sum) {
    return sum += value;
  }
}

Here is our Job Runnable class, it has our mandatory run method plus a sum method that we will pass as a parameter to our AtomicInteger.accumulateAndGet which takes an integer and a function that takes two integers and returns an integer as well. Our Job class has a constructor that takes a name String parameter, so we can see which thread prints first and an integer cap to limit the sequence.

Here is the code for the main method:

ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
getInstant("Main thread");
service.schedule(new Job("Thread-1", 5), 15, TimeUnit.SECONDS);
service.schedule(new Job("Thread-2", 10), 10, TimeUnit.SECONDS);
service.schedule(new Job("Thread-3", 15), 5, TimeUnit.SECONDS);
try {
  Thread.sleep(20000);
} catch (InterruptedException ignored) {}
service.shutdown();

We created getInstant() method to print out the time to console so we can see what time our pool was instantiated, and what time each thread outputs its value.

public static void getInstant() {
  DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM);
  LocalTime time = LocalTime.now();
  System.out.println(time.format(formatter));
}

Finally, the output:

Main thread at 19:36:24
Thread-3 at 19:36:29
Thread-3: 120
Thread-2 at 19:36:34
Thread-2: 55
Thread-1 at 19:36:39
Thread-1: 15

The first line shows the time before scheduling any threads, then the time the first thread started executing, notice that the first one to be printed was the last added because it had the lowest delay. Below the time it shows the sum of the sequence.

6.7.2 The Schedule with Fixed Delay Method

This method submits the first thread at the specified delay time, then waits for its completion and only then adds the additional delay. Our main method will look like this:

ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
getInstant("Main thread");
service.scheduleWithFixedDelay(new Job("Thread-1", 10), 5, 3, TimeUnit.SECONDS);
service.scheduleWithFixedDelay(new Job("Thread-2", 5), 10, 10, TimeUnit.SECONDS);
try {
  Thread.sleep(20000);
} catch (InterruptedException ignored) { }
service.shutdown();

Note that we used Thread.sleep(20000) so we will have enough time to re-execute threads. The first thread will wait five seconds to run for the first time and after that, it will run repeatedly every three seconds unless we shut it down or something else interrupts the thread. Here is the output.

Let’s add some sleeping time to our Job thread.

@Override
public void run() {
  Main.getInstant(name);
  AtomicInteger atomicSum = new AtomicInteger(0);
  for (int i = 1; i <= cap; i++) {
    atomicSum.accumulateAndGet(i, this::sum);
  }
  System.out.println(name + ": " + atomicSum.get());
  try {
  Thread.sleep(3000);
  } catch (InterruptedException ignored) { }
}
Main thread at 15:07:23
Thread-1 at 15:07:28
Thread-1: 55
Thread-2 at 15:07:33
Thread-2: 15
Thread-1 at 15:07:34
Thread-1: 55
Thread-1 at 15:07:40
Thread-1: 55

As you can see, we were able to execute the first thread three times before shutting it down. five-second delay for the first one, three-second delay plus at least 3 seconds to complete the job for the second and third threads. However, the second thread was executed only once. It’s because we shut the pool down before the second thread had time to re-schedule it. If shutdown() is never called, they will execute indefinitely every whatever time is defined.

Bear in mind that the creation and submission of the next threads are upon termination of the current ones plus delay time. In other words, the delay time starts counting just after the previous task is finished.

6.7.3 The Schedule at Fixed Rate Method

The main difference between this method and the previous one is that it will keep submitting tasks at the specified rate or upon previous thread completion, whatever happens first. Let’s write some code.

ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
getInstant("Main thread");
service.scheduleAtFixedRate(new Job("Thread-1", 8), 9, 9, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Job("Thread-2", 4), 11, 2, TimeUnit.SECONDS);
try {
  Thread.sleep(20000);
} catch (InterruptedException ignored) { }
service.shutdown();

Here we have two threads running. Before shutting down, we sleep for twenty seconds, but remember, it’s not guaranteed. Note that for the second thread we specified a two-second rate. However, our threads will sleep for about 3 seconds.

Main thread at 15:33:23
Thread-1 at 15:33:32
Thread-1: 36
Thread-2 at 15:33:34
Thread-2: 10
Thread-2 at 15:33:37
Thread-2: 10
Thread-2 at 15:33:40
Thread-2: 10
Thread-1 at 15:33:41
Thread-1: 36

As you can see, the second thread kept submitting at a three-second rate, not two due to its delay for completion.

6.8 The Semaphore Class

Now, let’s say we have an application that can run ten threads at a time, and we have a task or service that takes a long, long time to complete. We wouldn’t want our ten threads to be running this time-consuming task because we probably need them for other tasks as well.

The way Semaphore works is by granting permits of which we must provide the number that will be available to the constructor. Any other thread that tries to acquire a permit when none is available will be blocked. As soon as a task finishes and releases the permit, the blocked threads may acquire them and run.

Similar to the ReentrantLock class, you can also provide a second boolean parameter, fairness which, again, will give the permit to the longest-waiting thread. Let’s write an example code.

import java.util.concurrent.Semaphore;

public class LongTask implements Runnable {
  
  Semaphore semaphore;
  
  public LongTask(Semaphore semaphore) {
    this.semaphore = semaphore;
  }
  
  @Override
  public void run() {
    try {
      semaphore.acquireUninterruptibly();
      Main.getInstant(Thread.currentThread().getName());
      System.out.println("Very high time-consuming task processing...");
      Thread.sleep(5000);
    } catch (InterruptedException ignored) { }
    finally {
      semaphore.release();
    }
  }
}

Just a simple runnable which will print the current thread and a message to console then sleep for five seconds. The acquire and acquireUninterruptibly methods accepts an integer should you want to acquire more than one permit. However, you must release the same number of permits, or they may never be released.

Here is the main method:

private Semaphore semaphore = new Semaphore(2);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
  executorService.execute(new LongTask(semaphore));
}
executorService.shutdown();

We instantiated a Semaphore of size two, an ExecutorService of size ten and we added ten threads to our pool. Now let’s check the result.

pool-1-thread-2 at 22:18:13
Very high time-consuming task processing...
pool-1-thread-1 at 22:18:13
Very high time-consuming task processing...
pool-1-thread-6 at 22:18:19
Very high time-consuming task processing...
pool-1-thread-3 at 22:18:19
Very high time-consuming task processing...
pool-1-thread-4 at 22:18:24
Very high time-consuming task processing...
pool-1-thread-7 at 22:18:24
Very high time-consuming task processing...
pool-1-thread-5 at 22:18:29
Very high time-consuming task processing...
pool-1-thread-10 at 22:18:29
Very high time-consuming task processing...
pool-1-thread-8 at 22:18:34
Very high time-consuming task processing...
pool-1-thread-9 at 22:18:34
Very high time-consuming task processing...

As you can see, although we had ten threads running, we only allowed two to execute at a time.

Bear in mind we won’t cover every single class in this package. There are several more classes depending on what exactly you want to achieve, such as: ForkJoinPool, Phaser, CountDownLatch, CyclicBarrier, etc. to name some.

7. The Thread Local Class

The Thread Local API is a way of achieving thread safety in Java. Variables can only be written and read by the same thread and it enables us to define an object that is both local and global to that one thread.

Let’s say we are working with a huge amount of data, and they require date formatting. One solução would be to instantiate a SimpleDateFormat for each object in our collection, but that’s a lot of objects and memory just for formatting. We could also create a static SimpleDateFormat and use it for all of them, but then again, it is not thread-safe, and we would end up having data consistency issues.

That’s when the Thread Local class comes in handy. Let’s write some code.

import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Date;

public class SimpleDateFormatterThread implements Runnable {
  
  List<Date> dates;
  public SimpleDateFormatterThread(List<Date> dates) {
    this.dates = dates;
  }
  //private static SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy");
  private static ThreadLocal<SimpleDateFormat> dateFormatterThread =
    ThreadLocal.withInitial(() -> new SimpleDateFormat("MM-dd-yyyy"));
  
  @Override
  public void run() {
    for (Date date : dates) {
      System.out.println(
        Thread.currentThread().getName() + ": " + dateFormatterThread.get().format(date));
      //System.out.println(
        //Thread.currentThread().getName() + ": " + sdf.format(date));
    }
  }
}

Note that we also added a SimpleDateFormat that has been commented out should you want to try without ThreadLocal to check the results. Some threads will skip some dates due to thread suspension.

Our main method will look like this:

List<Date> dates = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
  dates.add(Date.from(Instant.now().plus(i, ChronoUnit.DAYS)));
}
for (int i = 0; i < 5; i++) {
  new Thread(new SimpleDateFormatterThread(dates)).start();
}

We will add one hundred different dates to our list then print them formatted to console. Next, we will create five threads to concur for those dates. We won’t be showing the results here due to the fact they are too extensive. Feel free to try it both ways.

8. Conclusion

By now you should have a great understanding of how multithreading works in Java. You should also be familiar with the most common classes used to make the most out of multithreading. In addition, you should be able to identify the most common multithread issues that arise when dealing with multiple threads. You can find the source code on our GitHub page.

Related Posts