Welcome!

Weblogic Authors: Yeshim Deniz, Elizabeth White, Michael Meiner, Michael Bushong, Avi Rosenthal

Related Topics: Weblogic

Weblogic: Article

Asynchronous Messaging with WebLogic Server, Server Sessions, and MDBs

Asynchronous Messaging with WebLogic Server, Server Sessions, and MDBs

A synchronous message processing is a requirement of many applications that need to send and receive messages in real time or near real time. The JMS 1.0.2 Specification defines the MessageListener interface that allows an application to receive messages asynchronously from a JMS Destination when a message is sent to that destination. An implementation of the MessageListener interface is typically a client application to a JMS provider that runs in a remote JVM.

The JMS Specification also defines an optional set of APIs, JMS Application Server Facilities, that may be implemented by an application server vendor to allow for asynchronous, concurrent processing of a subscription. The EJB 2.0 Specification provides the Message Driven Bean API (MDB) that allows a developer to create and deploy a component into an EJB container, and have the container invoke the MDB when a message is sent to a JMS Destination that the MDB is deployed to service. Both of these APIs provide a way to consume messages from a JMS Destination within the JVM and framework of an application server rather than as an external JMS client to a JMS provider. Consumers deployed in this fashion can take advantage of services provided by the application server, such as transaction and thread management.

WebLogic Application Server provides an implementation of the MDB API described in the EJB 2.0 Specification and the optional JMS Application Server Facilities described in Chapter 8 of the JMS 1.0.2 Specification. I'll discuss these APIs, and support for them within WebLogic Server, with a store and forward example that uses MDB and ServerSessionPool. The example code was developed and tested using WebLogic Server 6.0 Service Pack 2. A general discussion of JMS isn't provided here, but an excellent three-part article, authored by Dave Chappell, on the JMS API can be found in the March, April, and May 2001 issues of Java Developer's Journal (Vol. 6, issues 3-5).

JMS ServerSessions
Chapter 8 of the JMS 1.0.2 Specification defines three additional optional interfaces, implemented by an application server vendor, that provide a special facility for concurrent message processing. It further delineates the roles of JMS provider, Application Server, and Application regarding the implementation and use of these interfaces. The interfaces found in the javax.jms package that make up this part of the API are ServerSession, ServerSessionPool, and ConnectionConsumer. The spec also defines how an application server will use the javax.jms.Session interface to implement these special facilities.

A Session object provides a setMessageListener() and getMessageListener() method to allow a ConnectionConsumer to associate a MessageListener with a Session. The ConnectionConsumer can set the MessageListener with a message selector. It also provides a run() method that allows an application server to start a Session once messages are available for consumption on a Destination.

A ServerSession is implemented by the application server to provide the runtime support for a JMS Session. It maps a thread to a Session and executes the Session's run() method through its start() method. In addition to the start() method, the ServerSession defines a getSession() method that returns the JMS Session associated with the ServerSession to the caller.

The application server also provides a javax.jms.ServerSessionPool object that's used by the ConnectionConsumer to get a Session for message processing. The ConnectionConsumer uses the only ServerSessionPool method, getServerSession(), to acquire a ServerSession for consuming messages that have been produced at the Destination.

Finally, javax.jms.TopicConnection and javax.jms.QueueConnection provide a method, getConnectionConsumer(), that creates a ConnectionConsumer that's used by the application server to load messages into a Session, and start the Session indirectly by calling the ServerSession's start() method. Construction of the ConnectionConsumer associates it with a ServerSessionPool and a Destination.

WebLogic provides an additional class that isn't part of the specification, weblogic.jms.ServerSessionPoolFactory. A ServerSessionPoolFactory is a factory pattern used to create a ServerSessionPool and associate it with a Connection, a MessageListener, set the Session's acknowledgment type, and define the transaction semantics of the Session. Implementation of a ServerSessionPool is subject to interpretation and will have vendor-specific features, and as such, its life cycle is well-suited to the factory pattern should the implementation change or should multiple implementations be made available. The WebLogic Server ServerSessionPool factory is looked up through JNDI using the name "weblogic.jms.ServerSessionPoolFactory:<name>" where <name> is the name of the JMS Server where the ServerSessionPool will be used.

MDBs
Message Driven Beans are defined by the EJB 2.0 Specification and exist for the exclusive purpose of consuming messages from a JMS Destination. WebLogic Server 6.0 provides full support for the MDB portion of the EJB 2.0 API.

If you are a seasoned EJB developer, MDBs are surprisingly simple, and even if you are new to EJB development you can create and deploy MDBs to consume messages from JMS Destinations with little difficulty. This simplicity of development and deployment is, in my opinion, the strong suit of MDBs.

Unlike session beans and entity beans, an MDB doesn't require the development of an extension to the EJBHome interface or the EJBObject interface. The reasons are that an MDB's home interface isn't located by a client application through a JNDI lookup, its life cycle isn't controlled by a client application through the factory methods on a home interface, and no methods need to be exposed to a client application through a remote interface. All the bean developer needs to do is implement the MessageDrivenBean interface with its two methods, setMessageDrivenContext() and ejbRemove() and implement the MessageListener interface providing an onMessage() implementation.

The spec adds some new elements to the ejb-jar.xml deployment descriptor for deploying MDBs. Here's the element definition from the ejb-jar_2.0.dtd:

<!ELEMENT message-driven (description?, display-name?, small-icon?, large-icon?, ejb-name?, ejb-class, transaction-type, message-selector?, acknowledge-mode?, message-driven-destination?, env-entry*, ejb-ref*, security-identity?, resource-ref*, resource-env-ref*)>

The <message-driven> element is an optional element contained in the <enterprise-beans> element. It uses many of the elements previously defined for describing an EJB, such as <ejb-name> and <transaction-type>. The <transaction-type> element describes the transaction semantics of the bean. The allowable values are "Bean" and "Container", analogous to session beans. A value of "Container" tells the EJB container to call the MDB onMessage() method in the context of a transaction. This is recommended. When using container-managed transactions, a <container-managed> section in the <assembly-descriptor> is required to tell the container what transaction attributes are needed for the MDB when it's invoked. When using container-managed transactions, the allowable values for <trans-attribute> are "Required" and "NotSupported". The new elements defined are:

  • <message-driven-destination>
  • <message-selector>
  • <acknowledge-mode>
The <message-driven-destination> element is a stanza used to describe the characteristics of the destination the MDB is deployed to consume. It must contain <destination-type> and may optionally contain <subscription durability>. The allowable values for <destination-type> are "javax.jms.Queue" and "javax.jms.Topic". The allowable values for <subscription-durability> are "Durable" and "NonDurable", with "NonDurable" being the default.

A <message-selector> may be specified using message selector syntax described in the JMS Specification. Message selection with MDBs is discouraged, as it can be an expensive operation when there are many messages in a Queue or Topic. Message selectors on Queues are not recommended.

Message Driven Beans do not use the JMS message acknowledgment API. The container provides message acknowledgment for the MDB based on the value of <acknowledge-mode> in the deployment descriptor. The default mode is AUTO_ACKNOWLEDGE.

The weblogic-ejb-jar.dtd also defines a new element, <message-driven-descriptor>, that's used to specify WebLogic Server-specific properties of the bean at deployment time. It's defined as:

<!ELEMENT message-driven-descriptor (
pool?,
destination-jndi-name?,
initial-context-factory?,
provider-url?,
connection-factory-jndi-name?
)
>

The <pool> element is used to tell the container how to pool instances of the MDB. The <destination-jndi-name> is the name of the Destination that the MDB will consume messages on. The <initial-context-factory> is the name of the JNDI InitialContext factory that will be used to get an InitialContext for looking up a ConnectionFactory. The <provider-url> is the URL of the InitialContext provider, and the <connection-factory-jndi-name> is the name of the ConnectionFactory that will be looked up.

Store and Forward Example
I created a small store and forward example to demonstrate the use of these two server-side techniques of asynchronous messaging. I developed the example by taking sample code provided with WebLogic Server and changing it slightly, and by using the WebLogic Server Admin Console to create the necessary JMS elements (see Figure 1).

The example uses a Queue and two Topic(s) to simulate a store and forward architecture used to route alarm messages. The Queue is consumed by an MDB that examines the header of a TextMessage for information that the MDB uses to forward the message to the appropriate Topic. The header contains a criticality level for the alarm message used for routing. The two Topic(s) represent forward destinations for the two levels of alarm criticality. One Topic receives "Critical" alarms and the other Topic receives "Normal" alarms (see Figure 2).

Alarm Routing MDB
The "Alarm" Queue, wldjAlarmQueue, is a persistent Queue that's consumed by the AlarmReaderBean class that implements MessageDrivenBean and MessageListener. The AlarmReaderBean is deployed with container-managed transactions and a <transaction-attribute> of "Required". The ejb-jar.xml and weblogic-ejb-jar.xml deployment descriptors are shown in Listings 1 and 2.

The onMessage() method of this consumer is responsible for examining a user property named "level" in the message header and forwarding the message to the wldjCriticalAlarmTopic or the wldjNormalAlarmTopic (see Listing 3).

Since the onMessage() method has to route the message to the appropriate Topic, it must have access to a TopicConnection to do so. To avoid looking up a TopicConnection every time a message is processed, the ejbCreate() method of the AlarmReaderBean performs a JNDI lookup on the wldjTopicConnectionFactory, creates a TopicConnection, and starts the TopicConnection. The ejbCreate() method is called only when the WebLogic Server EJB container creates the bean instance (see Listing 4).

When the onMessage() method has decided where to route the message, it creates a nontransacted AUTO_ACKNOWLEDGE TopicSession using the cached TopicConnection. It uses the TopicSession to create a TopicPublisher and then publish the message to the found Topic. The TopicPublisher and TopicSession are closed at the end of the method.

Consumer ServerSessions
Once the alarm messages have been routed to the wldjCriticalAlarmTopic and the wldjNormalAlarmTopic, they are processed by a MessageListener that has been associated with the Sessions in the ServerSessionPool. When the message arrives, the ConnectionConsumer assigned to the Destination uses the ServerSessionPool to acquire a ServerSession and a Session, loads the Session with a message (one or more), and starts the ServerSession. Let's examine the ServerSessionPool and ConnectionConsumer for the wldjCriticalAlarmTopic.

A ServerSessionPool and ConnectionConsumer can be created programmatically or by using the WebLogic Server Admin Console. Programmatically creating the ServerSessionPool and ConnectionConsumer can be done from a WebLogic Server client application or from a WebLogic Server Startup class.

Example code of creating and initializing a ServerSessionPool and a ConnectionConsumer in a Startup class is provided with the WebLogic Server distribution in WL_HOME/samples/examples/jms/sessionpool and WL_HOME/samples/examples/jms/startup where WL_HOME is the directory where the WebLogic Server installation is located. A Startup class is a class that implements weblogic.common. T3StartupDef is deployed in a WebLogic Server server instance and is invoked by WebLogic Server at the end of the startup sequence to perform any task that logically needs to be performed at server startup, such as ServerSessionPool initialization. See listing 5 for the example modified for our application.

First, a Connection to the JMS Server must be acquired through a ConnectionFactory. I used a Topic, a TopicConnection, and a TopicConnectionFactory for this example. A default implementation of TopicConnectionFactory is provided by WebLogic Server and can be found through a JNDI lookup. I created my own TopicConnectionFactory, wldjTopicConnectionFactory, with the same characteristics as the default.

Next, a JNDI lookup is performed on the Topic that the ConnectionConsumer will service. A JNDI lookup is performed to locate the ServerSessionPoolFactory, and from the factory a SessionPool is acquired using the getServerSessionPool() method. It's important to note that the SessionPool is created by passing a Connection, the maximum number of Sessions in the pool, a class that implements the MessageListener interface, a Boolean that specifies if the Sessions are transacted or nontransacted, and an acknowledgment mode to the getServerSessionPool() method. Finally, the ConnectionConsumer is created using the createConnectionConsumer() method on the Connection object, and is passed to the Topic, the ServerSessionPool, a message selector String, and the maximum number of messages that can be loaded into a Session. The message selector specified when the ConnectionConsumer is created using a String containing the message selector syntax defined in the JMS Specification. My example uses '"TRUE" as the message selector that selects all messages. More on this later.

The Startup class will execute at the end of WebLogic Server initialization and prepare a ConnectionConsumer that's ready to load a JMS Session with messages from the Topic and start the Session so that the messages can be consumed by the specified MessageListener. When a message arrives at the Destination, the ConnectionConsumer gets a ServerSession, uses the ServerSession to get the JMS Session, loads the JMS Session with one or more messages, and calls the start() method on the ServerSession. The ServerSession calls the run() method on the JMS Session, which then calls the onMessage() method of the associated MessageListener, passing it the Message that needs to be consumed. The role of the ConnectionConsumer is diagrammed in the JMS Specification and reproduced in Figure 3.

I find the programmatic technique of creating a ServerSessionPool and ConnectionConsumer much more complicated than using the WebLogic Server Admin Console. They can be created in the Admin Console after the JMS Server has been created. Use the SessionPools tab under JMS->Servers>yourServer NameSessionPools. Select "Create a new JMS Session PoolS". That will take you to the frame that allows you to specify the characteristics of the ServerSessionPool (see Figure 4).

The wldjCriticalTopicSessionPool specifies the wldjTopicConnectionFactory as the ConnectionFactory to use, the fully qualified classname of the MessageListener, wldj.jms.AlarmListener, the Acknowledgment Mode as Auto, the maximum number of Sessions, and the transaction semantics of the Sessions. All that must be provided programmatically is the MessageListener, which has to be provided if you create the ServerSessionPool programmatically as well.

Next, create the ConnectionConsumer by navigating to the Consumers tab under the newly created ServerSessionPool (see Figure 5). The ConnectionConsumer is configured with the maximum number of messages that it should load into a Session, a message selector, and the Destination for which it is consuming messages.

When messages arrive at the wldjCriticalTopicConnection, the ConnectionConsumer gets a JMS Session from a ServerSession in the ServerSessionPool, loads the Session with a message, and starts the ServerSession. The ServerSession then calls the run() method on the Session, which activates the onMessage() method of the associated MessageListener and passes it a Message.

QueueSend Client
This application passes a TextMessage from a "store" queue, wldjAlarmQueue, to "forward" Topics, wldjCriticalAlarmTopic, and wldjNormalAlarmTopic. To get the TextMessage to the wldjAlarmQueue, a JMS client that uses a QueueSender was created. The client sets the JMS MessagePriority, sets a String header property named "level", and sets a String into the TextMessage (see Listing 6).

Transactions
Transaction semantics in the JMS Specification are associated with the Session object. A Session can be created as transacted or nontransacted. Transacted Sessions have their transactions committed or rolled back with the commit() and rollback methods. Nontransacted Sessions are created with an acknowledgment mode specified.

When using server-side asynchronous message consumers, MDBs or ServerSessionPools, using nontransacted Sessions with an acknowledge mode of AUTO_ACKNOWLEDGE allows the JMS server to manage the transactions for the message consumer by calling its onMessage() method in the context of a transaction, and then committing the transaction when the onMessage() method returns. If you use transacted Sessions, your MessageConsumer is responsible for committing or rolling back the transaction.

JTA User Transactions aren't supported with MDB. A JTA User Transaction started in the onMessage() method of a MessageConsumer called by a Session in a ServerSessionPool will be outside of the context of the transaction started when the onMessage() method is called.

Summary
An asynchronous message-processing application can take advantage of the JMS and EJB APIs to perform message processing using just server-side code that runs in the framework of WebLogic Server. This releases client code from having to manage threads, concurrency, and transactions. The ease of creating this store and forward example is a testament to the robust implementation of the JMS and Message Driven Bean APIs by WebLogic Server.

ServerSessionPools and ConnectionConsumers are presented here as an alternative to using MDBs, but are more involved in their implementation. MDB is a simpler choice with the restriction that only one MDB can be deployed per Destination. Multiple ServerSessionPools and ConnectionConsumers can be created to service one Destination, and each ConnectionConsumer can be deployed with a message selector to filter messages based on Message header properties. Message selectors are expensive and not recommended on Queues, but may be useful to filter messages on a Topic that may have more than one interested subscriber or that may be consumed by more than one ConnectionConsumer. Message filtering on a Queue by an MDB should be performed programmatically in the MessageListener's onMessage() method, as was shown by the AlarmReaderBean.

Comments (0)

Share your thoughts on this story.

Add your comment
You must be signed in to add a comment. Sign-in | Register

In accordance with our Comment Policy, we encourage comments that are on topic, relevant and to-the-point. We will remove comments that include profanity, personal attacks, racial slurs, threats of violence, or other inappropriate material that violates our Terms and Conditions, and will block users who make repeated violations. We ask all readers to expect diversity of opinion and to treat one another with dignity and respect.


IoT & Smart Cities Stories
The deluge of IoT sensor data collected from connected devices and the powerful AI required to make that data actionable are giving rise to a hybrid ecosystem in which cloud, on-prem and edge processes become interweaved. Attendees will learn how emerging composable infrastructure solutions deliver the adaptive architecture needed to manage this new data reality. Machine learning algorithms can better anticipate data storms and automate resources to support surges, including fully scalable GPU-c...
Machine learning has taken residence at our cities' cores and now we can finally have "smart cities." Cities are a collection of buildings made to provide the structure and safety necessary for people to function, create and survive. Buildings are a pool of ever-changing performance data from large automated systems such as heating and cooling to the people that live and work within them. Through machine learning, buildings can optimize performance, reduce costs, and improve occupant comfort by ...
The explosion of new web/cloud/IoT-based applications and the data they generate are transforming our world right before our eyes. In this rush to adopt these new technologies, organizations are often ignoring fundamental questions concerning who owns the data and failing to ask for permission to conduct invasive surveillance of their customers. Organizations that are not transparent about how their systems gather data telemetry without offering shared data ownership risk product rejection, regu...
René Bostic is the Technical VP of the IBM Cloud Unit in North America. Enjoying her career with IBM during the modern millennial technological era, she is an expert in cloud computing, DevOps and emerging cloud technologies such as Blockchain. Her strengths and core competencies include a proven record of accomplishments in consensus building at all levels to assess, plan, and implement enterprise and cloud computing solutions. René is a member of the Society of Women Engineers (SWE) and a m...
Poor data quality and analytics drive down business value. In fact, Gartner estimated that the average financial impact of poor data quality on organizations is $9.7 million per year. But bad data is much more than a cost center. By eroding trust in information, analytics and the business decisions based on these, it is a serious impediment to digital transformation.
Digital Transformation: Preparing Cloud & IoT Security for the Age of Artificial Intelligence. As automation and artificial intelligence (AI) power solution development and delivery, many businesses need to build backend cloud capabilities. Well-poised organizations, marketing smart devices with AI and BlockChain capabilities prepare to refine compliance and regulatory capabilities in 2018. Volumes of health, financial, technical and privacy data, along with tightening compliance requirements by...
Predicting the future has never been more challenging - not because of the lack of data but because of the flood of ungoverned and risk laden information. Microsoft states that 2.5 exabytes of data are created every day. Expectations and reliance on data are being pushed to the limits, as demands around hybrid options continue to grow.
Digital Transformation and Disruption, Amazon Style - What You Can Learn. Chris Kocher is a co-founder of Grey Heron, a management and strategic marketing consulting firm. He has 25+ years in both strategic and hands-on operating experience helping executives and investors build revenues and shareholder value. He has consulted with over 130 companies on innovating with new business models, product strategies and monetization. Chris has held management positions at HP and Symantec in addition to ...
Enterprises have taken advantage of IoT to achieve important revenue and cost advantages. What is less apparent is how incumbent enterprises operating at scale have, following success with IoT, built analytic, operations management and software development capabilities - ranging from autonomous vehicles to manageable robotics installations. They have embraced these capabilities as if they were Silicon Valley startups.
As IoT continues to increase momentum, so does the associated risk. Secure Device Lifecycle Management (DLM) is ranked as one of the most important technology areas of IoT. Driving this trend is the realization that secure support for IoT devices provides companies the ability to deliver high-quality, reliable, secure offerings faster, create new revenue streams, and reduce support costs, all while building a competitive advantage in their markets. In this session, we will use customer use cases...