以下是 Kafka 不同认证方式的配置示例,结合前面的单表设计方案,展示如何为每种认证方式填充配置表
1. SSL 认证配置示例
sql
INSERT INTO kafka_configs (cluster_name,description,bootstrap_servers,auth_type,security_protocol,truststore_data,truststore_password,keystore_data,keystore_password,additional_properties ) VALUES ('kafka-ssl-cluster','SSL认证的Kafka集群','kafka1:9093,kafka2:9093','SSL','SSL',-- 证书二进制数据decode('MIIDHzCCAgegAwIBAgIJAK...', 'base64'),crypt('truststore_password', gen_salt('bf')),decode('MIIDHzCCAgegAwIBAgIJAK...', 'base64'),crypt('keystore_password', gen_salt('bf')),'{"ssl.endpoint.identification.algorithm": "https","ssl.protocol": "TLSv1.3","ssl.keystore.type": "PKCS12","ssl.truststore.type": "JKS"}' );
2. SASL/PLAIN 认证配置示例
sql
INSERT INTO kafka_configs (cluster_name,description,bootstrap_servers,auth_type,security_protocol,sasl_mechanism,sasl_username,sasl_password_hash,sasl_salt,additional_properties ) VALUES ('kafka-sasl-plain-cluster','SASL/PLAIN认证的Kafka集群','kafka1:9094,kafka2:9094','SASL_PLAIN','SASL_SSL','PLAIN','admin',-- 密码哈希: SHA-256 + 盐值encode(digest('password' || 'random_salt', 'sha256'), 'hex'),'random_salt','{"ssl.endpoint.identification.algorithm": "https","sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"password\";"}' );
3. SASL/SCRAM 认证配置示例
sql
INSERT INTO kafka_configs (cluster_name,description,bootstrap_servers,auth_type,security_protocol,sasl_mechanism,sasl_username,sasl_password_hash,sasl_salt,sasl_iterations,additional_properties ) VALUES ('kafka-sasl-scram-cluster','SASL/SCRAM认证的Kafka集群','kafka1:9094,kafka2:9094','SASL_SCRAM','SASL_SSL','SCRAM-SHA-512','admin',-- SCRAM哈希值 (需要使用SCRAM算法生成)'nThb8a8vT7QJZQJZQJZQJZQJZQJZQJZQJZQJZQ==','random_salt_123456',4096,'{"ssl.endpoint.identification.algorithm": "https","sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"password\";"}' );
4. SASL/Kerberos (GSSAPI) 认证配置示例
sql
INSERT INTO kafka_configs (cluster_name,description,bootstrap_servers,auth_type,security_protocol,sasl_mechanism,kerberos_config,service_name,additional_properties ) VALUES ('kafka-kerberos-cluster','Kerberos认证的Kafka集群','kafka1:9093,kafka2:9093','SASL_KERBEROS','SASL_SSL','GSSAPI',-- krb5.conf配置内容' [libdefaults]default_realm = EXAMPLE.COMdns_lookup_realm = falsedns_lookup_kdc = false[realms]EXAMPLE.COM = {kdc = kerberos.example.com:88admin_server = kerberos.example.com:749}[domain_realm].example.com = EXAMPLE.COMexample.com = EXAMPLE.COM','kafka','{"ssl.endpoint.identification.algorithm": "https","sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/security/keytabs/kafka_client.keytab\" principal=\"kafka-client@EXAMPLE.COM\";","sasl.kerberos.service.name": "kafka","kerberos.principal.to.local.rules": "DEFAULT"}' );
5. OAUTHBEARER 认证配置示例
sql
INSERT INTO kafka_configs (cluster_name,description,bootstrap_servers,auth_type,security_protocol,sasl_mechanism,additional_properties ) VALUES ('kafka-oauth-cluster','OAuth认证的Kafka集群','kafka1:9096,kafka2:9096','SASL_OAUTHBEARER','SASL_SSL','OAUTHBEARER','{"ssl.endpoint.identification.algorithm": "https","sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username=\"client-id\" password=\"client-secret\" metadataServerUrls=\"https://auth-server:8080/realms/kafka/protocol/openid-connect/token\";","sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"}' );
使用说明
敏感信息安全:
- 密码字段(如 truststore_password)使用 PostgreSQL 的
crypt()
函数进行 BCrypt 加密- SASL 密码使用 SHA-256 加盐哈希(实际生产环境建议使用更强的算法如 Argon2)
二进制数据存储:
- 证书数据使用
decode()
函数将 Base64 字符串转换为二进制存储- 大文件建议存储在文件系统,数据库仅存储路径和元数据
配置验证:
- 插入数据前需验证配置完整性(如选择 SSL 认证时,SSL 相关字段不能为空)
- 对于复杂配置(如 Kerberos),建议在应用层进行格式验证
证书管理:
- 证书有效期需通过程序定期检查(可利用 certificates 表的 expires_at 字段)
- 证书更新时需同时更新数据库记录和文件存储
这些示例展示了如何为不同认证方式填充配置表,实际应用中需根据具体环境调整参数。