自己怎么搭建个人博客网站,做网站用什么框架好,动易网站只能进首页,网站可以自己做吗Flink CDC 3.0 Starrocks建表失败会导致任务卡主#xff01;
现象
StarRocks建表失败#xff0c;然后任务自动重启#xff0c;重启完毕后数据回放#xff0c;jobMaster打印下面日志后#xff0c;整个任务会卡主
There are already processing requests. Wait for proce…Flink CDC 3.0 Starrocks建表失败会导致任务卡主
现象
StarRocks建表失败然后任务自动重启重启完毕后数据回放jobMaster打印下面日志后整个任务会卡主
There are already processing requests. Wait for processing原因分析
前提概要可以先阅读CDC表变更处理流程然后再读下面会更加清晰
涉及类包括SchemaRegistry、SchemaOperator和StarRocksMetadataApplier类
SchemaRegistry-handleEventFromOperator方法执行建表失败后会导致任务重启但是jobMaster不会重启因此SchemaRegistry.requestHandler.pendingSchemaChanges无法删除导致任务卡主
public void flushSuccess(TableId tableId, int sinkSubtask) {flushedSinkWriters.add(sinkSubtask);if (flushedSinkWriters.equals(activeSinkWriters)) {LOG.info(All sink subtask have flushed for table {}. Start to apply schema change.,tableId.toString());PendingSchemaChange waitFlushSuccess pendingSchemaChanges.get(0);//执行表结构变更操作applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {//异常会跳过删除pendingSchamestartNextSchemaChangeRequest();}}
}
//删除pendingSchemaChanges中已经完成的pendingSchame
private void startNextSchemaChangeRequest() {this.pendingSchemaChanges.remove(0);this.flushedSinkWriters.clear();...
}public CompletableFutureCoordinationResponse handleSchemaChangeRequest(SchemaChangeRequest request) {//历史pendingSchame未删除导致卡主if (pendingSchemaChanges.isEmpty()) {LOG.info(Received schema change event request from table {}. Start to buffer requests for others.,request.getTableId().toString());if (request.getSchemaChangeEvent() instanceof CreateTableEvent schemaManager.schemaExists(request.getTableId())) {return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false)));}CompletableFutureCoordinationResponse response CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));schemaManager.applySchemaChange(request.getSchemaChangeEvent());pendingSchemaChanges.add(new PendingSchemaChange(request, response));pendingSchemaChanges.get(0).startToWaitForReleaseRequest();return response;} else {LOG.info(There are already processing requests. Wait for processing.);CompletableFutureCoordinationResponse response new CompletableFuture();pendingSchemaChanges.add(new PendingSchemaChange(request, response));return response;}
}解决办法
让建表执行成功catch住异常将schame删除后再异常重启未验证