2025-02-10

有一個項目要用到JMS,故而稍微學習瞭一點。
    消息服務是指,兩個或者多個客戶機能夠通過發送和接受消息(以對等的方式)通信。消息是通過消息服務器有一個客戶機發送到另一個客戶機的"一塊"數據,可以是文本的,也可是數值的,如果客戶機是Java應用程序,數據還可以包括對象。其中客戶機不需要同時運行。
    使用消息服務的原因:
       1.松散耦合但是高內聚。使用消息服務的客戶機不需要實現通用接口,不需要瞭解對方。消息服務提供瞭標準接口。
       2.不直接通信。客戶機不直接對話,而通過中間媒介,消息服務扮演 緩沖區,並提供 安全控制。
       3.保證消息傳遞。 JMS的提供者保持消息持久,直到客戶機接受為止。
       4.異步通信。
       5.一對多、多對多和多對一通信。
    JMS(Java 消息服務)是一組標準的API,能夠用於訪問多種消息服務器。使用JMS,可以使用一樣的API訪問IBM的MQSeries、JBossMQ等消息服務。
     JMS API中有很多核心概念是映射到底層的消息服務器。其中:
     1.受控對象(Administered Object)。它們是由管理者創建的供JMS客戶機使用的對象。如 連接工廠(Connection Factory),它們用於與底層的消息服務器和目標(隊列和主題)進行連接(註:不太理解,可能錯瞭。)。管理通過JNDI對他們進行管理。JMS提供瞭JMS客戶機和實際JMS提供者之間的緩沖區。JBoss就是這些對象的管理者。
     2.JMS提供者。實現瞭JMS接口的消息服務器。(如 JBossMQ消息服務器)。
     3.JMS客戶機。消息的生產者和消費者。由於是對等的通信機制,所以沒有客戶機和服務器的概念。JMS既可以是消息創建者又是消息接收者。
     4.消息(Message)。JMS客戶機之間傳送的一條條消息。
     傳統的消息服務一般支持點對點通信和發佈/訂閱通信兩種通信模式的一種。JMS API 兩種都支持。
 
      Point-To-Point
     點對點通信模式,有一個中心隊列作為發佈的目標(受控對象)。一個或多個消息生產者可以發送消息到這個隊列。然後被消息的消費者選取。
 
      Pub/Sub
      發佈/訂閱通信模式是基於主題(Topic)概念的。主題是消息的發佈目標(受控對象)。它和隊列的不同就在於,可以有多個發送消息和接收消息的客戶機,每個主題可以有多個發佈者和多個訂閱者。
 
      JMS API
      JMS API是在javax.jms包中定義的。要使用JMS API 需要創建一個提供連接對象的連接工廠。連接對象提供與消息服務器的鏈接。鏈接被用來創建會話,會話被用來創建消息,消息通過消息的生產者發送到目標(隊列或主題),然後消息傳遞到消息消費者。
JMS Parent
PTP Domain
Pub/Sub Domain
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
Queue
Topic
Session
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
TopicSubscriber
以下是對這些接口的簡單描述:
      1.連接工廠(Connection Factory)
         是客戶機用來創建與JMD提供者的鏈接的對象。它是受控對象,可以通過JNDI查找。JMS API定義瞭兩種類型的連接工廠。 QueueConnectionFactory 和 TopicConnectionFactory
      2.連接(Connection)
         連接對象是和JMS提供者通信的媒介。這個通信的具體實現依賴於JMS提供者。除瞭通用的借口,還有基於隊列(QueueConnection)和基於主題(TopicConnection)專用接口。
      3.會話(Session)
         用於創建消息的生產者、消費者和消息。它們都是單線程,能參加事務。有QueueSession和TopicSession。
      4.消息(Message)
         消息是消息服務器在客戶端之間發送的一條條信息。有五種接口,不同的類型消息。1. StreamMessage — Java原始值的數據流  2. MapMessage–一組名/值對  3.TextMessage–一個字符串對象 4. ObjectMessage–一個序列化的 Java對象 5.BytesMessage–一個未解釋字節的數據流。
         消息由以下幾部分組成:
              消息頭(header):JMS消息頭包含瞭許多字段,它們是消息發送後由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,並且為消息確定路由。
              屬性(property):由消息發送者產生,用來添加刪除消息頭以外的附加信息。
              消息體(body):由消息發送者產生。
       5.目標(Destination)
          目標是受控對象。在JMS中表示一個隊列或者一個主題。
       6.消息生產者(MessageProducer)
          是用於將消息發送到目標的對象,由會話對象創建,有QueueSender、TopicPublisher.
       7.消息消費者(MessageConsumer)
          是由會話對象創建,用於從目標獲取消息,有QueueReceiver、TopicSubscriber
 
    一個JMS應用是幾個JMS 客戶端交換消息,開發JMS客戶端應用由以下幾步構成:
      1) 用JNDI 得到ConnectionFactory對象;
      2) 用ConnectionFactory創建Connection 對象;
      3) 用Connection對象創建一個或多個JMS Session;
      4) 用JNDI 得到目標隊列或主題對象,即Destination對象;
      5) 用Session 和Destination 創建MessageProducer和MessageConsumer;
      6) 通知Connection 開始傳遞消息。
 消息生產者程序如下
[java]
package org.jms.test; 
import java.io.*; 
mport javax.jms.*; 
import javax.naming.*; 
public class Sender { 
    public static void main(String[] args) { 
        new Sender().send(); 
    } 
    public void send() { 
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
        try { 
            //Prompt for JNDI names 
            System.out.println("Enter ConnectionFactory name:"); 
            String factoryName = reader.readLine(); 
            System.out.println("Enter Destination name:"); 
            String destinationName = reader.readLine(); 
            //Look up administered objects 
            InitialContext initContext = new InitialContext(); 
            ConnectionFactory factory = 
                (ConnectionFactory) initContext.lookup(factoryName); 
            Destination destination = (Destination) initContext.lookup(destinationName); 
            initContext.close(); 
            //Create JMS objects 
            Connection connection = factory.createConnection(); 
            Session session = 
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
            MessageProducer sender = session.createProducer(queue); 
            //Send messages 
            String messageText = null; 
            while (true) { 
                System.out.println("Enter message to send or 'quit':"); 
                messageText = reader.readLine(); 
                if ("quit".equals(messageText)) 
                    break; 
                TextMessage message = session.createTextMessage(messageText); 
                sender.send(message); 
            } 
            //Exit 
            System.out.println("Exiting…"); 
            reader.close(); 
            connection.close(); 
            System.out.println("Goodbye!"); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            System.exit(1); 
        } 
    } 

 
消息消費者程序如下
[java]
package compute; 
import java.io.*; 
import javax.jms.*; 
import javax.naming.*; 
public class Receiver implements MessageListener { 
    private boolean stop = false; 
    public static void main(String[] args) { 
        new Receiver().receive(); 
    }  
    public void receive() { 
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
        try { 
            //Prompt for JNDI names 
            System.out.println("Enter ConnectionFactory name:"); 
            String factoryName = reader.readLine(); 
            System.out.println("Enter Destination name:"); 
            String destinationName = reader.readLine(); 
            reader.close(); 
            //Look up administered objects 
            InitialContext initContext = new InitialContext(); 
            ConnectionFactory factory = 
                (ConnectionFactory) initContext.lookup(factoryName); 
            Destination destination = (Destination) initContext.lookup(destinationName); 
            initContext.close(); 
            //Create JMS objects 
            Connection connection = factory.createConnection(); 
            Session session = 
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
            MessageConsumer receiver = session.createConsumer(queue); 
            receiver.setMessageListener(this); 
            connection.start(); 
            //Wait for stop 
            while (!stop) { 
                Thread.sleep(1000); 
            } 
            //Exit 
            System.out.println("Exiting…"); 
            connection.close(); 
            System.out.println("Goodbye!"); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            System.exit(1); 
        } 
    } 
    public void onMessage(Message message) { 
        try { 
            String msgText = ((TextMessage) message).getText(); 
            System.out.println(msgText); 
            if ("stop".equals(msgText)) 
                stop = true; 
        } catch (JMSException e) { 
            e.printStackTrace(); 
            stop = true; 
        } 
    } 

作者:luqin1988

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *