Files
2019-11-10 00:50:27 +00:00

400 lines
15 KiB
HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<!--[if IE]><meta http-equiv="X-UA-Compatible" content="IE=edge"><![endif]-->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="generator" content="Asciidoctor 1.5.8">
<title>Spring Cloud Stream</title>
<link rel="stylesheet" href="css/spring.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<style>
.hidden {
display: none;
}
.switch {
border-width: 1px 1px 0 1px;
border-style: solid;
border-color: #7a2518;
display: inline-block;
}
.switch--item {
padding: 10px;
background-color: #ffffff;
color: #7a2518;
display: inline-block;
cursor: pointer;
}
.switch--item:not(:first-child) {
border-width: 0 0 0 1px;
border-style: solid;
border-color: #7a2518;
}
.switch--item.selected {
background-color: #7a2519;
color: #ffffff;
}
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/zepto/1.2.0/zepto.min.js"></script>
<script type="text/javascript">
function addBlockSwitches() {
$('.primary').each(function() {
primary = $(this);
createSwitchItem(primary, createBlockSwitch(primary)).item.addClass("selected");
primary.children('.title').remove();
});
$('.secondary').each(function(idx, node) {
secondary = $(node);
primary = findPrimary(secondary);
switchItem = createSwitchItem(secondary, primary.children('.switch'));
switchItem.content.addClass('hidden');
findPrimary(secondary).append(switchItem.content);
secondary.remove();
});
}
function createBlockSwitch(primary) {
blockSwitch = $('<div class="switch"></div>');
primary.prepend(blockSwitch);
return blockSwitch;
}
function findPrimary(secondary) {
candidate = secondary.prev();
while (!candidate.is('.primary')) {
candidate = candidate.prev();
}
return candidate;
}
function createSwitchItem(block, blockSwitch) {
blockName = block.children('.title').text();
content = block.children('.content').first().append(block.next('.colist'));
item = $('<div class="switch--item">' + blockName + '</div>');
item.on('click', '', content, function(e) {
$(this).addClass('selected');
$(this).siblings().removeClass('selected');
e.data.siblings('.content').addClass('hidden');
e.data.removeClass('hidden');
});
blockSwitch.append(item);
return {'item': item, 'content': content};
}
$(addBlockSwitches);
</script>
</head>
<body class="book toc2 toc-left">
<div id="header">
<div id="toc" class="toc2">
<div id="toctitle">Table of Contents</div>
<ul class="sectlevel1">
<li><a href="#_spring_cloud_stream">Spring Cloud Stream</a>
<ul class="sectlevel2">
<li><a href="#_overview">Overview</a></li>
<li><a href="#_configuration">Configuration</a></li>
<li><a href="#_binding_with_functions">Binding with Functions</a></li>
<li><a href="#_binding_with_annotations">Binding with Annotations</a></li>
<li><a href="#_streaming_vs_polled_input">Streaming vs. Polled Input</a></li>
<li><a href="#_sample">Sample</a></li>
</ul>
</li>
</ul>
</div>
</div>
<div id="content">
<div class="sect1">
<h2 id="_spring_cloud_stream"><a class="link" href="#_spring_cloud_stream">Spring Cloud Stream</a></h2>
<div class="sectionbody">
<div class="paragraph">
<p>Spring Cloud GCP provides a <a href="https://cloud.spring.io/spring-cloud-stream/">Spring Cloud Stream</a> binder to Google Cloud Pub/Sub.</p>
</div>
<div class="paragraph">
<p>The provided binder relies on the <a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/integration">Spring Integration Channel Adapters for Google Cloud Pub/Sub</a>.</p>
</div>
<div class="paragraph">
<p>Maven coordinates, using <a href="getting-started.html#_bill_of_materials">Spring Cloud GCP BOM</a>:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-gcp-pubsub-stream-binder&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Gradle coordinates:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-pubsub-stream-binder'
}</code></pre>
</div>
</div>
<div class="sect2">
<h3 id="_overview"><a class="link" href="#_overview">Overview</a></h3>
<div class="paragraph">
<p>This binder binds producers to Google Cloud Pub/Sub topics and consumers to subscriptions.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
Partitioning is currently not supported by this binder.
</td>
</tr>
</table>
</div>
</div>
<div class="sect2">
<h3 id="_configuration"><a class="link" href="#_configuration">Configuration</a></h3>
<div class="paragraph">
<p>You can configure the Spring Cloud Stream Binder for Google Cloud Pub/Sub to automatically generate the underlying resources, like the Google Cloud Pub/Sub topics and subscriptions for producers and consumers.
For that, you can use the <code>spring.cloud.stream.gcp.pubsub.bindings.&lt;channelName&gt;.&lt;consumer|producer&gt;.auto-create-resources</code> property, which is turned ON by default.</p>
</div>
<div class="paragraph">
<p>Starting with version 1.1, these and other binder properties can be configured globally for all the bindings, e.g. <code>spring.cloud.stream.gcp.pubsub.default.consumer.auto-create-resources</code>.</p>
</div>
<div class="paragraph">
<p>If you are using Pub/Sub auto-configuration from the Spring Cloud GCP Pub/Sub Starter, you should refer to the <a href="#pubsub-configuration">configuration</a> section for other Pub/Sub parameters.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
To use this binder with a <a href="https://cloud.google.com/pubsub/docs/emulator">running emulator</a>, configure its host and port via <code>spring.cloud.gcp.pubsub.emulator-host</code>.
</td>
</tr>
</table>
</div>
<div class="sect3">
<h4 id="_producer_destination_configuration"><a class="link" href="#_producer_destination_configuration">Producer Destination Configuration</a></h4>
<div class="paragraph">
<p>If automatic resource creation is turned ON and the topic corresponding to the destination name does not exist, it will be created.</p>
</div>
<div class="paragraph">
<p>For example, for the following configuration, a topic called <code>myEvents</code> would be created.</p>
</div>
<div class="listingblock">
<div class="title">application.properties</div>
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.bindings.events.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.events.producer.auto-create-resources=true</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_consumer_destination_configuration"><a class="link" href="#_consumer_destination_configuration">Consumer Destination Configuration</a></h4>
<div class="paragraph">
<p>If automatic resource creation is turned ON and the subscription and/or the topic do not exist for a consumer, a subscription and potentially a topic will be created.
The topic name will be the same as the destination name, and the subscription name will be the destination name followed by the consumer group name.</p>
</div>
<div class="paragraph">
<p>Regardless of the <code>auto-create-resources</code> setting, if the consumer group is not specified, an anonymous one will be created with the name <code>anonymous.&lt;destinationName&gt;.&lt;randomUUID&gt;</code>.
Then when the binder shuts down, all Pub/Sub subscriptions created for anonymous consumer groups will be automatically cleaned up.</p>
</div>
<div class="paragraph">
<p>For example, for the following configuration, a topic named <code>myEvents</code> and a subscription called <code>myEvents.consumerGroup1</code> would be created.
If the consumer group is not specified, a subscription called <code>anonymous.myEvents.a6d83782-c5a3-4861-ac38-e6e2af15a7be</code> would be created and later cleaned up.</p>
</div>
<div class="admonitionblock important">
<table>
<tr>
<td class="icon">
<i class="fa icon-important" title="Important"></i>
</td>
<td class="content">
If you are manually creating Pub/Sub subscriptions for consumers, make sure that they follow the naming convention of <code>&lt;destinationName&gt;.&lt;consumerGroup&gt;</code>.
</td>
</tr>
</table>
</div>
<div class="listingblock">
<div class="title">application.properties</div>
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.bindings.events.destination=myEvents
spring.cloud.stream.gcp.pubsub.bindings.events.consumer.auto-create-resources=true
# specify consumer group, and avoid anonymous consumer group generation
spring.cloud.stream.bindings.events.group=consumerGroup1</code></pre>
</div>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_binding_with_functions"><a class="link" href="#_binding_with_functions">Binding with Functions</a></h3>
<div class="paragraph">
<p>Since version 3.0, Spring Cloud Stream supports a functional programming model natively.
This means that the only requirement for turning your application into a sink is presence of a <code>java.util.function.Consumer</code> bean in the application context.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Consumer&lt;UserMessage&gt; logUserMessage() {
return userMessage -&gt; {
// process message
}
};</code></pre>
</div>
</div>
<div class="paragraph">
<p>A source application is one where a <code>Supplier</code> bean is present.
It can return an object, in which case Spring Cloud Stream will invoke the supplier repeatedly.
Alternatively, the function can return a reactive stream, which will be used as is.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
Supplier&lt;Flux&lt;UserMessage&gt;&gt; generateUserMessages() {
return () -&gt; /* flux creation logic */;
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>A processor application works similarly to a source application, except it is triggered by presence of a <code>Function</code> bean.</p>
</div>
<div class="admonitionblock warning">
<table>
<tr>
<td class="icon">
<i class="fa icon-warning" title="Warning"></i>
</td>
<td class="content">
Even though Spring Cloud Stream is able to autodiscover functional beans, when using the <code>spring-cloud-gcp-pubsub-stream-binder</code> dependency, configuring the <code>spring.cloud.function.definition</code> property to indicate the function bean is <strong>required</strong>.
If this property is omitted, you will see a warning <code>Found more then one function beans in BeanFactory: [<em>your-bean-name</em>, pubSubReactiveScheduler]</code>, and the function returned by <em>your-bean-name</em> will never be bound to a Cloud Pub/Sub topic or subscription.
</td>
</tr>
</table>
</div>
</div>
<div class="sect2">
<h3 id="_binding_with_annotations"><a class="link" href="#_binding_with_annotations">Binding with Annotations</a></h3>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
As of version 3.0, annotation binding is considered legacy.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>To set up a sink application in this style, you would associate a class with a binding interface, such as the built-in <code>Sink</code> interface.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@EnableBinding(Sink.class)
public class SinkExample {
@StreamListener(Sink.INPUT)
public void handleMessage(UserMessage userMessage) {
// process message
}
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>To set up a source application, you would similarly associate a class with a built-in <code>Source</code> interface, and inject an instance of it provided by Spring Cloud Stream.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@EnableBinding(Source.class)
public class SourceExample {
@Autowired
private Source source;
public void sendMessage() {
this.source.output().send(new GenericMessage&lt;&gt;(/* your object here */));
}
}</code></pre>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_streaming_vs_polled_input"><a class="link" href="#_streaming_vs_polled_input">Streaming vs. Polled Input</a></h3>
<div class="paragraph">
<p>Many Spring Cloud Stream applications will use the built-in <code>Sink</code> binding, which triggers the <em>streaming</em> input binder creation.
Messages can then be consumed with an input handler marked by <code>@StreamListener(Sink.INPUT)</code> annotation, at whatever rate Pub/Sub sends them.</p>
</div>
<div class="paragraph">
<p>For more control over the rate of message arrival, a polled input binder can be set up by defining a custom binding interface with an <code>@Input</code>-annotated method returning <code>PollableMessageSource</code>.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">public interface PollableSink {
@Input("input")
PollableMessageSource input();
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The <code>PollableMessageSource</code> can then be injected and queried, as needed.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@EnableBinding(PollableSink.class)
public class SinkExample {
@Autowired
PollableMessageSource destIn;
@Bean
public ApplicationRunner singlePollRunner() {
return args -&gt; {
// This will poll only once.
// Add a loop or a scheduler to get more messages.
destIn.poll((message) -&gt; System.out.println("Message retrieved: " + message));
};
}
}</code></pre>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_sample"><a class="link" href="#_sample">Sample</a></h3>
<div class="paragraph">
<p>Sample applications are available:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>For <a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-binder-sample">streaming input, annotation-based</a>.</p>
</li>
<li>
<p>For <a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-binder-functional-sample">streaming input, functional style</a>.</p>
</li>
<li>
<p>For <a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-polling-binder-sample">polled input</a>.</p>
</li>
</ul>
</div>
</div>
</div>
</div>
</div>
<script type="text/javascript" src="js/tocbot/tocbot.min.js"></script>
<script type="text/javascript" src="js/toc.js"></script>
<link rel="stylesheet" href="js/highlight/styles/atom-one-dark-reasonable.min.css">
<script src="js/highlight/highlight.min.js"></script>
<script>hljs.initHighlighting()</script>
</body>
</html>