Добавил:
Upload Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:

Professional Java.JDK.5.Edition (Wrox)

.pdf
Скачиваний:
31
Добавлен:
29.02.2016
Размер:
12.07 Mб
Скачать

Chapter 12

Life-cycle methods are the same as the Message Processor, as shown following:

public void start(){

//method bodies omitted

}

public void stop() {

//method bodies omitted

}

The setRouter method uses reflection to take the string parameter and create an instance of a Routeable object:

public void setRouter(String className) { try {

Class clazz= Class.forName(className); Constructor ct= clazz.getConstructor(null); routeable= (Routeable)ct.newInstance(null);

}catch (Exception e) { e.printStackTrace();

}

}

}

The onMessage method passes the message body to the routable interface and the interface returns the name of the queue to look up in the JNDI context:

public void onMessage(Message message) { TextMessage textMessage= (TextMessage)message; try {

String text= textMessage.getText(); String name= routeable.route(text);

Destination queue = JndiHelper.getDestination(name); producer.send(queue, message );

}catch (JMSException e) { e.printStackTrace();

}

}

}

So far, you have built the components to process messages and route messages between queues based on the message content. The next component takes distributed processing a step further by dividing large processing tasks into smaller pieces that can be executed in parallel.

Creating a Component to Divide Large

Tasks for Parallel Processing

Often, large tasks need to be divided up into small tasks that can be processed in parallel. This next component allows large messages to be broken up into smaller messages such that each submessage can be processed separately; however, a common scenario is that the workflow cannot continue until all submessages are processed. This component uses JMS correlating messages that were created from the same initial request.

566

Distributed Processing with JMS and JMX

This component uses two MBeans: one to split a message into submessages, and another to join the submessages together after they have been processed individually. Think of these two components as bookends of a smaller subprocess.

The two MBeans follow the same pattern as the previous components in that they contain a behavior interface, an MBean interface, and an implementing class. The first class discussed is the behavior interface for splitting a JMS message.

Splitable

The splitable interface takes the message text as an argument and returns a list of strings for creating submessages:

package wrox.processing; import java.util.List;

public interface Splitable {

public List getSubMessage( String text);

}

MessageSplitter

The message splitter is similar in design to the other components already built, except that this component creates several messages from a single input message:

package wrox.processing.jmx;

import java.lang.reflect.Constructor; import java.util.Iterator;

import java.util.List;

// jms import statements omitted. import wrox.processing.Splitable;

import wrox.processing.util.JndiHelper;

public class MessageSplitter implements MessageListener, MessageSplitterMBean { private boolean running;

private Splitable splitable; private String splitterName;

The JMX exposed properties are as follows:

public String getDestination() { return destinationName;

}

public void setDestination(String name) { destinationName= name;

}

public String getSource() { return sourceName;

}

567

Chapter 12

public void setSource(String name) {

sourceName= name;

}

Again, the onMessage method is where the algorithm for the component is implemented:

public void onMessage(Message m) { try {

TextMessage textMessage= (TextMessage)m;

Get the Unique message ID assigned by the JMS server when the message was created:

String correlationId= m.getJMSMessageID();

String text= textMessage.getText();

The onMessage method takes an input text message and splits it into submessages:

List messages= splitable.getSubMessage(text);

int count= messages.size();

The getJMSCorrelationID is a header parameter used to show that several message are related to one another. In this example, you will split one message into several submessages. Using the messageID of the source message as the correlationID of all the submessage, you will be able to identify which message produced a given submessage. This creates a relationship between all the new submessages that can be looked up in the MessageAggregator component. You have also set a property count on the message header. This will tell the MessageAggregator how many messages exist with this correlationId:

for (Iterator iter= messages.iterator(); iter.hasNext();) { TextMessage subMessage= session.createTextMessage(); subMessage.setJMSCorrelationID(correlationId); subMessage.setStringProperty(“count”, count);

String subText= (String)iter.next();

subMessage.setText(subText);

producer.send(subMessage);

}

} catch (JMSException e) { e.printStackTrace();

}

}

Given the example message described in the following code, the OrderSplitter would transform the input message into submessages by applying an XML transformation that extracts each of the items in order, as in the following example message:

568

Distributed Processing with JMS and JMX

<order>

<id>400</id>

<customer>

<name>Heather</name>

</customer>

<items>

<item>

<id>4034</id>

<description>VW Jetta,Blue</description>

<quantity>1</quantity>

</item>

<item>

<id>4500</id> <description>...</description> <quantity>2</quantity>

</item>

</items>

</order>

An example result of splitting the above message is shown by the following:

<!-- Message 1 of 2 -> <item>

<order-reference>400</order-reference> <id>4034</id>

<quantity>1</quantity>

</item>

<!-- Message 2 of 2 -> <item>

<order-reference>400</order-reference> <id>450</id>

<quantity>1</quantity>

</item>

That concludes the portion of this component that is responsible for splitting the initial message into submessages. The next sections look at the process of taking submessages and correlating them back together. Figure 12-9 shows an overview of the classes and interfaces that make up the design of the AggregatorMBean.

It’s important to understand from the design that there is no guarantee that submessages will be processed in any particular order. For that reason, the aggregator acts as a stateful message filter. It collects and stores submessages until all the submessages for a particular correlationId have been processed.

It’s possible to see one of the benefits of the loose coupling design strategy. For example, the specific subprocessing step is not tied to the split or aggregate component. So therefore, numerous subprocessing steps can be configured without modifying the application code of this component.

569

Chapter 12

Figure 12-9

Aggregateable

The aggregateable interface defines the aggregator life cycle. This defines the aggregateable strategy. The strategy can be changed by providing a different implementation of the isComplete() and getResultMessage():

package wrox.processing;

public interface Aggregateable {

public void setCorrelationId(String correlationId);

public void addMessage(String messageId,int count, String message); public boolean isComplete();

public String getResultMessage( );

}

MessageAggregator

This class is responsible for implementing the logic of receiving the submessages and tracking them via the correlationId:

package wrox.processing.jmx;

import java.lang.reflect.Constructor; import java.util.HashMap;

import java.util.Map;

//omitted jms imports

import wrox.processing.Aggregateable; import wrox.processing.util.JndiHelper;

public class MessageAggregator implements MessageListener, MessageAggregatorMBean {

570

Distributed Processing with JMS and JMX

The Map defines the collection of aggregators, one for each correlationId as shown here:

private Map aggregators= new HashMap();

// jmx properties omitted.

public void onMessage(Message m) { TextMessage textMessage= (TextMessage)m; try {

String correlationId= textMessage.getJMSCorrelationID();

if (correlationId != null) {

String messageId= textMessage.getJMSMessageID(); int count= textMessage.getIntProperty(“count”);

The basic steps of the algorithm are to look up an aggregator in the aggregator map. If it doesn’t exist, create an aggregator for that correlationId:

String text= textMessage.getText();

Aggregateable aggregateable= (Aggregateable)aggregators.get(correlationId); if (aggregateable == null) {

aggregateable= createAggregateable(); aggregateable.setCorrelationId(correlationId); aggregators.put(correlationId, aggregateable);

}

Next, add the message text, count property, and message ID to the aggregateable interface. Check to see if the aggregator isComplete(), meaning the last message has been received. If it has, get the resulting message from the aggregator and send it to the next destination queue:

aggregateable.addMessage(messageId, count, text); if (aggregateable.isComplete()) {

String result= aggregateable.getResultMessage();

TextMessage resultMessage= session.createTextMessage(result); producer.send(destination, resultMessage ); aggregators.remove(correlationId);

}

}

}catch (JMSException e) { e.printStackTrace();

}

}

The next section of code shows the method of using reflection to create an instance of the agreeable interface. You need to have an aggregateable class for each correlationID. This method is a variation on the previous setProcessor, setRouter method. It will be called by the onMessage when a new correlation Id is discovered:

protected Aggregateable createAggregateable() { try {

Class clazz= Class.forName(aggregatorClassName);

Constructor ct= clazz.getConstructor(null); return (Aggregateable)ct.newInstance(null);

} catch (Exception e) {

571

Chapter 12

throw new RuntimeException(“couldn’t create aggregateable object.”, e);

}

}

// JMX properties getSource, getDestination omitted

public void start() { //omitted

}

public void stop() { //omitted

}

}

The only thing left to implement is the concrete aggregator used in this example. The next section shows the OrderAggregator class. This handles the logic for your order processing example.

OrderAggregator

This section describes the orderAggregator class. This is an example aggregator used in this chapter’s business process. It receives submessages from the queue it is registered with and saves each with a correlationId until all the messages for that correlationId have been processed. Once all message have been received, it sends the dummy message stating that the order has been processed:

package wrox.processing.order; import wrox.processing.Aggregateable;

public class OrderAggregator implements Aggregateable {

int received= 0; boolean done= false;

String correlationId;

List savedMessages = new ArrayList(); public OrderAggregator( ) {

}

public void setCorrelationId(String correlationId ){ this.correlationId = correlationId;

}

572

Distributed Processing with JMS and JMX

The specific behavior of the OrderAggregator is to save each submessage and check the number of messages received against the number specified in the count:

public void addMessage(String messageId, int count, String message) {

savedMessages.add(message); if (count == received) {

done= true;

}

}

public String getResultMessage() {

// transform savedmessageList into an xml result message //TODO transform resulting xml.

return “<order><id>300</id><status>complete</status></order>”;

}

public boolean isComplete() { return done;

}

}

This concludes the components required to implement the example business process. In review, this section covered the development of three messaging components used in realizing an order processing system. Each component was designed as a standard MBean. Now that these MBeans have been developed, they can be deployed across several servers. The next section goes into the deployment process in great detail.

Deploying the Application

Deploy the application using one of the two methods described hereafter. The “Basic Deployment” section walks through all the pieces of the application step-by-step. Once you understand how each piece plays together, the next section, “Advanced Deployment,” shows how to configure the application to deploy dynamically using a built-in service that reads a text descriptor file that describes the MBean to be deployed.

Basic Deployment

1.Start the JMS server. For the chapter examples, use JORAM JMS Server. JORAM implements the JMS 1.1 specification and it is available free to the open source community. The server starts up using the bat file common (see Figure 12-10):

C:\joram-4.0.0\samples\bin\windows\single_server.bat

573

Chapter 12

Figure 12-10

2.Create the administered objects using the JMS admin tool (this is vendor specific). Once the server is started, connect to it by running the JORAM administration console (see Figure 12-11):

C:\joram-4.0.0\samples\bin\windows\admin.bat

574

Distributed Processing with JMS and JMX

Figure 12-11

The admin console allows you to create the applications ConnectionFactory and Destinations objects. They are bound to the JNDI context, to be accessed through the lookup(<name>) method of the javax.naming.Context object from your application code.

Note: In this chapter, all the queues and topics will reside on the same server, but it is possible to connect to and manage multiple JMS servers through a single console.

3.Start the JMX Agent containing the MBeanServer. The agent is just a standalone Java application. The classpath environment variable must include the classes from the example as well as the jmxri.jar from Sun’s JMX reference implementation:

java wrox.processing.jmx.Agent

575

Соседние файлы в предмете [НЕСОРТИРОВАННОЕ]