做收集信息的网站,最新国内新闻10条,创意设计报告模板,做wish如何利用数据网站进入
DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地#xff0c;命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写#xff0c;java改成…进入
DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写java改成了kotlin
配置文件
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers
security.protocolSASL_SSL
sasl.mechanismsPLAIN
sasl.username
sasl.password
group.id
auto.offset.resetearliest
# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms45000
import timefrom confluent_kafka import Producer, Consumer
import asyncio
import threadingclass KafkaClient:def __init__(self, config_file):self.config self.read_config(config_file)def read_config(self, config_file):config {}with open(config_file) as fh:for line in fh:line line.strip()if len(line) ! 0 and line[0] ! #:parameter, value line.strip().split(, 1)config[parameter] value.strip()return configdef produce(self, topic, key, value):# Creates a new producer instanceproducer Producer(self.config)# Produces a sample messageproducer.produce(topic, keykey, valuevalue)print(fProduced message to topic {topic}: key {key:12} value {value:12})# Send any outstanding or buffered messages to the Kafka brokerproducer.flush()def consume_async(self, topic, callbackNone, group_idpython-group-1, auto_offset_resetearliest):# Sets the consumer group ID and offsetself.config[group.id] group_idself.config[auto.offset.reset] auto_offset_resetconsumer Consumer(self.config)consumer.subscribe([topic])loop asyncio.new_event_loop()asyncio.set_event_loop(loop)if callback is not None:loop.run_until_complete(callback(consumer))def consume(self, topic, callbackNone):thread threading.Thread(targetself.consume_async, args(topic, callback,))thread.start()return threadasync def consume_async(consumer):try:while True:msg consumer.poll(1.0)if msg is not None:breakif not msg.error():key msg.key().decode(utf-8)value msg.value().decode(utf-8)print(fConsumed message: key {key:12} value {value:12})except KeyboardInterrupt:passfinally:consumer.close()config_file_path .\\client.properties
topic test
key key
value valuekafka_client KafkaClient(config_file_path)
kafka_client.produce(topic, key, value)
thread kafka_client.consume(topic, consume_async)
配置文件
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers
security.protocolSASL_SSL
sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required usernameGHFXZDIOMQW3IPKA passwordTimUk7hj/EwTiB031lA95LeKfXN3t2DdnwizhKx37wFxZKMLGEqTOnneTKrlQQ;
sasl.mechanismPLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookupuse_all_dns_ips
key.serializerorg.apache.kafka.common.serialization.StringSerializer
value.serializerorg.apache.kafka.common.serialization.StringSerializer
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms45000
topic
group.id
auto.offset.resetearliest
key.deserializerorg.apache.kafka.common.serialization.StringDeserializer
value.deserializerorg.apache.kafka.common.serialization.StringDeserializer
# Best practice for Kafka producer to prevent data loss
acksall
java(kotiln) import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.io.FileInputStream
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.util.*class KafkaClientT, V : Closeable {private var producer: KafkaProducerT, V? nullprivate var fileConfig: Properties? nullval TOPIC topicprivate val DURATION 100Lprivate val POOLSIZE 10private val DISPATCHER newFixedThreadPoolContext(POOLSIZE, CoroutinePool)private val SCOPE CoroutineScope(DISPATCHER)constructor(configPath: String? null, config: Properties? null) {if (config null configPath null) throw Exception(dont have any config)var config1 Properties()if (configPath ! null) {fileConfig readConfig(configPath)fileConfig?.let { config1.putAll(it) }}if (config ! null) {config1.putAll(config)}producer KafkaProducer(config1)}fun produce(key: T, value: V, topic: String? null) {producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))}fun consume(func: suspend (ConsumerRecordsT, V) - Unit) {val consumer: KafkaConsumerT, V KafkaConsumer(fileConfig)consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))SCOPE.launch {while (true) {val records: ConsumerRecordsT, V consumer.poll(Duration.ofMillis(DURATION))func(records)delay(DURATION)}}}Throws(IOException::class)fun readConfig(configFile: String): Properties {if (!Files.exists(Paths.get(configFile))) {throw IOException($configFile not found.)}val config Properties()FileInputStream(configFile).use { inputStream - config.load(inputStream) }return config}override fun close() {producer?.close()}
}fun main() {val cli KafkaClientString, String(D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties)cli.consume {println(test beg)for (record in it) {println(String.format(Consumed message from topic %s: key %s value %s, cli.TOPIC, record.key(), record.value()))}println(test end)}// Give some time for the consumer to startThread.sleep(2000)cli.produce(key1, test)// Give some time for the consumer to consume the messageThread.sleep(5000)
}