在相亲网站做红娘,全球跨境电商平台排名,章丘网站优化,不通过第三方平台做微网站ElasticSeach#xff08;存储日志信息#xff09; Logstash#xff08;搬运工#xff09; Kibana 连接ElasticSeach图形化界面查询日志
ELK采集日志的原理#xff1a;
在每个服务器上安装LogstashLogstash需要配置固定读取某个日志文件Logstash将日志文件格式化为json的…ElasticSeach存储日志信息 Logstash搬运工 Kibana 连接ElasticSeach图形化界面查询日志
ELK采集日志的原理
在每个服务器上安装LogstashLogstash需要配置固定读取某个日志文件Logstash将日志文件格式化为json的格式输出到es中开发者使用Kibana连接到ElasticSeach 查询存储日志内容
为什么将日志存储在ElasticSeach 其底层使用到倒排索引 搜索效率高
为什么需要使用elkkafka 如果单纯的使用elk的话服务器节点扩容时需要在每个服务器上安装 Logstash 步骤十分冗余。 Logstash读取本地日志文件可能会对本地的磁盘io性能会有一定影响。
elkkafka采集日志的原理
springboot项目基于aop的方式拦截系统中日志将该日志投递到 kafka 中该过程一定要采用异步的形式Logstash 订阅 kafka 的主题获取日志消息内容在将日志消息内容输出到es中存放开发者使用Kibana连接到ElasticSeach 查询存储日志内容
logstash
Logstash是一个开源数据收集引擎具有实时管道功能。 Logstash可以动态地将来自不同数据源的数据统一起来并将数据标准化到你所选择的目的地
进入 logstash 目录执行命令安装输入输出插件
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch添加配置文件logstash/config/kafka.conf
# 输入
input {kafka {bootstrap_servers 192.168.10.110:9091topics 主题名称}
}
# 过滤排除一些不需要写入的日志
filter {#Only matched data are send to output.
}
# 输出
output {elasticsearch {action index #The operation on EShosts 192.168.10.110:9200 #ElasticSearch host, can be array.index 索引名称 #The index to write data to.}
}启动logstash./logstash -f …/config/kafka.conf
Aop拦截日志
Aspect
Component
public class AopLogAspect {Value(${server.port})private String serverPort;Autowiredprivate KafkaTemplateString, Object kafkaTemplate;Pointcut(execution(* com.example.service.*.*(..)))private void serviceAspect() {}Autowiredprivate LogContainer logContainer;// 异常通知AfterThrowing(pointcut serviceAspect(), throwing e)public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request requestAttributes.getRequest();JSONObject jsonObject new JSONObject();SimpleDateFormat df new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);// 设置日期格式jsonObject.put(request_time, df.format(new Date()));jsonObject.put(request_url, request.getRequestURL().toString());jsonObject.put(request_method, request.getMethod());jsonObject.put(signature, point.getSignature());jsonObject.put(request_args, Arrays.toString(point.getArgs()));jsonObject.put(error, e.toString());// IP地址信息jsonObject.put(ip_addres, getIpAddr(request) : serverPort);JSONObject requestJsonObject new JSONObject();requestJsonObject.put(request, jsonObject);// 将日志信息投递到kafka中String log requestJsonObject.toJSONString();logContainer.put(log);}
}
使用队列线程实现异步
Component
public class LogContainer {private static BlockingDequeString logDeque new LinkedBlockingDeque();Autowiredprivate KafkaTemplateString, Object kafkaTemplate;public LogContainer() {new LogThreadKafka().start();}// 存入日志public void put(String log) {logDeque.offer(log);}// 只需要创建一次线程class LogThreadKafka extends Thread {Overridepublic void run() {while (true) {String log logDeque.poll();if (!StringUtils.isEmpty(log)) {// 将消息投递kafka中kafkaTemplate.send(xxx-log, log);}}}}
}