2021年3月13日星期六

Spring integration handle exception in publisherSubscribeChannel

I am quite new to spring integration. I am evaluating the Spring Integration for our project. I am run into one issue on how to handle the Exception.

I am using the publishSubscribeChannel for handling the message. I am not sure if this is a correct approach or not. When a exception is thrown inside publishSubscribeChannel, I would like it to route to a different channel so I can reply to different HTTP status code.

How do I route the exception inside the publishSubscribeChannel to the errorChannel.

I have the following code. I have tried to use routeException in a different area of the code but no luck. Can someone please help me with how to solve this?

@Configuration  @EnableIntegration  public class IntegrationConfiguration {        // curl http://localhost:8080/tasks --data '{"username":"xyz","password":"xyz"}' -H 'Content-type: application/json'      @Bean      MessageChannel directChannel() {          return MessageChannels.direct().get();      }        @Bean      public IntegrationFlow httpGateway() {          return IntegrationFlows.from(              Http.inboundGateway("/tasks")                  .requestMapping(m -> m.methods(HttpMethod.POST))                  .requestPayloadType(String.class)                  .requestChannel(directChannel())              .get()          )              .transform(t -> {                  return "transofrm " + t;              })              .channel("queueChannel")              .routeByException(r -> {                  r.channelMapping(RuntimeException.class, "errorChannel");                  r.defaultOutputToParentFlow();              })              .get();      }        @Bean      public IntegrationFlow handleMessage() {          return IntegrationFlows.from("queueChannel")              .wireTap(flow -> flow.handle(System.out::println))              .routeByException(r -> {                  r.channelMapping(RuntimeException.class, "errorChannel");                  r.defaultOutputToParentFlow();              })              .publishSubscribeChannel(publisher -> {                  publisher.errorHandler(var1 -> {                      var1.printStackTrace();                  })                      .subscribe(flow -> flow                          .handle(m -> {                              if (m.getPayload().toString().contains("user")) {                                  throw new RuntimeException("user found");                              }                              System.out.println("subscribed " + m.getPayload());                          })                      );                  }              )                .transform(t -> "")              .wireTap(flow -> flow.handle(m -> {                  System.out.println(m.getHeaders().get("status"));              }))              .enrichHeaders( c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.OK))              .get();      }        @Bean      IntegrationFlow exceptionOrErrorFlow() {          return IntegrationFlows.from("errorChannel")              .wireTap(f -> f.handle(m -> System.out.println("failed badly")))              .enrichHeaders(c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_REQUEST))              .get();      }  }  
https://stackoverflow.com/questions/66620437/spring-integration-handle-exception-in-publishersubscribechannel March 14, 2021 at 10:08AM

没有评论:

发表评论