Reorganization of grpc code

This commit is contained in:
Oleg Zhurakousky
2021-10-18 17:55:33 +02:00
parent 1466c58207
commit cd8d556624
26 changed files with 55 additions and 73 deletions

View File

@@ -0,0 +1,61 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import org.springframework.messaging.Message;
import com.google.protobuf.GeneratedMessageV3;
/**
*
* @author Oleg Zhurakousky
*
* @param <T> instance of {@link GeneratedMessageV3}
*/
public abstract class AbstractGrpcMessageConverter<T extends GeneratedMessageV3> implements GrpcMessageConverter<T> {
@Override
public Message<byte[]> toSpringMessage(T grpcMessage) {
if (this.supports(grpcMessage)) {
return this.doToSpringMessage(grpcMessage);
}
return null;
}
@Override
public T fromSpringMessage(Message<byte[]> springMessage, Class<T> grpcClass) {
if (this.supports(grpcClass)) {
return this.doFromSpringMessage(springMessage);
}
return null;
}
protected abstract Message<byte[]> doToSpringMessage(T grpcMessage);
protected abstract T doFromSpringMessage(Message<byte[]> springMessage);
protected boolean supports(T grpcMessage) {
// String fieldName = grpcMessage.getAllFields().keySet().iterator().next().getFullName();
// fieldName = fieldName.substring(0, fieldName.lastIndexOf("."));
// System.out.println(grpcMessage.getClass().getName());
// return fieldName.contains(grpcMessage.getClass().getSimpleName());
return this.supports(grpcMessage.getClass());
}
protected abstract boolean supports(Class<? extends GeneratedMessageV3> grpcClass);
}

View File

@@ -0,0 +1,74 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.function.context.FunctionProperties;
/**
*
* @author Oleg Zhurakousky
* @since 3.2
*
*/
@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".grpc")
public class FunctionGrpcProperties {
private final static String GRPC_PREFIX = FunctionProperties.PREFIX + ".grpc";
/**
* The name of function definition property.
*/
public final static String SERVICE_CLASS_NAME = GRPC_PREFIX + ".service-class-name";
/**
* Default gRPC port.
*/
public final static int GRPC_PORT = 6048;
/**
* gRPC port server will bind to. Default 6048;
*/
private int port = GRPC_PORT;
/**
* The fully qualified name of the service you wish to enable/expose.
* Setting this property ensures that only a single service is enabled/exposed,
* regardless of how many services are available on the classpath.
*/
private String serviceClassName;
/**
* Grpc Server port.
*/
public int getPort() {
return this.port;
}
public void setPort(int port) {
this.port = port;
}
public String getServiceClassName() {
return serviceClassName;
}
public void setServiceClassName(String serviceClassName) {
this.serviceClassName = serviceClassName;
}
}

View File

@@ -0,0 +1,73 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import java.util.List;
import io.grpc.BindableService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
*
* @author Oleg Zhurakousky
* @since 3.2
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(FunctionGrpcProperties.class)
@ConditionalOnProperty(name = "spring.cloud.function.grpc.server", havingValue = "true", matchIfMissing = true)
class GrpcAutoConfiguration {
@Bean
public GrpcServer grpcServer(FunctionGrpcProperties grpcProperties, BindableService[] grpcMessagingServices) {
Assert.notEmpty(grpcMessagingServices, "'grpcMessagingServices' must not be null or empty");
if (StringUtils.hasText(grpcProperties.getServiceClassName())) {
for (BindableService bindableService : grpcMessagingServices) {
if (bindableService.getClass().getName().equals(grpcProperties.getServiceClassName())) {
return new GrpcServer(grpcProperties, new BindableService[] {bindableService});
}
}
}
return new GrpcServer(grpcProperties, grpcMessagingServices);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Bean
public BindableService grpcSpringMessageHandler(MessageHandlingHelper helper) {
return new GrpcServerMessageHandler(helper);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public MessageHandlingHelper grpcMessageHandlingHelper(List<GrpcMessageConverter<?>> grpcConverters,
FunctionProperties funcProperties, FunctionCatalog functionCatalog) {
return new MessageHandlingHelper(grpcConverters, functionCatalog, funcProperties);
}
@Bean
public GrpcSpringMessageConverter grpcSpringMessageConverter() {
return new GrpcSpringMessageConverter();
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import com.google.protobuf.GeneratedMessageV3;
import org.springframework.messaging.Message;
/**
*
* @author Oleg Zhurakousky
*
* @param <T> instance of {@link GeneratedMessageV3}
*/
public interface GrpcMessageConverter<T extends GeneratedMessageV3> {
Message<byte[]> toSpringMessage(T grpcMessage);
T fromSpringMessage(Message<byte[]> springMessage, Class<T> grpcClass);
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.ClassUtils;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
/**
*
* @author Oleg Zhurakousky
* @author Dave Syer
*
* @since 3.2
*
*/
class GrpcServer implements SmartLifecycle {
private Log logger = LogFactory.getLog(GrpcServer.class);
private final FunctionGrpcProperties grpcProperties;
private final BindableService[] grpcMessageServices;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Server server;
GrpcServer(FunctionGrpcProperties grpcProperties, BindableService[] grpcMessageServices) {
this.grpcProperties = grpcProperties;
this.grpcMessageServices = grpcMessageServices;
}
@Override
public void start() {
this.executor.execute(() -> {
try {
ServerBuilder<?> serverBuilder = ServerBuilder.forPort(this.grpcProperties.getPort());
for (int i = 0; i < this.grpcMessageServices.length; i++) {
BindableService bindableService = this.grpcMessageServices[i];
serverBuilder.addService(bindableService);
}
if (ClassUtils.isPresent("io.grpc.protobuf.services.ProtoReflectionService", null)) {
serverBuilder.addService(ProtoReflectionService.newInstance());
}
this.server = serverBuilder.build();
logger.info("Starting gRPC server");
this.server.start();
logger.info("gRPC server is listening on port " + this.grpcProperties.getPort());
}
catch (Exception e) {
stop();
throw new IllegalStateException(e);
}
});
}
@Override
public void stop() {
logger.info("Shutting down gRPC server");
this.server.shutdown();
this.executor.shutdown();
}
@Override
public boolean isRunning() {
return this.server != null && !this.server.isShutdown();
}
}

View File

@@ -0,0 +1,111 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
//
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
//
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.grpc.MessagingServiceGrpc.MessagingServiceImplBase;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import com.google.protobuf.GeneratedMessageV3;
//
//import com.google.protobuf.GeneratedMessage;
/**
*
* @author Oleg Zhurakousky
* @since 3.2
*
*/
@SuppressWarnings("rawtypes")
class GrpcServerMessageHandler extends MessagingServiceImplBase {
private Log logger = LogFactory.getLog(GrpcServerMessageHandler.class);
private final MessageHandlingHelper helper;
private boolean running;
GrpcServerMessageHandler(MessageHandlingHelper<GeneratedMessageV3> helper) {
this.helper = helper;
}
@Override
@SuppressWarnings("unchecked")
public void requestReply(GrpcSpringMessage request, StreamObserver<GrpcSpringMessage> responseObserver) {
this.helper.requestReply(request, responseObserver);
}
@Override
@SuppressWarnings("unchecked")
public void serverStream(GrpcSpringMessage request, StreamObserver<GrpcSpringMessage> responseObserver) {
this.helper.serverStream(request, responseObserver);
}
@Override
@SuppressWarnings("unchecked")
public StreamObserver<GrpcSpringMessage> clientStream(StreamObserver<GrpcSpringMessage> responseObserver) {
return this.helper.clientStream(responseObserver, GrpcSpringMessage.class);
}
@SuppressWarnings("unchecked")
@Override
public StreamObserver<GrpcSpringMessage> biStream(StreamObserver<GrpcSpringMessage> responseObserver) {
return this.helper.biStream(responseObserver, GrpcSpringMessage.class);
}
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import java.util.HashMap;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
/**
*
* @author Oleg Zhurakousky
*
*/
public class GrpcSpringMessageConverter extends AbstractGrpcMessageConverter<GrpcSpringMessage> {
@Override
protected Message<byte[]> doToSpringMessage(GrpcSpringMessage grpcMessage) {
return MessageBuilder.withPayload(grpcMessage.getPayload().toByteArray())
.copyHeaders(grpcMessage.getHeadersMap())
.build();
}
@Override
protected GrpcSpringMessage doFromSpringMessage(Message<byte[]> springMessage) {
Map<String, String> stringHeaders = new HashMap<>();
springMessage.getHeaders().forEach((k, v) -> {
stringHeaders.put(k, v.toString());
});
return GrpcSpringMessage.newBuilder()
.setPayload(ByteString.copyFrom(springMessage.getPayload()))
.putAllHeaders(stringHeaders)
.build();
}
@Override
protected boolean supports(Class<? extends GeneratedMessageV3> grpcClass) {
return grpcClass.isAssignableFrom(GrpcSpringMessage.class);
}
}

View File

@@ -0,0 +1,285 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
*
* @author Oleg Zhurakousky
* @since 3.2
*
*/
final class GrpcUtils {
private static Log logger = LogFactory.getLog(GrpcUtils.class);
private GrpcUtils() {
}
public static GrpcSpringMessage toGrpcSpringMessage(byte[] payload, Map<String, String> headers) {
return GrpcSpringMessage.newBuilder()
.setPayload(ByteString.copyFrom(payload))
.putAllHeaders(headers)
.build();
}
public static GrpcSpringMessage toGrpcSpringMessage(Message<byte[]> message) {
Map<String, String> stringHeaders = new HashMap<>();
message.getHeaders().forEach((k, v) -> {
stringHeaders.put(k, v.toString());
});
return toGrpcSpringMessage(message.getPayload(), stringHeaders);
}
public static Message<byte[]> fromGrpcSpringMessage(GrpcSpringMessage message) {
return MessageBuilder.withPayload(message.getPayload().toByteArray())
.copyHeaders(message.getHeadersMap())
.build();
}
public static Message<byte[]> requestReply(Message<byte[]> inputMessage) {
return requestReply("localhost", FunctionGrpcProperties.GRPC_PORT, inputMessage);
}
public static Message<byte[]> requestReply(String host, int port, Message<byte[]> inputMessage) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc
.newBlockingStub(channel);
GrpcSpringMessage response = stub.requestReply(toGrpcSpringMessage(inputMessage));
channel.shutdown();
return fromGrpcSpringMessage(response);
}
/**
* Utility method to support bi-directional streaming interaction. Will connect to gRPC server using default host/port,
* otherwise use {@link #biStreaming(String, int, Flux)} method.
*
* Keep in mind that there is no implied relationship between input stream and output stream.
* They are completely independent where one may end before the other.
*
* @param inputStream {@code FluxMessage<byte[]>>} representing input stream.
* @return {@code Flux<Message<byte[]>>} representing output stream
*/
public static Flux<Message<byte[]>> biStreaming(Flux<Message<byte[]>> inputStream) {
return biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, inputStream);
}
/**
* Utility method to support bi-directional streaming interaction.
* Keep in mind that there is no implied relationship between input stream and output stream.
* They are completely independent where one may end before the other.
*
* @param host gRPC server host name
* @param port gRPC server port
* @param inputStream {@code FluxMessage<byte[]>>} representing input stream
* @return {@code Flux<Message<byte[]>>} representing output stream
*/
public static Flux<Message<byte[]>> biStreaming(String host, int port, Flux<Message<byte[]>> inputStream) {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext().build();
MessagingServiceGrpc.MessagingServiceStub stub = MessagingServiceGrpc
.newStub(channel);
Many<Message<byte[]>> sink = Sinks.many().unicast().onBackpressureBuffer();
ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver = clientResponseObserver(inputStream, sink);
stub.biStream(clientResponseObserver);
return sink.asFlux().doOnComplete(() -> {
logger.debug("Shutting down channel");
channel.shutdown();
});
}
public static Flux<Message<byte[]>> serverStream(String host, int port, Message<byte[]> inputMessage) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc
.newBlockingStub(channel);
Iterator<GrpcSpringMessage> serverStream = stub.serverStream(toGrpcSpringMessage(inputMessage));
Many<Message<byte[]>> sink = Sinks.many().unicast().onBackpressureBuffer();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
while (serverStream.hasNext()) {
GrpcSpringMessage grpcMessage = serverStream.next();
sink.tryEmitNext(GrpcUtils.fromGrpcSpringMessage(grpcMessage));
}
sink.tryEmitComplete();
});
return sink.asFlux()
.doOnComplete(() -> {
channel.shutdown();
executor.shutdownNow();
});
}
/**
* Utility method to support client-side streaming interaction. Will connect to gRPC server using default host/port,
* otherwise use {@link #clientStream(String, int, Flux)} method.
*
* @param inputStream {@code FluxMessage<byte[]>>} representing input stream.
* @return {@code Message<byte[]>} representing output
*/
public static Message<byte[]> clientStream(Flux<Message<byte[]>> inputStream) {
return clientStream("localhost", FunctionGrpcProperties.GRPC_PORT, inputStream);
}
/**
* Utility method to support client-side streaming interaction.
*
* @param host gRPC server host name
* @param port gRPC server port
* @param inputStream {@code FluxMessage<byte[]>>} representing input stream
* @return {@code Message<byte[]>} representing output
*/
public static Message<byte[]> clientStream(String host, int port, Flux<Message<byte[]>> inputStream) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
LinkedBlockingQueue<Message<byte[]>> resultRef = new LinkedBlockingQueue<>(1);
StreamObserver<GrpcSpringMessage> responseObserver = new StreamObserver<GrpcSpringMessage>() {
@Override
public void onNext(GrpcSpringMessage result) {
if (logger.isDebugEnabled()) {
logger.debug("Client received reply: " + result);
}
resultRef.offer(GrpcUtils.fromGrpcSpringMessage(result));
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
logger.info("Client completed");
}
};
MessagingServiceGrpc.MessagingServiceStub asyncStub = MessagingServiceGrpc.newStub(channel);
StreamObserver<GrpcSpringMessage> requestObserver = asyncStub.clientStream(responseObserver);
inputStream.doOnNext(message -> {
if (logger.isDebugEnabled()) {
logger.debug("Client sending: " + message);
}
try {
requestObserver.onNext(GrpcUtils.toGrpcSpringMessage(message));
}
catch (Exception e) {
requestObserver.onError(e);
}
}).doOnComplete(() -> {
requestObserver.onCompleted();
}).subscribe();
try {
return resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ie);
}
}
private static ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver(Flux<Message<byte[]>> inputStream, Many<Message<byte[]>> sink) {
return new ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage>() {
ClientCallStreamObserver<GrpcSpringMessage> requestStreamObserver;
@Override
public void beforeStart(ClientCallStreamObserver<GrpcSpringMessage> requestStreamObserver) {
this.requestStreamObserver = requestStreamObserver;
requestStreamObserver.disableAutoInboundFlowControl();
requestStreamObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
inputStream
.doOnNext(request -> {
if (logger.isDebugEnabled()) {
logger.debug("Streaming message to function: " + request);
}
requestStreamObserver.onNext(GrpcUtils.toGrpcSpringMessage(request));
})
.doOnComplete(() -> {
requestStreamObserver.onCompleted();
})
.subscribe();
}
});
}
@Override
public void onNext(GrpcSpringMessage message) {
if (logger.isDebugEnabled()) {
logger.debug("Streaming message from function: " + message);
}
sink.tryEmitNext(fromGrpcSpringMessage(message));
requestStreamObserver.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
logger.info("Client stream is complete");
sink.tryEmitComplete(); // TODO revisit as this would complete the server stream simply because the client is done.
// Perhaps we need to expose some boolean value when this is desirable
}
};
}
}

View File

@@ -0,0 +1,352 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
*
* @author Oleg Zhurakousky
*
*/
public class MessageHandlingHelper<T extends GeneratedMessageV3> implements SmartLifecycle {
private Log logger = LogFactory.getLog(MessageHandlingHelper.class);
private final List<GrpcMessageConverter<?>> grpcConverters;
private final FunctionProperties funcProperties;
private final FunctionCatalog functionCatalog;
private final ExecutorService executor;
private boolean running;
public MessageHandlingHelper(List<GrpcMessageConverter<?>> grpcConverters,
FunctionCatalog functionCatalog, FunctionProperties funcProperties) {
this.grpcConverters = grpcConverters;
this.funcProperties = funcProperties;
this.functionCatalog = functionCatalog;
this.executor = Executors.newCachedThreadPool();
}
@SuppressWarnings("unchecked")
public void requestReply(T request, StreamObserver<T> responseObserver) {
Message<byte[]> message = this.toSpringMessage(request);
FunctionInvocationWrapper function = this.resolveFunction(message.getHeaders());
Message<byte[]> replyMessage = (Message<byte[]>) function.apply(message);
GeneratedMessageV3 reply = this.toGrpcMessage(replyMessage, (Class<T>) request.getClass());
responseObserver.onNext((T) reply);
responseObserver.onCompleted();
}
@SuppressWarnings("unchecked")
public void serverStream(T request, StreamObserver<T> responseObserver) {
Message<byte[]> message = this.toSpringMessage(request);
FunctionInvocationWrapper function = this.resolveFunction(message.getHeaders());
Publisher<Message<byte[]>> replyStream = (Publisher<Message<byte[]>>) function.apply(message);
Flux.from(replyStream).doOnNext(replyMessage -> {
responseObserver.onNext(this.toGrpcMessage(replyMessage, (Class<T>) request.getClass()));
})
.doOnComplete(() -> responseObserver.onCompleted())
.subscribe();
}
@SuppressWarnings("unchecked")
public StreamObserver<T> clientStream(StreamObserver<T> responseObserver, Class<T> grpcMessageType) {
ServerCallStreamObserver<T> serverCallStreamObserver = (ServerCallStreamObserver<T>) responseObserver;
serverCallStreamObserver.disableAutoInboundFlowControl();
FunctionInvocationWrapper function = this.resolveFunction(null);
AtomicBoolean wasReady = new AtomicBoolean(false);
serverCallStreamObserver.setOnReadyHandler(() -> {
if (serverCallStreamObserver.isReady() && !wasReady.get()) {
wasReady.set(true);
logger.info("gRPC Server receiving stream is ready.");
serverCallStreamObserver.request(1);
}
});
if (!function.isInputTypePublisher()) {
throw new UnsupportedOperationException("The client streaming is "
+ "not supported for functions that accept non-Publisher: "
+ function);
}
else if (function.isOutputTypePublisher()) {
throw new UnsupportedOperationException("The client streaming is "
+ "not supported for functions that return Publisher: "
+ function);
}
else {
Many<Message<byte[]>> inputStream = Sinks.many().unicast().onBackpressureBuffer();
Flux<Message<byte[]>> inputStreamFlux = inputStream.asFlux();
LinkedBlockingQueue<Message<byte[]>> resultRef = new LinkedBlockingQueue<>(1);
this.executor.execute(() -> {
Message<byte[]> replyMessage = (Message<byte[]>) function.apply(inputStreamFlux);
if (logger.isDebugEnabled()) {
logger.debug("Function invocation reply: " + replyMessage);
}
resultRef.offer(replyMessage);
});
return new StreamObserver<T>() {
@Override
public void onNext(T inputMessage) {
if (logger.isDebugEnabled()) {
logger.debug("gRPC Server receiving: " + inputMessage);
}
inputStream.tryEmitNext(toSpringMessage(inputMessage));
serverCallStreamObserver.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
logger.info("gRPC Server has finished receiving data.");
inputStream.tryEmitComplete();
try {
responseObserver.onNext(toGrpcMessage(resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS), grpcMessageType));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
responseObserver.onCompleted();
}
};
}
}
public StreamObserver<T> biStream(StreamObserver<T> responseObserver, Class<T> grpcMessageType) {
ServerCallStreamObserver<T> serverCallStreamObserver = (ServerCallStreamObserver<T>) responseObserver;
serverCallStreamObserver.disableAutoInboundFlowControl();
FunctionInvocationWrapper function = this.resolveFunction(null);
AtomicBoolean wasReady = new AtomicBoolean(false);
serverCallStreamObserver.setOnReadyHandler(() -> {
if (serverCallStreamObserver.isReady() && !wasReady.get()) {
wasReady.set(true);
logger.info("gRPC Server receiving stream is ready.");
serverCallStreamObserver.request(1);
}
});
if (function.isInputTypePublisher()) {
if (function.isOutputTypePublisher()) {
return this.biStreamReactive(responseObserver, serverCallStreamObserver, grpcMessageType);
}
throw new UnsupportedOperationException("The bi-directional streaming is "
+ "not supported for functions that accept Publisher but return non-Publisher: "
+ function);
}
else {
if (!function.isOutputTypePublisher()) {
return this.biStreamImperative(responseObserver, serverCallStreamObserver, wasReady);
}
throw new UnsupportedOperationException("The bidirection streaming is "
+ "not supported for functions that accept non-Publisher but return Publisher: "
+ function);
}
}
@SuppressWarnings("unchecked")
private StreamObserver<T> biStreamReactive(StreamObserver<T> responseObserver,
ServerCallStreamObserver<T> serverCallStreamObserver, Class<T> grpcMessageType) {
Many<Message<byte[]>> inputStream = Sinks.many().unicast().onBackpressureBuffer();
Flux<Message<byte[]>> inputStreamFlux = inputStream.asFlux();
FunctionInvocationWrapper function = this.resolveFunction(null);
Publisher<Message<byte[]>> outputPublisher = (Publisher<Message<byte[]>>) function.apply(inputStreamFlux);
Flux.from(outputPublisher).subscribe(functionResult -> {
T outputMessage = toGrpcMessage(functionResult, grpcMessageType);
if (logger.isDebugEnabled()) {
logger.debug("gRPC Server replying: " + outputMessage);
}
responseObserver.onNext(outputMessage);
});
return new StreamObserver<T>() {
@Override
public void onNext(T inputMessage) {
if (logger.isDebugEnabled()) {
logger.debug("gRPC Server receiving: " + inputMessage);
}
//GRPC_MESSAGE_TYPE = (Class<T>) inputMessage.getClass();
inputStream.tryEmitNext(toSpringMessage(inputMessage));
serverCallStreamObserver.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
logger.info("gRPC Server has finished receiving data.");
inputStream.tryEmitComplete();
responseObserver.onCompleted();
}
};
}
private StreamObserver<T> biStreamImperative(StreamObserver<T> responseObserver,
ServerCallStreamObserver<T> serverCallStreamObserver,
AtomicBoolean wasReady) {
return new StreamObserver<T>() {
@SuppressWarnings("unchecked")
@Override
public void onNext(T request) {
try {
Message<byte[]> message = toSpringMessage(request);
FunctionInvocationWrapper function = resolveFunction(
message.getHeaders());
Message<byte[]> replyMessage = (Message<byte[]>) function
.apply(message);
T reply = toGrpcMessage(replyMessage, (Class<T>) request.getClass());
responseObserver.onNext(reply);
// Check the provided ServerCallStreamObserver to see if it is still
// ready to accept more messages.
if (serverCallStreamObserver.isReady()) {
serverCallStreamObserver.request(1);
}
else {
wasReady.set(false);
}
}
catch (Throwable throwable) {
throwable.printStackTrace();
responseObserver.onError(
Status.UNKNOWN.withDescription("Error handling request")
.withCause(throwable).asException());
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
logger.info("gRPC Server has finished receiving data.");
responseObserver.onCompleted();
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private T toGrpcMessage(Message<byte[]> request, Class<T> grpcClass) {
for (GrpcMessageConverter converter : this.grpcConverters) {
GeneratedMessageV3 grpcMessage = converter.fromSpringMessage(request, grpcClass);
if (grpcMessage != null) {
return (T) grpcMessage;
}
}
throw new IllegalStateException("Failed to convert Grpc Message to Spring Message: " + request);
}
@Override
public void start() {
this.running = true;
}
@Override
public void stop() {
this.executor.shutdown();
try {
Assert.isTrue(this.executor.awaitTermination(5000, TimeUnit.MILLISECONDS), "gRPC Server executor timed out while stopping, "
+ "since there are currently executing tasks");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.running = false;
}
@Override
public boolean isRunning() {
return this.running;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private Message<byte[]> toSpringMessage(GeneratedMessageV3 request) {
for (GrpcMessageConverter converter : this.grpcConverters) {
Message<byte[]> springMessage = converter.toSpringMessage(request);
if (springMessage != null) {
return springMessage;
}
}
throw new IllegalStateException("Failed to convert Grpc Message to Spring Message: " + request);
}
private FunctionInvocationWrapper resolveFunction(Map<String, Object> headers) {
String functionDefinition = funcProperties.getDefinition();
if (!CollectionUtils.isEmpty(headers) && headers.containsKey(FunctionProperties.FUNCTION_DEFINITION)) {
functionDefinition = (String) headers.get(FunctionProperties.FUNCTION_DEFINITION);
}
FunctionInvocationWrapper function = this.functionCatalog.lookup(functionDefinition, "application/json");
Assert.notNull(function, "Failed to lookup function " + funcProperties.getDefinition());
return function;
}
}

View File

@@ -0,0 +1,18 @@
syntax = "proto3";
option java_multiple_files = true;
package org.springframework.cloud.function.grpc;
message GrpcSpringMessage {
bytes payload = 1;
map<string, string> headers = 2;
}
service MessagingService {
rpc biStream(stream GrpcSpringMessage) returns (stream GrpcSpringMessage);
rpc clientStream(stream GrpcSpringMessage) returns (GrpcSpringMessage);
rpc serverStream(GrpcSpringMessage) returns (stream GrpcSpringMessage);
rpc requestReply(GrpcSpringMessage) returns (GrpcSpringMessage);
}

View File

@@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.grpc.GrpcAutoConfiguration

View File

@@ -0,0 +1,284 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.grpc;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
/**
*
* @author Oleg Zhurakousky
*
*/
public class GrpcInteractionTests {
@Test
public void testRequestReply() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.grpc.port=" + FunctionGrpcProperties.GRPC_PORT)) {
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build();
Message<byte[]> reply = GrpcUtils.requestReply(message);
assertThat(reply.getPayload()).isEqualTo("\"HELLO GRPC\"".getBytes());
}
}
@Test
public void testRequstReplyFunctionDefinitionInMessage() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.grpc.port=" + FunctionGrpcProperties.GRPC_PORT)) {
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
.setHeader("foo", "bar")
.setHeader("spring.cloud.function.definition", "reverse")
.build();
Message<byte[]> reply = GrpcUtils.requestReply(message);
assertThat(reply.getPayload()).isEqualTo("\"CPRg olleh\"".getBytes());
}
}
@Test
public void testBidirectionalStreamWithImperativeFunction() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT)) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.build());
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
List<Message<byte[]>> results = clientResponseObserver.collectList().block(Duration.ofSeconds(5));
assertThat(results.size()).isEqualTo(3);
assertThat(results.get(0).getPayload()).isEqualTo("\"RICKY\"".getBytes());
assertThat(results.get(1).getPayload()).isEqualTo("\"JULIEN\"".getBytes());
assertThat(results.get(2).getPayload()).isEqualTo("\"BUBBLES\"".getBytes());
}
}
@Test
public void testBidirectionalStreamWithReactiveFunction() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercaseReactive",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT)) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
Flux<Message<byte[]>> resultStream =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
List<Message<byte[]>> results = resultStream.collectList().block(Duration.ofSeconds(5));
assertThat(results.size()).isEqualTo(3);
assertThat(results.get(0).getPayload()).isEqualTo("\"RICKY\"".getBytes());
assertThat(results.get(1).getPayload()).isEqualTo("\"JULIEN\"".getBytes());
assertThat(results.get(2).getPayload()).isEqualTo("\"BUBBLES\"".getBytes());
}
}
@Test
public void testClientStreaming() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=streamInStringOut",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT)) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.build());
Message<byte[]> reply =
GrpcUtils.clientStream("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
assertThat(reply.getPayload()).isEqualTo("[Ricky, Julien, Bubbles]".getBytes());
}
}
@Test
public void testServerStreaming() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=stringInStreamOut",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT)) {
Message<byte[]> message = MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar").build();
Flux<Message<byte[]>> reply =
GrpcUtils.serverStream("localhost", FunctionGrpcProperties.GRPC_PORT, message);
List<Message<byte[]>> results = reply.collectList().block(Duration.ofSeconds(5));
assertThat(results.size()).isEqualTo(2);
assertThat(results.get(0).getPayload()).isEqualTo("\"Ricky\"".getBytes());
assertThat(results.get(1).getPayload()).isEqualTo("\"RICKY\"".getBytes());
}
}
@Test
public void testBiStreamStreamInStringOutFailure() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=streamInStringOut",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT)) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.build());
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
try {
clientResponseObserver.collectList().block(Duration.ofSeconds(1));
fail();
}
catch (Exception e) {
// TODO: handle exception
}
}
}
@Test
public void testBiStreamStringInStreamOutFailure() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleConfiguration.class).web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=stringInStreamOut",
"--spring.cloud.function.grpc.port="
+ FunctionGrpcProperties.GRPC_PORT)) {
List<Message<byte[]>> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Julien\"".getBytes()).setHeader("foo", "bar")
.build());
messages.add(MessageBuilder.withPayload("\"Bubbles\"".getBytes()).setHeader("foo", "bar")
.build());
Flux<Message<byte[]>> clientResponseObserver =
GrpcUtils.biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, Flux.fromIterable(messages));
try {
clientResponseObserver.collectList().block(Duration.ofSeconds(1));
fail();
}
catch (Exception e) {
// TODO: handle exception
}
}
}
@EnableAutoConfiguration
public static class SampleConfiguration {
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<Flux<String>, Flux<String>> uppercaseReactive() {
return flux -> flux.map(v -> v.toUpperCase());
}
@Bean
public Function<Flux<String>, String> streamInStringOut() {
return flux -> flux.doOnNext(v -> {
try {
Thread.sleep(new Random().nextInt(2000)); // artificial delay
}
catch (Exception e) {
// ignore
}
}).collectList().block().toString();
}
@Bean
public Function<String, Flux<String>> stringInStreamOut() {
return value -> Flux.just(value, value.toUpperCase());
}
}
}

View File

@@ -0,0 +1 @@
logging.level.org.springframework.cloud.function.grpc=DEBUG