I will be using ThreadPoolTaskExecutor, Callable interface, Future Object from java.util.Concurrent package, task scheduled tag from Spring
My Requirement:
I need to have a scheduled processor which fetches huge data from database, on which I need to do a large processing and need to update the database with the process start and completion time. Processing the data can be done in parallel.
Solution:
For scheduling a task in Spring:
I wrote a simple POJO as follows:
@Service("myScheduler")
public class MyScheduler {
public void startProcess(){
}
}
Then in my spring configuration file (for starters, this is the applicationContext file)
< beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.0.xsd">
< util:properties id="myProperties" location="file:/C:/cron.properties" />
< task:scheduled-tasks>
cron="#{myProperties['scheduler.cron']}" />
< /task:scheduled-tasks>
< / beans>
Here I am getting the scheduling time as a cron expression from a properties file, cron.properties, which is located on the file system under C:\ drive.
So cron.properties will have entry as,
scheduler.cron=0 * * * * *
That's it for scheduling. The startProcess method runs according to your cron expression.
Now for Concurrency:
Now I have huge data (either from database or file reads) in startProcess() and I need to start processing that data in parallel. Also I have the req to get the process completion date, which means, I need to wait in my main thread for the completion of all threads.
First I write a Worker class which implements a Callable interface.
public class Worker implements Callable {
/**
*Callable interface implementation
*/
public Object call() throws Exception {
//Process your data here ..
boolean isCompleted = true;
return isCompleted;
}
}
Then in my spring application context, I mentioned as
This indicates to Spring, that a ThreadPoolTaskExecutor needs to be created who will have a pool size of 10, and a queue-capacity of 15. (Here, an initial pool of 10 threads will be created. First 10 requests will be given to 10 threads and their processing starts. The requests 11 - 15 will be set in a queue and as soon as any of the 10 threads are free , one of the queued requests will be assigned to that thread. When all the queue is full, the because we mentioned CALLER_RUNS, the next request runs in the main thread itself.
And in the MyScheduler (Which will be the main thread)
@Service("myScheduler")
public class MyScheduler {
@Autowired
@Qualifier("worker")
private ThreadPoolTaskExecutor worker;
public void startProcess(){
Date startTime = new Date();
Set futureSet = new HashSet();
for (int i = 0; i < 25; i++) {
Worker del = new Worker();
//Set any data to be sent to this worker thread..
futureSet .add(worker.submit(del));
}
for(Future temp: futureSet ){
if(temp.get() instanceof boolean){
boolean isCompleted = (Boolean)temp.get(); //This will wait till the thread completes it //task...
System.out.println("Complete Status :" + successMessage.get("CompletedStatus"));
}
}
Date completedTime = new Date();
}
}