Distributed Processing with JMS and JMX
The Map defines the collection of aggregators, one for each correlationId as shown here:
private Map aggregators= new HashMap();
// jmx properties omitted.
public void onMessage(Message m) { TextMessage textMessage= (TextMessage)m; try {
String correlationId= textMessage.getJMSCorrelationID();
if (correlationId != null) {
String messageId= textMessage.getJMSMessageID(); int count= textMessage.getIntProperty(“count”);
The basic steps of the algorithm are to look up an aggregator in the aggregator map. If it doesn’t exist, create an aggregator for that correlationId:
String text= textMessage.getText();
Aggregateable aggregateable= (Aggregateable)aggregators.get(correlationId); if (aggregateable == null) {
aggregateable= createAggregateable(); aggregateable.setCorrelationId(correlationId); aggregators.put(correlationId, aggregateable);
}
Next, add the message text, count property, and message ID to the aggregateable interface. Check to see if the aggregator isComplete(), meaning the last message has been received. If it has, get the resulting message from the aggregator and send it to the next destination queue:
aggregateable.addMessage(messageId, count, text); if (aggregateable.isComplete()) {
String result= aggregateable.getResultMessage();
TextMessage resultMessage= session.createTextMessage(result); producer.send(destination, resultMessage ); aggregators.remove(correlationId);
}
}
}catch (JMSException e) { e.printStackTrace();
}
}
The next section of code shows the method of using reflection to create an instance of the agreeable interface. You need to have an aggregateable class for each correlationID. This method is a variation on the previous setProcessor, setRouter method. It will be called by the onMessage when a new correlation Id is discovered:
protected Aggregateable createAggregateable() { try {
Class clazz= Class.forName(aggregatorClassName);
Constructor ct= clazz.getConstructor(null); return (Aggregateable)ct.newInstance(null);
} catch (Exception e) {