Javascript required
Skip to content Skip to sidebar Skip to footer

How to Get Pool Clear Again Naturally

Java - Thread Pools

[Last Updated: Jul 14, 2017]

A thread pool is the collection of pre-instantiated standby threads which are ready to execute application level tasks.


Why thread pools are important?

Thread pools improve operation by running multiple tasks simultaneously, and at the same fourth dimension they prevent the time and memory overhead incur during thread creation.

For example a web server instantiates thread pool at the get-go upwardly and then that it won't be spending fourth dimension creating threads when customer requests come in.

As compared to creating thread per task, thread pools avoid running out of resource (processors, cores, memory etc) by avoiding unlimited thread cosmos at a time. After creating certain number of threads, they typically put the extra tasks in a waiting queue till a thread is available for a new task.

Let'due south empathize the full general concepts of thread pool before we go to the thread pool implementation in Java Executor framework.


Number of threads (N)

This is the number of threads (Due north) instances currently in the pool. This number gradually increases as more chore are submitted to the pool.

Threads are usually created on demand equally response to the new tasks submission to the puddle. Total number of the threads is equal to the currently active threads plus the idle threads. Idle threads are the one which are done executing the assigned task and are not currently active. A task will be eventually assigned again to an idle thread or the idle thread might exist removed from the pool if it remains inactive for a long time.

Note that a configuration parameter tin be set to pre-initialized specified number of threads beforehand instead of creating them on demand.



Core puddle size (C)

This configuration parameter defines how many threads can at nigh be created per pool for the incoming tasks earlier the new tasks go to the queue.

This number can be fixed if the application knows beforehand that how many task volition be submitted at a fourth dimension. It basically a way to avoid creating as well many threads and to avoid the application failure due to a lot of hardware resources consumption. The application can also adjust a runtime strategy to dynamically alter this limit, depending on how many processors/core are currently bachelor and how much retentiveness is bachelor for the awarding.


Queuing

At some point, if new task submission exceeds core pool size (C) then a queue is used for the extra tasks.

If full tasks submitted to the pool are T, and C of them are currently beingness executed by the threads then remaining tasks (T-C, assuming T>C) have to expect in the queue for the currently running tasks to cease and take their places 1 past ane.


Bounded vs Unbounded queues

Queues tin exist bounded (having a predefined chapters) or unbounded (without a predefined capacity).

What happens if number of queued tasks exceeds beyond the capacity of a bounded queue?
At that place might be ii logical approaches there: either extra tasks should be rejected at this point or new threads should exist allowed to exist created again. Now if new threads are allowed once again then unlimited job submission may once again bring about the resource crunch and eventually application might fail. Well at least Java provides a solution to that problem (We will see Coffee implementation shortly in following sections), the solution is to provide another limiting parameter, known as pool maximum size (G) for the new threads cosmos. Once this limit is reached, more task submission will be rejected for thread assignment.


Maximum pool size (G)

This is the maximum number of threads instances created for the submitted tasks. This can dynamically be changed by the application.


No effect of G when using unbounded queues

An unbounded queue will grow infinitely without reaching the point to consider maximum pool size (G). That means the value of the M doesn't take any issue in that case. A job will never be rejected in the unbounded queue.


Trade off between M and divisional queues

Using high capacity bounded queues and small-scale number of cadre threads (C) minimizes CPU usage, OS resources, and context-switching overhead, but will yield low throughput.

Apply of low chapters bounded queues by and large requires larger pool sizes, which increases CPUs usage. In this instance if threads employ shared data then probably we have to utilize locks which also decreases throughput.


Idle threads

Later it'south creation and running one or more tasks, a thread can become idle if it's not currently engaged in running a chore. Depending on the application requirements, if a thread is idle for a long time, it's ameliorate to remove it from the pool to repossess resources. A thread puddle implementation should provide configuration parameters to specify whether idle threads should be removed from the pool or not and also what should exist the timeout for the removal.



Coffee Thread Puddle implementations



ThreadPoolExecutor class

This class provides a very versatile implementation to create broad range of thread pools with the help of supplied parameters at constructions fourth dimension. The most of those parameters tin can exist changed dynamically every bit well.

Let's examine the dissimilar constructors of this form:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit of measurement, BlockingQueue<Runnable> workQueue)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit of measurement, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize :

Aforementioned as C as described higher up.

For N <= C (corePoolSize) , the idle threads are not assigned the new incoming job, instead new threads are created.

For N > C (corePoolSize) and if in that location are some idle threads then new task is assigned there, otherwise they are put into the queue.

getPoolSize() gives current value of Due north.

getCorePoolSize() gives the value of C (corePoolSize).

This value may likewise exist changed dynamically by using setCorePoolSize(int)


maximumPoolSize:

Same as Grand described above.

This value can be changed dynamically by using setMaximumPoolSize(int corePoolSize)


keepAliveTime:

When N > C (corePoolSize) then idle threads are monitored for how long they have been idle. On crossing keepAliveTime, idle threads are terminated (removed from the puddle).

The method allowCoreThreadTimeOut(boolean) can be used to apply this timeout policy to core threads as well.

This parameter provides a means of reducing resource consumption when the pool is non being actively used. If the pool becomes more agile again after, new threads volition be constructed.

Value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from always terminating prior to shut down.

In that location'southward a setter for this holding too: setKeepAliveTime(long time, TimeUnit unit)


unit of measurement:

the time unit for the keepAliveTime parameter.


workQueue

The queue instance which has to be BlockingQueue.

Every bit mentioned above queues tin be bounded (east.g. ArrayBlockingQueue) or information technology tin can exist unbounded (e.g. LinkedBlockingQueue).

There's a third option, i.eastward. use direct mitt-offs queue e.g. SynchronousQueue which immediately attempts to assign a task to an available thread. If no thread is bachelor because maximum pool size (M) has been reached, and then task will be rejected. There's no much meaning with corePoolSize (C) with this option east.g. if corePoolSize = 2 and maximumPoolSize = v then queue Size will ever return 0 considering the job is never held in the queue. getCorePoolSize() will ever return ii but it doesn't have whatever effect. Later reaching five (the maximum limit) the new task will exist reject.

Nosotros are costless to utilize whatsoever implementation of the blocking queue, whatever is suitable for our need. Please explore all possible queue implementations here


handler

An instance of RejectedExecutionHandler to use when the tasks are submitted beyond the puddle maximum size. This handler takes care of what action needs to exist taken for new tasks submissions at this bespeak.

RejectedExecutionHandler interface:

            package java.util.concurrent;     public interface RejectedExecutionHandler {       void rejectedExecution(Runnable r, ThreadPoolExecutor executor);    }

Besides providing our own instance of this interface, ThreadPoolExecutor provides iv out of the box nested implementation to exist used:

  • ThreadPoolExecutor.AbortPolicy: This is the default rejection policy. It throws RejectedExecutionException:
                    public static class AbortPolicy implements RejectedExecutionHandler {            public void rejectedExecution(Runnable r, ThreadPoolExecutor eastward) {             throw new RejectedExecutionException("Job " + r.toString() +                                                  " rejected from " +                                                  e.toString());           }         }
  • ThreadPoolExecutor.CallerRunsPolicy: Instead of rejecting the task, information technology is attempted to run in the caller thread:
                    public static class CallerRunsPolicy implements RejectedExecutionHandler {          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {             if (!e.isShutdown()) {                 r.run();             }         }     }              

    Using this volition apparently slow downwardly the rate that new tasks are submitted.


  • ThreadPoolExecutor.DiscardPolicy: The task is quietly rejected:
                    public static class DiscardPolicy implements RejectedExecutionHandler {            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            }         }

  • ThreadPoolExecutor.DiscardOldestPolicy: Information technology removes the oldest task in the queue and executes the new i right away instead of rejecting it:
                    public static course DiscardOldestPolicy implements RejectedExecutionHandler {             public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {              if (!eastward.isShutdown()) {                 e.getQueue().poll();                 e.execute(r);            }           }         }

threadFactory

This parameter provides a factory of coffee.lang.Thread creation for the pool. This interface has only 1 method equally shown:

            package java.util.concurrent;   public interface ThreadFactory {     Thread newThread(Runnable var1);  }          

If not used, Executors.defaultThreadFactory() is used which creates all threads having NORM_PRIORITY, are non-daemon and belong to the aforementioned ThreadGroup.

Other useful methods


int prestartAllCoreThreads()

Calling this method overrides the default policy of on-demand cosmos of thread instances. The threads get-go in idle mode and await for the chore consignment. This method returns number of threads which actually started.

Calling boolean prestartCoreThread() volition first only i thread. This method returns false if all core threads take already been started

Example

Allow's meet how different queues work with ThreadPoolExecutor:

public class ThreadPoolExecutorExample {      public static void main (Cord[] args) {         createAndRunPoolForQueue(new ArrayBlockingQueue<Runnable>(iii), "Divisional");         createAndRunPoolForQueue(new LinkedBlockingDeque<>(), "Unbounded");         createAndRunPoolForQueue(new SynchronousQueue<Runnable>(), "Direct hand-off");     }      individual static void createAndRunPoolForQueue (BlockingQueue<Runnable> queue,                                                                       String msg) {         System.out.println("---- " + msg + " queue instance = " +                                                   queue.getClass()+ " -------------");          ThreadPoolExecutor e = new ThreadPoolExecutor(ii, v, Long.MAX_VALUE,                                  TimeUnit.NANOSECONDS, queue);          for (int i = 0; i < 10; i++) {             try {                 e.execute(new Task());             } catch (RejectedExecutionException ex) {                 System.out.println("Task rejected = " + (i + ane));             }             printStatus(i + 1, e);         }          e.shutdownNow();          System.out.println("--------------------\n");     }      individual static void printStatus (int taskSubmitted, ThreadPoolExecutor e) {         StringBuilder southward = new StringBuilder();         s.append("poolSize = ")          .append(e.getPoolSize())          .suspend(", corePoolSize = ")          .append(e.getCorePoolSize())          .append(", queueSize = ")          .append(e.getQueue()                   .size())          .append(", queueRemainingCapacity = ")          .append(eastward.getQueue()                   .remainingCapacity())          .append(", maximumPoolSize = ")          .append(e.getMaximumPoolSize())          .suspend(", totalTasksSubmitted = ")          .suspend(taskSubmitted);          System.out.println(s.toString());     }      individual static class Task implements Runnable {          @Override         public void run () {             while (true) {                 try {                     Thread.sleep(meg);                 } catch (InterruptedException east) {                     break;                 }             }         }     } }          

Output

---- Bounded queue example = class java.util.concurrent.ArrayBlockingQueue ------------- poolSize = ane, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = five, totalTasksSubmitted = 1 poolSize = 2, corePoolSize = ii, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = v, totalTasksSubmitted = two poolSize = two, corePoolSize = 2, queueSize = ane, queueRemainingCapacity = ii, maximumPoolSize = v, totalTasksSubmitted = 3 poolSize = 2, corePoolSize = ii, queueSize = 2, queueCapacity = 1, maximumPoolSize = 5, totalTasksSubmitted = four poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5 poolSize = 3, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6 poolSize = 4, corePoolSize = 2, queueSize = three, queueRemainingCapacity = 0, maximumPoolSize = v, totalTasksSubmitted = 7 poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = five, totalTasksSubmitted = 8 Task rejected = 9 poolSize = 5, corePoolSize = ii, queueSize = three, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9 Task rejected = ten poolSize = five, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = ten --------------------  ---- Unbounded queue instance = class java.util.concurrent.LinkedBlockingDeque ------------- poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = i poolSize = 2, corePoolSize = ii, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = v, totalTasksSubmitted = 2 poolSize = 2, corePoolSize = ii, queueSize = 1, queueRemainingCapacity = 2147483646, maximumPoolSize = 5, totalTasksSubmitted = 3 poolSize = two, corePoolSize = 2, queueSize = two, queueRemainingCapacity = 2147483645, maximumPoolSize = 5, totalTasksSubmitted = 4 poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 2147483644, maximumPoolSize = 5, totalTasksSubmitted = 5 poolSize = 2, corePoolSize = 2, queueSize = four, queueRemainingCapacity = 2147483643, maximumPoolSize = 5, totalTasksSubmitted = six poolSize = 2, corePoolSize = ii, queueSize = 5, queueRemainingCapacity = 2147483642, maximumPoolSize = 5, totalTasksSubmitted = 7 poolSize = ii, corePoolSize = 2, queueSize = 6, queueRemainingCapacity = 2147483641, maximumPoolSize = 5, totalTasksSubmitted = eight poolSize = 2, corePoolSize = ii, queueSize = 7, queueRemainingCapacity = 2147483640, maximumPoolSize = 5, totalTasksSubmitted = 9 poolSize = 2, corePoolSize = 2, queueSize = eight, queueRemainingCapacity = 2147483639, maximumPoolSize = v, totalTasksSubmitted = ten --------------------  ---- Straight manus-off queue instance = class java.util.concurrent.SynchronousQueue ------------- poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = five, totalTasksSubmitted = one poolSize = ii, corePoolSize = ii, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 2 poolSize = 3, corePoolSize = ii, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 3 poolSize = 4, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 4 poolSize = v, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = v, totalTasksSubmitted = 5 Task rejected = 6 poolSize = v, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = v, totalTasksSubmitted = 6 Job rejected = vii poolSize = v, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = five, totalTasksSubmitted = 7 Chore rejected = 8 poolSize = 5, corePoolSize = two, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8 Task rejected = ix poolSize = v, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9 Task rejected = 10 poolSize = 5, corePoolSize = two, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = five, totalTasksSubmitted = 10 --------------------   Procedure finished with get out code 0          

Executors factory methods for creating thread pools

Executors method Equivalent instantiation of
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,           TimeUnit unit, BlockingQueue<Runnable> workQueue)
newCachedThreadPool()
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,                                             new SynchronousQueue<Runnable>());
Directly hand-off queue.
The main advantage of using this thread pool is to reuse idle threads. Tasks are never blocked equally this puddle has a hand-off queue. The thread creation straight expect to go to maximum pool size with is max integer value (virtually unlimited) in this case. After timeout of idle threads (which is 60 secs), they are removed from the puddle, so effectively reducing puddle size and releasing hardware resource. lx secs of time out looks reasonable as new tasks accept good opportunity to reuse idle (or as the method name implies 'buried') threads.
newFixedThreadPool(int nThreads)
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,                                             new LinkedBlockingQueue<Runnable>());
Unbounded queue with both core and maximum pool sizes are equal to the provided nThread. The total threads instances will never go beyond nThread size. The queue can grow endlessly if a lot of tasks get in at the same time.
newSingleThreadExecutor()
new ThreadPoolExecutor(1, i, 0L, TimeUnit.MILLISECONDS,                                              new LinkedBlockingQueue<Runnable>())
Creates a puddle with unbounded queue and with core and maximum puddle sizes equal to 1. That ways only i thread will be agile at a fourth dimension. This pool executes tasks one at a time (sequentially). It is equivalent to creating a pool using Executors#newFixedThreadPool(1)

Note each above method of Executors class has corresponding overloaded variant with ThreadFactory parameter.


What's next?

In next tutorials we will explore Scheduled and work stealing pools

Example Projection

Dependencies and Technologies Used:

  • JDK 1.8
  • Maven iii.0.four

  • thread-pools-example
    • src
      • main
        • java
          • com
            • logicbig
              • case
                  • ThreadPoolExecutorExample.coffee

        hillierscia1952.blogspot.com

        Source: https://www.logicbig.com/tutorials/core-java-tutorial/java-multi-threading/thread-pools.html