当前位置: 首页 > news >正文

潍坊网站制作公司网站程序 不能创建文件夹

潍坊网站制作公司,网站程序 不能创建文件夹,沈阳工程建设信息网,wordpress给帖子打分学习笔记 Flink作为数据处理框架#xff0c;最终还是要把计算处理的结果写入外部存储#xff0c;为外部应用提供支持。 文章目录 **连接到外部系统****输出到文件**输出到 Kafka输出到 mysql自定义 sink 连接到外部系统 Flink的DataStream API专门提供了向外部写入数据的方… 学习笔记 Flink作为数据处理框架最终还是要把计算处理的结果写入外部存储为外部应用提供支持。 文章目录 **连接到外部系统****输出到文件**输出到 Kafka输出到 mysql自定义 sink 连接到外部系统 Flink的DataStream API专门提供了向外部写入数据的方法addSink。与addSource类似addSink方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。 Flink1.12以前Sink算子的创建是通过调用DataStream的.addSink()方法实现的。 stream.addSink(new SinkFunction(…)); addSink方法同样需要传入一个参数实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke()用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。 Flink1.12开始同样重构了Sink架构 stream.sinkTo(…) 当然Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示列出了Flink官方目前支持的第三方系统连接器 https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/ 我们可以看到像Kafka之类流式系统Flink提供了完美对接source/sink两端都能连接可读可写而对于Elasticsearch、JDBC等数据存储系统则只提供了输出写入的sink连接器。 除Flink官方之外Apache Bahir框架也实现了一些其他第三方系统与Flink的连接器。 除此以外就需要用户自定义实现sink连接器了。 输出到文件 Flink专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的Sink它可以将分区文件写入Flink支持的文件系统。 FileSink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder可以直接调用FileSink的静态方法 行编码 FileSink.forRowFormatbasePathrowEncoder。批量编码 FileSink.forBulkFormatbasePathbulkWriterFactory。 public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generator);// 输出到文件系统FileSinkString fieSink FileSink// 输出行式存储的文件指定路径、指定编码.StringforRowFormat(new Path(f:/tmp), new SimpleStringEncoder(UTF-8))// 输出文件的一些配置 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(atguigu-).withPartSuffix(.log).build())// 按照目录分桶如下就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 文件滚动策略: 1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();} } 输出到 Kafka 1添加Kafka 连接器依赖 由于我们已经测试过从Kafka数据源读取数据连接器相关依赖已经引入这里就不重复介绍了。 2启动Kafka集群 3编写输出到Kafka的示例代码 public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次必须开启checkpoint后续章节介绍env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** Kafka Sink:* TODO 注意如果要使用 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint后续介绍* 2、设置事务前缀* 3、设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次必须设置 事务的前缀.setTransactionalIdPrefix(atguigu-)// 如果是精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();} }自定义序列化器实现带key的record: public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** 如果要指定写入kafka的key可以自定义序列化器* 1、实现 一个接口重写 序列化 方法* 2、指定key转成 字节数组* 3、指定value转成 字节数组* 4、返回一个 ProducerRecord对象把key、value放进去*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setRecordSerializer(new KafkaRecordSerializationSchemaString() {NullableOverridepublic ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(ws, key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(atguigu-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();sensorDS.sinkTo(kafkaSink);env.execute();} }输出到 mysql 写入数据的MySQL的测试步骤如下。 1添加依赖 添加MySQL驱动 dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.27/version /dependency官方还未提供flink-connector-jdbc的1.17.0的正式依赖暂时从apache snapshot仓库下载pom文件中指定仓库路径 repositoriesrepositoryidapache-snapshots/idnameapache snapshots/name urlhttps://repository.apache.org/content/repositories/snapshots//url/repository /repositories添加依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17-SNAPSHOT/version /dependency如果不生效还需要修改本地maven的配置文件mirrorOf中添加如下标红内容 mirroridaliyunmaven/idmirrorOf*,!apache-snapshots/mirrorOfname阿里云公共仓库/nameurlhttps://maven.aliyun.com/repository/public/url /mirror2启动MySQL在test库下建表ws mysql CREATE TABLE ws ( id varchar(100) NOT NULL, ts bigint(20) DEFAULT NULL, vc int(11) DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf83编写输出到MySQL的示例代码 public class SinkMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperatorWaterSensor sensorDS env .socketTextStream(hadoop102, 7777) .map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法 addsink* 2、JDBCSink的4个参数:* 第一个参数 执行的sql一般就是 insert into* 第二个参数 预编译sql 对占位符填充值* 第三个参数 执行选项 ---》 攒批、重试* 第四个参数 连接选项 ---》 url、用户名、密码*/ SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://hadoop102:3306/test?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(000000).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build() );sensorDS.addSink(jdbcSink);env.execute(); } }4运行代码用客户端连接MySQL查看是否成功写入数据。 自定义 sink 如果我们想将数据存储到我们自己的存储设备中而Flink并没有提供可以直接使用的连接器就只能自定义Sink进行输出了。与Source类似Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类只要实现它通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。 stream.addSink(new MySinkFunction()); 在实现SinkFunction的时候需要重写的一个关键方法invoke()在这个方法中我们就可以实现将流里的数据发送出去的逻辑。 这种方式比较通用对于任何外部存储系统都有效不过自定义Sink想要实现状态一致性并不容易所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现而且在不断地扩充因此自定义的场景并不常见。
http://www.hkea.cn/news/14290817/

相关文章:

  • 张家界市网站建设设计提供电商网站建设
  • 网站建设后预期推广方式怎样修改wordpress密码
  • iis7 添加php网站开发一个商城网站多少钱
  • 网站的角色设置如何做湖北seo服务
  • 如何免费做网站的教程四川建筑人才招聘网
  • 网站建设平台网站设计论坛搭建教程
  • 吉林省建设监理协会网站全面的聊城网站建设
  • 江苏南京建设局官方网站域名解析到别的网站
  • 30岁做网站运营今天发生的重大新闻
  • 河北省城乡建设培训网官方网站抖音流量推广神器软件
  • 网站建设空间多大网站后台不显示验证码
  • 南阳做个网站多少钱上海网站建设开发电话
  • 北京网站建设推seo优化技巧有哪些
  • 哈尔滨智能建站模板郑州做网站建设哪家好
  • 用jsp做婚纱网站的流程海会主机做的网站都能干什么的
  • 有做门窗找活的网站吗网站建设万户
  • 网站建设百度认证图片企业做什么需要有网站
  • 网站商品台管理系统做网站为什么能挣钱
  • 国外免费空间网站申请网站版面布局结构图
  • 大连网站关键词推广html5网站搭建
  • 商城网站建设信息免费站推广网站2022
  • 头条网站收录提交入口互联网保险中介平台
  • 四川省建设厅申报网站南宁模板建站
  • 成都网站seo诊断石材企业网站源码
  • 百度云注册域名可以做网站明码玉环哪里有做网站
  • 汽车o2o网站建设国外代理ip地址 免费
  • 南宁企业网站建设制作如何熟悉网站项目的逻辑
  • dede搭建网站教程supercell账号注册网站
  • 在新西兰做兼职的网站网络营销专业好不好
  • dedecms 门户网站查看wordpress密码破解