TCP服务-非阻塞式(NIO)

 2024-08-01    0 条评论    201 浏览

nio

普通TCP服务-NIO

1. 创建ServerSocketChannel

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式

ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(port)); // 绑定端口

2. 创建Selector

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册接收连接事件

3. 处理连接和事件循环

while (true) {
    selector.select(); // 阻塞直到有事件就绪
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        
        if (key.isAcceptable()) {
            // 有新的连接
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel clientChannel = server.accept();
            clientChannel.configureBlocking(false); // 设置为非阻塞
            
            // 注册读事件
            clientChannel.register(selector, SelectionKey.OP_READ);
            
            // 可以在这里做一些初始化工作,如记录连接信息等
        } else if (key.isReadable()) {
            // 有数据可读
            SocketChannel clientChannel = (SocketChannel) key.channel();
            
            // 读取数据
            // 这里可以自行实现数据的读取和处理逻辑
            
            // 如果需要保持长连接,可以继续在这里监听读取事件并处理数据
        }
        iterator.remove();
    }
}

4. 数据读取和处理

在上面的示例中,当 key.isReadable() 触发时,表示有数据可读,你可以在这个分支内部实现具体的数据读取逻辑和业务处理逻辑。保持长连接的关键在于,不要在每次读写完成后关闭连接,而是保持 SocketChannel 对象处于活动状态,继续监听 OP_READ 事件。

5. 异常处理和连接关闭

在实际开发中,还需要考虑异常处理和连接的关闭管理。例如,在捕获到 IOException 或其他网络异常时,需要适当处理,可以选择关闭相关的 SocketChannel 或进行重连等操作。

完整代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.nio.channels.Selector;

public class NIOExample {

    public static void main(String[] args) {
        try {
            // 创建一个 Selector
            Selector selector = Selector.open();

            // 创建一个 ServerSocketChannel,并绑定到指定端口
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8080));
			// 非阻塞
            serverSocketChannel.configureBlocking(false);

            // 将 ServerSocketChannel 注册到 Selector 上,并指定对 OP_ACCEPT 事件感兴趣
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                // Selector 开始监听事件,阻塞直到有事件发生
                selector.select();

                // 获取发生的事件集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                // 遍历事件并处理
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();

                    if (key.isAcceptable()) {
                        // 有连接可以接受
                        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = serverChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("接受到新连接:" + socketChannel.getRemoteAddress());
                    } else if (key.isReadable()) {
                        // 有数据可读
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int bytesRead = socketChannel.read(buffer);
                        if (bytesRead > 0) {
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String message = new String(bytes);
                            System.out.println("接收到消息:" + message);

                            // TODO: 处理接收到的消息,例如回复客户端
                        }
                    }

                    // 处理完毕后移除事件,避免重复处理
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

这段代码演示了如何使用 NIO 实现一个简单的非阻塞服务器。关键点包括:

  1. 创建 Selector 并将 ServerSocketChannel 注册到 Selector 上,指定对 OP_ACCEPT 事件感兴趣。
  2. 在循环中调用 selector.select() 阻塞等待事件,一旦有事件发生,会返回,并且可以通过 selectedKeys() 获取发生的事件集合。
  3. 遍历事件集合,根据事件类型进行处理。例如,如果是 OP_ACCEPT 事件,表示有新的连接可以接受;如果是 OP_READ 事件,表示有数据可读取。
  4. 对于可读事件,可以通过 SocketChannel 读取数据到 ByteBuffer 中,然后处理接收到的数据。

这种方式相比传统的阻塞 I/O,在高并发情况下能够更高效地处理多个连接,避免了为每个连接创建一个线程的开销。在实际应用中,还可以根据具体需求进一步优化和扩展。

TCP长链接样例

在TCP中,长连接指的是客户端与服务器之间的持久连接,而不是简单的连接一次就断开。

1. 服务器端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class TCPServer {

    public static void main(String[] args) {
        try {
            // 创建一个 Selector
            Selector selector = Selector.open();

            // 创建 ServerSocketChannel,并设置为非阻塞模式
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);

            // 绑定端口
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));

            // 注册 ServerSocketChannel 到 Selector,并指定关注事件为 OP_ACCEPT
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("Server started...");

            while (true) {
                // 阻塞等待就绪的 Channel,这里的超时时间可以根据实际需求设置
                selector.select();

                // 获取就绪的 SelectionKey 集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                // 遍历 SelectionKey 集合处理就绪的事件
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();

                    if (key.isAcceptable()) {
                        // 如果是 OP_ACCEPT 事件,创建新的连接,并将新创建的 SocketChannel 注册到 Selector
                        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = serverChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("Accepted new connection from client: " + socketChannel.getRemoteAddress());

                    } else if (key.isReadable()) {
                        // 如果是 OP_READ 事件,读取数据
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int bytesRead = socketChannel.read(buffer);

                        if (bytesRead == -1) {
                            // 如果客户端断开连接
                            socketChannel.close();
                            System.out.println("Client disconnected: " + socketChannel.getRemoteAddress());
                        } else if (bytesRead > 0) {
                            // 处理读取到的数据,这里简单打印出来
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String message = new String(bytes);
                            System.out.println("Received message from client: " + message);

                            // 可以根据业务逻辑进行响应操作,如回复消息
                            // ByteBuffer responseBuffer = ByteBuffer.wrap("Response message".getBytes());
                            // socketChannel.write(responseBuffer);
                        }
                    }

                    // 处理完毕后,不移除 SelectionKey,保持长连接
                    // keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2. 客户端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class TCPClient {

    public static void main(String[] args) {
        try {
            // 创建一个 SocketChannel,并设置为非阻塞模式
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);

            // 连接到服务器
            socketChannel.connect(new InetSocketAddress("localhost", 8080));

            // 等待连接完成
            while (!socketChannel.finishConnect()) {
                // 可以做一些其他的事情
            }

            System.out.println("Connected to server.");

            // 向服务器发送数据
            String message = "Hello, Server!";
            ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
            socketChannel.write(buffer);

            // 可以根据需要,接收来自服务器的响应
            ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
            int bytesRead = socketChannel.read(responseBuffer);
            if (bytesRead > 0) {
                responseBuffer.flip();
                byte[] bytes = new byte[responseBuffer.remaining()];
                responseBuffer.get(bytes);
                String response = new String(bytes);
                System.out.println("Received response from server: " + response);
            }

            // 关闭连接
            socketChannel.close();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

修改说明

在服务器端的修改中,去掉了处理完毕后移除 SelectionKey 的代码 keyIterator.remove()。这样做可以保持 SelectionKey 在集合中,确保长连接的持久性。这样,服务器将能够持续监听并处理客户端的消息,而不是仅处理一次就断开连接。

客户端的示例保持不变,仅仅是连接到服务器,发送一条消息,并接收服务器的响应。

这样修改后,服务器和客户端之间的连接就是长连接了,可以持续通信而不是一次性的连接。