普通的 socket 通信
我们用 Python 来实现一个简单的 TCP 服务器,它实现 echo 功能。
服务端代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socketserver
class MyTcpHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
self.data = self.request.recv(1024).strip()
self.request.sendall(self.data)
HOST, PORT = "localhost", 9999
with socketserver.TCPServer((HOST, PORT), MyTcpHandler) as server:
server.serve_forever()
|
客户端代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
| #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
HOST, PORT = "localhost", 9999
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((HOST, PORT))
sock.sendall(bytes("Hello, I am Client", "utf-8"))
received = str(sock.recv(1024), "utf-8")
print(received)
|
增加 TLS 支持
单向认证
证书我们直接通过 openssl 生成
使用 openssl 生成证书
测试不需要真的让 CA 签发证书,所以我们用 openssl 生成自签名的根证书
生成 CA 私钥
1
| openssl genrsa -des3 -out ca.key 2048
|
-des3
使用 des3 算法,强度为 2048, 输出到 ca.key ,生成过程中输入密码 123456
生成自签名的 CA 证书
1
| openssl req -new -x509 -days 36500 -key ca.key -out ca.crt
|
openssl 会要求填写下面的信息
1
2
3
4
5
6
7
8
9
10
11
12
| Country Name (2 letter code) [AU]:CN // 国家码
State or Province Name (full name) [Some-State]:GuangDong // 省份
Locality Name (eg, city) []:GuangZhou // 城市
Organization Name (eg, company) [Internet Widgits Pty Ltd]: Personal // 组织机构或公司名
Organizational Unit Name (eg, section) []: Personal // 机构部门
Common Name (e.g. server FQDN or YOUR name) []:域名 // *.abc.com
Email Address []:[email protected] // 邮件地址
Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:123456 // 证书密码
An optional company name []:Personal // 公司名
|
这里要注意一点:上面的步骤好像全程没有公钥参与,实际上公钥就在证书中。下面的命令查看证书中的公钥:
1
| openssl x509 -in .\ca.crt -pubkey -noout
|
生成服务端的私钥
1
| openssl genrsa -out server.key 2048
|
输出到 server.key ,不用密码保护
生成服务端的证书签名请求 CSR 文件
1
| openssl req -new -key server.key -out server.csr
|
同样要填相关信息
使用上面生成的 CA 给服务端签证书
1
| openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 36500
|
x509: 证书格式
-days 36500: 过期时间
-in: 指定请求文件
服务端支持 tls
python 直接用 ssl 库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import socketserver
import ssl
class MyTcpHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
self.data = self.request.recv(1024).strip()
self.request.sendall(self.data)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile="server.crt", keyfile="server.key")
class MyTCPServer(socketserver.TCPServer):
def get_request(self):
(sock, addr) = super().get_request()
return (context.wrap_socket(sock=sock, server_side=True), addr)
HOST, PORT = "127.0.0.1", 9999
with MyTCPServer((HOST, PORT), MyTcpHandler) as server:
server.serve_forever()
|
可以看到,服务端 SSL 就需要刚刚生成的 server.crt
证书,=server.key= 私钥
客户端访问 SSL 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import ssl
HOST, PORT = "127.0.0.1", 9999
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.maximum_version = ssl.TLSVersion.TLSv1_2
context.load_default_certs()
context.load_verify_locations("ca.crt")
with socket.create_connection((HOST, PORT)) as sock:
with context.wrap_socket(sock) as ssock:
ssock.sendall(bytes("Hello, I am Client", "utf-8"))
received = str(ssock.recv(1024), "utf-8")
print(received)
|
我们这里不检测 hostname 。因为我们的证书是自签名的,所以要将验证位置为 “ca.crt” 才能验证成功
双向认证
双向认证服务器也需要认证客户端的身份,同样我们先生成客户端的私钥和证书。
服务端代码,区别只在需要验证客户端证书:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socketserver
import ssl
class MyTcpHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
self.data = self.request.recv(1024).strip()
self.request.sendall(self.data)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.verify_mode = ssl.VerifyMode.CERT_REQUIRED # 需要验证客户端证书
context.load_cert_chain(certfile="server.crt", keyfile="server.key")
context.load_verify_locations("ca.crt")
class MyTCPServer(socketserver.TCPServer):
def get_request(self):
(sock, addr) = super().get_request()
return (context.wrap_socket(sock=sock, server_side=True), addr)
HOST, PORT = "127.0.0.1", 9999
with MyTCPServer((HOST, PORT), MyTcpHandler) as server:
server.serve_forever()
|
客户端代码,客户端需要设置证书与密钥:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import ssl
HOST, PORT = "127.0.0.1", 9999
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.maximum_version = ssl.TLSVersion.TLSv1_2
context.load_cert_chain(certfile="client.crt", keyfile="client.key", password="123456")
context.load_verify_locations("ca.crt")
with socket.create_connection((HOST, PORT)) as sock:
with context.wrap_socket(sock) as ssock:
ssock.sendall(bytes("Hello, I am Client", "utf-8"))
received = str(ssock.recv(1024), "utf-8")
print(received)
|