网站内链怎么布局,廊坊电子商务网站建设,电子工程网络,哈尔滨网站设计快速建站官方文档#xff1a;http://kafka.apache.org/documentation.html
(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群) Kafka3台集群搭建环境#xff1a;
操作系统: centos7
防火墙#xff1a;全关
3台zookeeper集群内的机器#xff0c;1台logstash
软件版本: …
官方文档http://kafka.apache.org/documentation.html
(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群) Kafka3台集群搭建环境
操作系统: centos7
防火墙全关
3台zookeeper集群内的机器1台logstash
软件版本: zookeeper-3.4.12.tar.gz
软件版本kafka_2.12-2.1.0.tgz 安装软件
(3台zookeeper集群的机器)
# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/
# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka 创建数据目录(3台)
# mkdir /data/kafka-logs 修改第一台配置文件
(注意不同颜色标记的部分)
# egrep -v ^$|^# /usr/local/kafka/config/server.properties
broker.id1 #当前机器在集群中的唯一标识和zookeeper的myid性质一样
listenersPLAINTEXT://192.168.148.141:9092 #监听套接字
num.network.threads3 #这个是borker进行网络处理的线程数
num.io.threads8 #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes102400 #发送缓冲区buffer大小数据不是一下子就发送的先回存储到缓冲区了到达一定的大小后在发送能提高性能
socket.receive.buffer.bytes102400 #kafka接收缓冲区大小当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数这个值不能超过java的堆栈大小
log.dirs/data/kafka-logs #消息存放的目录这个目录可以配置为“”逗号分割的表达式上面的num.io.threads要大于这个目录的个数
#如果配置多个目录新创建的topic把消息持久化在分区数最少那一个目录中
num.partitions1 #默认的分区数一个topic默认1个分区数
num.recovery.threads.per.data.dir1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量默认1
offsets.topic.replication.factor2
transaction.state.log.replication.factor1
transaction.state.log.min.isr1
log.retention.hours168 #默认消息的最大持久化时间168小时7天
message.max.byte5242880 #消息保存的最大值5M
default.replication.factor2 #kafka保存消息的副本数
replica.fetch.max.bytes5242880 #取消息的最大字节数
log.segment.bytes1073741824 #这个参数是因为kafka的消息是以追加的形式落地到文件当超过这个值的时候kafka会新起一个文件
log.retention.check.interval.ms300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有删除
zookeeper.connect192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181 zookeeper.connection.timeout.ms6000
group.initial.rebalance.delay.ms0 修改另外两台配置文件
#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/
broker.id2
listenersPLAINTEXT://192.168.148.142:9092 # scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/
broker.id3
listenersPLAINTEXT://192.168.148.143:9092 启动kafka(3台)
[roothost1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties 查看启动情况(3台)
[roothost1 ~]# jps
10754 QuorumPeerMain
11911 Kafka
12287 Jps 创建topic来验证
[roothost1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien
出现Created topic cien验证成功运行 在一台服务器上创建一个发布者
[roothost2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien hello kafka ni hao ya 在另一台服务器上创建一个订阅者
[roothost3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning
...
hello kafka
ni hao ya 如果都能接收到,说明kafka部署成功! [roothost3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic
[roothost3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息
Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:
Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
[roothost3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic
Topic qianfeng is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true. 配置elfk集群订阅和zookeeper和kafka 配置第一台logstash生产消息输出到kafka yum -y install wget
wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm
yum localinstall jdk-8u341-linux-x64.rpm -y
java -version 1.安装logstash
tar xf logstash-6.4.1.tar.gz -C /usr/local
ln -s /usr/local/logstash-6.4.1 /usr/local/logstash 2.修改配置文件
cd /usr/local/logstash/config/
vim logstash.yml
http.host: 0.0.0.0 3.编写配置文件
不要过滤, logstash会将message内容写入到队列中
# cd /usr/local/logstash/config/
# vim logstash-kafka.conf input {file {type sys-logpath /var/log/messagesstart_position beginning}
}
output {kafka {bootstrap_servers 192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092 #输出到kafka集群topic_id sys-log-messages #主题名称compression_type snappy #压缩类型codec json}
}启动logstash
# /usr/local/logstash/bin/logstash -f logstash-kafka.conf 在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了
[roothost2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list
__consumer_offsets
qianfeng
sys-log-messages
[roothost2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages
Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:
Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 配置第二台logstash订阅kafka日志输出到es集群
# cat kafka-es.conf input {kafka {bootstrap_servers 192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092 topics sys-log-messages #kafka主题名称codec jsonauto_offset_reset earliest}
}output {elasticsearch {hosts [192.168.148.131:9200,192.168.148.132:9200]index kafka-%{type}-%{YYYY.MM.dd}}
}