SpringCloud GateWay通过过滤器GatewayFilter修改请求或响应内容

Spring Cloud Gateway在有些场景中需要获取request body内容进行参数校验或参数修改,我们通过在GatewayFilter中获取请求内容来获取和修改请求体,下面我们就基于ServerWebExchange来实现:

ServerWebExchange命名为服务网络交换器,存放着重要的请求-响应属性、请求实例和响应实例等等,有点像Context的角色,其中有两个重要的接口方法:

 // 获取ServerHttpRequest对象
 ServerHttpRequest getRequest();
 // 获取ServerHttpResponse对象
 ServerHttpResponse getResponse();

创建一个GatewayFilter,必须实现Ordered接口,返回一个小于-1的order值,这是因为NettyWriteResponseFilter的order值为-1,我们需要覆盖返回响应体的逻辑,自定义的GlobalFilter必须比NettyWriteResponseFilter优先执行。

public class RequestGatewayFilter implements GatewayFilter, Ordered {
@Override
 public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
 ServerHttpRequest request = exchange.getRequest();
 HttpHeaders headers = request.getHeaders();
 // 处理参数
 MediaType contentType = headers.getContentType();
 if (exchange.getRequest().getMethod().equals(HttpMethod.POST)) {
 //Content-type为“application/json”
 if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
 return readBody(exchange, chain);
 }
 //Content-type为“application/x-www-form-urlencoded”
 else if(MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)){
 GatewayContext gatewayContext = new GatewayContext();
 gatewayContext.setRequestHeaders(headers);
 gatewayContext.getAllRequestData().addAll(request.getQueryParams());
 exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext);
 return readFormData(exchange, chain, gatewayContext);
 }
//Content-type为“multipart/form-data”
else if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
 GatewayContext gatewayContext = new GatewayContext();
 gatewayContext.setRequestHeaders(headers);
 gatewayContext.getAllRequestData().addAll(request.getQueryParams());
 exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext);
 return readMultipartData(exchange, chain, gatewayContext);
 }
 } else {
 return readGetData(exchange, chain);
 }
 return chain.filter(exchange);
 }
 @Override
 public int getOrder() {
 return -2;
 }
}

处理content-type为application/json的方法:

/**
 * ReadJsonBody
 *
 * @param exchange
 * @param chain
 * @return
 */
 private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain) {
 /**
 * join the body
 */
 return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
 byte[] bytes = new byte[dataBuffer.readableByteCount()];
 dataBuffer.read(bytes);
 DataBufferUtils.release(dataBuffer);
 
 /**
 * validate request params or form data
 */
 Result checkResult = null;
 try {
 String bodyString = new String(bytes, "utf-8");
 Map bodyMap = JSONObject.parseObject(bodyString,Map.class);
 //校验参数
 checkResult = validParam(exchange, bodyMap);
 if(checkResult.getCode()!=0){
 return errorInfo(exchange, checkResult.getCode(), checkResult.getMessage());
 }
 } catch (UnsupportedEncodingException e) {
 e.printStackTrace();
 }
 Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
 DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
 DataBufferUtils.retain(buffer);
 return Mono.just(buffer);
 });
 /**
 * repackage ServerHttpRequest
 */
 ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
 @Override
 public Flux<DataBuffer> getBody() {
 return cachedFlux;
 }
 };
 ServerHttpResponse originalResponse = exchange.getResponse();
 originalResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON);
 DataBufferFactory bufferFactory = originalResponse.bufferFactory();
 ServerHttpResponseDecorator response = buildResponse(originalResponse, bufferFactory, (Map)checkResult.getResult());
 /**
 * mutate exchage with new ServerHttpRequest
 */
 ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).response(response).build();
 /**
 * read body string with default messageReaders
 */
 return ServerRequest.create(mutatedExchange, MESSAGE_READERS).bodyToMono(String.class)
 .doOnNext(objectValue -> {
 log.debug("[GatewayContext]Read JsonBody:{}", objectValue);
 }).then(chain.filter(mutatedExchange));
 
 });
 }

处理content-type为application/x-www-form-urlencoded的方法:

/**
 * ReadFormData
 * @param exchange
 * @param chain
 * @return
 */
 private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext){
 HttpHeaders headers = exchange.getRequest().getHeaders();
 return exchange.getFormData().switchIfEmpty(
 Mono.defer(() -> Mono.just(new LinkedMultiValueMap<>()))
 ).flatMap(formDataMap -> {
 Charset charset = headers.getContentType().getCharset();
 charset = charset == null? StandardCharsets.UTF_8:charset;
 String charsetName = charset.name();
 MultiValueMap<String, String> paramMap = exchange.getRequest().getQueryParams();
 Map map = convertMap(paramMap);
 /*
 * formData is empty just return
 */
 if((null == formDataMap || formDataMap.isEmpty()) && (null == map || map.isEmpty())){
 return chain.filter(exchange);
 }
 StringBuilder formDataBodyBuilder = new StringBuilder();
 String entryKey;
 List<String> entryValue;
 try {
 /*
 * repackage form data
 */
 for (Map.Entry<String, List<String>> entry : formDataMap.entrySet()) {
 entryKey = entry.getKey();
 entryValue = entry.getValue();
 if (entryValue.size() > 1) {
 for(String value : entryValue){
 formDataBodyBuilder.append(entryKey).append("=").append(value).append("&");
 }
 } else {
 formDataBodyBuilder.append(entryKey).append("=").append(entryValue.get(0)).append("&");
 }
 }
 }catch (Exception e){
 e.printStackTrace();
 }
 /*
 * substring with the last char '&'
 */
 String formDataBodyString = "";
 if(formDataBodyBuilder.length()>0){
 formDataBodyString = formDataBodyBuilder.substring(0, formDataBodyBuilder.length() - 1);
 }
 /*
 * get data bytes
 */
 byte[] bodyBytes = formDataBodyString.getBytes(charset);
 int contentLength = bodyBytes.length;
 HttpHeaders httpHeaders = new HttpHeaders();
 httpHeaders.putAll(exchange.getRequest().getHeaders());
 httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);
 /*
 * in case of content-length not matched
 */
 httpHeaders.setContentLength(contentLength);
 /**
 * validate request params or form data
 */
 Map<String, Object> bodyMap = StringUtils.isEmpty(formDataBodyString)?new HashMap<String, Object>():decodeFormBody(formDataBodyString);
 Result checkResult = validParam(exchange, bodyMap);
 if(checkResult.getCode()!=0){
 return errorInfo(exchange, checkResult.getCode(), checkResult.getMessage());
 }
 /*
 * use BodyInserter to InsertFormData Body
 */
 BodyInserter<String, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromObject(formDataBodyString);
 CachedBodyOutputMessage cachedBodyOutputMessage = new CachedBodyOutputMessage(exchange, httpHeaders);
 log.debug("[GatewayContext]Rewrite Form Data :{}",formDataBodyString);
 return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext())
 .then(Mono.defer(() -> {
 ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
 exchange.getRequest()) {
 @Override
 public HttpHeaders getHeaders() {
 return httpHeaders;
 }
 @Override
 public Flux<DataBuffer> getBody() {
 return cachedBodyOutputMessage.getBody();
 }
 };
 ServerHttpResponse originalResponse = exchange.getResponse();
 originalResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON);
 DataBufferFactory bufferFactory = originalResponse.bufferFactory();
 ServerHttpResponseDecorator response = buildResponse(originalResponse, bufferFactory, (Map)checkResult.getResult());
 return chain.filter(exchange.mutate().request(decorator).response(response).build());
 }));
 });
 }

有时需要对返回的数据统一处理,那么可以通过封装ServerHttpResponseDecorator进行处理,ServerHttpResponse装饰器ServerHttpResponseDecorator,主要覆盖写入响应体数据缓冲区的部分。

 private ServerHttpResponseDecorator buildResponse(ServerHttpResponse originalResponse, DataBufferFactory bufferFactory, Map result) {
 return new ServerHttpResponseDecorator(originalResponse) {
 @Override
 public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
 System.out.println("++++++++++++++++++++++++1");
 if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
 Flux<? extends DataBuffer> fluxBody = Flux.from(body);
 return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
 DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
 DataBuffer join = dataBufferFactory.join(dataBuffers);
 byte[] content = new byte[join.readableByteCount()];
 join.read(content);
 DataBufferUtils.release(join);
 // 流转为字符串
 String responseData = new String(content, Charsets.UTF_8);
 System.out.println(responseData);
 Map map = JSON.parseObject(responseData);
 //处理返回的数据
 
 //To do
 
 byte[] uppedContent = responseData.getBytes(Charsets.UTF_8);
 originalResponse.getHeaders().setContentLength(uppedContent.length);
 return bufferFactory.wrap(uppedContent);
 }));
 } else {
 System.out.println("----------"+getStatusCode());
 }
 return super.writeWith(body);
 }
 @Override
 public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
 return writeWith(Flux.from(body).flatMapSequential(p -> p));
 }
 };
 }
作者:三心两意弹棉花原文地址:https://segmentfault.com/a/1190000043534246

%s 个评论

要回复文章请先登录注册