贷款织梦网站模板,wordpress 干净主题,宿迁建设安全监督站网站,做网站需要什么按钮目录 1. 说明
2. 服务器规划
3. docker-compose文件
kafka{i}.yaml kafka-ui.yaml
4. kafka-ui配置集群监控
5. 参数表
6. 测试脚本
生产者-异步生产: AsyncKafkaProducer1.py
消费者-异步消费: AsyncKafkaConsumer1.py
7. 参考 1. 说明
创建一个本地开发环境所需的k…目录 1. 说明
2. 服务器规划
3. docker-compose文件
kafka{i}.yaml kafka-ui.yaml
4. kafka-ui配置集群监控
5. 参数表
6. 测试脚本
生产者-异步生产: AsyncKafkaProducer1.py
消费者-异步消费: AsyncKafkaConsumer1.py
7. 参考 1. 说明
创建一个本地开发环境所需的kafka集群分布在3个虚拟机上以docker容器方式互联互通
2. 服务器规划
Host端口备注 host001.dev.sb 9092, 9093, 9081 kafka ui 访问 kafka0 节点 host002.dev.sb9092, 9093kafka1 节点host003.dev.sb9092, 9093kafka2 节点
3. docker-compose文件
kafka{i}.yaml
- 其中 {i} 对应0,1,2
- 用户密码都配在文件里面
services:kafka:image: bitnami/kafka:3.6.2container_name: kafka{i}hostname: kafka{i}restart: alwaysports:- 9092:9092- 9093:9093environment:# KRaft- KAFKA_CFG_NODE_ID{i}- KAFKA_CFG_PROCESS_ROLEScontroller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS0kafka0:9093,1kafka1:9093,2kafka2:9093- KAFKA_KRAFT_CLUSTER_IDsbcluster01-mnopqrstuv# Listeners- KAFKA_CFG_LISTENERSINTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPINTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT- KAFKA_CFG_ADVERTISED_LISTENERSINTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092- KAFKA_CFG_CONTROLLER_LISTENER_NAMESCONTROLLER- KAFKA_CFG_NUM_PARTITIONS3- KAFKA_CFG_INTER_BROKER_LISTENER_NAMEINTERNAL# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR2# Log- KAFKA_CFG_LOG_RETENTION_HOURS 72# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLPLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLPLAIN- KAFKA_CFG_SASL_ENABLED_MECHANISMSPLAIN- KAFKA_CONTROLLER_USERkfkuser- KAFKA_CONTROLLER_PASSWORDyouknow- KAFKA_INTER_BROKER_USERkfkuser- KAFKA_INTER_BROKER_PASSWORDyouknow- KAFKA_CLIENT_USERSkfkuser- KAFKA_CLIENT_PASSWORDSyouknow# Others- TZAsia/Shanghaivolumes:- /data0/Server/Db/kafka0:/bitnami/kafkaextra_hosts: - kafka0:172.16.20.60- kafka1:172.16.20.61- kafka2:172.16.20.62 kafka-ui.yaml
services:kafka-ui:image: provectuslabs/kafka-ui:mastercontainer_name: kafka-uirestart: alwaysports:- 9081:8080environment:- KAFKA_CLUSTERS_0_NAMElocal- DYNAMIC_CONFIG_ENABLEDtrue- AUTH_TYPELOGIN_FORM- SPRING_SECURITY_USER_NAMEadmin- SPRING_SECURITY_USER_PASSWORDyouknowextra_hosts: - kafka0:172.16.20.60- kafka1:172.16.20.61- kafka2:172.16.20.62
4. kafka-ui配置集群监控 5. 参数表
参数说明KAFKA_CFG_PROCESS_ROLES kafka角色做broker, controller 示例 KAFKA_CFG_PROCESS_ROLEScontroller,broker KAFKA_KRAFT_CLUSTER_ID集群id, 同属节点需一样KAFKA_CFG_CONTROLLER_QUORUM_VOTERS投票选举列表KAFKA_CFG_CONTROLLER_LISTENER_NAMES控制器名称KAFKA_CFG_NUM_PARTITIONS默认分区数KAFKA_CFG_LISTENERS监听器的地址和端口KAFKA_CFG_ADVERTISED_LISTENERS发布监听器的地址和端口KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP监听器的协议 这里sasl_plain表示 仅认证加密 传输不加密KAFKA_CLIENT_USERS加密客户端账号KAFKA_CLIENT_PASSWORDS加密客户端密码#ClusteringKAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORKafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移 量KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORKafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRKafka 内部使用的 __transaction_state 主题的最小 ISRIn-Sync Replicas数量。ISR 是与 leader 保持同步的副本集合#LogKAFKA_CFG_LOG_DIRS日志目录KAFKA_CFG_LOG_RETENTION_HOURS数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理默认168小时一周时间
6. 测试脚本
生产者-异步生产: AsyncKafkaProducer1.py
from confluent_kafka import Producer
import jsondef delivery_report(err, msg):Called once for each message produced to indicate delivery result.Triggered by poll() or flush().if err is not None:print(fMessage delivery failed: {err})else:print(fMessage delivered to {msg.topic()} [{msg.partition()}])def create_async_producer(config):Creates an instance of an asynchronous Kafka producer.return Producer(config)def produce_messages(producer, topic, messages):Asynchronously produces messages to a Kafka topic.for message in messages:# Trigger any available delivery report callbacks from previous produce() callsproducer.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.producer.produce(topic, json.dumps(message).encode(utf-8), callbackdelivery_report)# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.producer.flush()if __name__ __main__:# Kafka configuration# Replace these with your servers configurationconf {bootstrap.servers: host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092,client.id: PythonProducer,security.protocol: SASL_PLAINTEXT,sasl.mechanisms: PLAIN,sasl.username: kfkuser,sasl.password: youknow,}# Create an asynchronous Kafka producerasync_producer create_async_producer(conf)# Messages to send to Kafkamessages_to_send [{key: value1a}, {key: value2a}, {key: value3a}]# Produce messagesproduce_messages(async_producer, zx001.msg.user, messages_to_send)消费者-异步消费: AsyncKafkaConsumer1.py
from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json
import logging
from datetime import datetime# 设置日志格式%()表示日志参数
log_format %(message)s
logging.basicConfig(filenamelogs/kafka_messages1.log, formatlog_format, levellogging.INFO
)async def consume_loop(consumer, topics):try:# 订阅主题consumer.subscribe(topics)while True:# 轮询消息msg consumer.poll(timeout1.0)if msg is None:continueif msg.error():if msg.error().code() KafkaError._PARTITION_EOF:# End of partition eventprint(%% %s [%d] reached end at offset %d\n% (msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:# 正常消息raw_message msg.value()# print(fRaw message: {raw_message})str_msg raw_message.decode(utf-8)parsed_message json.loads(str_msg)parsed_message[time] datetime.now().strftime(%Y-%m-%d %H:%M:%S)print(fReceived message: {type(parsed_message)} : {parsed_message})json_data json.dumps(parsed_message, ensure_asciiFalse)logging.info({}.format(json_data))await asyncio.sleep(0.01) # 小睡片刻让出控制权finally:# 关闭消费者consumer.close()async def consume():# 消费者配置conf {bootstrap.servers: host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092,group.id: MsgGroup2,auto.offset.reset: earliest,client.id : PythonConsumer,security.protocol : SASL_PLAINTEXT,sasl.mechanisms : PLAIN,sasl.username : kfkuser,sasl.password : youknow}# 创建消费者consumer Consumer(conf)await consume_loop(consumer, [zx001.msg.user])if __name__ __main__:asyncio.run(consume())7. 参考
- Apache Kafka® Quick Start - Local Install With Docker
- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub
- https://juejin.cn/post/7187301063832109112