有一個項目要用到JMS,故而稍微學習瞭一點。
消息服務是指,兩個或者多個客戶機能夠通過發送和接受消息(以對等的方式)通信。消息是通過消息服務器有一個客戶機發送到另一個客戶機的"一塊"數據,可以是文本的,也可是數值的,如果客戶機是Java應用程序,數據還可以包括對象。其中客戶機不需要同時運行。
使用消息服務的原因:
1.松散耦合但是高內聚。使用消息服務的客戶機不需要實現通用接口,不需要瞭解對方。消息服務提供瞭標準接口。
2.不直接通信。客戶機不直接對話,而通過中間媒介,消息服務扮演 緩沖區,並提供 安全控制。
3.保證消息傳遞。 JMS的提供者保持消息持久,直到客戶機接受為止。
4.異步通信。
5.一對多、多對多和多對一通信。
JMS(Java 消息服務)是一組標準的API,能夠用於訪問多種消息服務器。使用JMS,可以使用一樣的API訪問IBM的MQSeries、JBossMQ等消息服務。
JMS API中有很多核心概念是映射到底層的消息服務器。其中:
1.受控對象(Administered Object)。它們是由管理者創建的供JMS客戶機使用的對象。如 連接工廠(Connection Factory),它們用於與底層的消息服務器和目標(隊列和主題)進行連接(註:不太理解,可能錯瞭。)。管理通過JNDI對他們進行管理。JMS提供瞭JMS客戶機和實際JMS提供者之間的緩沖區。JBoss就是這些對象的管理者。
2.JMS提供者。實現瞭JMS接口的消息服務器。(如 JBossMQ消息服務器)。
3.JMS客戶機。消息的生產者和消費者。由於是對等的通信機制,所以沒有客戶機和服務器的概念。JMS既可以是消息創建者又是消息接收者。
4.消息(Message)。JMS客戶機之間傳送的一條條消息。
傳統的消息服務一般支持點對點通信和發佈/訂閱通信兩種通信模式的一種。JMS API 兩種都支持。
Point-To-Point
點對點通信模式,有一個中心隊列作為發佈的目標(受控對象)。一個或多個消息生產者可以發送消息到這個隊列。然後被消息的消費者選取。
Pub/Sub
發佈/訂閱通信模式是基於主題(Topic)概念的。主題是消息的發佈目標(受控對象)。它和隊列的不同就在於,可以有多個發送消息和接收消息的客戶機,每個主題可以有多個發佈者和多個訂閱者。
JMS API
JMS API是在javax.jms包中定義的。要使用JMS API 需要創建一個提供連接對象的連接工廠。連接對象提供與消息服務器的鏈接。鏈接被用來創建會話,會話被用來創建消息,消息通過消息的生產者發送到目標(隊列或主題),然後消息傳遞到消息消費者。
JMS Parent
PTP Domain
Pub/Sub Domain
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
Queue
Topic
Session
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
TopicSubscriber
以下是對這些接口的簡單描述:
1.連接工廠(Connection Factory)
是客戶機用來創建與JMD提供者的鏈接的對象。它是受控對象,可以通過JNDI查找。JMS API定義瞭兩種類型的連接工廠。 QueueConnectionFactory 和 TopicConnectionFactory
2.連接(Connection)
連接對象是和JMS提供者通信的媒介。這個通信的具體實現依賴於JMS提供者。除瞭通用的借口,還有基於隊列(QueueConnection)和基於主題(TopicConnection)專用接口。
3.會話(Session)
用於創建消息的生產者、消費者和消息。它們都是單線程,能參加事務。有QueueSession和TopicSession。
4.消息(Message)
消息是消息服務器在客戶端之間發送的一條條信息。有五種接口,不同的類型消息。1. StreamMessage — Java原始值的數據流 2. MapMessage–一組名/值對 3.TextMessage–一個字符串對象 4. ObjectMessage–一個序列化的 Java對象 5.BytesMessage–一個未解釋字節的數據流。
消息由以下幾部分組成:
消息頭(header):JMS消息頭包含瞭許多字段,它們是消息發送後由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,並且為消息確定路由。
屬性(property):由消息發送者產生,用來添加刪除消息頭以外的附加信息。
消息體(body):由消息發送者產生。
5.目標(Destination)
目標是受控對象。在JMS中表示一個隊列或者一個主題。
6.消息生產者(MessageProducer)
是用於將消息發送到目標的對象,由會話對象創建,有QueueSender、TopicPublisher.
7.消息消費者(MessageConsumer)
是由會話對象創建,用於從目標獲取消息,有QueueReceiver、TopicSubscriber
一個JMS應用是幾個JMS 客戶端交換消息,開發JMS客戶端應用由以下幾步構成:
1) 用JNDI 得到ConnectionFactory對象;
2) 用ConnectionFactory創建Connection 對象;
3) 用Connection對象創建一個或多個JMS Session;
4) 用JNDI 得到目標隊列或主題對象,即Destination對象;
5) 用Session 和Destination 創建MessageProducer和MessageConsumer;
6) 通知Connection 開始傳遞消息。
消息生產者程序如下
[java]
package org.jms.test;
import java.io.*;
mport javax.jms.*;
import javax.naming.*;
public class Sender {
public static void main(String[] args) {
new Sender().send();
}
public void send() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = session.createProducer(queue);
//Send messages
String messageText = null;
while (true) {
System.out.println("Enter message to send or 'quit':");
messageText = reader.readLine();
if ("quit".equals(messageText))
break;
TextMessage message = session.createTextMessage(messageText);
sender.send(message);
}
//Exit
System.out.println("Exiting…");
reader.close();
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
消息消費者程序如下
[java]
package compute;
import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Receiver implements MessageListener {
private boolean stop = false;
public static void main(String[] args) {
new Receiver().receive();
}
public void receive() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
reader.close();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = session.createConsumer(queue);
receiver.setMessageListener(this);
connection.start();
//Wait for stop
while (!stop) {
Thread.sleep(1000);
}
//Exit
System.out.println("Exiting…");
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public void onMessage(Message message) {
try {
String msgText = ((TextMessage) message).getText();
System.out.println(msgText);
if ("stop".equals(msgText))
stop = true;
} catch (JMSException e) {
e.printStackTrace();
stop = true;
}
}
}
作者:luqin1988