在android裡面用的smack包其實叫做asmack,該包提供瞭兩種不同的連接方式:socket和httpclient。該並且提供瞭很多操作xmpp協議的API,也方便各種不同自定義協議的擴展。我們不需要自己重新去定義一套接收機制來擴展新的協議,隻需繼承然後在類裡處理自己的協議就可以瞭。而本文今天主要說兩點,一點就是消息是如何接收的,另一點就是消息是如何通知事件的。
總的思路
1.使用socket連接服務器
2.將XmlPullParser的數據源關聯到socket的InputStream
3.啟動線程不斷循環處理消息
4.將接收到的消息解析xml處理封裝好成一個Packet包
5.將包廣播給所有註冊事件監聽的類
逐步擊破
(聲明在看下面的文章時,最好先理解一下smack的使用,這樣才能達到深入的理解)
(謹記:上圖隻顯示本文章解釋所要用到的類和方法,減縮瞭一些跟本文主題無關的代碼,隻留一條貫穿著從建立連接到接收消息的線。)
解析這塊東西打算從最初的調用開始作為入口,抽絲剝繭,逐步揭開。
1.
PacketListener packetListener = new PacketListener() {
@Override
public void processPacket(Packet packet) {
System.out
.println("Activity—-processPacket" + packet.toXML());
}
};
PacketFilter packetFilter = new PacketFilter() {
@Override
public boolean accept(Packet packet) {
System.out.println("Activity—-accept" + packet.toXML());
return true;
}
};
解釋:創建包的監聽以及包的過濾,當有消息到時就會廣播到所有註冊的監聽,當然前提是要通過packetFilter的過濾。
2.
connection = new XMPPConnection();
XMPPConnection在這構造函數裡面主要配置ip地址和端口(super(new ConnectionConfiguration("169.254.141.109", 9991));)
3.
connection.addPacketListener(packetListener, packetFilter);
connection.connect();
註冊監聽,開始初始化連接。
4.
public void connect() {
// Stablishes the connection, readers and writers
connectUsingConfiguration(config);
}5.
private void connectUsingConfiguration(ConnectionConfiguration config) {
String host = config.getHost();
int port = config.getPort();
try {
this.socket = new Socket(host, port);
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
initConnection();
}通過之前設置的ip和端口,建立socket對象
6.
protected void initDebugger() {
Class<?> debuggerClass = null;
try {
debuggerClass = Class.forName("com.simualteSmack.ConsoleDebugger");
Constructor<?> constructor = debuggerClass.getConstructor(
Connection.class, Writer.class, Reader.class);
debugger = (SmackDebugger) constructor.newInstance(this, writer,
reader);
reader = debugger.getReader();
} catch (ClassNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (Exception e) {
throw new IllegalArgumentException(
"Can't initialize the configured debugger!", e);
}
}private void initReaderAndWriter() {
try {
reader = new BufferedReader(new InputStreamReader(socket
.getInputStream(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
initDebugger();
}private void initConnection() {
// Set the reader and writer instance variables
initReaderAndWriter();
packetReader = new PacketReader(this);
addPacketListener(debugger.getReaderListener(), null);
// Start the packet reader. The startup() method will block until we
// get an opening stream packet back from server.
packetReader.startup();
}從三個方法可以看出,建立reader和writer的對象關聯到socket的InputStream,實例化ConsoleDebugger,該類主要是打印出接收到的消息,給reader設置瞭一個消息的監聽。接著建立PacketReader對象,並啟動。PacketReader主要負責消息的處理和通知
7.
public class PacketReader {
private ExecutorService listenerExecutor;
private boolean done;
private XMPPConnection connection;
private XmlPullParser parser;
private Thread readerThread;
protected PacketReader(final XMPPConnection connection) {
this.connection = connection;
this.init();
}
/**
* Initializes the reader in order to be used. The reader is initialized
* during the first connection and when reconnecting due to an abruptly
* disconnection.
*/
protected void init() {
done = false;
readerThread = new Thread() {
public void run() {
parsePackets(this);
}
};
readerThread.setName("Smack Packet Reader ");
readerThread.setDaemon(true);
// create an executor to deliver incoming packets to listeners.
// we will use a single thread with an unbounded queue.
listenerExecutor = Executors
.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,
"smack listener processor");
thread.setDaemon(true);
return thread;
}
});
resetParser();
}
/**
* Starts the packet reader thread and returns once a connection to the
* server has been established. A connection will be attempted for a maximum
* of five seconds. An XMPPException will be thrown if the connection fails.
*
*/
public void startup() {
readerThread.start();
}
/**
* Shuts the packet reader down.
*/
public void shutdown() {
done = true;
// Shut down the listener executor.
listenerExecutor.shutdown();
}
private void resetParser() {
try {
parser = XmlPullParserFactory.newInstance().newPullParser();
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
parser.setInput(connection.reader);
} catch (XmlPullParserException xppe) {
xppe.printStackTrace();
}
}
/**
* Parse top-level packets in order to process them further.
*
* @param thread
* the thread that is being used by the reader to parse incoming
* packets.
*/
private void parsePackets(Thread thread) {
try {
int eventType = parser.getEventType();
do {
if (eventType == XmlPullParser.START_TAG) {
if (parser.getName().equals("message")) {
processPacket(PacketParserUtils.parseMessage(parser));
}
System.out.println("START_TAG");
} else if (eventType == XmlPullParser.END_TAG) {
System.out.println("END_TAG");
}
eventType = parser.next();
} while (!done && eventType != XmlPullParser.END_DOCUMENT
&& thread == readerThread);
} catch (Exception e) {
e.printStackTrace();
if (!done) {
}
}
}
private void processPacket(Packet packet) {
if (packet == null) {
return;
}
// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector : connection.getPacketCollectors()) {
collector.processPacket(packet);
}
// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}
/**
* A runnable to notify all listeners of a packet.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : connection.recvListeners
.values()) {
listenerWrapper.notifyListener(packet);
}
}
}
}創建該類時就初始化線程和ExecutorService ,接著調用resetParser() 方法為parser設置輸入源(這裡是重點,parser的數據都是通過這裡獲取),調用startup啟動線程,循環監聽parser,如果接收到消息根據消息協議的不同將調用PacketParserUtils類裡的不同方法,這裡調用parseMessage()該方法主要處理message的消息,在該方法裡分析message消息並返回packet包。返回的包將調用processPacket方法,先通知所有註冊瞭PacketCollector的監聽,接著消息(listenerExecutor.submit(new ListenerNotification(packet)); )傳遞給所有註冊瞭PacketListener的監聽。這樣在activity開始之前註冊的那個監聽事件就會觸發,從而完成瞭整個流程。
7以上.
剩下的就是一些輔助包,很簡單。比如PacketCollector 這個類,它的用處主要用來處理一些需要在發送後需要等待一個答復這樣的請求。
protected synchronized void processPacket(Packet packet) {
System.out.println("PacketCollector—processPacket");
if (packet == null) {
return;
}
if (packetFilter == null || packetFilter.accept(packet)) {
while (!resultQueue.offer(packet)) {
resultQueue.poll();
}
}
}public Packet nextResult(long timeout) {
long endTime = System.currentTimeMillis() + timeout;
System.out.println("nextResult");
do {
try {
return resultQueue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { /* ignore */
}
} while (System.currentTimeMillis() < endTime);
return null;
}
該方法就是將獲取到的包,先過濾然後放到隊列裡,最後通過nextResult來獲取包,這樣就完成一個請求收一個答復。
這樣整個流程就完成瞭,最後總結一下,如圖(就這麼簡單^0^):