当前位置: 首页 > news >正文

phpcms获取网站访问量不良广告

phpcms获取网站访问量,不良广告,网页编辑与网站编辑,使用他人注册商标做网站水善利万物而不争#xff0c;处众人之所恶#xff0c;故几于道#x1f4a6; 文章目录 1. Kafka_Sink 2. Kafka_Sink - 自定义序列化器 3. Redis_Sink_String 4. Redis_Sink_list 5. Redis_Sink_set 6. Redis_Sink_hash 7. 有界流数据写入到ES 8. 无界流数据写入到ES 9. 自定… 水善利万物而不争处众人之所恶故几于道 文章目录 1. Kafka_Sink 2. Kafka_Sink - 自定义序列化器 3. Redis_Sink_String 4. Redis_Sink_list 5. Redis_Sink_set 6. Redis_Sink_hash 7. 有界流数据写入到ES 8. 无界流数据写入到ES 9. 自定义sink - mysql_Sink 10. Jdbc_Sink 官方文档 - Flink1.13 1. Kafka_Sink addSink(new FlinkKafkaProducer String(kafka_address,topic,序列化器) 要先添加依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion1.13.6/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);stream.keyBy(WaterSensor::getId).sum(vc).map(JSON::toJSONString).addSink(new FlinkKafkaProducerString(hadoop101:9092, // kafaka地址flink_sink_kafka, //要写入的Kafkatopicnew SimpleStringSchema() // 序列化器));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 2. Kafka_Sink - 自定义序列化器 自定义序列化器new FlinkKafkaProducer()的时候选择四个参数的构造方法然后使用new KafkaSerializationSchema序列化器。然后重写serialize方法 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);Properties sinkConfig new Properties();sinkConfig.setProperty(bootstrap.servers,hadoop101:9092);stream.keyBy(WaterSensor::getId).sum(vc).addSink(new FlinkKafkaProducerWaterSensor(defaultTopic, // 默认发往的topic ,一般用不上new KafkaSerializationSchemaWaterSensor() { // 自定义的序列化器Overridepublic ProducerRecordbyte[], byte[] serialize(WaterSensor waterSensor,Nullable Long aLong) {String s JSON.toJSONString(waterSensor);return new ProducerRecord(flink_sink_kafka,s.getBytes(StandardCharsets.UTF_8));}},sinkConfig, // Kafka的配置FlinkKafkaProducer.Semantic.AT_LEAST_ONCE // 一致性语义现在只能传入至少一次));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 3. Redis_Sink_String addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到String结构里面 添加依赖 dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version /dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.1.5/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);/* 往redis里面写字符串string 命令提示符用set 假设写的key是id,value是整个json格式的字符串 key value sensor_1 json格式字符串*/// new一个单机版的配置FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100) //最大连接数量.setMaxIdle(10) // 连接池里面的最大空闲.setMinIdle(2) // 连接池里面的最小空闲.setTimeout(10*1000) // 超时时间.build();// 写出到redis中result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {// 返回命令描述符往不同的数据结构写数据用的方法不一样Overridepublic RedisCommandDescription getCommandDescription() {// 写入到字符串用setreturn new RedisCommandDescription(RedisCommand.SET);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 4. Redis_Sink_list addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到 list 结构里面 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);// key是idvalue是处理后的json格式字符串FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100) //最大连接数量.setMaxIdle(10) // 连接池里面的最大空闲.setMinIdle(2) // 连接池里面的最小空闲.setTimeout(10*1000) // 超时时间.build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 写入listreturn new RedisCommandDescription(RedisCommand.RPUSH);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 5. Redis_Sink_set addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到 set 结构里面 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入set集合return new RedisCommandDescription(RedisCommand.SADD);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 6. Redis_Sink_hash addSink(new RedisSink(config, new RedisMapper WaterSensor() {} 写到 hash结构里面 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);FlinkJedisPoolConfig config new FlinkJedisPoolConfig.Builder().setHost(hadoop101).setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();result.addSink(new RedisSink(config, new RedisMapperWaterSensor() {Overridepublic RedisCommandDescription getCommandDescription() {// 数据写入hashreturn new RedisCommandDescription(RedisCommand.HSET,a);}Overridepublic String getKeyFromData(WaterSensor waterSensor) {return waterSensor.getId();}Overridepublic String getValueFromData(WaterSensor waterSensor) {return JSON.toJSONString(waterSensor);}}));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 7. 有界流数据写入到ES中 new ElasticsearchSink.Builder() public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);ListHttpHost hosts Arrays.asList(new HttpHost(hadoop101, 9200),new HttpHost(hadoop102, 9200),new HttpHost(hadoop103, 9200));ElasticsearchSink.BuilderWaterSensor builder new ElasticsearchSink.BuilderWaterSensor(hosts,new ElasticsearchSinkFunctionWaterSensor() {Overridepublic void process(WaterSensor element, // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文 不是context上下文对象RequestIndexer requestIndexer) { // 把要写出的数据封装到RequestIndexer里面String msg JSON.toJSONString(element);IndexRequest ir Requests.indexRequest(sensor).type(_doc) // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId()) // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir); // 把ir存入到indexer, 就会自动的写入到es中}});result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();} }8. 无界流数据写入到ES   和有界差不多 只不过把数据源换成socket然后因为无界流它高效不是你来一条就刷出去所以设置刷新时间、大小、条数才能看到结果。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);SingleOutputStreamOperatorWaterSensor result env.socketTextStream(hadoop101,9999).map(line-{String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).sum(vc);ListHttpHost hosts Arrays.asList(new HttpHost(hadoop101, 9200),new HttpHost(hadoop102, 9200),new HttpHost(hadoop103, 9200));ElasticsearchSink.BuilderWaterSensor builder new ElasticsearchSink.BuilderWaterSensor(hosts,new ElasticsearchSinkFunctionWaterSensor() {Overridepublic void process(WaterSensor element, // 需要写出的元素RuntimeContext runtimeContext, // 运行时上下文 不是context上下文对象RequestIndexer requestIndexer) { // 把要写出的数据封装到RequestIndexer里面String msg JSON.toJSONString(element);IndexRequest ir Requests.indexRequest(sensor).type(_doc) // 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId()) // 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg, XContentType.JSON);requestIndexer.add(ir); // 把ir存入到indexer, 就会自动的写入到es中}});// 自动刷新时间builder.setBulkFlushInterval(2000); // 默认不会根据时间自动刷新builder.setBulkFlushMaxSizeMb(1024); // 当批次中的数据大于等于这个值刷新builder.setBulkFlushMaxActions(2); // 每来多少条数据刷新一次// 这三个是或的关系只要有一个满足就会刷新result.addSink(builder.build());try {env.execute();} catch (Exception e) {e.printStackTrace();} }9. 自定义sink - mysql_Sink 需要写一个类实现RichSinkFunction然后实现invoke方法。这里因为是写MySQL所以需要建立连接那就用Rich版本。 记得导入MySQL依赖 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port, 1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);result.addSink(new MySqlSink());try {env.execute();} catch (Exception e) {e.printStackTrace();}}public static class MySqlSink extends RichSinkFunctionWaterSensor {private Connection connection;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);connection DriverManager.getConnection(jdbc:mysql://hadoop101:3306/test?useSSLfalse, root, 123456);}Overridepublic void close() throws Exception {if (connection!null){connection.close();}}// 调用每来一条元素这个方法执行一次Overridepublic void invoke(WaterSensor value, Context context) throws Exception {// jdbc的方式想MySQL写数据 // String sql insert into sensor(id,ts,vc)values(?,?,?);//如果主键不重复就新增主键重复就更新 // String sql insert into sensor(id,ts,vc)values(?,?,?) duplicate key update vc?;String sql replace into sensor(id,ts,vc)values(?,?,?);// 1. 得到预处理语句PreparedStatement ps connection.prepareStatement(sql);// 2. 给sql中的占位符进行赋值ps.setString(1,value.getId());ps.setLong(2,value.getTs());ps.setInt(3,value.getVc()); // ps.setInt(4,value.getVc());// 3. 执行ps.execute();// 4. 提交 // connection.commit(); MySQL默认自动提交所以这个地方不用调用// 5. 关闭预处理ps.close();} }运行结果 10. Jdbc_Sink addSink(JdbcSink.sink(sql,JdbcStatementBuilder,执行参数,连接参数) 对于jdbc数据库我们其实没必要自定义因为官方给我们了一个JDBC Sink - 官方JDBC Sink 传送门 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion1.13.6/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ArrayListWaterSensor waterSensors new ArrayList();waterSensors.add(new WaterSensor(sensor_1, 1607527992000L, 20));waterSensors.add(new WaterSensor(sensor_1, 1607527994000L, 50));waterSensors.add(new WaterSensor(sensor_1, 1607527996000L, 50));waterSensors.add(new WaterSensor(sensor_2, 1607527993000L, 10));waterSensors.add(new WaterSensor(sensor_2, 1607527995000L, 30));DataStreamSourceWaterSensor stream env.fromCollection(waterSensors);SingleOutputStreamOperatorWaterSensor result stream.keyBy(WaterSensor::getId).sum(vc);result.addSink(JdbcSink.sink(replace into sensor(id,ts,vc)values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement ps,WaterSensor waterSensor) throws SQLException {// 只做一件事给占位符赋值ps.setString(1,waterSensor.getId());ps.setLong(2,waterSensor.getTs());ps.setInt(3,waterSensor.getVc());}},new JdbcExecutionOptions.Builder() //设置执行参数.withBatchSize(1024) // 刷新大小上限.withBatchIntervalMs(2000) //刷新间隔.withMaxRetries(3) // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(com.mysql.cj.jdbc.Driver).withUrl(jdbc:mysql://hadoop101:3306/test?useSSLfalse).withUsername(root).withPassword(123456).build()));try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果
http://www.hkea.cn/news/14460195/

相关文章:

  • 太平洋手机官方网站注册完域名之后怎么找到网站
  • 网站项目合同淄博网站制作公司定制
  • 签合网站是哪个好中山网页建站模板
  • 语文建设投稿网站wordpress怎么装模板
  • 迅睿cms建站帝国做的电影网站
  • 淘客cms网站建设网站用单页面框架做
  • 盘锦建网站网络推广员怎么做
  • 营销型网站建设找哪家有哪些看设计做品的网站
  • 建网站公司专业制作网站基本步骤
  • 最新网站建设视频永康市建设银行网站查询
  • 网站框架策划租用网站服务器
  • 金融企业网站建设公司公司手机网站制作
  • 网站开发的可行性报告创业项目
  • 公司网站建设公司排名宝安区网站建设培训
  • 网站游戏入口h5素材做多的网站
  • 织梦m网站伪静态各大免费推广网站
  • html5 网站公司网站建设计划
  • 网站备案审核要多久商城小程序介绍
  • 网站建设公司在哪里找资源龙南黄页全部电话
  • 国外做的比较的ppt网站长沙网站设计服务商
  • 把asp.net写的网站别人怎么访问关于网站建设 策划文案
  • 百度上公司做网站sem培训学校
  • 上海省住房与城乡建设厅网站安徽柱石建设有限公司网站
  • 网站平台建设十大公司郑州经济技术开发区政务服务中心
  • 网站开发的路径是什么聊城市建设局网站
  • 做网站销售说辞wordpress 分类目录 丢失
  • 网站建设预算表格网站的后续优化方案
  • 可信网站logo哈尔滨百度网站快速优化
  • 洛阳网站制作鄞州区住房和城乡建设局网站
  • 怎么给汽车网站做推广15秒创意广告短片