ps做网站 大小,甘肃省建设厅招标办网站,移动网站开发内容,杭州房产网 官方文章目录 源码总流程图说明GateWayAutoConfigurationDispatcherHandlergetHandler()handleRequestWith()RouteToRequestUrlFilterReactiveLoadBalancerClientFilterNettyRoutingFilter 补充知识适配器模式 详细流程图 源码总流程图
在线总流程图 说明
Gateway的版本使用的是… 文章目录 源码总流程图说明GateWayAutoConfigurationDispatcherHandlergetHandler()handleRequestWith()RouteToRequestUrlFilterReactiveLoadBalancerClientFilterNettyRoutingFilter 补充知识适配器模式 详细流程图 源码总流程图
在线总流程图 说明
Gateway的版本使用的是4.0.0
Gateway的实现是基于WebFlux 、Reactor 、Netty Gateway微服务的yml配置如下
Gateway的访问端口为8888id为order_route的路由 uri为lb://mall-order为mall-order这个微服务定义了一个path路径的predicate断言定义了三个filter
server:port: 8888
spring:application:name: mall-gateway#配置nacos注册中心地址cloud:nacos:discovery:server-addr: nacos.mall.com:8848username: nacospassword: nacosgateway:#设置路由路由id、路由到微服务的uri、断言routes:- id: user_route #路由ID全局唯一uri: lb://mall-user #lb 整合负载均衡器ribbon,loadbalancerpredicates:- Path/user/** # 断言路径相匹配的进行路由- id: order_route #路由ID全局唯一# 测试 http://localhost:8888/order/findOrderByUserId/1uri: lb://mall-order #lb 整合负载均衡器loadbalancerpredicates:- Path/order/** # 断言路径相匹配的进行路由#配置过滤器工厂filters:- AddRequestHeaderX-Request-color, red #添加请求头- AddRequestParametercolor, blue # 添加请求参数- CheckAuthhushang,男 #自定义过滤器工厂Gateway的工作原理就是下面这一张图 在开始看Gateway的源码之前我们回忆一下SpringMVC的实现原理
DispatchServlet#doDispatch作为Springmvc的入口HandlerMapper 路由匹配 — 找到Handler通过handler 找的适配的 HandlerAdapterHandlerAdapter#handle方法执行 而在Gateway的源码中
DispatcherHandler#handle作为入口HandlerMapping 路由匹配 -- 断言predicate匹配 RoutePredicateHandlerMapping#getHandlerInternal找到路由Route对象返回FilteringWebHandlerHandlerAdapter 适配器 SimpleHandlerAdapter#handle 处理WebHandler进入到org.springframework.cloud.gateway.handler.FilteringWebHandler#handle — filterChain处理 Flux和Mono的概念
Reactor学习文档
Flux Mono GateWayAutoConfiguration 在GatewayAutoConfiguration自动配置类中它配置了很多bean对象常见的就比如
// 保存我们配置文件中关于网关路由相关的所有配置
// GatewayProperties保存了ListRouteDefinition
// 而RouteDefinition就是每一个路由对象保存了id、uri、断言集合ListPredicateDefinition、Filter集合ListFilterDefinition
Bean
public GatewayProperties gatewayProperties() {return new GatewayProperties();
}// Path路径匹配的断言工厂断言相关的bean都是以RoutePredicateFactory结尾
Bean
ConditionalOnEnabledPredicate
public PathRoutePredicateFactory pathRoutePredicateFactory() {return new PathRoutePredicateFactory();
}// 添加请求头的Filter一般都是以GatewayFilterFactory
Bean
ConditionalOnEnabledFilter
public AddRequestHeaderGatewayFilterFactory addRequestHeaderGatewayFilterFactory() {return new AddRequestHeaderGatewayFilterFactory();
}// 全局过滤器把我们访问网关的url转换为路由中配置的uri
// http://localhost:8888/order/findOrderByUserId/1 --- lb://mall-order/order/findOrderByUserId/1
Bean
ConditionalOnEnabledGlobalFilter
public RouteToRequestUrlFilter routeToRequestUrlFilter() {return new RouteToRequestUrlFilter();
}GateWayAutoConfiguration配置的主要bean
类名说明GatewayPropertiesgateway属性配置类PropertiesRouteDefinitionLocator操作GatewayProperties对象返回FluxRouteDefinitionRouteDefinitionRouteLocator将RouteDefinition转换为RouteRoutePredicateHandlerMappingGateway的HandlerMapping匹配请求对应的Route返回FilteringWebHandlerXXXRoutePredicateFactory路由断言工厂的beanXXXGatewayFilterFactory局部FilterGlobalFilter实现类全局Filter DispatcherHandler
DispatcherHandler#handle作为我们查看Gateway源码的入口
请求request和响应response实例会被封装为ServerWebExchange核心方法就是return语句
Override
public MonoVoid handle(ServerWebExchange exchange) {if (this.handlerMappings null) {return createNotFoundError();}if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {return handlePreFlight(exchange);}return Flux.fromIterable(this.handlerMappings) // 遍历所有的HandlerMapper.concatMap(mapping - mapping.getHandler(exchange)) // 调用每一个HandlerMapper能否找到Handler.next() // 继续遍历下一个HandlerMapper.switchIfEmpty(createNotFoundError()) // 如果HandlerMapper遍历完后都没有Handler那么要抛异常了.onErrorResume(ex - handleDispatchError(exchange, ex)).flatMap(handler - handleRequestWith(exchange, handler)); // 如果找到Handler那就去通过HandlerAdapter去调用Handler
}fromIterable()方法的作用就是就是遍历Gateway所有的HandlerMapper我们这里肯定最终是使用的RoutePredicateHandlerMapping这个路由断言的 我们接下来继续往下遍历各个HandlerMapper并调用mapping.getHandler(exchange)方法这里最终会调用至RoutePredicateHandlerMapping类的getHandlerInternal()方法中经过断言匹配后返回一个FilteringWebHandler对象。该方法接下来会详细介绍。 中间这几行其实主要就是如果我当前往Gateway的请求通过路由断言没有匹配上那么就会抛异常
.next()
.switchIfEmpty(createNotFoundError())
.onErrorResume(ex - handleDispatchError(exchange, ex))经过路由断言匹配得到一个WebHandler对象之后会执行handleRequestWith(exchange, handler)方法在该方法中会找一个与WebHandler匹配的HandlerAdapter来适配WebHandler对象最终去调用WebHandler的 getHandler()
通过DispatcherHandler#handle方法中的.concatMap(mapping - mapping.getHandler(exchange)) 这一行代码
进入到了AbstractHandlerMapping#getHandler 遍历我们yml配置文件中所有定义的路由 根据我们路由定义的断言Predicate规则去调用对应的断言工厂 将匹配成功的路由保存至exchange对象中 exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId); 断言匹配成功就返回一个WebHandler接口的实现类FilteringWebHandler对象
public MonoObject getHandler(ServerWebExchange exchange) {// 从这里我们就可以发现通过getHandlerInternal(exchange)方法就能找Handler之后的.map()方法中就直接return handler;return getHandlerInternal(exchange).map(handler - {if (logger.isDebugEnabled()) {logger.debug(exchange.getLogPrefix() Mapped to handler);}ServerHttpRequest request exchange.getRequest();// 正常情况下这个if都不会进入if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {CorsConfiguration config (this.corsConfigurationSource ! null ?this.corsConfigurationSource.getCorsConfiguration(exchange) : null);CorsConfiguration handlerConfig getCorsConfiguration(handler, exchange);config (config ! null ? config.combine(handlerConfig) : handlerConfig);if (config ! null) {config.validateAllowCredentials();}if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {return NO_OP_HANDLER;}}// 直接返回handlerreturn handler;});
}我们这里就直接进入到RoutePredicateHandlerMapping类中 RoutePredicateHandlerMapping#getHandlerInternal的详细代码如下
protected Mono? getHandlerInternal(ServerWebExchange exchange) {if (this.managementPortType DIFFERENT this.managementPort ! null exchange.getRequest().getURI().getPort() this.managementPort) {return Mono.empty();}exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());return Mono.deferContextual(contextView - {exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView);// 核心方法是lookupRoute(exchange)这里会去进行路由的校验根据我们配置文件中定义的路由断言规则进行校验return lookupRoute(exchange).flatMap((FunctionRoute, Mono?) r - {// 下面几行代码就是操作exchange的属性// 上方的lookupRoute()方法中会添加GATEWAY_PREDICATE_ROUTE_ATTR这里就进行移除exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);// 把当前路由对象添加进exchange对象中之后的流程还会用到我们的路由对象exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);// 路由匹配成功就直接返回WebHandler对象return Mono.just(webHandler);}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() - {// 当前请求没有任何一个路由匹配上的处理流程exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isTraceEnabled()) {logger.trace(No RouteDefinition found for [ getExchangeDesc(exchange) ]);}})));});
}再进入到RoutePredicateHandlerMapping#lookupRoute方法中
protected MonoRoute lookupRoute(ServerWebExchange exchange) {// 获取到我们yml配置文件中所有定义的路由并进行遍历return this.routeLocator.getRoutes().concatMap(route - Mono.just(route).filterWhen(r - {// 添加GATEWAY_PREDICATE_ROUTE_ATTRexchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());// 调用当前路由对象中的断言的apply()方法apply()方法中又是一些异步的处理流程// 这里就会根据我们配置文件中为路由配置的各个断言去调用各个断言对象return r.getPredicate().apply(exchange);}).doOnError(e - logger.error(Error applying predicate for route: route.getId(), e)).onErrorResume(e - Mono.empty())).next()// TODO: error handling.map(route - {if (logger.isDebugEnabled()) {logger.debug(Route matched: route.getId());}validateRoute(route, exchange);return route;});
}我当前gateway的配置文件中定义了两个路由
spring:cloud:gateway:#设置路由路由id、路由到微服务的uri、断言routes:- id: user_route #路由ID全局唯一uri: lb://mall-user #lb 整合负载均衡器ribbon,loadbalancerpredicates:- Path/user/** # 断言路径相匹配的进行路由- id: order_route #路由ID全局唯一# 测试 http://localhost:8888/order/findOrderByUserId/1uri: lb://mall-order #lb 整合负载均衡器loadbalancerpredicates:- Path/order/** # 断言路径相匹配的进行路由所以上面的方法会执行两次 因为return r.getPredicate().apply(exchange);又是一些异步调用但我们从方法名就能看出来这里根据我们yml配置文件中路由定义的断言去调用对应的断言对象。这就是各个断言工厂具体的实现了接下来就以Path路径匹配的断言工厂来举例
class DefaultAsyncPredicateT implements AsyncPredicateT {private final PredicateT delegate;Overridepublic PublisherBoolean apply(T t) { // 调用各个Predicate对象的test()方法return Mono.just(delegate.test(t));}//...
}所以我们现在就直接去看path路径匹配的断言类PathRoutePredicateFactory
Override
public PredicateServerWebExchange apply(Config config) {final ArrayListPathPattern pathPatterns new ArrayList();synchronized (this.pathPatternParser) {pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchTrailingSlash());config.getPatterns().forEach(pattern - {PathPattern pathPattern this.pathPatternParser.parse(pattern);pathPatterns.add(pathPattern);});}return new GatewayPredicate() {// 会进入到test()方法中Overridepublic boolean test(ServerWebExchange exchange) {// 当前请求的uri路径 /order/findOrderByUserId/1PathContainer path parsePath(exchange.getRequest().getURI().getRawPath());PathPattern match null;for (int i 0; i pathPatterns.size(); i) {// yml配置文件中的配置项 /order/**PathPattern pathPattern pathPatterns.get(i);// 如果path匹配成功那么match对象就不为nullif (pathPattern.matches(path)) {match pathPattern;break;}}// 如果path匹配成功那么match对象就不为null 。匹配成功的处理逻辑if (match ! null) {traceMatch(Pattern, match.getPatternString(), path, true);PathMatchInfo pathMatchInfo match.matchAndExtract(path);putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());// match.getPatternString() 为 /order/**exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ATTR, match.getPatternString());String routeId (String) exchange.getAttributes().get(GATEWAY_PREDICATE_ROUTE_ATTR);// 保存当前路由idif (routeId ! null) {exchange.getAttributes().put(GATEWAY_PREDICATE_MATCHED_PATH_ROUTE_ID_ATTR, routeId);}return true;}// path匹配不成返回falseelse {traceMatch(Pattern, config.getPatterns(), path, false);return false;}}Overridepublic Object getConfig() {return config;}Overridepublic String toString() {return String.format(Paths: %s, match trailing slash: %b, config.getPatterns(),config.isMatchTrailingSlash());}};
}handleRequestWith()
// DispatcherHandler#handleRequestWith
private MonoVoid handleRequestWith(ServerWebExchange exchange, Object handler) {if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {return Mono.empty(); // CORS rejection}if (this.handlerAdapters ! null) {// 遍历所有的HandlerAdapterfor (HandlerAdapter adapter : this.handlerAdapters) {// 找能处理WebHandler类型的HandlerAdapter 最终找到SimpleHandlerAdapterif (adapter.supports(handler)) {return adapter.handle(exchange, handler).flatMap(result - handleResult(exchange, result));}}}return Mono.error(new IllegalStateException(No HandlerAdapter: handler));
}如下图所示找到SimpleHandlerAdapter这个 SimpleHandlerAdapter的详细代码如下所示
public class SimpleHandlerAdapter implements HandlerAdapter {Overridepublic boolean supports(Object handler) {return WebHandler.class.isAssignableFrom(handler.getClass());}Overridepublic MonoHandlerResult handle(ServerWebExchange exchange, Object handler) {// 强转WebHandler webHandler (WebHandler) handler;// getHandler()方法返回了一个FilteringWebHandler对象这里就调用它的handle()方法MonoVoid mono webHandler.handle(exchange);// 返回一个空对象return mono.then(Mono.empty());}}FilteringWebHandler#handle方法
Override
public MonoVoid handle(ServerWebExchange exchange) {// getHandler()方法中存入了Route路由对象这里取出来该对象保存着我们yml配置文件中的配置Route route exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);// 取出我们yml配置文件中 该路由的局部Filter我这里有三个一个添加请求头、一个添加请求参数、一个自定义的ListGatewayFilter gatewayFilters route.getFilters();// 全局FilterListGatewayFilter combined new ArrayList(this.globalFilters);// 将局部Filter和全局Filter存入一个集合中combined.addAll(gatewayFilters);// 排序AnnotationAwareOrderComparator.sort(combined);if (logger.isDebugEnabled()) {logger.debug(Sorted gatewayFilterFactories: combined);}// 调用各自的filter()方法return new DefaultGatewayFilterChain(combined).filter(exchange);
}[GatewayFilterAdapter{RemoveCachedBodyFilter5cfed0ba}, order -2147483648]
[GatewayFilterAdapter{AdaptCachedBodyGlobalFilter22a6d75c}, order -2147482648]
[GatewayFilterAdapter{NettyWriteResponseFilter691567ea}, order -1]
[GatewayFilterAdapter{ForwardPathFilter28be7fec}, order 0]
// 三个局部Filter
[[AddRequestHeader X-Request-color red], order 1]
[[AddRequestParameter color blue], order 2]
[com.tuling.mall.gateway.filter.CheckAuthGatewayFilterFactory$$Lambda$980/0x0000014b3c64939054f513cb, order 3]
// 路由转换 把http://localhost:8888/order/findOrderByUserId/1 --- lb://mall-order/order/findOrderByUserId/1
[GatewayFilterAdapter{RouteToRequestUrlFilter5c8d58ed}, order 10000]
// 根据lb://前缀过滤处理使用serviceId选择一个服务实例从而实现负载均衡
[GatewayFilterAdapter{ReactiveLoadBalancerClientFilter437ed416}, order 10150]
[GatewayFilterAdapter{LoadBalancerServiceInstanceCookieFilter11f23038}, order 10151]
[GatewayFilterAdapter{WebsocketRoutingFilter26f0141}, order 2147483646]
// 发送netty 请求
[GatewayFilterAdapter{NettyRoutingFilterde77146}, order 2147483647]
[GatewayFilterAdapter{ForwardRoutingFilter6a567f7b}, order 2147483647]接下来就挑几个重要的全局GlobalFilter来分析
RouteToRequestUrlFilter
路由转换 把http://localhost:8888/order/findOrderByUserId/1?colorblue — lb://mall-order/order/findOrderByUserId/1?colorblue
Override
public MonoVoid filter(ServerWebExchange exchange, GatewayFilterChain chain) {// 从exchange中取路由Route对象Route route exchange.getAttribute(GATEWAY_ROUTE_ATTR);if (route null) {return chain.filter(exchange);}log.trace(RouteToRequestUrlFilter start);// 取当前请求uri http://localhost:8888/order/findOrderByUserId/1?colorblueURI uri exchange.getRequest().getURI();boolean encoded containsEncodedParts(uri);// 路由对象中保存的uri也就是我们在yml文件中配置的值 lb://mall-orderURI routeUri route.getUri();if (hasAnotherScheme(routeUri)) {exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());routeUri URI.create(routeUri.getSchemeSpecificPart());}// 如果我们在yml文件中配置的uri即不是lb开头并且host还为null那么就抛异常if (lb.equalsIgnoreCase(routeUri.getScheme()) routeUri.getHost() null) {throw new IllegalStateException(Invalid host: routeUri.toString());}// 转换结果为 lb://mall-order/order/findOrderByUserId/1?colorblueURI mergedUrl UriComponentsBuilder.fromUri(uri).scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();// 存入exchange中之后的LoadBalancer全局Filter中会用到exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);return chain.filter(exchange);
}ReactiveLoadBalancerClientFilter
解析lb://服务名去服务注册中心获取服务实例instance并通过负载均衡算法选择一个具体的instance
public MonoVoid filter(ServerWebExchange exchange, GatewayFilterChain chain) {// 此时的url是这个样子 lb://mall-order/order/findOrderByUserId/1?colorblueURI url exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);String schemePrefix exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url null || (!lb.equals(url.getScheme()) !lb.equals(schemePrefix))) {return chain.filter(exchange);}addOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() url before: url);}// 再获取一遍lb://mall-order/order/findOrderByUserId/1?colorblueURI requestUri exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);// 就是服务名 mall-orderString serviceId requestUri.getHost();SetLoadBalancerLifecycle supportedLifecycleProcessors LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),RequestDataContext.class, ResponseData.class, ServiceInstance.class);// 请求信息 封装成一个对象DefaultRequestRequestDataContext lbRequest new DefaultRequest(new RequestDataContext(new RequestData(exchange.getRequest()), getHint(serviceId)));// 调用choose()方法return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response - {// 服务注册中心的响应response获取server实例对象ServiceInstance retrievedInstance response.getServer();// 值为http://localhost:8888/order/findOrderByUserId/1?colorblueURI uri exchange.getRequest().getURI();String overrideScheme retrievedInstance.isSecure() ? https : http;if (schemePrefix ! null) {overrideScheme url.getScheme();}// 服务实例DelegatingServiceInstance serviceInstance new DelegatingServiceInstance(retrievedInstance,overrideScheme);// 最终通过上面的服务实例修改之后的请求url为http://192.168.236.173:8020/order/findOrderByUserId/1?colorblueURI requestUrl reconstructURI(serviceInstance, uri);if (log.isTraceEnabled()) {log.trace(LoadBalancerClientFilter url chosen: requestUrl);}// 存入exchange对象中之后netty发送请求会用到该urlexchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);supportedLifecycleProcessors.forEach(lifecycle - lifecycle.onStartRequest(lbRequest, response));}).then(chain.filter(exchange)).doOnError(throwable - supportedLifecycleProcessors.forEach(lifecycle - lifecycle.onComplete(new CompletionContextResponseData, ServiceInstance, RequestDataContext(CompletionContext.Status.FAILED, throwable, lbRequest,exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR))))).doOnSuccess(aVoid - supportedLifecycleProcessors.forEach(lifecycle - lifecycle.onComplete(new CompletionContextResponseData, ServiceInstance, RequestDataContext(CompletionContext.Status.SUCCESS, lbRequest,exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
}// 进入到choose()方法中
private MonoResponseServiceInstance choose(RequestRequestDataContext lbRequest, String serviceId,SetLoadBalancerLifecycle supportedLifecycleProcessors) {ReactorLoadBalancerServiceInstance loadBalancer this.clientFactory.getInstance(serviceId,ReactorServiceInstanceLoadBalancer.class);if (loadBalancer null) {throw new NotFoundException(No loadbalancer available for serviceId);}supportedLifecycleProcessors.forEach(lifecycle - lifecycle.onStart(lbRequest));// 调用ReactorLoadBalancer对象的choose()方法return loadBalancer.choose(lbRequest);
}NettyRoutingFilter
发送netty请求
public MonoVoid filter(ServerWebExchange exchange, GatewayFilterChain chain) {// 经过负载均衡之后的请求url具体的下游服务请求地址// http://192.168.236.173:8020/order/findOrderByUserId/1?colorblueURI requestUrl exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);// scheme为httpString scheme requestUrl.getScheme();if (isAlreadyRouted(exchange) || (!http.equalsIgnoreCase(scheme) !https.equalsIgnoreCase(scheme))) {return chain.filter(exchange);}setAlreadyRouted(exchange);ServerHttpRequest request exchange.getRequest();// GET 请求final HttpMethod method HttpMethod.valueOf(request.getMethod().name());// url为 http://192.168.236.173:8020/order/findOrderByUserId/1?colorbluefinal String url requestUrl.toASCIIString();// 请求头HttpHeaders filtered filterRequest(getHeadersFilters(), exchange);final DefaultHttpHeaders httpHeaders new DefaultHttpHeaders();filtered.forEach(httpHeaders::set);boolean preserveHost exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);// 路由对象Route route exchange.getAttribute(GATEWAY_ROUTE_ATTR);FluxHttpClientResponse responseFlux getHttpClient(route, exchange).headers(headers - {headers.add(httpHeaders);// Will either be set below, or later by Nettyheaders.remove(HttpHeaders.HOST);if (preserveHost) {String host request.getHeaders().getFirst(HttpHeaders.HOST);headers.add(HttpHeaders.HOST, host);}// 发送请求}).request(method).uri(url).send((req, nettyOutbound) - {if (log.isTraceEnabled()) {nettyOutbound.withConnection(connection - log.trace(...);}return nettyOutbound.send(request.getBody().map(this::getByteBuf));}).responseConnection((res, connection) - {// Defer committing the response until all route filters have run// Put client response as ServerWebExchange attribute and write// response later NettyWriteResponseFilterexchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response exchange.getResponse();// put headers and status so filters can modify the responseHttpHeaders headers new HttpHeaders();res.responseHeaders().forEach(entry - headers.add(entry.getKey(), entry.getValue()));String contentTypeValue headers.getFirst(HttpHeaders.CONTENT_TYPE);if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);}setResponseStatus(res, response);// make sure headers filters run after setting status so it is// available in responseHttpHeaders filteredResponseHeaders HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,Type.RESPONSE);if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {// It is not valid to have both the transfer-encoding header and// the content-length header.// Remove the transfer-encoding header in the response if the// content-length header is present.response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);}exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());response.getHeaders().addAll(filteredResponseHeaders);return Mono.just(res);});Duration responseTimeout getResponseTimeout(route);if (responseTimeout ! null) {responseFlux responseFlux.timeout(responseTimeout,Mono.error(new TimeoutException(Response took longer than timeout: responseTimeout))).onErrorMap(TimeoutException.class,th - new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));}return responseFlux.then(chain.filter(exchange));
}补充知识
适配器模式
在FilteringWebHandler#handle方法中先获取路由的局部Filter在创建一个集合存放全局Filter在把局部Filter和全局Filter放在一起。这里就有一个问题
局部Filter的类型是GatewayFilter而全局Filter的类型是GlobalFilter它们是怎么通过下面这种方式存放在一个集合中的嘞
// 局部Filter
ListGatewayFilter gatewayFilters route.getFilters();
// 全局Filter
ListGatewayFilter combined new ArrayList(this.globalFilters);
// 将局部Filter和全局Filter存入一个集合中
combined.addAll(gatewayFilters);这里就用到了适配器模式具体实现步骤是
创建一个GatewayFilterAdapter类实现 GatewayFilter接口GatewayFilterAdapter类中定义一个GlobalFilter属性构造方法中传GlobalFilter类型的对象赋值给该属性实现GatewayFilter接口的filter()方法在filter()方法中调用全局Filter的filter()方法
private static class GatewayFilterAdapter implements GatewayFilter {private final GlobalFilter delegate;GatewayFilterAdapter(GlobalFilter delegate) {this.delegate delegate;}Overridepublic MonoVoid filter(ServerWebExchange exchange, GatewayFilterChain chain) {return this.delegate.filter(exchange, chain);}
}遍历ListGlobalFilter globalFilters全局Filter分别存入GatewayFilterAdapter对象中
ListGatewayFilterAdapter ---- ListGatewayFilter globalFilters集合
Override
public MonoVoid handle(ServerWebExchange exchange) {Route route exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);// 局部FilterListGatewayFilter gatewayFilters route.getFilters();// 全局Filter这样就把全局GlobalFilter变为了GatewayFilter类型了ListGatewayFilter combined new ArrayList(this.globalFilters);// 存入一个集合中combined.addAll(gatewayFilters);AnnotationAwareOrderComparator.sort(combined);// 调用各自的filter()方法return new DefaultGatewayFilterChain(combined).filter(exchange);
}详细流程图