网站建设与维护 前台,做营销型网站的企业,企业推广的成功在于他们发现,网站功能表一.Spark Streaming 框架介绍 Spark Streaming 是 Spark core API 的扩展#xff0c;支持实时数据流的处理#xff0c;并且具有可扩展#xff0c; 高吞吐量#xff0c;容错的特点。 数据可以从许多来源获取#xff0c;如 Kafka #xff0c; Flume #xff0c; Kin…一.Spark Streaming 框架介绍 Spark Streaming 是 Spark core API 的扩展支持实时数据流的处理并且具有可扩展 高吞吐量容错的特点。 数据可以从许多来源获取如 Kafka Flume Kinesis 或 TCP sockets 并且可以使用复杂的算法进行处理这些算法使用诸如 map reduce join 和 window 等高 级函数表示。 最后处理后的数据可以推送到文件系统数据库等。 实际上您可以将 Spark 的机器学习和图形处理算法应用于数据流。 二.框架集成
1. 创建 Maven 项目
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.es/groupIdartifactIdes-sparkstreaming/artifactIdversion1.0/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion3.0.0/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion3.0.0/version/dependencydependencygroupIdorg.elasticsearch/groupIdartifactIdelasticsearch/artifactIdversion7.8.0/version/dependency!-- elasticsearch的客户端 --dependencygroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-high-level-client/artifactIdversion7.8.0/version/dependency!-- elasticsearch依赖2.x的log4j --dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion2.8.2/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion2.8.2/version/dependency!-- dependency--!-- groupIdcom.fasterxml.jackson.core/groupId--!-- artifactIdjackson-databind/artifactId--!-- version2.11.1/version--!-- /dependency--!-- lt;!ndash; junit单元测试 ndash;gt;--!-- dependency--!-- groupIdjunit/groupId--!-- artifactIdjunit/artifactId--!-- version4.12/version--!-- /dependency--/dependencies
/project
2.功能实现
package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(ESTest)val ssc new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] ssc.socketTextStream(localhost, 9999)ds.foreachRDD(rdd {rdd.foreach(data {val client new RestHighLevelClient(RestClient.builder(new HttpHost(localhost,9200, http)))val ss data.split( )val request new IndexRequest()request.index(product).id(ss(0))val json s| { data : ${ss(1)} }|.stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}3.界面截图
三.安装NetCat
1.下载网址netcat 1.11 for Win32/Win64 2.解压压缩包
右键zip文件--解压到当前文件夹
3.配置环境变量
右键此电脑--属性--高级系统设置--环境变量 四.测试
Window R 重新启动cmd命令窗口
4.1测试输入 nc -l -p 9999
4.2 启动测试 4.3 cmd输入 1001 jianzi 4.4 postman 查看
get http://127.0.0.1:9200/product/_doc/1001