Thursday, January 27, 2011

Spring – Oracle Connection Pool Configuration

Spring – Oracle Connection Pool Configuration

This is basic data source (Only testing)
<bean id="datasource1"
  class="org.springframework.jdbc.datasource.DriverManagerDataSource"> 
     <property name="driverClassName"> 
            <value>oracle.jdbc.driver.OracleDriver</value> 
     </property> 
     <property name="url"> 
<value>ORACLE URL</value> 
     </property> 
     <property name="username"> 
<value>user id</value> 
     </property> 
     <property name="password"> 
<value>user password</value> 
     </property> 
 </bean>

This is dbcp data source (Preferred for Testing.)

<bean id="datasource2"
class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">    <property name="driverClassName"> 
          <value>org.apache.commons.dbcp.BasicDataSource</value> 
 </property> 
 <property name="url"> 
          <value>ORACLE URL</value> 
 </property> 
 <property name="username"> 
          <value>user id</value> 
</property> 
<property name="password"> 
          <value>user password</value> 
</property> 
<property name="initialSize" value="5"/> 
<property name="maxActive" value="20"/> 
</bean>

This is Oracle Connection Pool (Production Quality)

<bean id="connectionPool1" class="oracle.jdbc.pool.OracleDataSource" destroy-method="close"> 
 <property name="connectionCachingEnabled" value="true" />
<property name="connectionCacheName" value="pool1" /> 
 <property name="URL"> 
          <value>ORACLE URL</value> 
 </property> 
 <property name="user"> 
          <value>user id</value> 
</property> 
<property name="password"> 
          <value>user password</value> 
</property> 
<property name="connectionCacheProperties"> 
       <value> 
         MinLimit:1 
         MaxLimit:5 
         InitialLimit:1 
         ConnectionWaitTimeout:120 
         InactivityTimeout:180 
         ValidateConnection:true 
       </value> 
</property> 
</bean>

**Oracle Connection Pool is better than DBCP
  • To create pool you must set connectionCachingEnabled=true, otherwise the datasource would be serving physical connections
  • Always set connectionCacheName for each datasource. And if you want separate datasources to be separate connection pools those names have to be unique. This is because Oracle implementation uses static singleton to manage "named" pools. connectionCacheName property references one of those "named" pools. So if two datasources have the same connectionCacheName they would actually be the using the same pool instance. If connectionCacheName is left blank, Oracle would generate a pseudo-random name and would create a new pool. The problem comes when you reload the application. If connectionCacheName is blank Oracle would create a new pool every time application is reloaded thus leaking resources.
  • connectionCacheProperties do have to be specified the way they are in the example.
  • Unlike DBCP, Oracle pool can check if connection is alive without running a SQL through it.
  • Unlike DBCP, Oracle pool sticks to connection pool limits and timeouts.
  • You can take advantage of Oracle proprietary failover mechanisms

Wednesday, January 26, 2011

Threads - Multithreading Models

1.     Re-using the threads to process the jobs.

import java.util.Hashtable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadModel1 {

      private static final int NUM_OF_THREADS = 50;
      private static BlockingQueue<String> bq = null;

      private String getNextJobId() {
                  System.out.println("remaining job ids to process..." + bq.size());
                  return bq.poll();
      }

      public void processExtraction() {
                  try {

                              bq = new LinkedBlockingQueue<String>();
                              for (int i = 0; i < 1000; i++) {
                                          bq.offer("E-" + i);
                              }
                              System.out.println("No of job ids in queue for processing... " + bq.size());

                              Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
                              for (int i = 0; i < NUM_OF_THREADS; i++) {
                                          ProcessThread det = new ProcessThread("Thread_" + i);
                                          threadMap.put("Thread_" + i, det);
                                          det.start();
                              }

                              for (;;) {
                                          for (int i = 0; i < NUM_OF_THREADS; i++) {
                                                      ProcessThread thread = threadMap.get("Thread_" + i);
                                                      if (thread != null && !thread.isAlive()) {
                                                                  System.out.println("removing the thread : " + thread.threadName
                                                                                          + ", size of the thread map : " + threadMap.size());
                                                                  threadMap.remove(thread.getThreadName());
                                                                  System.out.println("size of the running threads : " + threadMap.size());
                                                      }

                                                      if (threadMap.isEmpty()) {
                                                                  System.out
                                                                                          .println("no threads are running to process, exiting the main thread.....");
                                                                  return;
                                                      }
                                          }

                                          try {
                                                      System.out.println("Waiting for the child threds to complete.....");
                                                      Thread.sleep(60000);
                                          } catch (InterruptedException ie) {
                                          }
                              }

                  } catch (Exception e) {
                              e.printStackTrace();
                              System.exit(1);
                              return;
                  }
      }

      class ProcessThread extends Thread {

                  private String threadName = null;

                  ProcessThread(String threadName) {
                              this.threadName = threadName;
                  }

                  public String getThreadName() {
                              return threadName;
                  }

                  public void setThreadName(String threadName) {
                              this.threadName = threadName;
                  }

                  public void run() {

                              try {
                                          for (;;) {
                                                      String jobId = getNextJobId();
                                                      System.out.println("Processing  job id : " + jobId);

                                                      if (jobId == null) {
                                                                  System.out.println("No job ids exists for processing, exiting the thread : "
                                                                                          + threadName);
                                                                  return;
                                                      }

                                                      try {
                                                                  Thread.sleep(1000);
                                                      } catch (InterruptedException ie) {
                                                      }
                                          }
                              } catch (Exception e) {
                                          e.printStackTrace();
                                          System.exit(1);
                                          return;
                              }
                  }
      }

      public static void main(String[] args) {
                  new ThreadModel1().processExtraction();
      }
}

2.     Creating the new threads to process jobs.

import java.util.Hashtable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadModel2 {

      private static final int NUM_OF_THREADS = 50;

      private static BlockingQueue<String> bq = null;

      private String getNextJobId() {
                  System.out.println("remaining job ids to process..." + bq.size());
                  return bq.poll();
      }

      public void processExtraction() {
                  try {

                              bq = new LinkedBlockingQueue<String>();
                              for (int i = 0; i < 1000; i++) {
                                          bq.offer("E-" + i);
                              }
                              System.out.println("No of job ids in queue for processing... " + bq.size());

                              Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
                              String id = null;

                              for (;;) {
                                          for (int i = 0; i < NUM_OF_THREADS; i++) {

                                                      ProcessThread thread = threadMap.get("Thread_" + i);
                                                      if (thread == null && (id = getNextJobId()) != null) {
                                                                  System.out
                                                                                          .println("thread doesn't exists, creating the new thread to process new id.");
                                                                  ProcessThread det = new ProcessThread("Thread_" + i, id);
                                                                  threadMap.put("Thread_" + i, det);
                                                                  det.start();
                                                      } else if (thread != null && !thread.isAlive() && (id = getNextJobId()) != null) {
                                                                  System.out
                                                                                          .println("thread is completed, replacing it with new thread to process new id.");
                                                                  ProcessThread det = new ProcessThread("Thread_" + i, id);
                                                                  threadMap.put("Thread_" + i, det);
                                                                  det.start();
                                                      } else {
                                                                  System.out
                                                                                          .println("processing of all jobs are completed, removing the thread : "
                                                                                                                  + thread.getName());
                                                                  threadMap.remove("Thread_" + i);
                                                      }

                                                      if (threadMap.isEmpty()) {
                                                                  System.out
                                                                                          .println("no threads are running to process, exiting the main thread.....");
                                                                  return;
                                                      }
                                          }

                                          try {
                                                      System.out.println("Waiting for the child threds to complete.....");
                                                      Thread.sleep(10000);
                                          } catch (InterruptedException ie) {
                                          }
                              }

                  } catch (Exception e) {
                              e.printStackTrace();
                              System.exit(1);
                              return;
                  }
      }

      class ProcessThread extends Thread {

                  private String threadName = null;
                  private String idToProcess = null;

                  ProcessThread(String threadName, String id) {
                              this.threadName = threadName;
                              this.idToProcess = id;
                  }

                  public String getThreadName() {
                              return threadName;
                  }

                  public void setThreadName(String threadName) {
                              this.threadName = threadName;
                  }

                  public String getIdToProcess() {
                              return idToProcess;
                  }

                  public void setIdToProcess(String idToProcess) {
                              this.idToProcess = idToProcess;
                  }

                  public void run() {

                              System.out.println("Processing Id : " + idToProcess);

                              // can place the required code here..
                              try {
                                          Thread.sleep(1000);
                              } catch (InterruptedException ie) {
                              }
                  }
      }

      public static void main(String[] args) {
                  new ThreadModel2().processExtraction();
      }
}

3.     Re-using the threads to process the jobs with traditional Semaphores.

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;

public class ThreadModel3 {

      private static final int NUM_OF_THREADS = 50;

      private static List<String> list = null;
      private static int listIndex = 0;
      private Semaphore lock = new Semaphore();

      private String getNextJobId() {
                  System.out.println("remaining job ids to process..." + (list.size() - listIndex));
                  String jobId = null;
                  try {
                              if (listIndex < list.size()) {
                                          lock.acquire();
                                          jobId = list.get(listIndex);
                                          ++listIndex;
                                          lock.release();
                              }
                  } catch (InterruptedException ie) {

                  }
                  return jobId;
      }

      public void processExtraction() {
                  try {

                              // TODO
                              // replaced with DB call...
                              list = new ArrayList<String>();
                              for (int i = 0; i < 1000; i++) {
                                          list.add("E-" + i);
                              }

                              System.out.println("No of Id's in queue for processing... " + list.size());

                              Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
                              for (int i = 0; i < NUM_OF_THREADS; i++) {
                                          ProcessThread det = new ProcessThread("Thread_" + i);
                                          threadMap.put("Thread_" + i, det);
                                          det.start();
                              }

                              for (;;) {
                                          for (int i = 0; i < NUM_OF_THREADS; i++) {
                                                      ProcessThread thread = threadMap.get("Thread_" + i);
                                                      if (thread != null && !thread.isAlive()) {
                                                                  System.out.println("removing the thread : " + thread.threadName
                                                                                          + ", size of the thread job : " + threadMap.size());
                                                                  threadMap.remove(thread.getThreadName());
                                                                  System.out.println("size of the running threads : " + threadMap.size());

                                                      }

                                                      if (threadMap.isEmpty()) {
                                                                  System.out
                                                                                          .println("no threads are running to process, exiting the main thread.....");
                                                                  return;
                                                      }
                                          }

                                          try {
                                                      System.out.println("Waiting for the child threds to complete.....");
                                                      Thread.sleep(60000);
                                          } catch (InterruptedException ie) {
                                          }
                              }

                  } catch (Exception e) {
                              e.printStackTrace();
                              System.exit(1);
                              return;
                  }
      }

      class ProcessThread extends Thread {

                  private String threadName = null;

                  ProcessThread(String threadName) {
                              this.threadName = threadName;
                  }

                  public String getThreadName() {
                              return threadName;
                  }

                  public void setThreadName(String threadName) {
                              this.threadName = threadName;
                  }

                  public void run() {

                              try {
                                          for (;;) {
                                                      String jobId = getNextJobId();
                                                      System.out.println("Processing job Id : " + jobId);

                                                      if (jobId == null) {
                                                                  System.out.println("No job ids exists for processing, exiting the thread : "
                                                                                          + threadName);
                                                                  return;
                                                      }

                                                      try {
                                                                  Thread.sleep(1000);
                                                      } catch (InterruptedException ie) {
                                                      }
                                          }
                              } catch (Exception e) {
                                          e.printStackTrace();
                                          System.exit(1);
                                          return;
                              }
                  }
      }

      class Semaphore {

                  private long count = 1;

                  public synchronized void acquire() throws InterruptedException {
                              if (count == 0)
                                          wait();

                              --count;
                  }

                  public synchronized void release() throws InterruptedException {
                              ++count;
                              notify();
                  }
      }

      public static void main(String[] args) {
                  new ThreadModel3().processExtraction();
      }
}