深圳营销型网站建设公司,免费海报制作网站,如何搭建一个个人网站,学编程的培训机构一、说明
在大数据处理和分析中 Apache Kafka 已经成为了一个核心组件。然而在生产环境中部署 Kafka 时#xff0c;安全性是一个必须要考虑的重要因素。SASL#xff08;简单认证与安全层#xff09;和 SCRAM#xff08;基于密码的认证机制的盐化挑战响应认证机制#xff…
一、说明
在大数据处理和分析中 Apache Kafka 已经成为了一个核心组件。然而在生产环境中部署 Kafka 时安全性是一个必须要考虑的重要因素。SASL简单认证与安全层和 SCRAM基于密码的认证机制的盐化挑战响应认证机制提供了一种方法来增强 Kafka 集群的安全性。
本文将从零开始部署 ZooKeeper 和 Kafka 并通过配置 SASL/SCRAM 和 ACL访问控制列表来增强 Kafka 的安全性。 二、Kafka 的安全机制
kafka 社区在 0.9.0.0 版本正式添加了安全特性可以满足各种安全性的要求包括
Kafka 与 ZooKeeper 之间的安全通信Kafka 集群 ZooKeeper 之间的安全通信客户端与服务端之间的安全通信消息级别的权限控制可以控制客户端生产者或消费者的读写操作权限。 认证方式引入版本适用场景SSL0.9.0SSL做信道加密比较多SSL认证不如SASL所以一般都会使用SSL来做通信加密。SASL/GSSAPI0.9.9主要是给 Kerberos 使用的。如果你的公司已经做了 Kerberos 认证比如使用 Active Directory那么使用 GSSAPI 是最方便的了。因为你不需要额外地搭建 Kerberos只要让你们的 Kerberos 管理员给每个 Broker 和要访问 Kafka 集群的操作系统用户申请 principal 就好了。SASL/PLAIN0.10.2简单的用户名密码认证通常与SSL结合使用对于小公司来说没必要搭建公司级别的Kerberos使用它就比较合适。SASL/SCRAM0.10.2PLAIN的加强版本支持动态的用户增减。Deleation Token1.1Delegation Token 是在 1.1 版本引入的它是一种轻量级的认证机制主要目的是补充现有的 SASL 或 SSL 认证。如果要使用 Delegation Token你需要先配置好 SASL 认证然后再利用 Kafka 提供的 API 去获取对应的 Delegation Token。这样 Broker 和客户端在做认证的时候可以直接使用这个 token不用每次都去 KDC 获取对应的 ticketKerberos 认证或传输 Keystore 文件SSL 认证。SASL/OAUTHBEARER2.0OAuth 2框架的集成。 三、环境和软件准备
从 Apache Kafka 官网 下载对应版本的 Kafka 并解压到你选择的目录。 确保已经安装 Java 才能运行 Kafka可以通过运行 java -version 来检查 Java 环境。 四、部署 Zookeeper
使用 Kafka 内置的 Zookeeper
4.1. 启用 SASL 认证
进入 config 目录修改 zookeeper.properties 配置文件增加以下内容
authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew3600000
requireClientAuthSchemesasl
zookeeper.sasl.clienttrue4.2. 配置 JAAS
在 config 目录下创建 zk_jaas.conf 文件内容如下
Server {org.apache.zookeeper.server.auth.DigestLoginModule requiredusernameadminpasswordadminuser_adminadminuser_zkclientzkclient;
};其作用是创建了一个 Server 节点其中
org.apache.zookeeper.server.auth.DigestLoginModule required 是认证逻辑的处理类username、password 是zookeeper之间通讯的用户名和密码user_adminadmin 的结构是 user_[username][password] 定义 kafka-brokerzookeeper客户端连接到 zookeeper 时用的用户名和密码。 注意Server 内部最后一行的 ; 和 } 后的 ; 不能缺少 4.3. 修改启动文件
进入 bin 目录修改 zookeeper-server-start.sh 文件
在 export KAFKA_HEAP_OPTS 配置项的参数后添加 JAAS 的配置
export KAFKA_HEAP_OPTS-Xmx512M -Xms512M -Djava.security.auth.login.config../config/zk_jaas.conf4.4. 启动 Zookeeper
执行命令./zookeeper-server-start.sh -daemon ../config/zookeeper.properties -daemon 参数配置后台运行 4.5. 测试
可以从官网 Apache ZooKeeper 下载对应版本的 ZooKeeper 并解压
添加 JAAS 配置在 confi 目录下创建 zk_client_jaas.conf 文件
Client{org.apache.zookeeper.server.auth.DigestLoginModule requiredusernamezkclientpasswordzkclient;
};修改 bin 目录下的 zkCli.sh 文件在启动命令中增加 JAAS 的配置
$JAVA -Dzookeeper.log.dir${ZOO_LOG_DIR} -Dzookeeper.root.logger${ZOO_LOG4J_PROP} -Dzookeeper.log.file${ZOO_LOG_FILE} \-cp $CLASSPATH $CLIENT_JVMFLAGS $JVMFLAGS \-Djava.security.auth.login.config../conf/zk_client_jaas.conf \org.apache.zookeeper.ZooKeeperMain $执行 zkCli.sh 连接本机已经启动好的 ZooKeeper
进入 Kafka 的 log 目录查看内置 zk 的日志 zookeeper.out 显示以下内容
INFO adding SASL authorization for authorizationID: zkclient (org.apache.zookeeper.server.ZooKeeperServer)代表 ZooKeeper 的 SASL 认证已经配置成功。 五、部署 Kafka
5.1. 配置 Kafka Broker
进入 config 目录修改 server.properties 配置文件增加以下内容
listenersSASL_PLAINTEXT://:9092
advertised.listenersSASL_PLAINTEXT://localhost:9092
security.inter.broker.protocolSASL_PLAINTEXT
sasl.mechanism.inter.broker.protocolSCRAM-SHA-256
sasl.enabled.mechanismsSCRAM-SHA-256
authorizer.class.namekafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.foundfalse
super.usersUser:adminauthorizer.class.name 开启 ACL 授权机制并指定实现类allow.everyone.if.no.acl.found 如果没有找到ACL访问控制列表配置是否允许任何操作这里设置为 false 指除了超级管理员其他用户必须配置 ACL 才能访问资源super.users 超级管理员无需配置 ACL 拥有所有权限的用户。 5.2. 配置 JAAS
在 config 目录下创建 kafka_server_jaas.conf 文件内容如下
KafkaServer {org.apache.kafka.common.security.scram.ScramLoginModule requiredusernameadminpasswordadmin;
};Client{org.apache.kafka.common.security.plain.PlainLoginModule requiredusernamezkclientpasswordzkclient;
};KafkaServer 中的 usernamepassword 用于 Kafka 集群 Broker 节点之间通信用的账号密码KafkaServer 中的 user_testtest 用于 Kafka 客户端producerconsumer连接broker时用该配置下user_[username][password]结构配置的账号密码登录Client 用于 broker 和 zookeeper 之间的认证对应 zk_jaas.conf 中的 【user_zkclient“zkclient”】 配置user_adminadmin 的结构是 user_[username][password] 定义 kafka-brokerzookeeper客户端连接到 zookeeper 时用的用户名和密码。 5.3. 修改启动文件
进入 bin 目录修改 kafka-server-start.sh 文件
在 export KAFKA_HEAP_OPTS 配置项的参数后添加 JAAS 的配置
export KAFKA_HEAP_OPTS-Xmx1G -Xms1G -Djava.security.auth.login.config../config/kafka_server_jaas.conf5.4. 创建 SCRAM 用户
在启动 Kafka 之前需要先创建好用户在 bin 目录下执行以下内容 分别创建 admin(超级管理员) 和 test(客户端用户) ./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256[passwordadmin] --entity-type users --entity-name admin./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256[passwordtest] --entity-type users --entity-name testSASL/SCRAM 认证的用户信息是动态创建存储在 ZooKeeper 中 由于上面的配置 kafka_server_jaas.conf 中 Broker 之间的通信是通过 admin 用户的如果该用户不存在会 启动报错。 5.5. 启动 Kafka
执行命令./kafka-server-start.sh -daemon ../config/server.properties -daemon 参数配置后台运行 六、验证 SASL/SCRAM 鉴权
6.1. 客户端认证配置
6.1.1. 管理员配置
进入 config 目录创建 cmd.properties 内容如下
security.protocolSASL_PLAINTEXT
sasl.mechanismSCRAM-SHA-256
sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required usernameadmin passwordadmin;配置认证的类型以及登录逻辑的处理类和用户使用超级管理员 admin 注意 最后的 ; 是必须加上的。 6.1.2. 生产者配置
修改 config 目录下的 producer.properties 增加以下内容
security.protocolSASL_PLAINTEXT
sasl.mechanismSCRAM-SHA-256
sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required usernameadmin passwordadmin;生产者也使用超级管理员 admin 来发送消息。 6.1.3. 消费者配置
修改 config 目录下的 consumer.properties 增加以下内容
security.protocolSASL_PLAINTEXT
sasl.mechanismSCRAM-SHA-256
sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required usernametest passwordtest;消费者使用 test 用户来接收消息。 6.2. 创建topic
在 bin 目录下执行以下命令
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1 --replication-factor 1 --command-config ../config/cmd.propertiesbootstrap-server 配置 Kafka 服务端的地址topic 指定topic名称command-config 指定命令的认证配置这里使用上面创建的 管理员配置
创建成功后可以通过以下命令查看存在的 topic 清单
./kafka-topics.sh --bootstrap-server localhost:9092 --list --command-config ../config/cmd.properties6.3. 创建消费者
6.3.1. 执行 kafka-console-consumer
在 bin 目录下执行以下命令
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties执行命令后会发现以下 报错 信息
ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256
Processed a total of 0 messagesAuthentication failed 认证失败由于消费者的认证使用的是 test 用户而该用户还未配置任何 ACL 权限。 6.3.2. 配置用户 ACL 权限
Kafka 的 ACL (Access Control Lists) 允许你定义哪些用户可以访问哪些主题并且可以执行哪些操作如读、写、创建、删除等。
执行以下命令
./kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --add --allow-principal User:test --operation Read --topic test --group test-consumer-group为 test 用户在资源 topic[test] 下分配只读权限 执行成功可以通过以下命令查看资源所分配的所有 ACL 清单
./kafka-acls.sh --bootstrap-server localhost:9092 --topic test --list --command-config ../config/cmd.properties重新创建消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties执行成功后该 shell 窗口会一直阻塞等待消息。 6.4. 创建生产者
新开一个 shell 窗口 在 bin 目录下执行以下命令
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config ../config/producer.properties由于生产者的认证使用的是 admin 为 超级管理员 所以无需配置 ACL 权限。 执行成功后会出现 符号输入内容之后切换到 消费者 窗口就可以看到了。