Wednesday, January 26, 2011

Threads - Multithreading Models

1.     Re-using the threads to process the jobs.

import java.util.Hashtable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadModel1 {

      private static final int NUM_OF_THREADS = 50;
      private static BlockingQueue<String> bq = null;

      private String getNextJobId() {
                  System.out.println("remaining job ids to process..." + bq.size());
                  return bq.poll();
      }

      public void processExtraction() {
                  try {

                              bq = new LinkedBlockingQueue<String>();
                              for (int i = 0; i < 1000; i++) {
                                          bq.offer("E-" + i);
                              }
                              System.out.println("No of job ids in queue for processing... " + bq.size());

                              Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
                              for (int i = 0; i < NUM_OF_THREADS; i++) {
                                          ProcessThread det = new ProcessThread("Thread_" + i);
                                          threadMap.put("Thread_" + i, det);
                                          det.start();
                              }

                              for (;;) {
                                          for (int i = 0; i < NUM_OF_THREADS; i++) {
                                                      ProcessThread thread = threadMap.get("Thread_" + i);
                                                      if (thread != null && !thread.isAlive()) {
                                                                  System.out.println("removing the thread : " + thread.threadName
                                                                                          + ", size of the thread map : " + threadMap.size());
                                                                  threadMap.remove(thread.getThreadName());
                                                                  System.out.println("size of the running threads : " + threadMap.size());
                                                      }

                                                      if (threadMap.isEmpty()) {
                                                                  System.out
                                                                                          .println("no threads are running to process, exiting the main thread.....");
                                                                  return;
                                                      }
                                          }

                                          try {
                                                      System.out.println("Waiting for the child threds to complete.....");
                                                      Thread.sleep(60000);
                                          } catch (InterruptedException ie) {
                                          }
                              }

                  } catch (Exception e) {
                              e.printStackTrace();
                              System.exit(1);
                              return;
                  }
      }

      class ProcessThread extends Thread {

                  private String threadName = null;

                  ProcessThread(String threadName) {
                              this.threadName = threadName;
                  }

                  public String getThreadName() {
                              return threadName;
                  }

                  public void setThreadName(String threadName) {
                              this.threadName = threadName;
                  }

                  public void run() {

                              try {
                                          for (;;) {
                                                      String jobId = getNextJobId();
                                                      System.out.println("Processing  job id : " + jobId);

                                                      if (jobId == null) {
                                                                  System.out.println("No job ids exists for processing, exiting the thread : "
                                                                                          + threadName);
                                                                  return;
                                                      }

                                                      try {
                                                                  Thread.sleep(1000);
                                                      } catch (InterruptedException ie) {
                                                      }
                                          }
                              } catch (Exception e) {
                                          e.printStackTrace();
                                          System.exit(1);
                                          return;
                              }
                  }
      }

      public static void main(String[] args) {
                  new ThreadModel1().processExtraction();
      }
}

2.     Creating the new threads to process jobs.

import java.util.Hashtable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadModel2 {

      private static final int NUM_OF_THREADS = 50;

      private static BlockingQueue<String> bq = null;

      private String getNextJobId() {
                  System.out.println("remaining job ids to process..." + bq.size());
                  return bq.poll();
      }

      public void processExtraction() {
                  try {

                              bq = new LinkedBlockingQueue<String>();
                              for (int i = 0; i < 1000; i++) {
                                          bq.offer("E-" + i);
                              }
                              System.out.println("No of job ids in queue for processing... " + bq.size());

                              Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
                              String id = null;

                              for (;;) {
                                          for (int i = 0; i < NUM_OF_THREADS; i++) {

                                                      ProcessThread thread = threadMap.get("Thread_" + i);
                                                      if (thread == null && (id = getNextJobId()) != null) {
                                                                  System.out
                                                                                          .println("thread doesn't exists, creating the new thread to process new id.");
                                                                  ProcessThread det = new ProcessThread("Thread_" + i, id);
                                                                  threadMap.put("Thread_" + i, det);
                                                                  det.start();
                                                      } else if (thread != null && !thread.isAlive() && (id = getNextJobId()) != null) {
                                                                  System.out
                                                                                          .println("thread is completed, replacing it with new thread to process new id.");
                                                                  ProcessThread det = new ProcessThread("Thread_" + i, id);
                                                                  threadMap.put("Thread_" + i, det);
                                                                  det.start();
                                                      } else {
                                                                  System.out
                                                                                          .println("processing of all jobs are completed, removing the thread : "
                                                                                                                  + thread.getName());
                                                                  threadMap.remove("Thread_" + i);
                                                      }

                                                      if (threadMap.isEmpty()) {
                                                                  System.out
                                                                                          .println("no threads are running to process, exiting the main thread.....");
                                                                  return;
                                                      }
                                          }

                                          try {
                                                      System.out.println("Waiting for the child threds to complete.....");
                                                      Thread.sleep(10000);
                                          } catch (InterruptedException ie) {
                                          }
                              }

                  } catch (Exception e) {
                              e.printStackTrace();
                              System.exit(1);
                              return;
                  }
      }

      class ProcessThread extends Thread {

                  private String threadName = null;
                  private String idToProcess = null;

                  ProcessThread(String threadName, String id) {
                              this.threadName = threadName;
                              this.idToProcess = id;
                  }

                  public String getThreadName() {
                              return threadName;
                  }

                  public void setThreadName(String threadName) {
                              this.threadName = threadName;
                  }

                  public String getIdToProcess() {
                              return idToProcess;
                  }

                  public void setIdToProcess(String idToProcess) {
                              this.idToProcess = idToProcess;
                  }

                  public void run() {

                              System.out.println("Processing Id : " + idToProcess);

                              // can place the required code here..
                              try {
                                          Thread.sleep(1000);
                              } catch (InterruptedException ie) {
                              }
                  }
      }

      public static void main(String[] args) {
                  new ThreadModel2().processExtraction();
      }
}

3.     Re-using the threads to process the jobs with traditional Semaphores.

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;

public class ThreadModel3 {

      private static final int NUM_OF_THREADS = 50;

      private static List<String> list = null;
      private static int listIndex = 0;
      private Semaphore lock = new Semaphore();

      private String getNextJobId() {
                  System.out.println("remaining job ids to process..." + (list.size() - listIndex));
                  String jobId = null;
                  try {
                              if (listIndex < list.size()) {
                                          lock.acquire();
                                          jobId = list.get(listIndex);
                                          ++listIndex;
                                          lock.release();
                              }
                  } catch (InterruptedException ie) {

                  }
                  return jobId;
      }

      public void processExtraction() {
                  try {

                              // TODO
                              // replaced with DB call...
                              list = new ArrayList<String>();
                              for (int i = 0; i < 1000; i++) {
                                          list.add("E-" + i);
                              }

                              System.out.println("No of Id's in queue for processing... " + list.size());

                              Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
                              for (int i = 0; i < NUM_OF_THREADS; i++) {
                                          ProcessThread det = new ProcessThread("Thread_" + i);
                                          threadMap.put("Thread_" + i, det);
                                          det.start();
                              }

                              for (;;) {
                                          for (int i = 0; i < NUM_OF_THREADS; i++) {
                                                      ProcessThread thread = threadMap.get("Thread_" + i);
                                                      if (thread != null && !thread.isAlive()) {
                                                                  System.out.println("removing the thread : " + thread.threadName
                                                                                          + ", size of the thread job : " + threadMap.size());
                                                                  threadMap.remove(thread.getThreadName());
                                                                  System.out.println("size of the running threads : " + threadMap.size());

                                                      }

                                                      if (threadMap.isEmpty()) {
                                                                  System.out
                                                                                          .println("no threads are running to process, exiting the main thread.....");
                                                                  return;
                                                      }
                                          }

                                          try {
                                                      System.out.println("Waiting for the child threds to complete.....");
                                                      Thread.sleep(60000);
                                          } catch (InterruptedException ie) {
                                          }
                              }

                  } catch (Exception e) {
                              e.printStackTrace();
                              System.exit(1);
                              return;
                  }
      }

      class ProcessThread extends Thread {

                  private String threadName = null;

                  ProcessThread(String threadName) {
                              this.threadName = threadName;
                  }

                  public String getThreadName() {
                              return threadName;
                  }

                  public void setThreadName(String threadName) {
                              this.threadName = threadName;
                  }

                  public void run() {

                              try {
                                          for (;;) {
                                                      String jobId = getNextJobId();
                                                      System.out.println("Processing job Id : " + jobId);

                                                      if (jobId == null) {
                                                                  System.out.println("No job ids exists for processing, exiting the thread : "
                                                                                          + threadName);
                                                                  return;
                                                      }

                                                      try {
                                                                  Thread.sleep(1000);
                                                      } catch (InterruptedException ie) {
                                                      }
                                          }
                              } catch (Exception e) {
                                          e.printStackTrace();
                                          System.exit(1);
                                          return;
                              }
                  }
      }

      class Semaphore {

                  private long count = 1;

                  public synchronized void acquire() throws InterruptedException {
                              if (count == 0)
                                          wait();

                              --count;
                  }

                  public synchronized void release() throws InterruptedException {
                              ++count;
                              notify();
                  }
      }

      public static void main(String[] args) {
                  new ThreadModel3().processExtraction();
      }
}

No comments:

Post a Comment