top of page

Task Execution — Java Concurrency

Ideally, tasks are independent activities, work that doesn’t depend on the state, result, or side effects of other tasks. Independency helps in concurrency, as independent tasks can be executed parallelly.



Application server should have the following properties

  • High throughput ( Supports as many users as possible to reduce the per-user provisioning cost)

  • Good responsiveness ( User want their response as quickly as possible)

The application should show graceful degradation rather than failing at a high load. Good Task Boundry + Best execution policy can help in achieving these goals.


Most of the server application offers a natural task boundary is Individual request.

  • Webserver, mail server, file server, containers, and database all accept the request via a network connection from the remote client.

  • Individual request size offers both task independence and appropriate task sizing.

  • The processing of one request doesn’t impact the other requests.

  • Handling a single request requires a very small capacity of the server.


Executing task Sequentially


class SingleThreadApp {
   public static void main(String[] args) {      
        ServerSocket socket = new ServerSocket(80);      
        while(true) {         
                Socket connection = socket.accept();                         
                handleRequest(connection);      
        }   
    }
}

  • If handleRequest takes a long time, then all other requests would be blocked meanwhile.

  • If handleRequest is making some I/O operation, then the CPU will be idle meanwhile.

  • This approach rarely provides good throughput or good responsiveness.


Explicitly create a thread for the task


class ThreadPerTaskApp {  
    public static void main(String[] args) {    
        ServerSocket socketServer = new ServerSocket(80);    
        while(true) {      
                final Socket connection = socketServer.accept();      
                Runnable task = new Runnable() {        
                public void run() {          
                       handleRequest(connection);        
                }      
        }      
        new Thread(task).start();   
    }  
}}

  • Each request will be executed in a different thread. Processing is offloaded from the main thread ( which is responsible for accepting the connections) — It increases the responsiveness of the application.

  • Execute the task in parallelly — It increases the throughput of the application.

  • Task code should be thread-safe as it will be executed by the multiple threads concurrently.

  • Over the moderate load, this approach is better than the sequential execution.


The disadvantage of unbounded thread creation:


Thread lifecycle overhead

  • Thread creation and teardown is not free. It requires some processing activities from the OS and JVM. If we create a thread for each request, it will add latency to the processing and consume a good amount of the resources.


Resource consumption

  • Active thread consumes the system resources, especially memory. When there is more thread than the number of the processors, thread sits idle. A large number of idle threads have an impact on the memory, impact on GC, many threads competing for the CPU has performance issue as well.

  • If you have enough threads to keep the CPU busy, creating more thread won’t help it even hurt more.


Stability

  • There is a limit on how many threads can be created.

  • This limit depends on the platform, JVM, and a few other parameters.

  • Once this limit is reached, the application will result in OutOfMemoryError.

  • It’s risky to recover from such error, instead, we should have a better planning of the resources.


EXECUTOR FRAMEWORK

  • Tasks are a logical unit of work.

  • Threads are mechanisms by which tasks are executed asynchronously.

  • It uses a bounded queue to prevent an overloaded application from running out of memory.

  • Primary abstraction for the execution of java class in Executor.

public interface Executor {     
    void execute(Runnable command);
}
  • It is simple though the powerful framework for async task execution.

  • It decouples the task execution from the task submission.

  • An executor is based on the producer-consumer pattern

  • Activities that submit task are the producer

  • Threads that execute the task are the consumers.


public class ExecutorApp {  
    public static final int NTHREADS = 100;    
    private static final Executor executor =         Executors.newFixedThreadPool(NTHREADS);  
    public static void main(String[] args) {    
        SocketServer socketServer = new SocketServer(80);    
        while(true) {       
                final Socket connection = socketServer.accept();       
                Runnable task = new Runnable() {          
                public void run() {             
                      handleRequest(connection);          
                }       
        }       
        executor.execute(task);    
    }  
}}


EXECUTION POLICY

  • An execution policy specifies the “What, Where, When and how” of the task execution.


Questions to ask

  • In what thread will tasks be executed?

  • In what order should the tasks be executed ( FIFO, LIFO, priority order)

  • How many tasks may execute concurrently?

  • How many tasks may be queued pending execution?

  • If a task has to be rejected because the system is overloaded, which task should be selected as the victim and how should the application be notified.

  • What action should be taken before and after the task execution?


THREAD POOLS

  • A thread pool manages a homogenous pool of worker threads.

  • Thread pool is tightly bound

  • Reusing an existing thread prevents the cost of creating/tearing down a thread.


You can create a thread pool by using one of the following methods:

  • newFixedThreadPool: Create a thread up to the maximum size specified.

  • newCacheThreadPool: Flexible thread pool without any bound on the maximum number. If the required thread number exceeds the available thread, it creates the new thread and keeps it in the cache so that it can be used next time.

  • newSingleThreadPool: Create only one worker thread, an recreates if it dies unexpectedly.

  • newScheduledThreadPool: A fixed-size thread pool that supports delayed and periodic jobs.


EXECUTOR CYCLE

  • Executor process tasks in an async manner, at any time state of the previously submitted task, is not obvious.

  • Some are in a completed state, some are running and others are waiting.


Shutdowns

  • Graceful shutdown: Finish what you have started but don’t accept the new connection.

  • Abrupt Shutdown: Turn off the power to the machine.

  • Executor expose a few methods for the lifecycle management of the Executor

public interface ExecutorService extends Executor {  
    void shutdown();  
        List<Runnable> shutdownNow();  
        boolean isShutdown();  
        boolean isTerminated();  
        boolean awaitTermination(long timeout, TimeUnit unit)  
            throws InterruptedException;  
        // … additiona convenicence methods for task submission}

Executor service has three states

  • Running

  • Shutting

  • Terminated


States description

  • Initially, Executor Service starts in running state.

  • The shutdown method brings it to the graceful shutdown state.

  • Shutdown now is abrupt shutdown.

  • After shutdown, the task to the executor service is being handled by the rejected execution handler, which might discard the task or will throw the exception. — RejectedExecutionException.

  • Once all tasks are completed it will move to the terminated state. You can wait for the Executor service to reach the terminated state with awaitTermination method or poll for a terminated state with isTerminated

public class ExecutorLifeCycle {  
  private final ExecutorService executor =     
          Executors.newFixedThreadPool(10);  
  public void start() throws IOException {    
    ServerSocket serverSocket = new ServerSocket(80);      
    while(!executor.isShutdown()) {      
      try {        
        final Socket connection = serverSocket.accept();        
        executor.execute(new Runnable() {           
          @Override           
          public void run() {            
          handleRequest(connection);           
        }       
      });     
      } 
      catch (RejectedExecutionException e) {       
        if (!executor.isShutdown()) {         
            log(“Task submission rejectected.”, e);       
        }     
      }   
   } 
} 

public void stop() {    
  executor.shutdown(); 
} 

public void handleRequest(Socket connection) {   
  Request request = readRequest(connection);   
  if (isShutdownRequest(request)) {     
    stop();   
  } 
  else {     
    dispatchRequest(request);   
  }
  }
}


Source: Medium


The Tech Platform

0 comments

Comments


bottom of page