当前位置: 首页 > news >正文

html做简单网站实例制作网页然后把文件上传

html做简单网站实例,制作网页然后把文件上传,平面设计的网站有哪些,帝国cms怎么做音乐网站数据采集 日志数据#xff08;文件#xff09;到Kafka 自己写个程序模拟一些用户的行为数据#xff0c;这些数据存在一个文件夹中。 接着使用flume监控采集这些文件#xff0c;然后发送给kafka中待消费。 1、flume采集配置文件 监控文件将数据发给kafka的flume配置文件… 数据采集 日志数据文件到Kafka 自己写个程序模拟一些用户的行为数据这些数据存在一个文件夹中。 接着使用flume监控采集这些文件然后发送给kafka中待消费。 1、flume采集配置文件 监控文件将数据发给kafka的flume配置文件 #定义组件 a1.sources r1 a1.channels c1#配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /opt/module/applog/log/app.* a1.sources.r1.positionFile /opt/module/flume/taildir_position.json a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers 192.168.10.100:9092 a1.channels.c1.kafka.topic topic_log a1.channels.c1.parseAsFlumeEvent false#组装 a1.sources.r1.channels c1a1.sources.r1.channels c1 这边设置parseAsFlumeEvent false后数据就不会以flume的事件event的形式传递就没有head了只有body数据head虽然对这个离线案例有用但是如果要弄实时数仓flink也会到kafka中取数据这时head对于实时的就没用了。所以这边设置成false也能减少数据传输的大小。 2、拦截器过滤数据 在source和channel之间设置拦截器做一个轻度的清洗。 编写Flume拦截器 1创建Maven工程flume-interceptor 2创建包com.atguigu.gmall.flume.interceptor 3在pom.xml文件中添加如下配置 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency/dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build 4在com.atguigu.gmall.flume.utils包下创建JSONUtil类 package com.atguigu.gmall.flume.utils;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.JSONException;public class JSONUtil {/** 通过异常判断是否是json字符串* 是返回true  不是返回false* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}} 5在com.atguigu.gmall.flume.interceptor包下创建ETLInterceptor类 package com.atguigu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.List;public class ETLInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json是返回当前event不是返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}Overridepublic ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()){Event next iterator.next();if(intercept(next)null){iterator.remove();}}return list;}// a1.sources.r1.interceptors.i1.type 的值是这个的全类名public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}Overridepublic void close() {}} 6打包 7需要先将打好的包放入到flume的lib目录下/opt/module/flume/lib文件夹下面。 3、启动flume采集验证 使用上面的配置文件启动flume监控 bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console 接着创建一个Kafka消费者消费topic_log主题 bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.100:9092 --topic topic_log 然后往文件中追加数据看能不能消费到。 看到完整的json被消费了不完整的json被拦截器过滤了 日志数据文件同步给Hadoop的hdfs 现在数据已经在Kafka了下一步就是要将数据发给Hadoop存储并且要按天进行分区。 按照规划该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分将不同天的数据发往HDFS不同天的路径。 1、创建flume消费者 创建flume消费者从Kafka中消费数据发给hdfs。 目前的数据位于kafka中原本可以直接用下面的这种flume架构但由于flume的上游将数据存到kafka的时候只存了body这边将数据发给hdfs中需要按照时间落盘所以需要拦截器加上head给每条数据在head中添加时间信息但是拦截器需要有flume source才能生效。所以这种架构就不行。需要使用带有source的架构。 带有source的架构模式  拦截器 // 必须在在header中添加名为timestamp字段的时间戳Overridepublic Event intercept(Event event) {MapString, String headers event.getHeaders();byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);String ts JSONObject.parseObject(log).getString(ts);headers.put(timestamp,ts);return event;} flume配置文件 #定义组件 a1.sourcesr1 a1.channelsc1 a1.sinksk1#配置source1 a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize 2000 a1.sources.r1.batchDurationMillis 2000 a1.sources.r1.kafka.bootstrap.servers 192.168.10.100:9092 a1.sources.r1.kafka.topicstopic_log a1.sources.r1.kafka.consumer.group.id topic_log a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel a1.channels.c1.type file a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs /opt/module/flume/data/behavior1 a1.channels.c1.maxFileSize 2146435071 a1.channels.c1.capacity 1000000 a1.channels.c1.keep-alive 6#配置sink a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix log a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10 a1.sinks.k1.hdfs.rollSize 134217728 a1.sinks.k1.hdfs.rollCount 0#控制输出文件类型 a1.sinks.k1.hdfs.fileType CompressedStream a1.sinks.k1.hdfs.codeC gzip#组装 a1.sources.r1.channels c1 a1.sinks.k1.channel c1 1、数据位于kafka中source用kafka source kafka source其实就是一个kafka消费者定义消费者组id防止使用默认的组id导致消费不到数据如果有两个消费者都消费toppic_id主题同一个消费者组id一样的只有一个消费者能消费到。 a1.sources.r1.batchSize 2000一次批量写入channel通道的最大消息数。 a1.sources.r1.batchDurationMillis 2000若没达到一批次的消息数量达到这个时间了也将消息都发给channel通道。这时间设置成产生2000条大概花费的时间。 2、Channel用file channel我猜测是由于要发送给hdfs又因为hdfs是文件系统 通过配置dataDirs指向多个路径每个路径对应不同的硬盘flume就可以将来自Source的数据写到不同的目录硬盘但是这边是单机就设置了一个可以增大Flume吞吐量。 a1.channels.c1.maxFileSize 2146435071file channel数据存储在文件中                单个日志文件的最大大小(以字节计)。 a1.channels.c1.capacity 1000000file channel的最大容量 1000000条 a1.channels.c1.keep-alive 6 回滚后source要重新到文件或者kafka中取这2000条数据  3、数据发给HDFS所以sink用hdfs sink a1.sinks.k1.hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d path中包含时间的转移序列用于将不同时间的数据放到不同的路径。 基于以上hdfs.rollInterval10hdfs当达到10秒后滚动形成文件 hdfs.rollSize134217728hdfs数据当达到128M形成文件 hdfs.rollCount 0event事件条数达到多少条形成文件 几个参数综合作用效果如下 1文件在达到128M时会滚动生成新文件 2文件创建超3600秒时会滚动生成新文件 还没达到形成新文件的时候是以.tmp结尾存在的这个时候是没用的。 2、启动flume消费者  进入flume的家目录下执行 bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.loggerinfo,console 效果 效果分析文件一有新的日志数据写入就会被flume采集到kafka的topic_log主题中就会被flume消费者发到hdfs中的路径文件中。这样会有几个问题 一有数据就发给hdfs中形成一个文件就会产生大量的小文件上面每个文件就几百B大小。 元数据层面每个小文件都有一份元数据其中包括文件路径文件名所有者所属组权限创建时间等这些信息都保存在Namenode内存中。所以小文件过多会占用Namenode服务器大量内存影响Namenode性能和使用寿命 计算层面默认情况下MR会对每个小文件启用一个Map任务计算非常影响计算性能。同时也影响磁盘寻址时间。 数据漂移问题 加入拦截器解决数据漂移、修改参数解决小文件问题后 可以看到现在起码不是几十B了因为现在时间10秒就形成新文件到时候可以根据128M生成的时间设置。  现在这条数据链路已经打通了。  业务数据MySQL到HDFS 在离线数仓中业务数据是很重要的一个来源为后续的计算提供数据来源离线数仓一般一天采集同步一次业务数据到离线数仓中供后续使用存储、计算、处理、分析。 1、数据同步方案 同步的策略有增量同步效率好、逻辑复杂和全量同步数据量大变化少时效率低、逻辑简单。增量同步就是只将有变更的数据同步过来而全量同步是每次都将全表同步过来覆盖原有的数据。一般而言一个数据库中大表变化多全量、大表变化少增量、小表都用全量。 同步策略 优点 缺点 全量同步 逻辑简单 在某些情况下效率较低。例如某张表数据量较大但是每天数据的变化比例很低若对其采用每日全量同步则会重复同步和存储大量相同的数据。 增量同步 效率高无需同步和存储重复数据 逻辑复杂需要将每日的新增及变化数据同原来的数据进行整合才能使用 全量同步通常使用DataX、Sqoop等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具也可使用Maxwell、Canal等工具下面对增量同步不同方案进行简要对比。 增量同步方案 DataX/Sqoop Maxwell/Canal 对数据库的要求 原理是基于查询故若想通过select查询获取新增及变化数据就要求数据表中存在create_time、update_time字段然后根据这些字段获取变更数据。 要求数据库记录变更操作例如MySQL需开启binlog。 数据的中间状态 由于是离线批量同步故若一条数据在一天中变化多次该方案只能获取最后一个状态中间状态无法获取。 由于是实时获取所有的数据变更操作所以可以获取变更数据的所有中间状态。 2、各个表同步策略 一般而言一个数据库中大表变化多全量、大表变化少增量、小表都用全量。 2.1、部署DataX全量同步数据 使用DataX全量同步数据给HDFS。 1、正常步骤需要为每个全量同步的表各自创建一个DataX任务的json文件每个表都由公主和王子来写json文件实在是有点麻烦直接搞个脚本自动生成如果报错把注释去掉 # ecodingutf-8 import json import getopt import os import sys import MySQLdb#MySQL相关配置需根据实际情况作出修改 mysql_host hadoop102 mysql_port 3306 mysql_user root mysql_passwd 000000#HDFS NameNode相关配置需根据实际情况作出修改 hdfs_nn_host hadoop102 hdfs_nn_port 8020#生成DataX配置文件的目标路径可根据实际情况作出修改 output_path /opt/module/datax/job/importdef get_connection():return MySQLdb.connect(hostmysql_host, portint(mysql_port), usermysql_user, passwdmysql_passwd)def get_mysql_meta(database, table):connection get_connection()cursor connection.cursor()sql SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA%s AND TABLE_NAME%s ORDER BY ORDINAL_POSITIONcursor.execute(sql, [database, table])fetchall cursor.fetchall()cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table):return map(lambda x: x[0], get_mysql_meta(database, table))def get_hive_columns(database, table):def type_mapping(mysql_type):mappings {bigint: bigint,int: bigint,smallint: bigint,tinyint: bigint,decimal: string,double: double,float: float,binary: string,char: string,varchar: string,datetime: string,time: string,timestamp: string,date: string,text: string}return mappings[mysql_type]meta get_mysql_meta(database, table)return map(lambda x: {name: x[0], type: type_mapping(x[1].lower())}, meta)def generate_json(source_database, source_table):job {job: {setting: {speed: {channel: 3},errorLimit: {record: 0,percentage: 0.02}},content: [{reader: {name: mysqlreader,parameter: {username: mysql_user,password: mysql_passwd,column: get_mysql_columns(source_database, source_table),splitPk: ,connection: [{table: [source_table],jdbcUrl: [jdbc:mysql:// mysql_host : mysql_port / source_database]}]}},writer: {name: hdfswriter,parameter: {defaultFS: hdfs:// hdfs_nn_host : hdfs_nn_port,fileType: text,path: ${targetdir},fileName: source_table,column: get_hive_columns(source_database, source_table),writeMode: append,fieldDelimiter: \t,compress: gzip}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ..join([source_database, source_table, json])), w) as f:json.dump(job, f)def main(args):source_database source_table options, arguments getopt.getopt(args, -d:-t:, [sourcedb, sourcetbl])for opt_name, opt_value in options:if opt_name in (-d, --sourcedb):source_database opt_valueif opt_name in (-t, --sourcetbl):source_table opt_valuegenerate_json(source_database, source_table)if __name__ __main__:main(sys.argv[1:]) 注由于目标路径包含一层日期用于对不同天的数据加以区分故path参数并未写死需在提交任务时通过参数动态传入参数名称为targetdir 2、使用方式 安装Python Mysql驱动由于需要使用Python访问Mysql数据库故需安装驱动命令如下 sudo yum install -y MySQL-python脚本使用说明 python gen_import_config.py -d database -t table 通过-d传入数据库名-t传入表名执行上述命令即可生成该表的DataX同步配置文件。 3、每个表的json文件都要这样执行也比较直接再弄个脚本为每个表生成 #!/bin/bashpython ~/bin/gen_import_config.py -d gmall -t activity_info python ~/bin/gen_import_config.py -d gmall -t activity_rule python ~/bin/gen_import_config.py -d gmall -t base_category1 python ~/bin/gen_import_config.py -d gmall -t base_category2 python ~/bin/gen_import_config.py -d gmall -t base_category3 python ~/bin/gen_import_config.py -d gmall -t base_dic python ~/bin/gen_import_config.py -d gmall -t base_province python ~/bin/gen_import_config.py -d gmall -t base_region python ~/bin/gen_import_config.py -d gmall -t base_trademark python ~/bin/gen_import_config.py -d gmall -t cart_info python ~/bin/gen_import_config.py -d gmall -t coupon_info python ~/bin/gen_import_config.py -d gmall -t sku_attr_value python ~/bin/gen_import_config.py -d gmall -t sku_info python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value python ~/bin/gen_import_config.py -d gmall -t spu_info 4、测试生产的配置文件是否可用 由于DataX同步任务要求目标路径提前存在故需手动创建路径当前activity_info表的目标路径应为/origin_data/gmall/db/activity_info_full/2020-06-14。命令不行可以手动创建。 hadoop fs -mkdir /origin_data/gmall/db/activity_info_full/2020-06-14 执行DataX同步命令 $ python /opt/module/datax/bin/datax.py -p-Dtargetdir/origin_data/gmall/db/activity_info_full/2020-06-14 /opt/module/datax/job/import/gmall.activity_info.json python /opt/module/datax/bin/datax.py -p-Dtargetdir/origin_data/gmall/db/activity_info_full/2020-06-14 /opt/module/datax/job/import/gmall.activity_info.json 5、观察结果 观察HFDS目标路径是否出现数据。 6、全量同步脚本 #!/bin/bashDATAX_HOME/opt/module/datax# 如果传入日期则do_date等于传入的日期否则等于前一天日期 if [ -n $2 ] ;thendo_date$2 elsedo_datedate -d -1 day %F fi#处理目标路径此处的处理逻辑是如果目标路径不存在则创建若存在则清空目的是保证同步任务可重复执行 handle_targetdir() {hadoop fs -test -e $1if [[ $? -eq 1 ]]; thenecho 路径$1不存在正在创建......hadoop fs -mkdir -p $1elseecho 路径$1已经存在fs_count$(hadoop fs -count $1)content_size$(echo $fs_count | awk {print $3})if [[ $content_size -eq 0 ]]; thenecho 路径$1为空elseecho 路径$1不为空正在清空......hadoop fs -rm -r -f $1/*fifi }#数据同步 import_data() {datax_config$1target_dir$2# 先在HDFS中创建目录handle_targetdir $target_dirpython $DATAX_HOME/bin/datax.py -p-Dtargetdir$target_dir $datax_config }case $1 in activity_info)import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date;; activity_rule)import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date;; base_category1)import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date;; base_category2)import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date;; base_category3)import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date;; base_dic)import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date;; base_province)import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date;; base_region)import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date;; base_trademark)import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date;; cart_info)import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date;; coupon_info)import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date;; sku_attr_value)import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date;; sku_info)import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date;; sku_sale_attr_value)import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date;; spu_info)import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date;; all)import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_dateimport_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_dateimport_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date;; esac 使用方式 ./mysql_to_hdfs_full.sh all 2023-10-25 all表示全量同步脚本设置的所有表第二个参数是创建的文件夹时间如果不传默认取前一天的时间比如今天是2023年10月25日则创建2023-10-24文件夹存放数据生产环境中都是凌晨1点多开始全量同步前面一天的数据。所以生产中第二个参数不传。 2.2、Maxwell增量同步数据 使用Maxwell增量同步业务数据到kafka再由Flume采集到HDFS 1、创建一个Maxwell增量同步MySQL中需要增量同步的业务表发送给kafka的topic_db主题 如果MySQL的端口不是3306Maxwell的配置文件记得加上 mxw.sh start 2、由于有些表是全量同步所以需要在MySQL的配置文件中将全量同步的表去掉bin_log 3、创建flume消费topic_db主题发送给hdfs flume配置文件 #定义组件 a1.sourcesr1 a1.channelsc1 a1.sinksk1#配置source1 a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize 2000 a1.sources.r1.batchDurationMillis 2000 a1.sources.r1.kafka.bootstrap.servers 192.168.10.100:9092 a1.sources.r1.kafka.topicstopic_db a1.sources.r1.kafka.consumer.group.id topic_db a1.sources.r1.interceptors i1 a2.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.TableNameTimestampInterceptor$Builder#配置channel a1.channels.c1.type file a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs /opt/module/flume/data/behavior2 a1.channels.c1.maxFileSize 2146435071 a1.channels.c1.capacity 1000000 a1.channels.c1.keep-alive 6#配置sink a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix db a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10 a1.sinks.k1.hdfs.rollSize 134217728 a1.sinks.k1.hdfs.rollCount 0#控制输出文件类型 a1.sinks.k1.hdfs.fileType CompressedStream a1.sinks.k1.hdfs.codeC gzip#组装 a1.sources.r1.channels c1 a1.sinks.k1.channel c1 a1.sinks.k1.hdfs.path /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d 由于在hdfs中落盘的需要按照以上的格式而%{tableName} 这种hdfs会到event的头部中找tableName来解析%Y-%m-%d会找timestamp的值解析。以下是一条event格式但是如果不设置的话会按默认的时间。 timestamp^R^M1698288936059^R­^B{database:gmall,table:comment_info,type:insert,ts:1698288933,xid:912,commit:true,data:{id:1716761825009860616,user_id:13,nick_name:null,head_img:null,sku_id:12,spu_id:25,order_id:null,appraise:null,comment_txt:æµ8bè¯951219,create_time:null,operate_time:null}}^Qib8e)^^^^^^?^^^^_^W^M^D^^^^Q©^?äi8b^A^^^Yr^?äi8b^A^^^E^M^A^^^^^?^^^$^W^M^B^^^^Qª^?äi8b^A^ 以下是Maxwell增量同步MySQL的一条数据其中ts字段需要个性化定制Maxwell才能生成ts是当时监控到这条数据变更的时间可以将这个时间设置给event的头部timestamp。 {database: gmall,table: comment_info,type: insert,ts: 1698288933,xid: 912,commit: true,data: {id: 1716761825009860616,user_id: 13,nick_name: null,head_img: null,sku_id: 12,spu_id: 25,order_id: null,appraise: null,comment_txt: æµ8bè¯951219,create_time: null,operate_time: null} } 拦截器 // flume采集的每条数据 eventOverridepublic Event intercept(Event event) {MapString, String headers event.getHeaders();byte[] body event.getBody();String db new String(body, StandardCharsets.UTF_8);// 获取Maxwell输出的时间戳 单位是秒Long ts JSONObject.parseObject(db).getLong(ts);String table JSONObject.parseObject(db).getString(table);// flume的hdfs sink解析需要 毫秒headers.put(timestamp, String.valueOf(ts * 1000));headers.put(tableName,table);return event;} 4、增量表首日全量 通常情况下增量表需要在首日首次进行一次全量同步后续每日再进行增量同步首日全量同步可以使用Maxwell的bootstrap功能方便起见下面编写一个增量表首日全量同步脚本。 #!/bin/bash# 该脚本的作用是初始化所有的增量表只需执行一次MAXWELL_HOME/opt/module/maxwellimport_data() {$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties }case $1 in cart_info)import_data cart_info;; comment_info)import_data comment_info;; coupon_use)import_data coupon_use;; favor_info)import_data favor_info;; order_detail)import_data order_detail;; order_detail_activity)import_data order_detail_activity;; order_detail_coupon)import_data order_detail_coupon;; order_info)import_data order_info;; order_refund_info)import_data order_refund_info;; order_status_log)import_data order_status_log;; payment_info)import_data payment_info;; refund_payment)import_data refund_payment;; user_info)import_data user_info;; all)import_data cart_infoimport_data comment_infoimport_data coupon_useimport_data favor_infoimport_data order_detailimport_data order_detail_activityimport_data order_detail_couponimport_data order_infoimport_data order_refund_infoimport_data order_status_logimport_data payment_infoimport_data refund_paymentimport_data user_info;; esac 离线数仓环境准备 现在日志数据和业务数据都采集过来了是位于hdfs的文件中需要把数据加入到我们数据仓库中第一步先加入到hive中。 1、hive安装 大数据-hive-CSDN博客
http://www.hkea.cn/news/14308398/

相关文章:

  • 灌南县建设局网站内外外贸购物网站建设
  • 如何建一个免费试用网站怎样建设网站
  • 不同网站对商家做o2o的政策如何做网站搜索栏
  • 中国建设银行云南官网站纪念币赣州卫生人才考试网
  • 云南网站建设500平台电商运营
  • 做百度网站哪家公司好如何用小米路由器做网站
  • 确山专业网站建设做优化很好的网站
  • seo网站源码网站被泛解析
  • 网站建设pdf 下载用asp做网站视频
  • 网站开发 图片储存wordpress 页面404
  • joomla 网站图标普通人做电商要多少钱
  • 江山市建设局网站服务器在国外怎样做网站镜像
  • 双辽建设局网站杭州建站模板系统
  • 物流网站的建设网站2级目录怎么做
  • 手机网站建设cz35临漳网站建设
  • 生活做爰网站成都营销型网站建设公司
  • 网站开发的上市公司有哪些站长之家html
  • 扬中网站推广托管工商注册核名查询官网
  • 在哪个网站找水利工地做小程序自助建站
  • 微信手机客户端网站建设做网站用微软雅黑侵权吗
  • 制作钓鱼网站的费用wap视频网站建设难吗
  • 招聘网站设计师要求自己制作免费网站
  • 网站内链少改怎么做wordpress写表格
  • 手机网站字体大小自适应网站开发建设推荐用书
  • 安徽建设监理协会网站做的网站为什么图片看不了怎么回事
  • 毕业设计做课程网站好中铁建设集团有限公司登录
  • 网站刷排名工具wordpress多站点使用期限插件
  • 2017年做那家网站好做网站用什么ui美观
  • 北京网站设计合理刻找人代做网站注意事项
  • 怎样让网站排名优化工没网站怎么做cpa