I am quite new to spring integration. I am evaluating the Spring Integration for our project. I am run into one issue on how to handle the Exception.
I am using the publishSubscribeChannel for handling the message. I am not sure if this is a correct approach or not. When a exception is thrown inside publishSubscribeChannel, I would like it to route to a different channel so I can reply to different HTTP status code.
How do I route the exception inside the publishSubscribeChannel to the errorChannel.
I have the following code. I have tried to use routeException
in a different area of the code but no luck. Can someone please help me with how to solve this?
@Configuration @EnableIntegration public class IntegrationConfiguration { // curl http://localhost:8080/tasks --data '{"username":"xyz","password":"xyz"}' -H 'Content-type: application/json' @Bean MessageChannel directChannel() { return MessageChannels.direct().get(); } @Bean public IntegrationFlow httpGateway() { return IntegrationFlows.from( Http.inboundGateway("/tasks") .requestMapping(m -> m.methods(HttpMethod.POST)) .requestPayloadType(String.class) .requestChannel(directChannel()) .get() ) .transform(t -> { return "transofrm " + t; }) .channel("queueChannel") .routeByException(r -> { r.channelMapping(RuntimeException.class, "errorChannel"); r.defaultOutputToParentFlow(); }) .get(); } @Bean public IntegrationFlow handleMessage() { return IntegrationFlows.from("queueChannel") .wireTap(flow -> flow.handle(System.out::println)) .routeByException(r -> { r.channelMapping(RuntimeException.class, "errorChannel"); r.defaultOutputToParentFlow(); }) .publishSubscribeChannel(publisher -> { publisher.errorHandler(var1 -> { var1.printStackTrace(); }) .subscribe(flow -> flow .handle(m -> { if (m.getPayload().toString().contains("user")) { throw new RuntimeException("user found"); } System.out.println("subscribed " + m.getPayload()); }) ); } ) .transform(t -> "") .wireTap(flow -> flow.handle(m -> { System.out.println(m.getHeaders().get("status")); })) .enrichHeaders( c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.OK)) .get(); } @Bean IntegrationFlow exceptionOrErrorFlow() { return IntegrationFlows.from("errorChannel") .wireTap(f -> f.handle(m -> System.out.println("failed badly"))) .enrichHeaders(c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_REQUEST)) .get(); } }
https://stackoverflow.com/questions/66620437/spring-integration-handle-exception-in-publishersubscribechannel March 14, 2021 at 10:08AM
没有评论:
发表评论