Thursday, September 15, 2011

Spring Integration JMS Priority Messages

In this post I'm going to explain how to process a priority JMS message using Spring Integration.
Use case: create an enterprise email system capable of sending mass emails and single email, when the system receives a priority message it needs to pass through even there is already messages are being processed.
Environment Setup:
1. Spring 3.0
2. Spring Integegration 2.0.1 or above
3. Weblogic 10.3.3
4. JDeveloper 11.1.1.3
Download JDeveloper from http://www.oracle.com/technetwork/developer-tools/jdev/downloads/index.html
and install on local machine. This installs weblogic application server as integrated server.

Create an Email.java domain class contains recipient information and priority

import java.io.Serializable;

public class Email implements Serializable{
    private String toEmailAddress;
    private String fromEmailAddress;
    private String subject;
    private String content;
    private MessagePriority priority;

    public void setToEmailAddress(String toEmailAddress) {
        this.toEmailAddress = toEmailAddress;
    }

    public String getToEmailAddress() {
        return toEmailAddress;
    }

    public void setFromEmailAddress(String fromEmailAddress) {
        this.fromEmailAddress = fromEmailAddress;
    }

    public String getFromEmailAddress() {
        return fromEmailAddress;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public String getSubject() {
        return subject;
    }

    public MessagePriority getPriority() {
        return priority;
    }

    public void setPriority(MessagePriority priority) {
        this.priority = priority;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }
}</span>

Enumeration class for priority
public enum MessagePriority {
   HIGH,
   MEDIUM,
   LOW
}

Create spring context file where defining direct channels and  routing to jms adapters based on the message priority.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xmlns:jee="http://www.springframework.org/schema/jee"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
       xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-3.0.xsd             http://www.springframework.org/schema/integration             http://www.springframework.org/schema/integration/spring-integration-2.0.xsd             http://www.springframework.org/schema/integration/jms             http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd             http://www.springframework.org/schema/integration/mail             http://www.springframework.org/schema/integration/mail/spring-integration-mail-2.0.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/jms
       http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
        http://www.springframework.org/schema/jee
            http://www.springframework.org/schema/jee/spring-jee-2.0.xsd">

  <context:annotation-config/>
  <context:component-scan base-package="com.si.priority"/>
  <int:gateway id="siGateway" default-request-channel="gatewayChannel"
               service-interface="com.si.priority.SIGateway"/>
  <int:channel id="gatewayChannel"/>
  <int:splitter input-channel="gatewayChannel" output-channel="setPriorityChannel"/>
 
<!-- Router to determine next channel based on the message priority -->
  <int:channel id="setPriorityChannel" />
  <int:router id="priorityRouterId" input-channel="setPriorityChannel" default-output-channel="lowPriorityChannel" ref="priorityRouter"/>

<!-- Set the low priority value -->
  <int:channel id="lowPriorityChannel" />
  <int-jms:outbound-channel-adapter id="lowPriorityAdapter" channel="lowPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true"  priority="0"/>

<!-- Set the medium priority value -->
  <int:channel id="mediumPriorityChannel" />
  <int-jms:outbound-channel-adapter id="mediumPriorityAdapter" channel="mediumPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true"  priority="1"/>

<!-- Set the high priority value -->
  <int:channel id="highPriorityChannel" />
  <int-jms:outbound-channel-adapter id="highPriorityAdapter" channel="highPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true"  priority="9"/>
<!-- Priority value range should be 0-9, setting more than that throws uncategorized exception -->
  <int:channel id="singleMessage" />

<!--A default JMS message listener -->
 <int-jms:message-driven-channel-adapter id="singleMessageAdapter"
                                          destination="EmailQueue"
                                          channel="singleMessage" />

<!--Convert message into MailMessage using a transformer -->
  <int:transformer id="transformer" input-channel="singleMessage" output-channel="emailChannel" ref="emailTransformer"/>
  <int:channel id="emailChannel" />
 <int-mail:outbound-channel-adapter channel="emailChannel" mail-sender="mailSender" />


 <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
    <property name="host" value="example.com"/><!-- the host name should be the valid one -->
  </bean>

  <jee:jndi-lookup id="connectionFactory" jndi-name="jms/connectionFactory">
<jee:environment>
                          java.naming.factory.initial=weblogic.jndi.WLInitialContextFactory
                           java.naming.provider.url=t3://localhost:8001
</jee:environment>
  </jee:jndi-lookup>
   <jee:jndi-lookup id="EmailQueue" jndi-name="jms/emailQueue">
   <jee:environment>
                          java.naming.factory.initial=weblogic.jndi.WLInitialContextFactory
                           java.naming.provider.url=t3://localhost:8001
</jee:environment>
   </jee:jndi-lookup>

</beans>


In the context file a gateway has defined with id = siGateway this is the place where process initiates. The gateway contains a business method submitRequest(List<Email>)
 import java.util.List;

public interface SIGateway {
    public void submitRequest(List<Email> emailList);
}

Gateway receives message and sends to splitter, splitter splits the list into single Email message then forwards to Router priorityRouterId.  @Router annotated method nextChannel returns next channel name based message priority..


import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;

@MessageEndpoint
public class PriorityRouter {
  private static final String HIGH_PRIORITY_CHANNEL = "highPriorityChannel";
  private static final String MEDIUM_PRIORITY_CHANNEL = "mediumPriorityChannel";
  private static final String LOW_PRIORITY_CHANNEL = "lowPriorityChannel";

    @Router
    public String nextChannel(Email email){
      if (email.getPriority() == MessagePriority.HIGH) {
          return HIGH_PRIORITY_CHANNEL;
      }
      if (email.getPriority() == MessagePriority.MEDIUM) {
          return MEDIUM_PRIORITY_CHANNEL;
      }
      return LOW_PRIORITY_CHANNEL;
    }
}

Setting priority value to the JMS header using   <int-jms:outbound-channel-adapter>  in order to set the priority value we need to add property explicit-qos-enabled="true" otherwise it ignores priority property.
we need to add either destination and connectionfactory or jmsTemplate property to the jms:outbound-channel adapter. here I added a destination name EmailQueue is a distributed queue created in weblogic JMS module.
Create a  destination key in weblogic with sortKey is JMSPriority and direction Descending.
Set this destination key to distributed queue EmailQueue.
The priority value range must be set  0-9.
 Defined   default message listener <int-jms:message-driven-channel-adapter> on queue EmailQueue receives a spring integration message and forwards to EmailTransformer .
I'm using  EmailTransformer to transform spring integration message into MailMessage because <int-mail:outbound-channel-adapte> receiving message type should be mailmessage otherwise throws  MessageHandlingException :Unable to create MailMessage from payload type...
 EmailTransformer.java looks like below sets recipient information to MailMessage.


 import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Transformer;
import org.springframework.mail.MailMessage;
import org.springframework.mail.SimpleMailMessage;

@MessageEndpoint
public class EmailTransformer {
    @Transformer
    public MailMessage tranform(Email email){
      MailMessage msg = new SimpleMailMessage();
      msg.setTo(email.getToEmailAddress());
      msg.setFrom(email.getFromEmailAddress());
      msg.setSubject(email.getSubject());
      msg.setText(email.getContent());
      return msg;
    }
}

Finally this is the client sending mass email list and high priority email.


import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class SendEmailTest {

   protected static SIGateway gateway;
    public SendEmailTest() {
        super();
    }

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("resources/applicationContext.xml");
        gateway = (SIGateway)context.getBean("siGateway");

        List<Email> massEmailList = new ArrayList<Email>();


        for (int i = 1; i <= 2; i++) {
            Email email = new Email();
            email.setFromEmailAddress("no-reply@test.com");
            email.setSubject("Low priority email: " + i);
            email.setToEmailAddress("testxxx@gmail.com");
            email.setContent("JMS low priority message.");
            email.setPriority(MessagePriority.LOW);
            massEmailList.add(email);
        }
        gateway.submitRequest(massEmailList);
        System.out.println("Request submit to process mass email.");
        Email singleEmail = new Email();
        singleEmail.setFromEmailAddress("no-reply@test.com");
        singleEmail.setSubject("High priority email");
        singleEmail.setToEmailAddress("testxxx@gmail.com");
        singleEmail.setContent("JMS high priority message.");
        singleEmail.setPriority(MessagePriority.HIGH);
        List<Email> emailList = Arrays.asList(singleEmail);
        gateway.submitRequest(emailList);
        System.out.println("Request submit to process priority email.");

    }
}






1 comment:

  1. Hi,

    Thanks for your posting and clear explanation on every details required to integrate with weblogic JMS and Spring Integration.

    Regards
    Muthuvel

    ReplyDelete