未分类

JAVA TCP通信

在UDP通信一文里面,我们介绍过基本的通信原理,但是TCP通信不像UDP通信那样直接发就行了,TCP通信需要各种流嵌套。同时TCP里面的socket和UDP的socket还有些许不一样,在TCP通信里面,有两种用到的socket,一个是专门监听的socket一个是发消息的socket,同时在服务节点,还会维护一个socket队列,每次监听socket监听到连接请求后就建立一个socket并将其加入socket队列,然后每次发送消息的时候就从socket队列中取出一个socket,然后通过这个socket发送消息,同时,一般来说,发送消息的socket和监听的socket使用的是同一个端口。我们可以看出,因为接收和发送是两个阶段,我们在这儿多线程就可以派上用场了,直接在新的线程里面将接收到的socket作为参数传递给子线程,然后子线程再进行回传消息。而这里的TCP通信服务器端的多线程要不就直接多线程,或者用线程池。

 

首先是最标准版本的客户端


import java.io.*;
import java.net.*;

public class EchoClient {

    public static void main(String[] args) throws Exception {

        String userInput = null;
        String echoMessage = null;

        BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));

        Socket socket = new Socket("127.0.0.1", 8189);
        System.out.println("Connected to Server");

        InputStream inStream = socket.getInputStream();
        OutputStream outStream = socket.getOutputStream();
        BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
        PrintWriter out = new PrintWriter(outStream);

        while ((userInput = stdIn.readLine()) != null) {
            out.println(userInput);
            out.flush();
            echoMessage = in.readLine();
            System.out.println("Echo from server: " + echoMessage);
        }

        socket.close();

    }
}

最标准版本的服务端


import java.io.*;
import java.net.*;

public class EchoServer {
     public static void main(String[] args) throws Exception {
        Socket clientSocket = null;
        ServerSocket listenSocket = new ServerSocket(8189);
               
        System.out.println("Server listening at 8189");
        clientSocket = listenSocket.accept();
        System.out.println("Accepted connection from client");
       
        InputStream inStream = clientSocket.getInputStream();
        OutputStream outStream = clientSocket.getOutputStream();
        BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
        PrintWriter out = new PrintWriter(outStream);
       
        String line = null;
        while((line=in.readLine())!=null) {
            System.out.println("Message from client:" + line);
            out.println(line);
            out.flush();
        }
        clientSocket.close();
        listenSocket.close();
     }
}

我们可以看出,和UDP客户端自己构建Datagrampacket不同的是,接收和发送都是从最底层的byte构建来的。

首先我们分析下代码中出现的几个Stream

  • InputStream:字节流,该流里面的数据以字节为单位,直接提取数据,数据源可以是socket,也可以是硬盘文件。
  • InputStreamReader:字符流,读取的对象为字节流。
  • BufferedReader:带缓冲的字符流,支持以行为单位进行读写。目的是方便字符处理。
  • PrintWriter:字符流,用法和InputStreamReader类似,不过是向外输出。
  • OutputStream:字节流,以字节的方式向外输出,可以是socket也可以是文件。
     
     
    在读取的代码中,我们可以看到,直接从键盘读取的代码为

    
    
    
    

    最底层的字节流为System.in,然后一层层向上包装,最后通过一个while循环反复读取到一个string中。
     
    从socket中读取的代码为:

    
    
    InputStream inStream = socket.getInputStream();
    BufferedReader in = new BufferedReader(new InputStreamReader(inStream));

    此时的字节流为socket,通过同样的方式将数据读取到一个string中,最后将这个string输出。
     
    同样,对于输出端,构建流的代码为

    
    
    OutputStream outStream = clientSocket.getOutputStream();
    PrintWriter out = new PrintWriter(outStream);

    由于不存在反复输出的情况,都是由数据就输出,此时未构造带缓冲的输出流。直接通过输出将数据输出到socket中。

 

从这儿我们可以看出,java基本上对于数据的处理喜欢将数据最底层抽象化,统一将数据作为字节流处理,然后根据需要再对字节流进行相应的处理。

 

接下来我们看看多线程版本的服务端


import java.io.*;
import java.net.*;

public class MultiThreadEchoServer {
    public static void main(String[] args) throws Exception {
        ServerSocket listenSocket = new ServerSocket(8189);
        Socket socket = null;

        int count = 0;
        System.out.println("Server listening at 8189");

        while (true) {
            socket = listenSocket.accept();
            count++;
            System.out.println("the total number of clients is" + count);
            ServerThread serverthread = new ServerThread(socket);
            serverthread.run();
        }
    }
}

class ServerThread extends Thread {
    Socket socket = null;

    public ServerThread(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        InputStream is = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        OutputStream os = null;
        PrintWriter pw = null;
        try {
            is = socket.getInputStream();
            isr = new InputStreamReader(is);
            br = new BufferedReader(isr);
            os = socket.getOutputStream();
            pw = new PrintWriter(os);
            String info = null;
            while ((info = br.readLine()) != null) {
                System.out.println("Message from client: " + info);
                pw.println(info);
                pw.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                /*
                 * if (pw != null) pw.close(); if (os != null) os.close(); if (br != null)
                 * br.close(); if (isr != null) isr.close(); if (is != null) is.close();
                 */

                if (socket != null)
                    socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

其他的都和普通版本的服务端一样,就是注意下,java多线程启动的时候跑的是class里面的run函数,启动的方式为调用start方法。再就是构造函数的时候将接收达到的socket实例传递给构造器。并且这个ServerThread继承了Thread类。

 

最后是线程池版本的服务端


import java.io.*;
import java.net.*;
import java.util.concurrent.*;

public class ThreadPoolEchoServer {
    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
        ServerSocket listenSocket = new ServerSocket(8189);
        Socket socket = null;

        int count = 0;
        System.out.println("Server listening at 8189");

        while (true) {
            socket = listenSocket.accept();
            count++;
            System.out.println("The total number of clients is " + count);
            ServerThread serverthread = new ServerThread(socket);
            executor.execute(serverthread);
        }
    }
}

class ServerThread implements Runnable {
    private Socket socket = null;

    public ServerThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream is = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        OutputStream os = null;
        PrintWriter pw = null;
        try {
            is = socket.getInputStream();
            isr = new InputStreamReader(is);
            br = new BufferedReader(isr);
            os = socket.getOutputStream();
            pw = new PrintWriter(os);
            String info = null;
            while ((info = br.readLine()) != null) {
                System.out.println("Message from client: " + info);
                pw.println(info);
                pw.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (socket != null)
                    socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

线程池版本的其他都和多线程版本的一样,就是开启多线程的方式,线程池版本的ServerThread继承了一个Runnable接口类。

ThreadPoolExecutor 参数解析

ThreadPoolExecutor 主要有以下几个参数:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

参数说明

(1)corePoolSize 核心线程数量 即使没有任务执行,核心线程也会一直存活 线程数小于核心线程时,即使有空闲线程,线程沲也会创建新线程执行任务 设置 allowCoreThreadTimeout=true 时,核心线程会超时关闭 。

(2)maximumPoolSize 最大线程数 当所有核心线程都在执行任务,且任务队列已满时,线程沲会创建新线程执行任务。 当线程数=maxPoolSize, 且 任 务 队 列 已 满 , 此 时 添 加 任 务 时 会 触 发 RejectedExecutionHandler 进行处理。

(3)keepAliveTime TimeUnit 线程空闲时间
如果线程数>corePoolSize,且有线程空闲时间达到 keepAliveTime 时,线程会销毁,直 到线程数量=corePoolSize 如果设置 allowCoreThreadTimeout=true 时,核心线程执行完任务也会销毁直到数量=0 。

(4) workQueue 任务队列。

我个人的理解是任务队列就好像排队打饭,队列就是排队的过程,最大线程数就是可以多少人同时打饭,就好像有几个窗口本来就存在,但是只是一般时候没有开放,核心线程数就是一般开放的打饭窗口数。

我们上面的代码只使用了前四个参数,进一步说明下,当我们任务超过核心线程数量的时候我们会继续添加线程,最后直到最大线程数量。我们自己的代码里面申请了5个核心线程,10个最大线程,还有任务队列的长度为5。

Leave a Reply

邮箱地址不会被公开。 必填项已用*标注