嘉峪关建设局网站,品牌推广语,广州版单一窗口,有域名后如何建网站整体架构
参考 七米老师的日志收集项目 主要用go实现logagent的部分#xff0c;logagent的作用主要是实时监控日志追加的变化#xff0c;并将变化发送到kafka中。 之前我们已经实现了 用go连接kafka并向其中发送数据#xff0c;也实现了使用tail库监控日志追加操作。 我们…整体架构
参考 七米老师的日志收集项目 主要用go实现logagent的部分logagent的作用主要是实时监控日志追加的变化并将变化发送到kafka中。 之前我们已经实现了 用go连接kafka并向其中发送数据也实现了使用tail库监控日志追加操作。 我们把这两部分结合起来实现监控日志追加并发送到kafka。
使用github.com/go-ini/ini配置参数
// 读取配置参数cfg, err:ini.Load(config/config.ini)if err!nil {logrus.Error(( load config error))return}[kafka]
address 127.0.0.1:9092
chan_size 1000[collect]
logfile_path D:/learn/go/log-collector-lmh/log_agent/config_version/log_file/xx.log配置参数主要包括kafka的启动端口存储的数据大小限制日志文件的路径。
初始化kafka
kafka.go
package kafkaimport (github.com/Shopify/saramagithub.com/sirupsen/logrus
)var (Client sarama.SyncProducerMsgChan chan *sarama.ProducerMessage //占用的字节数少传递的指针
)func InitKafka(kafkaAddr string, chanSize int64) (err error){config:sarama.NewConfig()// 生产者配置config.Producer.RequiredAckssarama.WaitForAllconfig.Producer.Partitionersarama.NewRandomPartitionerconfig.Producer.Return.Successestrue// 连接kafkaClient,errsarama.NewSyncProducer([]string{kafkaAddr}, config)if err!nil {logrus.Error(producer closed, err)return}// 从管道中读取日志并发送到kafkaMsgChan make(chan *sarama.ProducerMessage, chanSize)go sendMsg()return
}func sendMsg(){for {select {case msg : - MsgChan:pid, offset, err : Client.SendMessage(msg)if err ! nil {logrus.Warning(send msg failed, err:, err)return}logrus.Infof(send msg to kafka success. pid:%v offset:%v, pid, offset)}}
}这里实现了连接kafka并使用协程不断地读取MsgChan读取到数据后向kafka发送这里MsgChan通道的数据由tail监控到的日志变化写入。 main.go中调用
// 初始化kafkakafkaAddr:cfg.Section(kafka).Key(address).String()chanSize:cfg.Section(kafka).Key(chan_size).MustInt64(0)errkafka.InitKafka(kafkaAddr, chanSize)if err!nil {logrus.Error(kafka init failed)}logrus.Info(Kafka init success)初始化tailf并将日志数据写入ChanMsg
tailF.go
package tailF
import (github.com/hpcloud/tailfmt
)
var (TailObj *tail.Tail
)
func InitTail(filename string) (err error) {config : tail.Config{ReOpen: true,Follow: true,Location: tail.SeekInfo{Offset: 0, Whence: 2},MustExist: false,Poll: true,}// 打开文件开始读取数据TailObj, err tail.TailFile(filename, config)if err ! nil {fmt.Printf(create tail %s failed, err:%v\n, filename, err)return}return
}main.go中对应
// 初始化tailffileName:cfg.Section(collect).Key(logfile_path).String()errtailF.InitTail(fileName)if err!nil {logrus.Error( tailf init failed)}logrus.Info(Init tail success)// 把读取的日志发往kafkaerrrun()if err!nil {logrus.Error( run error%s, err)return}logrus.Info(run success)main.go中实现的run函数读取tailF的数据并写入ChanMsg
func run () (err error){for {line,ok:-tailF.TailObj.Linesif !ok {logrus.Warn(tail file %s close reopen\n, tailF.TailObj.Filename)// 读取出错等一秒time.Sleep(time.Second)continue}// 使用通道将传输日志改为异步// 读取的日志封装为ProducerMessagemsg:sarama.ProducerMessage{}msg.Topicweb_logmsg.Valuesarama.StringEncoder(line.Text)// 放到channel中kafka.MsgChan-msg}
}完整main.go
package mainimport (config_version/kafkaconfig_version/tailFtimegithub.com/Shopify/saramagithub.com/go-ini/inigithub.com/sirupsen/logrus
)func main() {// 读取配置参数cfg, err:ini.Load(config/config.ini)if err!nil {logrus.Error(( load config error))return}// 初始化kafkakafkaAddr:cfg.Section(kafka).Key(address).String()chanSize:cfg.Section(kafka).Key(chan_size).MustInt64(0)errkafka.InitKafka(kafkaAddr, chanSize)if err!nil {logrus.Error(kafka init failed)}logrus.Info(Kafka init success)// 初始化tailffileName:cfg.Section(collect).Key(logfile_path).String()errtailF.InitTail(fileName)if err!nil {logrus.Error( tailf init failed)}logrus.Info(Init tail success)// 把读取的日志发往kafkaerrrun()if err!nil {logrus.Error( run error%s, err)return}logrus.Info(run success)}func run () (err error){for {line,ok:-tailF.TailObj.Linesif !ok {logrus.Warn(tail file %s close reopen\n, tailF.TailObj.Filename)// 读取出错等一秒time.Sleep(time.Second)continue}// 使用通道将传输日志改为异步// 读取的日志封装为ProducerMessagemsg:sarama.ProducerMessage{}msg.Topicweb_logmsg.Valuesarama.StringEncoder(line.Text)// 放到channel中kafka.MsgChan-msg}
}至此 我们实现了简化版的日志收集系统的logagent功能目前日志的路径还需要手动写入配置文件中修改的话还需重启项目之后可以使用ETCD实现日志路径的自动配置。