@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
* Copyright 2021-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,28 +16,34 @@
|
||||
|
||||
package org.springframework.cloud.function.grpc;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
import io.grpc.BindableService;
|
||||
import io.grpc.Server;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.grpc.protobuf.services.ProtoReflectionService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.context.EnvironmentAware;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Dave Syer
|
||||
* @author Chris Bono
|
||||
*
|
||||
* @since 3.2
|
||||
*
|
||||
*/
|
||||
class GrpcServer implements SmartLifecycle {
|
||||
class GrpcServer implements SmartLifecycle, EnvironmentAware {
|
||||
|
||||
private Log logger = LogFactory.getLog(GrpcServer.class);
|
||||
|
||||
@@ -49,6 +55,8 @@ class GrpcServer implements SmartLifecycle {
|
||||
|
||||
private Server server;
|
||||
|
||||
private Environment environment;
|
||||
|
||||
GrpcServer(FunctionGrpcProperties grpcProperties, BindableService[] grpcMessageServices) {
|
||||
this.grpcProperties = grpcProperties;
|
||||
this.grpcMessageServices = grpcMessageServices;
|
||||
@@ -70,7 +78,12 @@ class GrpcServer implements SmartLifecycle {
|
||||
|
||||
logger.info("Starting gRPC server");
|
||||
this.server.start();
|
||||
logger.info("gRPC server is listening on port " + this.grpcProperties.getPort());
|
||||
logger.info("gRPC server is listening on port " + this.server.getPort());
|
||||
|
||||
if (environment instanceof ConfigurableEnvironment) {
|
||||
((ConfigurableEnvironment) this.environment).getPropertySources().addFirst(
|
||||
new MapPropertySource("grpcServerProps", Collections.singletonMap("local.grpc.server.port", server.getPort())));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
stop();
|
||||
@@ -90,4 +103,9 @@ class GrpcServer implements SmartLifecycle {
|
||||
public boolean isRunning() {
|
||||
return this.server != null && !this.server.isShutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnvironment(Environment environment) {
|
||||
this.environment = environment;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
* Copyright 2021-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
* @author Chris Bono
|
||||
*/
|
||||
@Disabled
|
||||
public class GrpcInteractionTests {
|
||||
@@ -62,13 +62,14 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestReply() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testRequestReply() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercase",
|
||||
"--spring.cloud.function.grpc.port=" + port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
|
||||
.setHeader("foo", "bar")
|
||||
@@ -82,13 +83,14 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestReplyWithMonoReturn() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testRequestReplyWithMonoReturn() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercaseMonoReturn",
|
||||
"--spring.cloud.function.grpc.port=" + port)) {
|
||||
"--spring.cloud.function.definition=uppercaseMonoReturn",
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
|
||||
.setHeader("foo", "bar")
|
||||
@@ -102,14 +104,15 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestReplyWithFluxReturn() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testRequestReplyWithFluxReturn() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercaseFluxReturn",
|
||||
"--spring.cloud.function.grpc.port=" + port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
|
||||
.setHeader("foo", "bar")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
|
||||
@@ -125,13 +128,14 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequstReplyFunctionDefinitionInMessage() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testRequstReplyFunctionDefinitionInMessage() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.grpc.port=" + port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
Message<byte[]> message = MessageBuilder.withPayload("\"hello gRPC\"".getBytes())
|
||||
.setHeader("foo", "bar")
|
||||
.setHeader("spring.cloud.function.definition", "reverse")
|
||||
@@ -144,14 +148,15 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBidirectionalStreamWithImperativeFunction() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testBidirectionalStreamWithImperativeFunction() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercase",
|
||||
"--spring.cloud.function.grpc.port=" + port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
List<Message<byte[]>> messages = new ArrayList<>();
|
||||
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
|
||||
.build());
|
||||
@@ -172,15 +177,15 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBidirectionalStreamWithReactiveFunction() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testBidirectionalStreamWithReactiveFunction() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=uppercaseReactive",
|
||||
"--spring.cloud.function.grpc.port="
|
||||
+ port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
List<Message<byte[]>> messages = new ArrayList<>();
|
||||
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
|
||||
@@ -204,14 +209,14 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientStreaming() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testClientStreaming() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=streamInStringOut",
|
||||
"--spring.cloud.function.grpc.port="
|
||||
+ port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
List<Message<byte[]>> messages = new ArrayList<>();
|
||||
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
|
||||
@@ -229,14 +234,14 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerStreaming() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testServerStreaming() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=stringInStreamOut",
|
||||
"--spring.cloud.function.grpc.port="
|
||||
+ port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
Message<byte[]> message = MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar").build();
|
||||
|
||||
@@ -251,14 +256,14 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBiStreamStreamInStringOutFailure() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testBiStreamStreamInStringOutFailure() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=streamInStringOut",
|
||||
"--spring.cloud.function.grpc.port="
|
||||
+ port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
List<Message<byte[]>> messages = new ArrayList<>();
|
||||
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
|
||||
@@ -276,14 +281,14 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBiStreamStringInStreamOutFailure() {
|
||||
int port = SocketUtils.findAvailableTcpPort();
|
||||
public void testBiStreamStringInStreamOutFailure() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=stringInStreamOut",
|
||||
"--spring.cloud.function.grpc.port="
|
||||
+ port)) {
|
||||
"--spring.cloud.function.grpc.port=0")) {
|
||||
|
||||
int port = patientlyGetPort(context);
|
||||
|
||||
List<Message<byte[]>> messages = new ArrayList<>();
|
||||
messages.add(MessageBuilder.withPayload("\"Ricky\"".getBytes()).setHeader("foo", "bar")
|
||||
@@ -300,6 +305,17 @@ public class GrpcInteractionTests {
|
||||
}
|
||||
}
|
||||
|
||||
private int patientlyGetPort(ConfigurableApplicationContext context) throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
String port = context.getEnvironment().getProperty("local.grpc.server.port");
|
||||
if (port == null) {
|
||||
Thread.sleep(500);
|
||||
port = context.getEnvironment().getProperty("local.grpc.server.port");
|
||||
assertThat(port).as("Unable to get 'local.grpc.server.port' - server may not have started up").isNotNull();
|
||||
}
|
||||
return Integer.valueOf(port);
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
public static class SampleConfiguration {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user