WebFlux로 구성되어 있는 프로젝트에서 Mono, Flux stream에서 map이나 flatMap을 사용해서 특정 데이터를 매핑하는 과정에서 특정한 경우에 대해서 null을 리턴하고 다음 파이프라인에서 filter로 Objects의 nonNull을 사용해서 컨텐츠를 필터링 하려고 했다.
작성하려고 한 코드의 일부 예시를 만들어서 작성해봤다.
만약 resultData를 통해 전달받은 데이터가 예상 대로라면 map과정에서 500보다 큰 5189값만 정상 반환하고 나머지는 null을 반환한 후 filter를 통해 정상적으로 하나의 데이터만 남을 것이라고 예상했다.
@Test
@DisplayName("map 과정에서 반환된 null이 정상적으로 필터링 되었는지 확인하는 테스트")
void mono_null_filter_test() {
StepVerifier.create(resultData()
.map(d -> {
if (Long.parseLong(d) > 500L) {
return d;
}
return null;
})
.filter(Objects::nonNull)
)
.assertNext(d -> {
assertThat(d).isNotNull();
assertThat(d).isEqualTo("5189");
}).verifyComplete();
}
// Flux String stream 값을 반환하는 메소드 resultData
Flux<String> resultData() {
List<String> data = new ArrayList<>();
data.add("123");
data.add("456");
data.add("5189");
return Flux.fromIterable(
data
);
}
하지만 실제 테스트 결과 NullPointerException가 발생하고 테스트가 죽어버렸다.
expectation "assertNext" failed (expected: onNext(); actual: onError(java.lang.NullPointerException: The mapper returned a null value.))
java.lang.AssertionError: expectation "assertNext" failed (expected: onNext(); actual: onError(java.lang.NullPointerException: The mapper returned a null value.))
at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
at reactor.test.DefaultStepVerifierBuilder.lambda$consumeNextWith$1(DefaultStepVerifierBuilder.java:268)
at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2213)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1485)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1433)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1092)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:151)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:252)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:205)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:335)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:222)
at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:102)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:281)
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:179)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSubscribe(DefaultStepVerifierBuilder.java:1124)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onSubscribe(FluxFilter.java:79)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:185)
at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:64)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.Mono.subscribe(Mono.java:4213)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:868)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:824)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:816)
at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:683)
at com.example.demo.DemoApplicationTests.mono_null_filter_test(DemoApplicationTests.java:31)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.stop(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.NullPointerException: The mapper returned a null value.
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:199)
... 106 more
예상과 다른 결과에 Reactor 코드에 Mono에서 사용되는 Map 코드를 보았더니 다음과 같이 되어있었다.
데이터가 subscribe return 되는 과정에서 mapper(Function)에 apply 되었을 때 반환되는 값이 null이게 되면 NPE를 내놓게 되어있었다.
그래서 MonoMap에서 mapper에 result값이 null인 경우에 subscribe 하는 과정에서 NPE가 되는 것이었다.
이것은 map이 아닌 flatMap이어도 동일하다.
resultData()
.flatMap(d -> {
if (Long.parseLong(d) > 500L) {
return Mono.just(d);
}
return null;
})
이를 해결 하는 방법은 2가지 있다.
1. flatMap 사용
flatmap의 경우 inner Mono를 unwrap 하는 과정에서 비어있는 Mono를 필터링 하기 때문에 다음과 같이 해결할 수 있다.
@Test
void mono_null_filter_test() {
StepVerifier.create(resultData()
.flatMap(d -> {
if (Long.parseLong(d) > 500L) {
return Mono.just(d);
}
return Mono.empty();
})
)
.assertNext(d -> {
assertThat(d).isNotNull();
assertThat(d).isEqualTo("5189");
}).verifyComplete();
}
2. Optional 사용
null 값을 사용할 수 없기 때문에 Map에서 Null을 반환할 객체를 Null 대신 Optional로 감싸서 반환하고 filter에서 실제 값이 있는 데이터만 filter해서 보여줄 수 있는 방법이 있다.
@Test
void mono_null_filter_test() {
StepVerifier.create(resultData()
.map(d -> {
if (Long.parseLong(d) > 500L) {
return Optional.of(d);
}
return Optional.empty();
})
.filter(Optional::isPresent)
.map(Optional::get)
)
.assertNext(d -> {
assertThat(d).isNotNull();
assertThat(d).isEqualTo("5189");
}).verifyComplete();
}
출처 :
tech.kakao.com/2018/05/29/reactor-programming/
stackoverflow.com/questions/58215810/returning-a-null-in-a-map-versus-flatmap-in-reactor