JAVA NIO多線程服務器1.2版 – JAVA編程語言程序開發技術文章

Reactor 模式的 JAVA NIO 多線程服務器

public class MiniServer extends Thread
{
    private static final Log log = LogFactory.getLog(MiniServer.class);
   
    private final Selector s;
    private final ServerSocketChannel ssc;
    private ExecutorService executor;
   
    public MiniServer(int portnumber,ExecutorService executor) throws IOException
    {
        this.executor=executor;
        s = Selector.open();
        ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(portnumber));
        ssc.configureBlocking(false);
        ssc.register(s,SelectionKey.OP_ACCEPT);
    }
   
    public void run()
    {
        try
        {
            while(s.isOpen())
            {
                int nKeys=s.select();
                if(nKeys>0)
                {
                    Iterator<SelectionKey> it = s.selectedKeys().iterator();
                    while (it.hasNext())
                    {
                        SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid() || !key.channel().isOpen())
                            continue;
                        if(key.isAcceptable())
                        {
                            SocketChannel sc = ssc.accept();
                            if (sc != null)
                            {
                                sc.configureBlocking(false);
                                sc.register(s, SelectionKey.OP_READ, new Reader(executor));
                            }
                        }
                        else if(key.isReadable()||key.isWritable())
                        {
                            Reactor reactor = (Reactor) key.attachment();
                            reactor.execute(key);
                        }
                    }
                }
            }
        }
        catch(IOException e)
        {
            log.info(e);
        }
    }
}


public interface Reactor
{
    void execute(SelectionKey key);
}


public class Reader implements Reactor
{
    private static final Log log = LogFactory.getLog(Reader.class);
   
    private byte[] bytes=new byte[0];
    private ExecutorService executor;
   
    public Reader(ExecutorService executor)
    {
        this.executor=executor;
    }
   
    @Override
    public void execute(SelectionKey key)
    {
        SocketChannel sc = (SocketChannel) key.channel();
        try
        {
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            int len=-1;
            while(sc.isConnected() && (len=sc.read(buffer))>0)
            {
                buffer.flip();
                  byte [] content = new byte[buffer.limit()];
                buffer.get(content);
                bytes=NutUtil.ArrayCoalition(bytes,content);
                buffer.clear();
            }
            if(len==0)
            {
                key.interestOps(SelectionKey.OP_READ);
                key.selector().wakeup();
            }
            else if(len==-1)
            {
                Callable<byte[]> call=new ProcessCallable(bytes);
                Future<byte[]> task=executor.submit(call);
                ByteBuffer output=ByteBuffer.wrap(task.get());
                sc.register(key.selector(), SelectionKey.OP_WRITE, new Writer(output));
            }
        }
        catch(Exception e)
        {
            log.info(e);
        }
    }
}


public class Writer implements Reactor
{
    private static final Log log = LogFactory.getLog(Writer.class);
   
    private ByteBuffer output;
   
    public Writer(ByteBuffer output)
    {
        this.output=output;
    }
   
    public void execute(SelectionKey key)
    {
        SocketChannel sc = (SocketChannel) key.channel();
        try
        {
            while(sc.isConnected() && output.hasRemaining())
            {
                int len=sc.write(output);
                if(len<0)
                {
                    throw new EOFException();
                }
                if(len==0)
                {
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.selector().wakeup();
                    break;
                }
            }
            if(!output.hasRemaining())
            {
                output.clear();
                key.cancel();
                sc.close();
            }
        }
        catch(IOException e)
        {
            log.info(e);
        }
    }
}

發佈留言