做网站 传视频 用什么笔记本好电商网页
代码基于主从Reactor多线程
这一线程模型编写,加上客户端总共只有5个类。服务启动后,会创建一个主Reactor线程,负责接受新连接,4个从Reactor线程,负责I/O读写,另外还有一个线程池,里面有200个线程,负责具体的业务处理。
使用方法:先运行Server类,再运行Client类,然后在Client端控制台输入信息并回车,就会接收到来自Server端的响应信息(可以认为是实现了ECHO协议)。当在Client端输入exit
并回车后,连接将会断开,Client端进程将会终止,此时Server端会继续运行。
代码如下:
package cn.cjc.netty.pkg3.msm;/*** 主从Reactor多线程** @author chenjc* @since 2019-01-03*/
public class Server {public static void main(String[] args) throws Exception {new Acceptor().startup();}
}
package cn.cjc.netty.pkg3.msm;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;/*** @author chenjc* @since 2015/8/18*/
public class Client {public static void main(String[] args) throws IOException {Socket socket = new Socket("localhost", 8888);BufferedReader console = new BufferedReader(new InputStreamReader(System.in));BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);String msg;while ((msg = console.readLine()) != null) {if (msg.equals("exit")) {socket.close();break;}writer.println(msg);System.out.println(reader.readLine());}}
}
package cn.cjc.netty.pkg3.msm;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.Set;/*** 主Reactor** @author chenjc* @since 2015/8/18*/
public class Acceptor {private Selector selector;private Thread[] threads = new Thread[4];private IOReactor[] dispatchers = new IOReactor[4];public Acceptor() throws Exception {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.socket().bind(new InetSocketAddress(8888));ssc.configureBlocking(false);selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);}public void startup() throws Exception {for (int i = 0; i < 4; i++) {dispatchers[i] = new IOReactor();}for (int i = 0; i < 4; i++) {threads[i] = new Thread(dispatchers[i]);threads[i].start();}while (true) {try {// 此处阻塞selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {handle(selectionKey);}selectionKeys.clear();} catch (Exception e) {e.printStackTrace();}}}private void handle(SelectionKey selectionKey) throws Exception {if (selectionKey.isAcceptable()) {System.out.println("accept");ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel = ssc.accept();System.out.println("开始连接:" + socketChannel);// 随机挑选一个I/O线程int i = new Random().nextInt(4);dispatchers[i].addChannel(socketChannel);}}
}
package cn.cjc.netty.pkg3.msm;import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 从Reactor** @author chenjc* @since 2019-01-03*/
public class IOReactor implements Runnable {private Selector selector;private Queue<SocketChannel> newChannels = new ConcurrentLinkedQueue<>();private static Charset charset = Charset.defaultCharset();private static ExecutorService workerPool = Executors.newFixedThreadPool(200);public IOReactor() throws Exception {selector = Selector.open();}@Overridepublic void run() {while (true) {try {int readyCount = selector.select(1000);if (readyCount > 0) {// 处理读写事件processEvents(selector.selectedKeys());}// 处理新的连接processNewChannels();} catch (Exception e) {e.printStackTrace();}}}private void processNewChannels() throws Exception {SocketChannel channel;while ((channel = newChannels.poll()) != null) {channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_READ);}}private void processEvents(Set<SelectionKey> selectionKeys) throws Exception {for (SelectionKey selectionKey : selectionKeys) {handle(selectionKey);}selectionKeys.clear();}private void handle(SelectionKey selectionKey) throws Exception {if (selectionKey.isReadable()) {System.out.println("read");handleRead(selectionKey);} else if (selectionKey.isWritable()) {System.out.println("write");handleWrite(selectionKey);}}private void handleWrite(SelectionKey selectionKey) throws Exception {Object attachment = selectionKey.attachment();selectionKey.attach(null);String resp = (String) attachment;ByteBuffer buf = charset.encode(resp);SocketChannel socketChannel = (SocketChannel) selectionKey.channel();socketChannel.write(buf);int ops = selectionKey.interestOps();// 取消对写事件的监听selectionKey.interestOps(ops ^ SelectionKey.OP_WRITE);}private void handleRead(SelectionKey selectionKey) throws Exception {SocketChannel socketChannel = (SocketChannel) selectionKey.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int len = socketChannel.read(buffer);if (len > 0) {buffer.flip();String req = charset.decode(buffer).toString();// 交给工作线程处理workerPool.execute(new Worker(req, selectionKey));} else if (len < 0) {System.out.println("关闭连接:" + socketChannel);selectionKey.cancel();socketChannel.close();}}public void addChannel(SocketChannel socketChannel) {newChannels.add(socketChannel);selector.wakeup();}
}
package cn.cjc.netty.pkg3.msm;import java.nio.channels.SelectionKey;/*** 业务处理** @author chenjc* @since 2019-01-05*/
public class Worker implements Runnable {private String req;private SelectionKey selectionKey;public Worker(String req, SelectionKey selectionKey) {this.req = req;this.selectionKey = selectionKey;}@Overridepublic void run() {String resp = "echo:" + req;selectionKey.attach(resp);fireWrite();}private void fireWrite() {int ops = selectionKey.interestOps();// 添加对写事件的监听selectionKey.interestOps(ops | SelectionKey.OP_WRITE);// 唤醒selectorselectionKey.selector().wakeup();}
}