附近网站建设公司,云搜索引擎入口,装饰网站设计模板下载,图文广告公司取名实训笔记8.25 8.25笔记一、Flume数据采集技术1.1 Flume实现数据采集主要借助Flume的组成架构1.2 Flume采集数据的时候#xff0c;核心是编写Flume的采集脚本xxx.conf1.2.1 脚本文件主要由五部分组成 二、Flume案例实操2.1 采集一个网络端口的数据到控制台2.1.1 分析案例的组件… 实训笔记8.25 8.25笔记一、Flume数据采集技术1.1 Flume实现数据采集主要借助Flume的组成架构1.2 Flume采集数据的时候核心是编写Flume的采集脚本xxx.conf1.2.1 脚本文件主要由五部分组成 二、Flume案例实操2.1 采集一个网络端口的数据到控制台2.1.1 分析案例的组件类型2.2.2 编写脚本文件portToConsole.conf2.2.3 根据脚本文件启动Flume采集程序2.2.4 测试 2.2 采集一个文件的数据到控制台2.2.1 案例需求2.2.2 案例分析2.2.3 编写脚本文件2.2.4 启动2.2.5 测试 2.3 采集一个文件夹下的新文件数据到控制台2.3.1 案例需求2.3.2 案例分析2.3.3 编写配置文件2.3.4 运行2.3.5 测试 2.4 采集一个网络端口的数据到HDFS中2.4.1 案例需求2.4.2 案例分析2.4.3 编写脚本文件 2.5 多数据源和多目的地案例2.5.1 案例需求2.5.2 案例分析2.5.3 编写脚本文件 2.6 多Flume进程组合的案例2.6.1 案例需求2.6.2 案例分析2.6.3 编写脚本文件2.6.4 启动脚本程序 三、Hadoop、Hive、SQOOP、Flume、ZookeeperHA高可用、Azkaban 8.25笔记
一、Flume数据采集技术
将海量的数据通过某种技术采集到大数据环境中进行存储和管理为后期的大数据处理分析做准备 常见的数据网站/软件的运行日志、记录的日志软件的结构化数据、爬虫数据、传感器采集数据…
Flume是apache开源的顶尖项目专门是采集和聚合海量的日志数据。但是随着Flume技术的发展支持很多种其他类型数据源的数据采集。
1.1 Flume实现数据采集主要借助Flume的组成架构
Agent、Source、Channel、Sink、Event、Flume采集脚本xxx.conf
一个agent进程中可以有多个Source、channel、sink, 其中一个source只能连接一个数据源一个sink只能连接一个目的地。 而且在一个Flume的agent进程中一个source采集的数据可以发送给多个channel但是一个sink只能拉取一个channel的数据。
1.2 Flume采集数据的时候核心是编写Flume的采集脚本xxx.conf
Flume支持多种数据源、管道、目的地我们采集数据的时候并不是所有的数据源和目的地都要使用而是使用我们需要的源头和目的地。但是Flume不知道你需要什么数据源、需要什么目的地。 通过脚本文件指定我们采集的数据源、目的地、管道
1.2.1 脚本文件主要由五部分组成 起别名 我们可以根据采集脚本启动一个Flume进程Agent一个Flume支持启动多个Agent,Flume要求每一个Agent必须有自己的一个别名Flume启动的多个Agent的别名不能重复。 同时Flume一个Agent进程中可以有多个source、多个channel、多个sink如何区分多个组件 我们还需要多Agent进程中的source、channel、sink起别名的 Agent、source、channel、sink起别名 配置Source组件 我们一个Flume进程中可能存在1个或者多个数据源每一个source组件需要连接一个数据源但是数据源到底是谁如何连接我们需要配置。 配置channel组件 一个Agent中可能存在一个或者多个channelchannel也有很多种类型的因此我们需要配置我们channel的类型以及channel的容量 配置Sink组件一个Agent可以同时将数据下沉到多个目的地一个sink只能连接一个目的地目的地到底是谁如何连接需要配置sink 组装source、channel、sink核心 一个source的数据可以发送给多个channel一个sink只能读取一个channel的数据。因此我们需要根据业务逻辑配置source、channel、sink的连接关系。
二、Flume案例实操
2.1 采集一个网络端口的数据到控制台
2.1.1 分析案例的组件类型
source网络端口 netcatchannel基于内存的管道即可memorysink控制台–Flume的日志输出logger
2.2.2 编写脚本文件portToConsole.conf
# 1、配置agent、source、channel、sink的别名
demo.sourcess1
demo.channelsc1
demo.sinksk1# 2、配置source组件连接的数据源--不同数据源的配置项都不一样 监听netcat type bind port
demo.sources.s1.typenetcat
demo.sources.s1.bindlocalhost
demo.sources.s1.port44444# 3、配置channel组件的类型--不同类型的管道配置项也不一样 基于内存memory的管道
demo.channels.c1.typememory
demo.channels.c1.capacity1000
demo.channels.c1.transactionCapacity200# 4、配置sink组件连接的目的地--不同类型的sink配置项不一样 基于logger的下沉地
demo.sinks.k1.typelogger# 5、配置source channel sink之间的连接 source连接channel sink也要连接channel
# 一个source的数据可以发送给多个channel 一个sink只能拉取一个channel的数据
demo.sources.s1.channelsc1
demo.sinks1.k1.channelc1
2.2.3 根据脚本文件启动Flume采集程序
flume-ng agent -n agent的别名必须和文件中别名保持一致 -f xxx.conf的路径 -Dflume.root.loggerINFO,console
2.2.4 测试
我们只需要给本地的44444端口发送数据看看Flume的控制台能否把数据输出即可
需要新建一个和Linux的连接窗口然后使用 telnet localhost 44444 命令连接本地的44444端口发送数据
telnet软件linux默认没有安装需要使用yum安装一下 yum install -y telnet
必须先启动flume采集程序再telnet连接网络端口发送数据
2.2 采集一个文件的数据到控制台
2.2.1 案例需求
现在有一个文件文件源源不断的记录用户的访问日志信息我们现在想通过Flume去监听这个文件一旦当这个文件有新的用户数据产生把数据采集到flume的控制台上
2.2.2 案例分析
sourceexec(将一个linux命令的输出导出数据源、自己写监听命令) 、taildir
channelmemory
sinklogger
2.2.3 编写脚本文件
#1、起别名
demo01.sourcess1
demo01.channelsc1
demo01.sinksk1#2、定义数据源 Spooling Directory Source
demo01.sources.s1.typespooldir
demo01.sources.s1.spoolDir/root/demo#3、定义管道
demo01.channels.c1.typememory
demo01.channels.c1.capacity1000
demo01.channels.c1.transactionCapacity200#4、配置sink目的地 logger
demo01.sinks.k1.typelogger#5、关联组件
demo01.sources.s1.channelsc1
demo01.sinks.k1.channelc12.2.4 启动
2.2.5 测试
2.3 采集一个文件夹下的新文件数据到控制台
2.3.1 案例需求
有一个文件夹文件夹下记录着网站产生的很多日志数据而且日志文件不止一个就像把文件夹下所有的文件数据采集到控制台同时如果这个文件夹下有新的数据文件产生也会把新文件的数据全部采集到控制台上
2.3.2 案例分析
source:Spooling Directory Source
channel:memory
sink:logger
2.3.3 编写配置文件
#1、起别名
demo01.sourcess1
demo01.channelsc1
demo01.sinksk1#2、定义数据源 exec linux命令 监听一个文件 tail -f|-F 文件路径
demo01.sources.s1.typeexec
demo01.sources.s1.commandtail -F /root/a.log#3、定义管道
demo01.channels.c1.typememory
demo01.channels.c1.capacity1000
demo01.channels.c1.transactionCapacity200#4、配置sink目的地 logger
demo01.sinks.k1.typelogger#5、关联组件
demo01.sources.s1.channelsc1
demo01.sinks.k1.channelc12.3.4 运行
2.3.5 测试 2.1~2.3 单source、sink、channel souece数据源不一样 sink目的地都是一样的 2.4 采集一个网络端口的数据到HDFS中
2.4.1 案例需求
监控一个网络端口产生的数据一旦当端口产生新的数据就把数据采集到HDFS上以文件的形式进行存放
2.4.2 案例分析
source:网络端口netcat
channel:基于内存的管道 memory
sink:HDFS
2.4.3 编写脚本文件
启动采集进程必须先启动HDFS
# 1、配置agent、source、channel、sink的别名
demo.sourcess1
demo.channelsc1
demo.sinksk1# 2、配置source组件连接的数据源--不同数据源的配置项都不一样 监听netcat type bind port
demo.sources.s1.typenetcat
demo.sources.s1.bindlocalhost
demo.sources.s1.port44444# 3、配置channel组件的类型--不同类型的管道配置项也不一样 基于内存memory的管道
demo.channels.c1.typememory
demo.channels.c1.capacity1000
demo.channels.c1.transactionCapacity200# 4、配置sink组件连接的目的地--基于HDFS的
demo.sinks.k1.typehdfs
# 配置采集到HDFS上的目录 数据在目录下以文件的形式进行存放
demo.sinks.k1.hdfs.pathhdfs://single:9000/flume
# 目录下生成的文件的前缀 如果没有配置 默认就是FlumeData
demo.sinks.k1.hdfs.filePrefixcollect
# 指定生成的文件的后缀 默认是没有后缀 生成的文件的格式collect.时间戳.txt
demo.sinks.k1.hdfs.fileSuffix.txt
# 目录采集的数据并不是记录到一个文件中文件是会滚动生成新的文件的
# 滚动的规则有三种1、基于时间滚动 2、基于文件的容量滚动 3、基于文件的记录的event数量进行滚动
# 时间 30s 容量1024b event 10
# 时间滚动规则 单位是s 如果指设置为0 那么就代表不基于时间生成新的文件
demo.sinks.k1.hdfs.rollInterval60
# 文件容量的滚动规则 单位b 如果设置为0 代表不基于容量滚动生成新的文件
demo.sinks.k1.hdfs.rollSize100
# event数量滚动规则 一般都设置为0 代表不基于event数量滚动生成新的文件
demo.sinks.k1.hdfs.rollCount0
# 文件在HDFS上的默认存储格式是SequenceFile文件格式
demo.sinks.k1.hdfs.fileTypeDataStream
# 设置event的头部使用本地时间戳作为header
demo.sinks.k1.hdfs.useLocalTimeStamptrue# 5、配置source channel sink之间的连接 source连接channel sink也要连接channel
# 一个source的数据可以发送给多个channel 一个sink只能拉取一个channel的数据
demo.sources.s1.channelsc1
demo.sinks.k1.channelc1 【注意】flume的依赖的guava和hadoop的guava有冲突需要将flume的lib目录下的guava依赖删除同时将hadoop的share/common/lib/guava依赖复制到flume的lib目录下 2.5 多数据源和多目的地案例
2.5.1 案例需求
现在有三个数据源
网络端口文件文件夹
想把这三个数据源的数据全部采集到HDFS的指定目录下同时还要求把文件数据源的数据在控制台上同步进行展示
2.5.2 案例分析
sourcenetcat exec spooldir
channel两个基于内存的
sink1、hdfs 2、logger 2.5.3 编写脚本文件
# 1、起别名 三个数据源 两个管道 两个sink
more.sourcess1 s2 s3
more.channelsc1 c2
more.sinksk1 k2# 2、定义数据源 三个
# 定义s1数据源 s1连接的网络端口
more.sources.s1.typenetcat
more.sources.s1.bindlocalhost
more.sources.s1.port44444# 定义s2的数据源 s2连接的是一个文件 /root/more.log文件
more.sources.s2.typeexec
more.sources.s2.commandtail -F /root/more.log# 定义s3的数据源 s3监控的是一个文件夹 /root/more
more.sources.s3.typespooldir
more.sources.s3.spoolDir/root/more# 3、定义channel 两个 基于内存的
# 定义c1管道 c2管道需要接受三个数据源的数据
more.channels.c1.typememory
more.channels.c1.capacity20000
more.channels.c1.transactionCapacity5000# 定义c2管道 c2管道只需要接受一个数据源 s2的数据
more.channels.c2.typememory
more.channels.c2.capacity5000
more.channels.c2.transactionCapacity500# 4、定义sink 两个 HDFS logger
# 定义k1这个sink 基于hdfs
more.sinks.k1.typehdfs
# HDFS支持生成动态目录--基于时间的 /more/2023-08-25
more.sinks.k1.hdfs.pathhdfs://single:9000/more/%Y-%m-%d
# 如果设置了动态目录那么必须指定动态目录的滚动规则-多长时间生成一个新的目录
more.sinks.k1.hdfs.roundtrue
more.sinks.k1.hdfs.roundValue24
more.sinks.k1.hdfs.roundUnithourmore.sinks.k1.hdfs.filePrefixcollect
more.sinks.k1.hdfs.fileSuffix.txt
more.sinks.k1.hdfs.rollInterval0
more.sinks.k1.hdfs.rollSize134217728
more.sinks.k1.hdfs.rollCount0
more.sinks.k1.hdfs.fileTypeDataStream
more.sinks.k1.hdfs.useLocalTimeStamptrue# 定义k2 logger
more.sinks.k2.typelogger# 5、组合agent的组件
more.sources.s1.channelsc1
more.sources.s2.channelsc1 c2
more.sources.s3.channelsc1
more.sinks.k1.channelc1
more.sinks.k2.channelc22.6 多Flume进程组合的案例
2.6.1 案例需求
三个Flume进程其中第一个Flume采集端口的数据第二个Flume采集文件的数据要求第一个Flume进程和第二个Flume进程将采集到的数据发送给第三个Flume进程第三个Flume进程将接受到的数据采集到控制台上。 2.6.2 案例分析
first agent source :netcatchannel:memorysink:avro second agent source:execchannel:memorysink:avro third agent source:avrochannel:memorysink:logger
2.6.3 编写脚本文件
第一个脚本监听端口到avro的
first.sourcess1
first.channelsc1
first.sinksk1first.sources.s1.typenetcat
first.sources.s1.bindlocalhost
first.sources.s1.port44444first.channels.c1.typememory
first.channels.c1.capacity1000
first.channels.c1.transactionCapacity500first.sinks.k1.typeavro
first.sinks.k1.hostnamelocalhost
first.sinks.k1.port60000first.sources.s1.channelsc1
first.sinks.k1.channelc1第二脚本文件监听文件数据到avro的
second.sourcess1
second.channelsc1
second.sinksk1second.sources.s1.typeexec
second.sources.s1.commandtail -F /root/second.txtsecond.channels.c1.typememory
second.channels.c1.capacity1000
second.channels.c1.transactionCapacity500second.sinks.k1.typeavro
second.sinks.k1.hostnamelocalhost
second.sinks.k1.port60000second.sources.s1.channelsc1
second.sinks.k1.channelc1第三个脚本文件监听avro汇总的数据到logger的
third.sourcess1
third.channelsc1
third.sinksk1# avro类型当作source 需要 bind和port参数 如果当作sink使用 需要hostname port
third.sources.s1.typeavro
third.sources.s1.bindlocalhost
third.sources.s1.port60000third.channels.c1.typememory
third.channels.c1.capacity1000
third.channels.c1.transactionCapacity500third.sinks.k1.typeloggerthird.sources.s1.channelsc1
third.sinks.k1.channelc12.6.4 启动脚本程序
先启动第三个脚本再启动第一个和第二脚本
三、Hadoop、Hive、SQOOP、Flume、ZookeeperHA高可用、Azkaban