当前位置: 首页 > news >正文

创建网站收费在东莞建公司网站

创建网站收费,在东莞建公司网站,江苏泰州海陵区建设局网站,怎么做微信小说网站吗目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors…目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataStream API 程序首先需要声明一个执行环境StreamExecutionEnvironment这是流式程序执行的上下文。 你将通过它来设置作业的属性例如默认并发度、重启策略等、创建源、并最终触发作业的执行。 env StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.BATCH) env.set_parallelism(1) 创建了 StreamExecutionEnvironment 之后你可以使用它来声明数据源。数据源从外部系统如 Apache Kafka、Rabbit MQ 或 Apache Pulsar拉取数据到 Flink 作业里。 为了简单起见本教程读取文件作为数据源。 ds env.from_source(sourceFileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategyWatermarkStrategy.for_monotonous_timestamps(),source_namefile_source ) Watermark 大部分情况下流到operator的数据都是按照事件产生的时间顺序来的但是也不排除由于网络、分布式等原因导致乱序的产生所谓乱序就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。 为了解决乱序数据flink引入watermark。引入watermark机制则会等待晚到的数据一段时间等待时间到则触发计算如果数据延迟很大通常也会被丢弃或者另外处理。 为了使用事件时间语义Flink 应用程序需要知道事件时间戳对应的字段意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。 watermark策略简介 时间戳的分配与 watermark 的生成是齐头并进的其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。 使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略并且用户也可以在某些必要场景下构建自己的 watermark 策略。 使用 Watermark 策略 WatermarkStrategy 可以在 Flink 应用程序中的两处使用第一种是直接在数据源上使用第二种是直接在非数据源的操作之后使用。 第一种方式相比会更好因为数据源可以利用 watermark 生成逻辑中有关分片/分区shards/partitions/splits的信息。使用这种方式数据源通常可以更精准地跟踪 watermark整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口。 仅当无法直接在数据源上设置策略时才应该使用第二种方式在任意转换操作之后设置 WatermarkStrategy 内置水印生成器 水印策略定义了如何在流源中生成水印。WatermarkStrategy是生成水印的WatermarkGenerator和分配记录内部时间戳的TimestampAssigner的生成器/工厂。 BoundedOutOfOrdernessDuration为创建WatermarkStrategy常见的内置策略。 for_bound_out_of_ordernness(max_out_of_ordernesspyflink.common.time.Duration)为记录无序的情况创建水印策略但可以设置事件无序程度的上限。 无序绑定B意味着一旦遇到时间戳为T的事件就不会再出现早于T-B的事件。 for_bound_out_of_ordernness(5) for_mononous_timestamps()为时间戳单调递增的情况创建水印策略。 水印是定期生成的并严格遵循数据中的最新时间戳。该策略引入的延迟主要是生成水印的周期间隔。 WatermarkStrategy.for_monotonous_timestamps() with_timestamp_assigner(timestamp_assigner:pyflink.common.watermark_strategy.TimestampAssigner) 创建一个新的WatermarkStrategy该策略通过实现TimestampAssigner接口使用给定的TimestampAssigner。 参数: timestamp_assigner 给定的TimestampAssigner。 Return: 包装TimestampAssigner的WaterMarkStrategy。 watermark_strategy WatermarkStrategy.for_monotonous_timestamps() with_timestamp_assigner(MyTimestampAssigner()) 处理空闲数据源 如果数据源中的某一个分区/分片在一段时间内未发送事件数据则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值则其 watermark 将不会发生变化。 为了解决这个问题你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口withIdleness(Duration.ofMinutes(1)) with_idleness(idle_timeout:pyfrink.common.time.Duration) 创建一个新的丰富的WatermarkStrategy它也在创建的WatermarkGenerator中执行空闲检测。 参数idle_timeout–空闲超时。 Return配置了空闲检测的新水印策略。 算子处理 Watermark 的方式 一般情况下在将 watermark 转发到下游之前需要算子对其进行触发的事件完全进行处理。例如WindowOperator 将首先计算该 watermark 触发的所有窗口数据当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后其才会被发送到下游。换句话说由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。 相同的规则也适用于 TwoInputstreamOperator。但是在这种情况下算子当前的 watermark 会取其两个输入的最小值。 创建DataStream的方式 通过list对象创建 from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironmentenv StreamExecutionEnvironment.get_execution_environment() ds env.from_collection(collection[(1, aaa|bb), (2, bb|a), (3, aaa|a)],type_infoTypes.ROW([Types.INT(), Types.STRING()])) ​​​​​​使用DataStream connectors创建 使用add_source函数此函数仅支持FlinkKafkaConsumer仅在streaming执行模式下使用 from pyflink.common.serialization import JsonRowDeserializationSchema from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import FlinkKafkaConsumerenv StreamExecutionEnvironment.get_execution_environment() # the sql connector for kafka is used here as its a fat jar and could avoid dependency issues env.add_jars(file:///path/to/flink-sql-connector-kafka.jar) deserialization_schema JsonRowDeserializationSchema.builder() \.type_info(type_infoTypes.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer FlinkKafkaConsumer(topicstest_source_topic,deserialization_schemadeserialization_schema,properties{bootstrap.servers: localhost:9092, group.id: test_group})ds env.add_source(kafka_consumer) 使用from_source函数此函数仅支持NumberSequenceSource和FileSource自定义数据源仅在streaming执行模式下使用 from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import WatermarkStrategy from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import NumberSequenceSourceenv StreamExecutionEnvironment.get_execution_environment() seq_num_source NumberSequenceSource(1, 1000) ds env.from_source(sourceseq_num_source,watermark_strategyWatermarkStrategy.for_monotonous_timestamps(),source_nameseq_num_source,type_infoTypes.LONG()) ​​​​​​​使用Table SQL connectors创建 首先用Table SQL connectors创建表再转换为DataStream. from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironmentenv StreamExecutionEnvironment.get_execution_environment() t_env StreamTableEnvironment.create(stream_execution_environmentenv)t_env.execute_sql(CREATE TABLE my_source (a INT,b VARCHAR) WITH (connector datagen,number-of-rows 10))ds t_env.to_append_stream(t_env.from_path(my_source),Types.ROW([Types.INT(), Types.STRING()]))
http://www.hkea.cn/news/14273253/

相关文章:

  • 做网站的动态图片wordpress主题屏蔽更新
  • 河源手机网站制作做网站前需要做什么准备
  • 全国建筑网站微信引流推广 方法
  • 晋江企业网站制作建站是什么专业
  • 小鱼在线网站建设学校网站建设哪家好
  • 游戏秒玩网站免费推广网址
  • js写的网站怎么做seo短链接制作
  • 《美食天下》网站的建设甘肃网站建设项目
  • 全国有哪些做服装的网站给你一个网站怎么做的
  • 网站gif图标星大建设集团招聘网站
  • 遵义网站建公司wordpress漫画网站
  • 龙华网站建设营销推广微信小程序开发的优势
  • 货代网站建设阿里巴巴国际站外贸流程
  • 百度站长平台链接提交网站开发用户功能分析
  • 海口分类信息网站wordpress支持中文用户名
  • c语言做的网站高端网站建设一般多少钱
  • 宁波网站制作报价在淘宝上做的网站要转出
  • 甘肃网站建设推广服务宁波正规seo推广公司
  • 汉阳网站建设工信部icp备案官网
  • 上海市区网站设计制作公司手机搭建网站教程
  • 快速网站建设费用莱芜大集
  • 太原自助模板建站怎么建设一个电影资源网站解析
  • dw做网站设计晋城龙采网站建设
  • 酷站网部队网站建设报告
  • 重庆网站开发企业什么亲子网站可以做一下广告词
  • 坡头手机网站建设公司h5做商城网站
  • 厦门it做网站最强wordpress中页面伪静态页面
  • 建站园wordpress添加表
  • 做域名后就得做网站吗海外网站服务器租用
  • php网站分类目录源码frontpage网站模板下载