JAVA TCP通信
在UDP通信一文里面,我们介绍过基本的通信原理,但是TCP通信不像UDP通信那样直接发就行了,TCP通信需要各种流嵌套。同时TCP里面的socket和UDP的socket还有些许不一样,在TCP通信里面,有两种用到的socket,一个是专门监听的socket一个是发消息的socket,同时在服务节点,还会维护一个socket队列,每次监听socket监听到连接请求后就建立一个socket并将其加入socket队列,然后每次发送消息的时候就从socket队列中取出一个socket,然后通过这个socket发送消息,同时,一般来说,发送消息的socket和监听的socket使用的是同一个端口。我们可以看出,因为接收和发送是两个阶段,我们在这儿多线程就可以派上用场了,直接在新的线程里面将接收到的socket作为参数传递给子线程,然后子线程再进行回传消息。而这里的TCP通信服务器端的多线程要不就直接多线程,或者用线程池。
首先是最标准版本的客户端
import java.io.*;
import java.net.*;
public class EchoClient {
public static void main(String[] args) throws Exception {
String userInput = null;
String echoMessage = null;
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
Socket socket = new Socket("127.0.0.1", 8189);
System.out.println("Connected to Server");
InputStream inStream = socket.getInputStream();
OutputStream outStream = socket.getOutputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
PrintWriter out = new PrintWriter(outStream);
while ((userInput = stdIn.readLine()) != null) {
out.println(userInput);
out.flush();
echoMessage = in.readLine();
System.out.println("Echo from server: " + echoMessage);
}
socket.close();
}
}
最标准版本的服务端
import java.io.*;
import java.net.*;
public class EchoServer {
public static void main(String[] args) throws Exception {
Socket clientSocket = null;
ServerSocket listenSocket = new ServerSocket(8189);
System.out.println("Server listening at 8189");
clientSocket = listenSocket.accept();
System.out.println("Accepted connection from client");
InputStream inStream = clientSocket.getInputStream();
OutputStream outStream = clientSocket.getOutputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
PrintWriter out = new PrintWriter(outStream);
String line = null;
while((line=in.readLine())!=null) {
System.out.println("Message from client:" + line);
out.println(line);
out.flush();
}
clientSocket.close();
listenSocket.close();
}
}
我们可以看出,和UDP客户端自己构建Datagrampacket不同的是,接收和发送都是从最底层的byte构建来的。
首先我们分析下代码中出现的几个Stream
- InputStream:字节流,该流里面的数据以字节为单位,直接提取数据,数据源可以是socket,也可以是硬盘文件。
- InputStreamReader:字符流,读取的对象为字节流。
- BufferedReader:带缓冲的字符流,支持以行为单位进行读写。目的是方便字符处理。
- PrintWriter:字符流,用法和InputStreamReader类似,不过是向外输出。
- OutputStream:字节流,以字节的方式向外输出,可以是socket也可以是文件。
在读取的代码中,我们可以看到,直接从键盘读取的代码为最底层的字节流为System.in,然后一层层向上包装,最后通过一个while循环反复读取到一个string中。
从socket中读取的代码为:InputStream inStream = socket.getInputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(inStream));此时的字节流为socket,通过同样的方式将数据读取到一个string中,最后将这个string输出。
同样,对于输出端,构建流的代码为OutputStream outStream = clientSocket.getOutputStream();
PrintWriter out = new PrintWriter(outStream);由于不存在反复输出的情况,都是由数据就输出,此时未构造带缓冲的输出流。直接通过输出将数据输出到socket中。
从这儿我们可以看出,java基本上对于数据的处理喜欢将数据最底层抽象化,统一将数据作为字节流处理,然后根据需要再对字节流进行相应的处理。
接下来我们看看多线程版本的服务端
import java.io.*;
import java.net.*;
public class MultiThreadEchoServer {
public static void main(String[] args) throws Exception {
ServerSocket listenSocket = new ServerSocket(8189);
Socket socket = null;
int count = 0;
System.out.println("Server listening at 8189");
while (true) {
socket = listenSocket.accept();
count++;
System.out.println("the total number of clients is" + count);
ServerThread serverthread = new ServerThread(socket);
serverthread.run();
}
}
}
class ServerThread extends Thread {
Socket socket = null;
public ServerThread(Socket socket) {
this.socket = socket;
}
public void run() {
InputStream is = null;
InputStreamReader isr = null;
BufferedReader br = null;
OutputStream os = null;
PrintWriter pw = null;
try {
is = socket.getInputStream();
isr = new InputStreamReader(is);
br = new BufferedReader(isr);
os = socket.getOutputStream();
pw = new PrintWriter(os);
String info = null;
while ((info = br.readLine()) != null) {
System.out.println("Message from client: " + info);
pw.println(info);
pw.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
/*
* if (pw != null) pw.close(); if (os != null) os.close(); if (br != null)
* br.close(); if (isr != null) isr.close(); if (is != null) is.close();
*/
if (socket != null)
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
其他的都和普通版本的服务端一样,就是注意下,java多线程启动的时候跑的是class里面的run函数,启动的方式为调用start方法。再就是构造函数的时候将接收达到的socket实例传递给构造器。并且这个ServerThread继承了Thread类。
最后是线程池版本的服务端
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
public class ThreadPoolEchoServer {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
ServerSocket listenSocket = new ServerSocket(8189);
Socket socket = null;
int count = 0;
System.out.println("Server listening at 8189");
while (true) {
socket = listenSocket.accept();
count++;
System.out.println("The total number of clients is " + count);
ServerThread serverthread = new ServerThread(socket);
executor.execute(serverthread);
}
}
}
class ServerThread implements Runnable {
private Socket socket = null;
public ServerThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStream is = null;
InputStreamReader isr = null;
BufferedReader br = null;
OutputStream os = null;
PrintWriter pw = null;
try {
is = socket.getInputStream();
isr = new InputStreamReader(is);
br = new BufferedReader(isr);
os = socket.getOutputStream();
pw = new PrintWriter(os);
String info = null;
while ((info = br.readLine()) != null) {
System.out.println("Message from client: " + info);
pw.println(info);
pw.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
线程池版本的其他都和多线程版本的一样,就是开启多线程的方式,线程池版本的ServerThread继承了一个Runnable接口类。
ThreadPoolExecutor 参数解析
ThreadPoolExecutor 主要有以下几个参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数说明
(1)corePoolSize 核心线程数量 即使没有任务执行,核心线程也会一直存活 线程数小于核心线程时,即使有空闲线程,线程沲也会创建新线程执行任务 设置 allowCoreThreadTimeout=true 时,核心线程会超时关闭 。
(2)maximumPoolSize 最大线程数 当所有核心线程都在执行任务,且任务队列已满时,线程沲会创建新线程执行任务。 当线程数=maxPoolSize, 且 任 务 队 列 已 满 , 此 时 添 加 任 务 时 会 触 发 RejectedExecutionHandler 进行处理。
(3)keepAliveTime TimeUnit 线程空闲时间
如果线程数>corePoolSize,且有线程空闲时间达到 keepAliveTime 时,线程会销毁,直 到线程数量=corePoolSize 如果设置 allowCoreThreadTimeout=true 时,核心线程执行完任务也会销毁直到数量=0 。
(4) workQueue 任务队列。
我个人的理解是任务队列就好像排队打饭,队列就是排队的过程,最大线程数就是可以多少人同时打饭,就好像有几个窗口本来就存在,但是只是一般时候没有开放,核心线程数就是一般开放的打饭窗口数。
我们上面的代码只使用了前四个参数,进一步说明下,当我们任务超过核心线程数量的时候我们会继续添加线程,最后直到最大线程数量。我们自己的代码里面申请了5个核心线程,10个最大线程,还有任务队列的长度为5。