In this post I'm going to explain how to process a priority JMS message using Spring Integration.
Use case: create an enterprise email system capable of sending mass emails and single email, when the system receives a priority message it needs to pass through even there is already messages are being processed.
Environment Setup:
1. Spring 3.0
2. Spring Integegration 2.0.1 or above
3. Weblogic 10.3.3
4. JDeveloper 11.1.1.3
Download JDeveloper from http://www.oracle.com/technetwork/developer-tools/jdev/downloads/index.html
and install on local machine. This installs weblogic application server as integrated server.
Create an Email.java domain class contains recipient information and priority
import java.io.Serializable;
public class Email implements Serializable{
private String toEmailAddress;
private String fromEmailAddress;
private String subject;
private String content;
private MessagePriority priority;
public void setToEmailAddress(String toEmailAddress) {
this.toEmailAddress = toEmailAddress;
}
public String getToEmailAddress() {
return toEmailAddress;
}
public void setFromEmailAddress(String fromEmailAddress) {
this.fromEmailAddress = fromEmailAddress;
}
public String getFromEmailAddress() {
return fromEmailAddress;
}
public void setSubject(String subject) {
this.subject = subject;
}
public String getSubject() {
return subject;
}
public MessagePriority getPriority() {
return priority;
}
public void setPriority(MessagePriority priority) {
this.priority = priority;
}
public void setContent(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}</span>
Enumeration class for priority
public enum MessagePriority {
HIGH,
MEDIUM,
LOW
}
Create spring context file where defining direct channels and routing to jms adapters based on the message priority.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail-2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-2.0.xsd">
<context:annotation-config/>
<context:component-scan base-package="com.si.priority"/>
<int:gateway id="siGateway" default-request-channel="gatewayChannel"
service-interface="com.si.priority.SIGateway"/>
<int:channel id="gatewayChannel"/>
<int:splitter input-channel="gatewayChannel" output-channel="setPriorityChannel"/>
<!-- Router to determine next channel based on the message priority -->
<int:channel id="setPriorityChannel" />
<int:router id="priorityRouterId" input-channel="setPriorityChannel" default-output-channel="lowPriorityChannel" ref="priorityRouter"/>
<!-- Set the low priority value -->
<int:channel id="lowPriorityChannel" />
<int-jms:outbound-channel-adapter id="lowPriorityAdapter" channel="lowPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true" priority="0"/>
<!-- Set the medium priority value -->
<int:channel id="mediumPriorityChannel" />
<int-jms:outbound-channel-adapter id="mediumPriorityAdapter" channel="mediumPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true" priority="1"/>
<!-- Set the high priority value -->
<int:channel id="highPriorityChannel" />
<int-jms:outbound-channel-adapter id="highPriorityAdapter" channel="highPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true" priority="9"/>
<!-- Priority value range should be 0-9, setting more than that throws uncategorized exception -->
<int:channel id="singleMessage" />
<!--A default JMS message listener -->
<int-jms:message-driven-channel-adapter id="singleMessageAdapter"
destination="EmailQueue"
channel="singleMessage" />
<!--Convert message into MailMessage using a transformer -->
<int:transformer id="transformer" input-channel="singleMessage" output-channel="emailChannel" ref="emailTransformer"/>
<int:channel id="emailChannel" />
<int-mail:outbound-channel-adapter channel="emailChannel" mail-sender="mailSender" />
<bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
<property name="host" value="example.com"/><!-- the host name should be the valid one -->
</bean>
<jee:jndi-lookup id="connectionFactory" jndi-name="jms/connectionFactory">
<jee:environment>
java.naming.factory.initial=weblogic.jndi.WLInitialContextFactory
java.naming.provider.url=t3://localhost:8001
</jee:environment>
</jee:jndi-lookup>
<jee:jndi-lookup id="EmailQueue" jndi-name="jms/emailQueue">
<jee:environment>
java.naming.factory.initial=weblogic.jndi.WLInitialContextFactory
java.naming.provider.url=t3://localhost:8001
</jee:environment>
</jee:jndi-lookup>
</beans>
In the context file a gateway has defined with id = siGateway this is the place where process initiates. The gateway contains a business method submitRequest(List<Email>)
import java.util.List;
public interface SIGateway {
public void submitRequest(List<Email> emailList);
}
Gateway receives message and sends to splitter, splitter splits the list into single Email message then forwards to Router priorityRouterId. @Router annotated method nextChannel returns next channel name based message priority..
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
@MessageEndpoint
public class PriorityRouter {
private static final String HIGH_PRIORITY_CHANNEL = "highPriorityChannel";
private static final String MEDIUM_PRIORITY_CHANNEL = "mediumPriorityChannel";
private static final String LOW_PRIORITY_CHANNEL = "lowPriorityChannel";
@Router
public String nextChannel(Email email){
if (email.getPriority() == MessagePriority.HIGH) {
return HIGH_PRIORITY_CHANNEL;
}
if (email.getPriority() == MessagePriority.MEDIUM) {
return MEDIUM_PRIORITY_CHANNEL;
}
return LOW_PRIORITY_CHANNEL;
}
}
Setting priority value to the JMS header using <int-jms:outbound-channel-adapter> in order to set the priority value we need to add property explicit-qos-enabled="true" otherwise it ignores priority property.
we need to add either destination and connectionfactory or jmsTemplate property to the jms:outbound-channel adapter. here I added a destination name EmailQueue is a distributed queue created in weblogic JMS module.
Create a destination key in weblogic with sortKey is JMSPriority and direction Descending.
Set this destination key to distributed queue EmailQueue.
The priority value range must be set 0-9.
Defined default message listener <int-jms:message-driven-channel-adapter> on queue EmailQueue receives a spring integration message and forwards to EmailTransformer .
I'm using EmailTransformer to transform spring integration message into MailMessage because <int-mail:outbound-channel-adapte> receiving message type should be mailmessage otherwise throws MessageHandlingException :Unable to create MailMessage from payload type...
EmailTransformer.java looks like below sets recipient information to MailMessage.
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Transformer;
import org.springframework.mail.MailMessage;
import org.springframework.mail.SimpleMailMessage;
@MessageEndpoint
public class EmailTransformer {
@Transformer
public MailMessage tranform(Email email){
MailMessage msg = new SimpleMailMessage();
msg.setTo(email.getToEmailAddress());
msg.setFrom(email.getFromEmailAddress());
msg.setSubject(email.getSubject());
msg.setText(email.getContent());
return msg;
}
}
Finally this is the client sending mass email list and high priority email.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SendEmailTest {
protected static SIGateway gateway;
public SendEmailTest() {
super();
}
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("resources/applicationContext.xml");
gateway = (SIGateway)context.getBean("siGateway");
List<Email> massEmailList = new ArrayList<Email>();
for (int i = 1; i <= 2; i++) {
Email email = new Email();
email.setFromEmailAddress("no-reply@test.com");
email.setSubject("Low priority email: " + i);
email.setToEmailAddress("testxxx@gmail.com");
email.setContent("JMS low priority message.");
email.setPriority(MessagePriority.LOW);
massEmailList.add(email);
}
gateway.submitRequest(massEmailList);
System.out.println("Request submit to process mass email.");
Email singleEmail = new Email();
singleEmail.setFromEmailAddress("no-reply@test.com");
singleEmail.setSubject("High priority email");
singleEmail.setToEmailAddress("testxxx@gmail.com");
singleEmail.setContent("JMS high priority message.");
singleEmail.setPriority(MessagePriority.HIGH);
List<Email> emailList = Arrays.asList(singleEmail);
gateway.submitRequest(emailList);
System.out.println("Request submit to process priority email.");
}
}
Use case: create an enterprise email system capable of sending mass emails and single email, when the system receives a priority message it needs to pass through even there is already messages are being processed.
Environment Setup:
1. Spring 3.0
2. Spring Integegration 2.0.1 or above
3. Weblogic 10.3.3
4. JDeveloper 11.1.1.3
Download JDeveloper from http://www.oracle.com/technetwork/developer-tools/jdev/downloads/index.html
and install on local machine. This installs weblogic application server as integrated server.
Create an Email.java domain class contains recipient information and priority
import java.io.Serializable;
public class Email implements Serializable{
private String toEmailAddress;
private String fromEmailAddress;
private String subject;
private String content;
private MessagePriority priority;
public void setToEmailAddress(String toEmailAddress) {
this.toEmailAddress = toEmailAddress;
}
public String getToEmailAddress() {
return toEmailAddress;
}
public void setFromEmailAddress(String fromEmailAddress) {
this.fromEmailAddress = fromEmailAddress;
}
public String getFromEmailAddress() {
return fromEmailAddress;
}
public void setSubject(String subject) {
this.subject = subject;
}
public String getSubject() {
return subject;
}
public MessagePriority getPriority() {
return priority;
}
public void setPriority(MessagePriority priority) {
this.priority = priority;
}
public void setContent(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}</span>
Enumeration class for priority
public enum MessagePriority {
HIGH,
MEDIUM,
LOW
}
Create spring context file where defining direct channels and routing to jms adapters based on the message priority.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail-2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-2.0.xsd">
<context:annotation-config/>
<context:component-scan base-package="com.si.priority"/>
<int:gateway id="siGateway" default-request-channel="gatewayChannel"
service-interface="com.si.priority.SIGateway"/>
<int:channel id="gatewayChannel"/>
<int:splitter input-channel="gatewayChannel" output-channel="setPriorityChannel"/>
<!-- Router to determine next channel based on the message priority -->
<int:channel id="setPriorityChannel" />
<int:router id="priorityRouterId" input-channel="setPriorityChannel" default-output-channel="lowPriorityChannel" ref="priorityRouter"/>
<!-- Set the low priority value -->
<int:channel id="lowPriorityChannel" />
<int-jms:outbound-channel-adapter id="lowPriorityAdapter" channel="lowPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true" priority="0"/>
<!-- Set the medium priority value -->
<int:channel id="mediumPriorityChannel" />
<int-jms:outbound-channel-adapter id="mediumPriorityAdapter" channel="mediumPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true" priority="1"/>
<!-- Set the high priority value -->
<int:channel id="highPriorityChannel" />
<int-jms:outbound-channel-adapter id="highPriorityAdapter" channel="highPriorityChannel" destination="EmailQueue" explicit-qos-enabled="true" priority="9"/>
<!-- Priority value range should be 0-9, setting more than that throws uncategorized exception -->
<int:channel id="singleMessage" />
<!--A default JMS message listener -->
<int-jms:message-driven-channel-adapter id="singleMessageAdapter"
destination="EmailQueue"
channel="singleMessage" />
<!--Convert message into MailMessage using a transformer -->
<int:transformer id="transformer" input-channel="singleMessage" output-channel="emailChannel" ref="emailTransformer"/>
<int:channel id="emailChannel" />
<int-mail:outbound-channel-adapter channel="emailChannel" mail-sender="mailSender" />
<bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
<property name="host" value="example.com"/><!-- the host name should be the valid one -->
</bean>
<jee:jndi-lookup id="connectionFactory" jndi-name="jms/connectionFactory">
<jee:environment>
java.naming.factory.initial=weblogic.jndi.WLInitialContextFactory
java.naming.provider.url=t3://localhost:8001
</jee:environment>
</jee:jndi-lookup>
<jee:jndi-lookup id="EmailQueue" jndi-name="jms/emailQueue">
<jee:environment>
java.naming.factory.initial=weblogic.jndi.WLInitialContextFactory
java.naming.provider.url=t3://localhost:8001
</jee:environment>
</jee:jndi-lookup>
</beans>
In the context file a gateway has defined with id = siGateway this is the place where process initiates. The gateway contains a business method submitRequest(List<Email>)
import java.util.List;
public interface SIGateway {
public void submitRequest(List<Email> emailList);
}
Gateway receives message and sends to splitter, splitter splits the list into single Email message then forwards to Router priorityRouterId. @Router annotated method nextChannel returns next channel name based message priority..
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
@MessageEndpoint
public class PriorityRouter {
private static final String HIGH_PRIORITY_CHANNEL = "highPriorityChannel";
private static final String MEDIUM_PRIORITY_CHANNEL = "mediumPriorityChannel";
private static final String LOW_PRIORITY_CHANNEL = "lowPriorityChannel";
@Router
public String nextChannel(Email email){
if (email.getPriority() == MessagePriority.HIGH) {
return HIGH_PRIORITY_CHANNEL;
}
if (email.getPriority() == MessagePriority.MEDIUM) {
return MEDIUM_PRIORITY_CHANNEL;
}
return LOW_PRIORITY_CHANNEL;
}
}
Setting priority value to the JMS header using <int-jms:outbound-channel-adapter> in order to set the priority value we need to add property explicit-qos-enabled="true" otherwise it ignores priority property.
we need to add either destination and connectionfactory or jmsTemplate property to the jms:outbound-channel adapter. here I added a destination name EmailQueue is a distributed queue created in weblogic JMS module.
Create a destination key in weblogic with sortKey is JMSPriority and direction Descending.
Set this destination key to distributed queue EmailQueue.
The priority value range must be set 0-9.
Defined default message listener <int-jms:message-driven-channel-adapter> on queue EmailQueue receives a spring integration message and forwards to EmailTransformer .
I'm using EmailTransformer to transform spring integration message into MailMessage because <int-mail:outbound-channel-adapte> receiving message type should be mailmessage otherwise throws MessageHandlingException :Unable to create MailMessage from payload type...
EmailTransformer.java looks like below sets recipient information to MailMessage.
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Transformer;
import org.springframework.mail.MailMessage;
import org.springframework.mail.SimpleMailMessage;
@MessageEndpoint
public class EmailTransformer {
@Transformer
public MailMessage tranform(Email email){
MailMessage msg = new SimpleMailMessage();
msg.setTo(email.getToEmailAddress());
msg.setFrom(email.getFromEmailAddress());
msg.setSubject(email.getSubject());
msg.setText(email.getContent());
return msg;
}
}
Finally this is the client sending mass email list and high priority email.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SendEmailTest {
protected static SIGateway gateway;
public SendEmailTest() {
super();
}
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("resources/applicationContext.xml");
gateway = (SIGateway)context.getBean("siGateway");
List<Email> massEmailList = new ArrayList<Email>();
for (int i = 1; i <= 2; i++) {
Email email = new Email();
email.setFromEmailAddress("no-reply@test.com");
email.setSubject("Low priority email: " + i);
email.setToEmailAddress("testxxx@gmail.com");
email.setContent("JMS low priority message.");
email.setPriority(MessagePriority.LOW);
massEmailList.add(email);
}
gateway.submitRequest(massEmailList);
System.out.println("Request submit to process mass email.");
Email singleEmail = new Email();
singleEmail.setFromEmailAddress("no-reply@test.com");
singleEmail.setSubject("High priority email");
singleEmail.setToEmailAddress("testxxx@gmail.com");
singleEmail.setContent("JMS high priority message.");
singleEmail.setPriority(MessagePriority.HIGH);
List<Email> emailList = Arrays.asList(singleEmail);
gateway.submitRequest(emailList);
System.out.println("Request submit to process priority email.");
}
}
Hi,
ReplyDeleteThanks for your posting and clear explanation on every details required to integrate with weblogic JMS and Spring Integration.
Regards
Muthuvel