普通的 socket 通信

我们用 Python 来实现一个简单的 TCP 服务器,它实现 echo 功能。

服务端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socketserver

class MyTcpHandler(socketserver.BaseRequestHandler):

    def handle(self) -> None:
        self.data = self.request.recv(1024).strip()
        self.request.sendall(self.data)


HOST, PORT = "localhost", 9999

with socketserver.TCPServer((HOST, PORT), MyTcpHandler) as server:
    server.serve_forever()

客户端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket

HOST, PORT = "localhost", 9999


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    sock.sendall(bytes("Hello, I am Client", "utf-8"))
    received = str(sock.recv(1024), "utf-8")
    print(received)

增加 TLS 支持

SSL/TLS相关概念及流程 里面比较详细的介绍了 SSL 概念。

单向认证

证书我们直接通过 openssl 生成

使用 openssl 生成证书

测试不需要真的让CA签发证书,所以我们用 openssl 生成自签名的根证书

  1. 生成 CA 私钥

    1
    
    openssl genrsa -des3 -out ca.key 2048
    

    -des3 使用 des3 算法,强度为2048, 输出到 ca.key ,生成过程中输入密码 123456

  2. 生成自签名的 CA 证书

    1
    
    openssl req -new -x509 -days 36500 -key ca.key -out ca.crt
    

    openssl 会要求填写下面的信息

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    Country Name (2 letter code) [AU]:CN // 国家码
    State or Province Name (full name) [Some-State]:GuangDong // 省份
    Locality Name (eg, city) []:GuangZhou // 城市
    Organization Name (eg, company) [Internet Widgits Pty Ltd]: Personal // 组织机构或公司名
    Organizational Unit Name (eg, section) []: Personal // 机构部门
    Common Name (e.g. server FQDN or YOUR name) []:域名 // *.abc.com
    Email Address []:[email protected]  // 邮件地址
    
    Please enter the following 'extra' attributes
    to be sent with your certificate request
    A challenge password []:123456 // 证书密码
    An optional company name []:Personal // 公司名
    

    这里要注意一点:上面的步骤好像全程没有公钥参与,实际上公钥就在证书中。下面的命令查看证书中的公钥:

    1
    
    openssl x509 -in .\ca.crt -pubkey -noout
    
  3. 生成服务端的私钥

    1
    
    openssl genrsa -out server.key 2048
    

    输出到 server.key ,不用密码保护

  4. 生成服务端的证书签名请求CSR文件

    1
    
    openssl req -new -key server.key -out server.csr
    

    同样要填相关信息

  5. 使用上面生成的 CA 给服务端签证书

    1
    
    openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 36500
    

    x509: 证书格式 -days 36500: 过期时间 -in: 指定请求文件

服务端支持 tls

python 直接用 ssl 库

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import socketserver
import ssl

class MyTcpHandler(socketserver.BaseRequestHandler):

    def handle(self) -> None:
        self.data = self.request.recv(1024).strip()
        self.request.sendall(self.data)

context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile="server.crt", keyfile="server.key")

class MyTCPServer(socketserver.TCPServer):
    def get_request(self):
        (sock, addr) = super().get_request()
        return (context.wrap_socket(sock=sock, server_side=True), addr)

HOST, PORT = "127.0.0.1", 9999

with MyTCPServer((HOST, PORT), MyTcpHandler) as server:
    server.serve_forever()

可以看到,服务端 SSL 就需要刚刚生成的 server.crt 证书,server.key 私钥

客户端访问 SSL 服务端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import ssl

HOST, PORT = "127.0.0.1", 9999

context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.maximum_version = ssl.TLSVersion.TLSv1_2
context.load_default_certs()
context.load_verify_locations("ca.crt")

with socket.create_connection((HOST, PORT)) as sock:
    with context.wrap_socket(sock) as ssock:
        ssock.sendall(bytes("Hello, I am Client", "utf-8"))
        received = str(ssock.recv(1024), "utf-8")
        print(received)

我们这里不检测 hostname 。因为我们的证书是自签名的,所以要将验证位置为 “ca.crt” 才能验证成功

双向认证

双向认证服务器也需要认证客户端的身份,同样我们先生成客户端的私钥和证书。

服务端代码,区别只在需要验证客户端证书:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socketserver
import ssl

class MyTcpHandler(socketserver.BaseRequestHandler):

    def handle(self) -> None:
        self.data = self.request.recv(1024).strip()
        self.request.sendall(self.data)

context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.verify_mode = ssl.VerifyMode.CERT_REQUIRED # 需要验证客户端证书

context.load_cert_chain(certfile="server.crt", keyfile="server.key")
context.load_verify_locations("ca.crt")

class MyTCPServer(socketserver.TCPServer):
    def get_request(self):
        (sock, addr) = super().get_request()
        return (context.wrap_socket(sock=sock, server_side=True), addr)

HOST, PORT = "127.0.0.1", 9999

with MyTCPServer((HOST, PORT), MyTcpHandler) as server:
    server.serve_forever()

客户端代码,客户端需要设置证书与密钥:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import ssl

HOST, PORT = "127.0.0.1", 9999

context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.maximum_version = ssl.TLSVersion.TLSv1_2
context.load_cert_chain(certfile="client.crt", keyfile="client.key", password="123456")
context.load_verify_locations("ca.crt")

with socket.create_connection((HOST, PORT)) as sock:
    with context.wrap_socket(sock) as ssock:
        ssock.sendall(bytes("Hello, I am Client", "utf-8"))
        received = str(ssock.recv(1024), "utf-8")
        print(received)

问题解决办法

如果过程中遇到问题,可以用 Wireshark 抓包查看是哪一步有问题,相关的理论支撑:SSL/TLS相关概念及流程