akka
78b5ed849846
Re-enable javadsl Flow 'via' test #18863
Arnout Engelen
2 days ago

4 5 6 7 8 9 10 11 12 13 14 15 4 5 6 7 8 9 10 11 12 13
package akka.stream.javadsl; import akka.Done; import akka.NotUsed; import akka.actor.ActorRef;
import akka.dispatch.Foreach; import akka.dispatch.Futures;
import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.japi.function.*; import akka.stream.*; import akka.stream.impl.ConstantFun;
...
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 15 16 17 18 19 20 21 22 23 24 25 26
import akka.stream.stage.*; import akka.testkit.AkkaSpec; import akka.stream.testkit.TestPublisher; import akka.testkit.javadsl.TestKit; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.reactivestreams.Publisher;
import scala.concurrent.Await; import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import akka.testkit.AkkaJUnitActorSystemResource; import java.util.*;
...
186 187 188 189 190 191 192 193 194 195 196 197 181 182 183 184 185 186 187 188 189 190 191 192
probe.expectNoMsg(duration); future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); }
@Ignore("StatefulStage to be converted to GraphStage when Java Api is available (#18817)") @Test public void mustBeAbleToUseTransform() {
@Test public void mustBeAbleToUseVia() {
final TestKit probe = new TestKit(system); final Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); // duplicate each element, stop after 4 elements, and emit sum to the end final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).via(new GraphStage<FlowShape<Integer, Integer>>() {

10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
import akka.japi.Pair; import akka.japi.function.*; import akka.japi.pf.PFBuilder; import akka.stream.*; import akka.stream.impl.ConstantFun;
import akka.stream.stage.*;
import akka.testkit.AkkaSpec; import akka.stream.testkit.TestPublisher; import akka.testkit.javadsl.TestKit; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.util.Try; import akka.testkit.AkkaJUnitActorSystemResource;
...
82 83 84 85 86 87 88 89 90 91 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
probe.expectMsgEquals("b"); probe.expectMsgEquals("c"); probe.expectMsgEquals("Done"); }
@Test public void mustBeAbleToUseVia() { final TestKit probe = new TestKit(system); final Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); // duplicate each element, stop after 4 elements, and emit sum to the end Source.from(input).via(new GraphStage<FlowShape<Integer, Integer>>() { public final Inlet<Integer> in = Inlet.create("in"); public final Outlet<Integer> out = Outlet.create("out"); @Override public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception { return new GraphStageLogic(shape()) { int sum = 0; int count = 0; { setHandler(in, new AbstractInHandler() { @Override public void onPush() { final Integer element = grab(in); sum += element; count += 1; if (count == 4) { emitMultiple(out, Arrays.asList(element, element, sum).iterator(), () -> completeStage()); } else { emitMultiple(out, Arrays.asList(element, element).iterator()); } } }); setHandler(out, new AbstractOutHandler() { @Override public void onPull() throws Exception { pull(in); } }); } }; } @Override public FlowShape<Integer, Integer> shape() { return FlowShape.of(in, out); } }).runForeach((Procedure<Integer>) elem -> probe.getRef().tell(elem, ActorRef.noSender()), materializer); probe.expectMsgEquals(0); probe.expectMsgEquals(0); probe.expectMsgEquals(1); probe.expectMsgEquals(1); probe.expectMsgEquals(2); probe.expectMsgEquals(2); probe.expectMsgEquals(3); probe.expectMsgEquals(3); probe.expectMsgEquals(6); }
@SuppressWarnings("unchecked") @Test public void mustBeAbleToUseGroupBy() throws Exception { final Iterable<String> input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee"); final Source<List<String>, NotUsed> source = Source
About FluentSend Feedback