1. Re-using the threads to process the jobs.
import java.util.Hashtable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadModel1 {
private static final int NUM_OF_THREADS = 50;
private static BlockingQueue<String> bq = null;
private String getNextJobId() {
System.out.println("remaining job ids to process..." + bq.size());
return bq.poll();
}
public void processExtraction() {
try {
bq = new LinkedBlockingQueue<String>();
for (int i = 0; i < 1000; i++) {
bq.offer("E-" + i);
}
System.out.println("No of job ids in queue for processing... " + bq.size());
Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
for (int i = 0; i < NUM_OF_THREADS; i++) {
ProcessThread det = new ProcessThread("Thread_" + i);
threadMap.put("Thread_" + i, det);
det.start();
}
for (;;) {
for (int i = 0; i < NUM_OF_THREADS; i++) {
ProcessThread thread = threadMap.get("Thread_" + i);
if (thread != null && !thread.isAlive()) {
System.out.println("removing the thread : " + thread.threadName
+ ", size of the thread map : " + threadMap.size());
threadMap.remove(thread.getThreadName());
System.out.println("size of the running threads : " + threadMap.size());
}
if (threadMap.isEmpty()) {
System.out
.println("no threads are running to process, exiting the main thread.....");
return;
}
}
try {
System.out.println("Waiting for the child threds to complete.....");
Thread.sleep(60000);
} catch (InterruptedException ie) {
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
return;
}
}
class ProcessThread extends Thread {
private String threadName = null;
ProcessThread(String threadName) {
this.threadName = threadName;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public void run() {
try {
for (;;) {
String jobId = getNextJobId();
System.out.println("Processing job id : " + jobId);
if (jobId == null) {
System.out.println("No job ids exists for processing, exiting the thread : "
+ threadName);
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
return;
}
}
}
public static void main(String[] args) {
new ThreadModel1().processExtraction();
}
}
2. Creating the new threads to process jobs.
import java.util.Hashtable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadModel2 {
private static final int NUM_OF_THREADS = 50;
private static BlockingQueue<String> bq = null;
private String getNextJobId() {
System.out.println("remaining job ids to process..." + bq.size());
return bq.poll();
}
public void processExtraction() {
try {
bq = new LinkedBlockingQueue<String>();
for (int i = 0; i < 1000; i++) {
bq.offer("E-" + i);
}
System.out.println("No of job ids in queue for processing... " + bq.size());
Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
String id = null;
for (;;) {
for (int i = 0; i < NUM_OF_THREADS; i++) {
ProcessThread thread = threadMap.get("Thread_" + i);
if (thread == null && (id = getNextJobId()) != null) {
System.out
.println("thread doesn't exists, creating the new thread to process new id.");
ProcessThread det = new ProcessThread("Thread_" + i, id);
threadMap.put("Thread_" + i, det);
det.start();
} else if (thread != null && !thread.isAlive() && (id = getNextJobId()) != null) {
System.out
.println("thread is completed, replacing it with new thread to process new id.");
ProcessThread det = new ProcessThread("Thread_" + i, id);
threadMap.put("Thread_" + i, det);
det.start();
} else {
System.out
.println("processing of all jobs are completed, removing the thread : "
+ thread.getName());
threadMap.remove("Thread_" + i);
}
if (threadMap.isEmpty()) {
System.out
.println("no threads are running to process, exiting the main thread.....");
return;
}
}
try {
System.out.println("Waiting for the child threds to complete.....");
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
return;
}
}
class ProcessThread extends Thread {
private String threadName = null;
private String idToProcess = null;
ProcessThread(String threadName, String id) {
this.threadName = threadName;
this.idToProcess = id;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public String getIdToProcess() {
return idToProcess;
}
public void setIdToProcess(String idToProcess) {
this.idToProcess = idToProcess;
}
public void run() {
System.out.println("Processing Id : " + idToProcess);
// can place the required code here..
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
public static void main(String[] args) {
new ThreadModel2().processExtraction();
}
}
3. Re-using the threads to process the jobs with traditional Semaphores.
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
public class ThreadModel3 {
private static final int NUM_OF_THREADS = 50;
private static List<String> list = null;
private static int listIndex = 0;
private Semaphore lock = new Semaphore();
private String getNextJobId() {
System.out.println("remaining job ids to process..." + (list.size() - listIndex));
String jobId = null;
try {
if (listIndex < list.size()) {
lock.acquire();
jobId = list.get(listIndex);
++listIndex;
lock.release();
}
} catch (InterruptedException ie) {
}
return jobId;
}
public void processExtraction() {
try {
// TODO
// replaced with DB call...
list = new ArrayList<String>();
for (int i = 0; i < 1000; i++) {
list.add("E-" + i);
}
System.out.println("No of Id's in queue for processing... " + list.size());
Hashtable<String, ProcessThread> threadMap = new Hashtable<String, ProcessThread>();
for (int i = 0; i < NUM_OF_THREADS; i++) {
ProcessThread det = new ProcessThread("Thread_" + i);
threadMap.put("Thread_" + i, det);
det.start();
}
for (;;) {
for (int i = 0; i < NUM_OF_THREADS; i++) {
ProcessThread thread = threadMap.get("Thread_" + i);
if (thread != null && !thread.isAlive()) {
System.out.println("removing the thread : " + thread.threadName
+ ", size of the thread job : " + threadMap.size());
threadMap.remove(thread.getThreadName());
System.out.println("size of the running threads : " + threadMap.size());
}
if (threadMap.isEmpty()) {
System.out
.println("no threads are running to process, exiting the main thread.....");
return;
}
}
try {
System.out.println("Waiting for the child threds to complete.....");
Thread.sleep(60000);
} catch (InterruptedException ie) {
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
return;
}
}
class ProcessThread extends Thread {
private String threadName = null;
ProcessThread(String threadName) {
this.threadName = threadName;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public void run() {
try {
for (;;) {
String jobId = getNextJobId();
System.out.println("Processing job Id : " + jobId);
if (jobId == null) {
System.out.println("No job ids exists for processing, exiting the thread : "
+ threadName);
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
return;
}
}
}
class Semaphore {
private long count = 1;
public synchronized void acquire() throws InterruptedException {
if (count == 0)
wait();
--count;
}
public synchronized void release() throws InterruptedException {
++count;
notify();
}
}
public static void main(String[] args) {
new ThreadModel3().processExtraction();
}
}