当前位置: 首页 > news >正文

做百度网站费用多少有经验的佛山网站建设

做百度网站费用多少,有经验的佛山网站建设,推广普通话的重要性,常州抖音seokafka尚硅谷视频#xff1a; 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​ 1. producer初始化#xff1a;加载默认配置#xff0c;以及配置的参数#xff0c;开启网络线程 2. 拦截器拦截 3. 序列化器进行消息key, value序列化 4. 进行分区 5. kafka broker集群 获取…kafka尚硅谷视频 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​ 1. producer初始化加载默认配置以及配置的参数开启网络线程 2. 拦截器拦截 3. 序列化器进行消息key, value序列化 4. 进行分区 5. kafka broker集群 获取metaData 6. 消息缓存到RecordAccumulator收集器分配到该分区的DQueue(RecordBatch) 7. batch.size满了或者linker.ms到达指定时间唤醒sender线程 实例化networkClient RecordBatch RequestClient 发送消息体 8. 与分区相同broker建立网络连接发送到对应broker 1. send()方法参数producerRecord对象 ​ 关于分区 ​ a.指定分区则发送到该分区     b.不指定分区k值没有传入使用黏性分区sticky partition 第一次调用时随机生成一个整数后面每次调用在这个整数上自增将这个值与 topic 可用的 partition 总数取余得到 partition 值也就是常说的 round-robin 算法    c.不指定分区传入k值k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余进行分区。 如 k hash 5 topic目前的分区数2  则 分区为1 k  hash 6  topic目前的分区数2  则 分区为0 2. KafkaProducer 异步, 同步发送api 异步发送 producer.send(producerRecord对象) 同步发送则send()方法后面.get​ kafka 的send方法核心逻辑 public FutureRecordMetadata send(ProducerRecordK, V record) {return this.send(record, (Callback)null);}public FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {// 拦截器集合。多个拦截对象循环遍历ProducerRecordK, V interceptedRecord this.interceptors.onSend(record);return this.doSend(interceptedRecord, callback);}private FutureRecordMetadata doSend(ProducerRecordK, V record, Callback callback) {TopicPartition tp null;// 获取集群信息metadatatry {this.throwIfProducerClosed();long nowMs this.time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);} catch (KafkaException var22) {if (this.metadata.isClosed()) {throw new KafkaException(Producer closed while send in progress, var22);}throw var22;}nowMs clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster clusterAndWaitTime.cluster;// 序列化器 key序列化byte[] serializedKey;try {serializedKey this.keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException var21) {throw new SerializationException(Cant convert key of class record.key().getClass().getName() to class this.producerConfig.getClass(key.serializer).getName() specified in key.serializer, var21);}// 序列化器 value序列化byte[] serializedValue;try {serializedValue this.valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException var20) {throw new SerializationException(Cant convert value of class record.value().getClass().getName() to class this.producerConfig.getClass(value.serializer).getName() specified in value.serializer, var20);}// 分区int partition this.partition(record, serializedKey, serializedValue, cluster);tp new TopicPartition(record.topic(), partition);this.setReadOnly(record.headers());Header[] headers record.headers().toArray();int serializedSize AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);this.ensureValidRecordSize(serializedSize);long timestamp record.timestamp() null ? nowMs : record.timestamp();if (this.log.isTraceEnabled()) {this.log.trace(Attempting to append record {} with callback {} to topic {} partition {}, new Object[]{record, callback, record.topic(), partition});}Callback interceptCallback new InterceptorCallback(callback, this.interceptors, tp);// RecordAccumulator.append() 添加数据转 ProducerBatchRecordAccumulator.RecordAppendResult result this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition partition;this.partitioner.onNewBatch(record.topic(), cluster, partition);partition this.partition(record, serializedKey, serializedValue, cluster);tp new TopicPartition(record.topic(), partition);if (this.log.isTraceEnabled()) {this.log.trace(Retrying append due to new batch creation for topic {} partition {}. The old partition was {}, new Object[]{record.topic(), partition, prevPartition});}interceptCallback new InterceptorCallback(callback, this.interceptors, tp);result this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (this.transactionManager ! null) {this.transactionManager.maybeAddPartition(tp);}// 判断是否满了满了唤醒sender sender继承了runnableif (result.batchIsFull || result.newBatchCreated) {this.log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), partition);this.sender.wakeup();}return result.future;} catch (ApiException var23) {this.log.debug(Exception occurred during message send:, var23);if (tp null) {tp ProducerInterceptors.extractTopicPartition(record);}Callback interceptCallback new InterceptorCallback(callback, this.interceptors, tp);interceptCallback.onCompletion((RecordMetadata)null, var23);this.errors.record();this.interceptors.onSendError(record, tp, var23);return new FutureFailure(var23);} catch (InterruptedException var24) {this.errors.record();this.interceptors.onSendError(record, tp, var24);throw new InterruptException(var24);} catch (KafkaException var25) {this.errors.record();this.interceptors.onSendError(record, tp, var25);throw var25;} catch (Exception var26) {this.interceptors.onSendError(record, tp, var26);throw var26;}}Sender类 run()方法 public void run() {this.log.debug(Starting Kafka producer I/O thread.);while(this.running) {try {this.runOnce();} catch (Exception var5) {this.log.error(Uncaught error in kafka producer I/O thread: , var5);}}this.log.debug(Beginning shutdown of Kafka producer I/O thread, sending remaining records.);while(!this.forceClose (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() 0 || this.hasPendingTransactionalRequests())) {try {this.runOnce();} catch (Exception var4) {this.log.error(Uncaught error in kafka producer I/O thread: , var4);}}while(!this.forceClose this.transactionManager ! null this.transactionManager.hasOngoingTransaction()) {if (!this.transactionManager.isCompleting()) {this.log.info(Aborting incomplete transaction due to shutdown);this.transactionManager.beginAbort();}try {this.runOnce();} catch (Exception var3) {this.log.error(Uncaught error in kafka producer I/O thread: , var3);}}if (this.forceClose) {if (this.transactionManager ! null) {this.log.debug(Aborting incomplete transactional requests due to forced shutdown);this.transactionManager.close();}this.log.debug(Aborting incomplete batches due to forced shutdown);this.accumulator.abortIncompleteBatches();}try {this.client.close();} catch (Exception var2) {this.log.error(Failed to close network client, var2);}this.log.debug(Shutdown of Kafka producer I/O thread has completed.);}void runOnce() {if (this.transactionManager ! null) {try {this.transactionManager.maybeResolveSequences();if (this.transactionManager.hasFatalError()) {RuntimeException lastError this.transactionManager.lastError();if (lastError ! null) {this.maybeAbortBatches(lastError);}this.client.poll(this.retryBackoffMs, this.time.milliseconds());return;}this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();if (this.maybeSendAndPollTransactionalRequest()) {return;}} catch (AuthenticationException var5) {this.log.trace(Authentication exception while processing transactional request, var5);this.transactionManager.authenticationFailed(var5);}}long currentTimeMs this.time.milliseconds();// 发送数据long pollTimeout this.sendProducerData(currentTimeMs);this.client.poll(pollTimeout, currentTimeMs);} sendProducerData() : 最终转换为ClientRequest对象 ClientRequest clientRequest this.client.newClientRequest(nodeId, requestBuilder, now, acks ! 0, this.requestTimeoutMs, callback);this.client.send(clientRequest, now); private long sendProducerData(long now) {Cluster cluster this.metadata.fetch();RecordAccumulator.ReadyCheckResult result this.accumulator.ready(cluster, now);Iterator iter;if (!result.unknownLeaderTopics.isEmpty()) {iter result.unknownLeaderTopics.iterator();while(iter.hasNext()) {String topic (String)iter.next();this.metadata.add(topic, now);}this.log.debug(Requesting metadata update due to unknown leader topics from the batched records: {}, result.unknownLeaderTopics);this.metadata.requestUpdate();}iter result.readyNodes.iterator();long notReadyTimeout Long.MAX_VALUE;while(iter.hasNext()) {Node node (Node)iter.next();if (!this.client.ready(node, now)) {iter.remove();notReadyTimeout Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));}}MapInteger, ListProducerBatch batches this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);this.addToInflightBatches(batches);List expiredBatches;Iterator var11;ProducerBatch expiredBatch;if (this.guaranteeMessageOrder) {Iterator var9 batches.values().iterator();while(var9.hasNext()) {expiredBatches (List)var9.next();var11 expiredBatches.iterator();while(var11.hasNext()) {expiredBatch (ProducerBatch)var11.next();this.accumulator.mutePartition(expiredBatch.topicPartition);}}}this.accumulator.resetNextBatchExpiryTime();ListProducerBatch expiredInflightBatches this.getExpiredInflightBatches(now);expiredBatches this.accumulator.expiredBatches(now);expiredBatches.addAll(expiredInflightBatches);if (!expiredBatches.isEmpty()) {this.log.trace(Expired {} batches in accumulator, expiredBatches.size());}var11 expiredBatches.iterator();while(var11.hasNext()) {expiredBatch (ProducerBatch)var11.next();String errorMessage Expiring expiredBatch.recordCount record(s) for expiredBatch.topicPartition : (now - expiredBatch.createdMs) ms has passed since batch creation;this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);if (this.transactionManager ! null expiredBatch.inRetry()) {this.transactionManager.markSequenceUnresolved(expiredBatch);}}this.sensors.updateProduceRequestMetrics(batches);long pollTimeout Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);pollTimeout Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);pollTimeout Math.max(pollTimeout, 0L);if (!result.readyNodes.isEmpty()) {this.log.trace(Nodes with data ready to send: {}, result.readyNodes);pollTimeout 0L;}this.sendProduceRequests(batches, now);return pollTimeout;}private void sendProduceRequests(MapInteger, ListProducerBatch collated, long now) {Iterator var4 collated.entrySet().iterator();while(var4.hasNext()) {Map.EntryInteger, ListProducerBatch entry (Map.Entry)var4.next();this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());}}private void sendProduceRequest(long now, int destination, short acks, int timeout, ListProducerBatch batches) {if (!batches.isEmpty()) {MapTopicPartition, ProducerBatch recordsByPartition new HashMap(batches.size());byte minUsedMagic this.apiVersions.maxUsableProduceMagic();Iterator var9 batches.iterator();while(var9.hasNext()) {ProducerBatch batch (ProducerBatch)var9.next();if (batch.magic() minUsedMagic) {minUsedMagic batch.magic();}}ProduceRequestData.TopicProduceDataCollection tpd new ProduceRequestData.TopicProduceDataCollection();Iterator var16 batches.iterator();while(var16.hasNext()) {ProducerBatch batch (ProducerBatch)var16.next();TopicPartition tp batch.topicPartition;MemoryRecords records batch.records();if (!records.hasMatchingMagic(minUsedMagic)) {records (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();}ProduceRequestData.TopicProduceData tpData tpd.find(tp.topic());if (tpData null) {tpData (new ProduceRequestData.TopicProduceData()).setName(tp.topic());tpd.add(tpData);}tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));recordsByPartition.put(tp, batch);}String transactionalId null;if (this.transactionManager ! null this.transactionManager.isTransactional()) {transactionalId this.transactionManager.transactionalId();}ProduceRequest.Builder requestBuilder ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));RequestCompletionHandler callback (response) - {this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());};String nodeId Integer.toString(destination);ClientRequest clientRequest this.client.newClientRequest(nodeId, requestBuilder, now, acks ! 0, this.requestTimeoutMs, callback);// this.client 为KafkaClient接口 实现类NetworkClient对象this.client.send(clientRequest, now);this.log.trace(Sent produce request to {}: {}, nodeId, requestBuilder);}} NetworkClient send()方法 public void send(ClientRequest request, long now) {this.doSend(request, false, now);}private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {this.ensureActive();String nodeId clientRequest.destination();if (!isInternalRequest !this.canSendRequest(nodeId, now)) {throw new IllegalStateException(Attempt to send a request to node nodeId which is not ready.);} else {AbstractRequest.Builder? builder clientRequest.requestBuilder();try {NodeApiVersions versionInfo this.apiVersions.get(nodeId);short version;if (versionInfo null) {version builder.latestAllowedVersion();if (this.discoverBrokerVersions this.log.isTraceEnabled()) {this.log.trace(No version information found when sending {} with correlation id {} to node {}. Assuming version {}., new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});}} else {version versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());}this.doSend(clientRequest, isInternalRequest, now, builder.build(version));} catch (UnsupportedVersionException var9) {this.log.debug(Version mismatch when attempting to send {} with correlation id {} to {}, new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});ClientResponse clientResponse new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);if (!isInternalRequest) {this.abortedSends.add(clientResponse);} else if (clientRequest.apiKey() ApiKeys.METADATA) {this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));}}}}private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {String destination clientRequest.destination();RequestHeader header clientRequest.makeHeader(request.version());if (this.log.isDebugEnabled()) {this.log.debug(Sending {} request with header {} and timeout {} to node {}: {}, new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});}Send send request.toSend(header);// clientRequest convert InFlightRequest 对象InFlightRequest inFlightRequest new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);this.inFlightRequests.add(inFlightRequest);// nio channel。。。selector 发送消息信息//this.selector is Selectable interface KafkaChannel is implementthis.selector.send(new NetworkSend(clientRequest.destination(), send));} 总结直接阅读源码很快就能想明白kafka 生产者发送逻辑kafka-client.jar。  核心 本文第一张图片
http://www.hkea.cn/news/14350810/

相关文章:

  • 网站运营推广怎么做网站制作难点
  • 四大门户网站是哪些水贝做网站公司
  • 企业营销网站有哪些太白 网站建设
  • 沧州网站建设推广wordpress nginx 302
  • 宝山北京网站建设海底捞口碑营销案例
  • 做网站建设价格dede 网站地图 模块
  • 昆明有哪些帮忙做网站的公司移动互联网的应用论文
  • 长沙电子商务公司网站制作贵州中小型营销型网站建设公司
  • qq刷赞网站怎么做公司如何建站
  • 公司网站界面设计建一个区域网站需要多少资金
  • 上海尚海整装官方网站百度搜索量怎么查
  • 黄埔做网站的公网站建设模板免费下载
  • 鲜花店的网站建设网站建设分前端和后台吗
  • 企业网站哪家公司好网站怎么做电子合同
  • 做sgs认证的公司网站番禺高端网站制作
  • 有什么平台可以做网站网站搜不出来怎么办
  • 网站的功能需求30秒短视频制作报价明细
  • 安阳实力网站建设首选优化企业网站标题
  • 怎么查看网站是asp还是php长沙网站建设长沙
  • 移动互联网 网站建设重庆永川网站建设
  • 网站建设对于企业的必要性兰州市网站建设
  • 湘潭做网站 搜搜磐石网络临沂网站制作页面
  • 辽阳专业建设网站公司wordpress 锁定地址
  • 计算机网站维护建设wordpress干什么用的
  • 哈尔滨 门户网站门户网站制作流程博客
  • 中国土木工程网优化方案2022版语文
  • 广州公司建设网站网络工程师都考什么
  • 10大营销理论西安网站建设 乐云seo
  • 别墅装修公司排名前十强学seo的培训学校
  • 关于网站建设的讲话做的网站如何被百度搜到