Files
spring-cloud-static/spring-cloud-gcp/1.2.3.RELEASE/reference/html/spring-integration.html
2020-05-29 15:34:15 +00:00

669 lines
33 KiB
HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<!--[if IE]><meta http-equiv="X-UA-Compatible" content="IE=edge"><![endif]-->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="generator" content="Asciidoctor 1.5.8">
<title>Spring Integration</title>
<link rel="stylesheet" href="css/spring.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<style>
.hidden {
display: none;
}
.switch {
border-width: 1px 1px 0 1px;
border-style: solid;
border-color: #7a2518;
display: inline-block;
}
.switch--item {
padding: 10px;
background-color: #ffffff;
color: #7a2518;
display: inline-block;
cursor: pointer;
}
.switch--item:not(:first-child) {
border-width: 0 0 0 1px;
border-style: solid;
border-color: #7a2518;
}
.switch--item.selected {
background-color: #7a2519;
color: #ffffff;
}
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/zepto/1.2.0/zepto.min.js"></script>
<script type="text/javascript">
function addBlockSwitches() {
$('.primary').each(function() {
primary = $(this);
createSwitchItem(primary, createBlockSwitch(primary)).item.addClass("selected");
primary.children('.title').remove();
});
$('.secondary').each(function(idx, node) {
secondary = $(node);
primary = findPrimary(secondary);
switchItem = createSwitchItem(secondary, primary.children('.switch'));
switchItem.content.addClass('hidden');
findPrimary(secondary).append(switchItem.content);
secondary.remove();
});
}
function createBlockSwitch(primary) {
blockSwitch = $('<div class="switch"></div>');
primary.prepend(blockSwitch);
return blockSwitch;
}
function findPrimary(secondary) {
candidate = secondary.prev();
while (!candidate.is('.primary')) {
candidate = candidate.prev();
}
return candidate;
}
function createSwitchItem(block, blockSwitch) {
blockName = block.children('.title').text();
content = block.children('.content').first().append(block.next('.colist'));
item = $('<div class="switch--item">' + blockName + '</div>');
item.on('click', '', content, function(e) {
$(this).addClass('selected');
$(this).siblings().removeClass('selected');
e.data.siblings('.content').addClass('hidden');
e.data.removeClass('hidden');
});
blockSwitch.append(item);
return {'item': item, 'content': content};
}
$(addBlockSwitches);
</script>
</head>
<body class="book toc2 toc-left">
<div id="header">
<div id="toc" class="toc2">
<div id="toctitle">Table of Contents</div>
<ul class="sectlevel1">
<li><a href="#_spring_integration">Spring Integration</a>
<ul class="sectlevel2">
<li><a href="#_channel_adapters_for_cloud_pubsub">Channel Adapters for Cloud Pub/Sub</a></li>
<li><a href="#_channel_adapters_for_google_cloud_storage">Channel Adapters for Google Cloud Storage</a></li>
</ul>
</li>
</ul>
</div>
</div>
<div id="content">
<div class="sect1">
<h2 id="_spring_integration"><a class="link" href="#_spring_integration">Spring Integration</a></h2>
<div class="sectionbody">
<div class="paragraph">
<p>Spring Cloud GCP provides Spring Integration adapters that allow your applications to use Enterprise Integration Patterns backed up by Google Cloud Platform services.</p>
</div>
<div class="sect2">
<h3 id="_channel_adapters_for_cloud_pubsub"><a class="link" href="#_channel_adapters_for_cloud_pubsub">Channel Adapters for Cloud Pub/Sub</a></h3>
<div class="paragraph">
<p>The channel adapters for Google Cloud Pub/Sub connect your Spring <a href="https://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel"><code>MessageChannels</code></a> to Google Cloud Pub/Sub topics and subscriptions.
This enables messaging between different processes, applications or micro-services backed up by Google Cloud Pub/Sub.</p>
</div>
<div class="paragraph">
<p>The Spring Integration Channel Adapters for Google Cloud Pub/Sub are included in the <code>spring-cloud-gcp-pubsub</code> module and can be autoconfigured by using the <code>spring-cloud-gcp-starter-pubsub</code> module in combination with a Spring Integration dependency.</p>
</div>
<div class="paragraph">
<p>Maven coordinates, using <a href="getting-started.html#_bill_of_materials">Spring Cloud GCP BOM</a>:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-gcp-starter-pubsub&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.integration&lt;/groupId&gt;
&lt;artifactId&gt;spring-integration-core&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Gradle coordinates:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-pubsub'
compile group: 'org.springframework.integration', name: 'spring-integration-core'
}</code></pre>
</div>
</div>
<div class="sect3">
<h4 id="_inbound_channel_adapter_using_pubsub_streaming_pull"><a class="link" href="#_inbound_channel_adapter_using_pubsub_streaming_pull">Inbound channel adapter (using Pub/Sub Streaming Pull)</a></h4>
<div class="paragraph">
<p><code>PubSubInboundChannelAdapter</code> is the inbound channel adapter for GCP Pub/Sub that listens to a GCP Pub/Sub subscription for new messages.
It converts new messages to an internal Spring <a href="https://docs.spring.io/spring-integration/reference/html/messaging-construction-chapter.html#message"><code>Message</code></a> and then sends it to the bound output channel.</p>
</div>
<div class="paragraph">
<p>Google Pub/Sub treats message payloads as byte arrays.
So, by default, the inbound channel adapter will construct the Spring <code>Message</code> with <code>byte[]</code> as the payload.
However, you can change the desired payload type by setting the <code>payloadType</code> property of the <code>PubSubInboundChannelAdapter</code>.
The <code>PubSubInboundChannelAdapter</code> delegates the conversion to the desired payload type to the <code>PubSubMessageConverter</code> configured in the <code>PubSubTemplate</code>.</p>
</div>
<div class="paragraph">
<p>To use the inbound channel adapter, a <code>PubSubInboundChannelAdapter</code> must be provided and configured on the user application side.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
public MessageChannel pubsubInputChannel() {
return new PublishSubscribeChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubSubscriberOperations subscriberOperations) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(subscriberOperations, "subscriptionName");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>In the example, we first specify the <code>MessageChannel</code> where the adapter is going to write incoming messages to.
The <code>MessageChannel</code> implementation isn&#8217;t important here.
Depending on your use case, you might want to use a <code>MessageChannel</code> other than <code>PublishSubscribeChannel</code>.</p>
</div>
<div class="paragraph">
<p>Then, we declare a <code>PubSubInboundChannelAdapter</code> bean.
It requires the channel we just created and a <code>SubscriberFactory</code>, which creates <code>Subscriber</code> objects from the Google Cloud Java Client for Pub/Sub.
The Spring Boot starter for GCP Pub/Sub provides a configured <code>PubSubSubscriberOperations</code> object.</p>
</div>
<div class="sect4">
<h5 id="_acknowledging_messages_and_handling_failures"><a class="link" href="#_acknowledging_messages_and_handling_failures">Acknowledging messages and handling failures</a></h5>
<div class="paragraph">
<p>When working with Cloud Pub/Sub, it is important to understand the concept of <code>ackDeadline</code>&#8201;&#8212;&#8201;the amount of time Cloud Pub/Sub will wait until attempting redelivery of an outstanding message.
Each subscription has a default <code>ackDeadline</code> applied to all messages sent to it.
Additionally, the Cloud Pub/Sub client library can extend each streamed message&#8217;s <code>ackDeadline</code> until the message processing completes, fails or until the maximum extension period elapses.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
In the Pub/Sub client library, default maximum extension period is an hour. However, Spring Cloud GCP disables this auto-extension behavior.
Use the <code>spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period</code> property to re-enable it.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>Acknowledging (acking) a message removes it from Pub/Sub&#8217;s known outstanding messages. Nacking a message resets its acknowledgement deadline to 0, forcing immediate redelivery.
This could be useful in a load balanced architecture, where one of the subscribers is having issues but others are available to process messages.</p>
</div>
<div class="paragraph">
<p>The <code>PubSubInboundChannelAdapter</code> supports three acknowledgement modes: the default <code>AckMode.AUTO</code> (automatic acking on processing success and nacking on exception), as well as two modes for additional manual control: AckMode.AUTO_ACK (automatic acking on success but no action on exception) and AckMode.MANUAL (no automatic actions at all; both acking and nacking have to be done manually).</p>
</div>
<table class="tableblock frame-all grid-all stretch">
<caption class="title">Table 1. Acknowledgement mode behavior</caption>
<colgroup>
<col style="width: 25%;">
<col style="width: 25%;">
<col style="width: 25%;">
<col style="width: 25%;">
</colgroup>
<thead>
<tr>
<th class="tableblock halign-left valign-top"></th>
<th class="tableblock halign-left valign-top">AUTO</th>
<th class="tableblock halign-left valign-top">AUTO_ACK</th>
<th class="tableblock halign-left valign-top">MANUAL</th>
</tr>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">Message processing completes successfully</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">ack, no redelivery</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">ack, no redelivery</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">&lt;no action&gt;*</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">Message processing fails, but error handler completes successfully**</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">ack, no redelivery</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">ack, no redelivery</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">&lt;no action&gt;*</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">Message processing fails; no error handler present</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">nack, immediate redelivery</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">&lt;no action&gt;*</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">&lt;no action&gt;*</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">Message processing fails, and error handler throws an exception</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">nack, immediate redelivery</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">&lt;no action&gt;*</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">&lt;no action&gt;*</p></td>
</tr>
</tbody>
</table>
<div class="paragraph">
<p>* &lt;no action&gt; means that the message will be neither acked nor nacked.
Cloud Pub/Sub will attempt redelivery according to subscription <code>ackDeadline</code> setting and the <code>max-ack-extension-period</code> client library setting.</p>
</div>
<div class="paragraph">
<p>** For the adapter, "success" means the Spring Integration flow processed without raising an exception, so successful message processing and the successful completion of an error handler both result in the same behavior (message will be acknowledged).
To trigger default error behavior (nacking in <code>AUTO</code> mode; neither acking nor nacking in <code>AUTO_ACK</code> mode), propagate the error back to the adapter by throwing an exception from the <a href="#_error_handling">Error Handling flow</a>.</p>
</div>
<div class="sect5">
<h6 id="_manual_ackingnacking"><a class="link" href="#_manual_ackingnacking">Manual acking/nacking</a></h6>
<div class="paragraph">
<p>The adapter attaches a <code>BasicAcknowledgeablePubsubMessage</code> object to the <code>Message</code> headers.
Users can extract the <code>BasicAcknowledgeablePubsubMessage</code> using the <code>GcpPubSubHeaders.ORIGINAL_MESSAGE</code> key and use it to ack (or nack) a message.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -&gt; {
LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
BasicAcknowledgeablePubsubMessage originalMessage =
message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}</code></pre>
</div>
</div>
</div>
<div class="sect5">
<h6 id="_error_handling"><a class="link" href="#_error_handling">Error Handling</a></h6>
<div class="paragraph">
<p>If you want to have more control over message processing in case of an error, you need to associate the <code>PubSubInboundChannelAdapter</code> with a Spring Integration error channel and specify the behavior to be invoked with <code>@ServiceActivator</code>.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
In order to activate the default behavior (nacking in <code>AUTO</code> mode; neither acking nor nacking in <code>AUTO_ACK</code> mode), your error handler has to throw an exception.
Otherwise, the adapter will assume that processing completed successfully and will ack the message.
</td>
</tr>
</table>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
public MessageChannel pubsubInputChannel() {
return new PublishSubscribeChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
SubscriberFactory subscriberFactory) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(subscriberFactory, "subscriptionName");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.AUTO_ACK);
adapter.setErrorChannelName("pubsubErrors");
return adapter;
}
@ServiceActivator(inputChannel = "pubsubErrors")
public void pubsubErrorHandler(Message&lt;MessagingException&gt; message) {
LOGGER.warn("This message will be automatically acked because error handler completes successfully");
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>If you would prefer to manually ack or nack the message, you can do it by retrieving the header of the exception payload:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@ServiceActivator(inputChannel = "pubsubErrors")
public void pubsubErrorHandler(Message&lt;MessagingException&gt; exceptionMessage) {
BasicAcknowledgeablePubsubMessage originalMessage =
(BasicAcknowledgeablePubsubMessage)exceptionMessage.getPayload().getFailedMessage()
.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
originalMessage.nack();
}</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_pollable_message_source_using_pubsub_synchronous_pull"><a class="link" href="#_pollable_message_source_using_pubsub_synchronous_pull">Pollable Message Source (using Pub/Sub Synchronous Pull)</a></h4>
<div class="paragraph">
<p>While <code>PubSubInboundChannelAdapter</code>, through the underlying Asynchronous Pull Pub/Sub mechanism, provides the best performance for high-volume applications that receive a steady flow of messages, it can create load balancing anomalies due to message caching.
This behavior is most obvious when publishing a large batch of small messages that take a long time to process individually.
It manifests as one subscriber taking up most messages, even if multiple subscribers are available to take on the work.
For a more detailed explanation of this scenario, see <a href="https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages">GCP Pub/Sub documentation</a>.</p>
</div>
<div class="paragraph">
<p>In such a scenario, a <code>PubSubMessageSource</code> can help spread the load between different subscribers more evenly.</p>
</div>
<div class="paragraph">
<p>As with the Inbound Channel Adapter, the message source has a configurable acknowledgement mode, payload type, and header mapping.</p>
</div>
<div class="paragraph">
<p>The default behavior is to return from the synchronous pull operation immediately if no messages are present.
This can be overridden by using <code>setBlockOnPull()</code> method to wait for at least one message to arrive.</p>
</div>
<div class="paragraph">
<p>By default, <code>PubSubMessageSource</code> pulls from the subscription one message at a time.
To pull a batch of messages on each request, use the <code>setMaxFetchSize()</code> method to set the batch size.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@InboundChannelAdapter(channel = "pubsubInputChannel", poller = @Poller(fixedDelay = "100"))
public MessageSource&lt;Object&gt; pubsubAdapter(PubSubTemplate pubSubTemplate) {
PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "exampleSubscription");
messageSource.setAckMode(AckMode.MANUAL);
messageSource.setPayloadType(String.class);
messageSource.setBlockOnPull(true);
messageSource.setMaxFetchSize(100);
return messageSource;
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The <code>@InboundChannelAdapter</code> annotation above ensures that the configured <code>MessageSource</code> is polled for messages, which are then available for manipulation with any Spring Integration mechanism on the <code>pubsubInputChannel</code> message channel.
For example, messages can be retrieved in a method annotated with <code>@ServiceActivator</code>, as seen below.</p>
</div>
<div class="paragraph">
<p>For additional flexibility, <code>PubSubMessageSource</code> attaches an <code>AcknowledgeablePubSubMessage</code> object to the <code>GcpPubSubHeaders.ORIGINAL_MESSAGE</code> message header.
The object can be used for manually (n)acking the message.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) AcknowledgeablePubsubMessage message)
throws InterruptedException {
LOGGER.info("Message arrived by Synchronous Pull! Payload: " + payload);
message.ack();
}</code></pre>
</div>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
<code>AcknowledgeablePubSubMessage</code> objects acquired by synchronous pull are aware of their own acknowledgement IDs.
Streaming pull does not expose this information due to limitations of the underlying API, and returns <code>BasicAcknowledgeablePubsubMessage</code> objects that allow acking/nacking individual messages, but not extracting acknowledgement IDs for future processing.
</td>
</tr>
</table>
</div>
</div>
<div class="sect3">
<h4 id="_outbound_channel_adapter"><a class="link" href="#_outbound_channel_adapter">Outbound channel adapter</a></h4>
<div class="paragraph">
<p><code>PubSubMessageHandler</code> is the outbound channel adapter for GCP Pub/Sub that listens for new messages on a Spring <code>MessageChannel</code>.
It uses <code>PubSubTemplate</code> to post them to a GCP Pub/Sub topic.</p>
</div>
<div class="paragraph">
<p>To construct a Pub/Sub representation of the message, the outbound channel adapter needs to convert the Spring <code>Message</code> payload to a byte array representation expected by Pub/Sub.
It delegates this conversion to the <code>PubSubTemplate</code>.
To customize the conversion, you can specify a <code>PubSubMessageConverter</code> in the <code>PubSubTemplate</code> that should convert the <code>Object</code> payload and headers of the Spring <code>Message</code> to a <code>PubsubMessage</code>.</p>
</div>
<div class="paragraph">
<p>To use the outbound channel adapter, a <code>PubSubMessageHandler</code> bean must be provided and configured on the user application side.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "topicName");
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The provided <code>PubSubTemplate</code> contains all the necessary configuration to publish messages to a GCP Pub/Sub topic.</p>
</div>
<div class="paragraph">
<p><code>PubSubMessageHandler</code> publishes messages asynchronously by default.
A publish timeout can be configured for synchronous publishing.
If none is provided, the adapter waits indefinitely for a response.</p>
</div>
<div class="paragraph">
<p>It is possible to set user-defined callbacks for the <code>publish()</code> call in <code>PubSubMessageHandler</code> through the <code>setPublishFutureCallback()</code> method.
These are useful to process the message ID, in case of success, or the error if any was thrown.</p>
</div>
<div class="paragraph">
<p>To override the default destination you can use the <code>GcpPubSubHeaders.DESTINATION</code> header.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Autowired
private MessageChannel pubsubOutputChannel;
public void handleMessage(Message&lt;?&gt; msg) throws MessagingException {
final Message&lt;?&gt; message = MessageBuilder
.withPayload(msg.getPayload())
.setHeader(GcpPubSubHeaders.TOPIC, "customTopic").build();
pubsubOutputChannel.send(message);
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>It is also possible to set an SpEL expression for the topic with the <code>setTopicExpression()</code> or <code>setTopicExpressionString()</code> methods.</p>
</div>
</div>
<div class="sect3">
<h4 id="_header_mapping"><a class="link" href="#_header_mapping">Header mapping</a></h4>
<div class="paragraph">
<p>These channel adapters contain header mappers that allow you to map, or filter out, headers from Spring to Google Cloud Pub/Sub messages, and vice-versa.
By default, the inbound channel adapter maps every header on the Google Cloud Pub/Sub messages to the Spring messages produced by the adapter.
The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring, like headers with key <code>"id"</code>, <code>"timestamp"</code> and <code>"gcp_pubsub_acknowledgement"</code>.
In the process, the outbound mapper also converts the value of the headers into string.</p>
</div>
<div class="paragraph">
<p>Each adapter declares a <code>setHeaderMapper()</code> method to let you further customize which headers you want to map from Spring to Google Cloud Pub/Sub, and vice-versa.</p>
</div>
<div class="paragraph">
<p>For example, to filter out headers <code>"foo"</code>, <code>"bar"</code> and all headers starting with the prefix "prefix_", you can use <code>setHeaderMapper()</code> along with the <code>PubSubHeaderMapper</code> implementation provided by this module.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">PubSubMessageHandler adapter = ...
...
PubSubHeaderMapper headerMapper = new PubSubHeaderMapper();
headerMapper.setOutboundHeaderPatterns("!foo", "!bar", "!prefix_*", "*");
adapter.setHeaderMapper(headerMapper);</code></pre>
</div>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
The order in which the patterns are declared in <code>PubSubHeaderMapper.setOutboundHeaderPatterns()</code> and <code>PubSubHeaderMapper.setInboundHeaderPatterns()</code> matters.
The first patterns have precedence over the following ones.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>In the previous example, the <code>"*"</code> pattern means every header is mapped.
However, because it comes last in the list, <a href="https://docs.spring.io/spring-integration/api/org/springframework/integration/util/PatternMatchUtils.html#smartMatch-java.lang.String-java.lang.String&#8230;&#8203;-">the previous patterns take precedence</a>.</p>
</div>
</div>
<div class="sect3">
<h4 id="_samples"><a class="link" href="#_samples">Samples</a></h4>
<div class="paragraph">
<p>Available examples:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample">Sending/Receiving Messages with Channel Adapters</a></p>
</li>
<li>
<p><a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-json-sample">Pub/Sub Channel Adapters with JSON payloads</a></p>
</li>
<li>
<p><a href="https://codelabs.developers.google.com/codelabs/cloud-spring-cloud-gcp-pubsub-integration/index.html">Spring Integration and Pub/Sub Codelab</a></p>
</li>
</ul>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_channel_adapters_for_google_cloud_storage"><a class="link" href="#_channel_adapters_for_google_cloud_storage">Channel Adapters for Google Cloud Storage</a></h3>
<div class="paragraph">
<p>The channel adapters for Google Cloud Storage allow you to read and write files to Google Cloud Storage through <code>MessageChannels</code>.</p>
</div>
<div class="paragraph">
<p>Spring Cloud GCP provides two inbound adapters, <code>GcsInboundFileSynchronizingMessageSource</code> and <code>GcsStreamingMessageSource</code>, and one outbound adapter, <code>GcsMessageHandler</code>.</p>
</div>
<div class="paragraph">
<p>The Spring Integration Channel Adapters for Google Cloud Storage are included in the <code>spring-cloud-gcp-storage</code> module.</p>
</div>
<div class="paragraph">
<p>To use the Storage portion of Spring Integration for Spring Cloud GCP, you must also provide the <code>spring-integration-file</code> dependency, since it isn&#8217;t pulled transitively.</p>
</div>
<div class="paragraph">
<p>Maven coordinates, using <a href="getting-started.html#_bill_of_materials">Spring Cloud GCP BOM</a>:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-gcp-storage&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.integration&lt;/groupId&gt;
&lt;artifactId&gt;spring-integration-file&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Gradle coordinates:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-storage'
compile group: 'org.springframework.integration', name: 'spring-integration-file'
}</code></pre>
</div>
</div>
<div class="sect3">
<h4 id="_inbound_channel_adapter"><a class="link" href="#_inbound_channel_adapter">Inbound channel adapter</a></h4>
<div class="paragraph">
<p>The Google Cloud Storage inbound channel adapter polls a Google Cloud Storage bucket for new files and sends each of them in a <code>Message</code> payload to the <code>MessageChannel</code> specified in the <code>@InboundChannelAdapter</code> annotation.
The files are temporarily stored in a folder in the local file system.</p>
</div>
<div class="paragraph">
<p>Here is an example of how to configure a Google Cloud Storage inbound channel adapter.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@InboundChannelAdapter(channel = "new-file-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource&lt;File&gt; synchronizerAdapter(Storage gcs) {
GcsInboundFileSynchronizer synchronizer = new GcsInboundFileSynchronizer(gcs);
synchronizer.setRemoteDirectory("your-gcs-bucket");
GcsInboundFileSynchronizingMessageSource synchAdapter =
new GcsInboundFileSynchronizingMessageSource(synchronizer);
synchAdapter.setLocalDirectory(new File("local-directory"));
return synchAdapter;
}</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_inbound_streaming_channel_adapter"><a class="link" href="#_inbound_streaming_channel_adapter">Inbound streaming channel adapter</a></h4>
<div class="paragraph">
<p>The inbound streaming channel adapter is similar to the normal inbound channel adapter, except it does not require files to be stored in the file system.</p>
</div>
<div class="paragraph">
<p>Here is an example of how to configure a Google Cloud Storage inbound streaming channel adapter.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@InboundChannelAdapter(channel = "streaming-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource&lt;InputStream&gt; streamingAdapter(Storage gcs) {
GcsStreamingMessageSource adapter =
new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs)));
adapter.setRemoteDirectory("your-gcs-bucket");
return adapter;
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>If you would like to process the files in your bucket in a specific order, you may pass in a <code>Comparator&lt;BlobInfo&gt;</code> to the constructor <code>GcsStreamingMessageSource</code> to sort the files being processed.</p>
</div>
</div>
<div class="sect3">
<h4 id="_outbound_channel_adapter_2"><a class="link" href="#_outbound_channel_adapter_2">Outbound channel adapter</a></h4>
<div class="paragraph">
<p>The outbound channel adapter allows files to be written to Google Cloud Storage.
When it receives a <code>Message</code> containing a payload of type <code>File</code>, it writes that file to the Google Cloud Storage bucket specified in the adapter.</p>
</div>
<div class="paragraph">
<p>Here is an example of how to configure a Google Cloud Storage outbound channel adapter.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@ServiceActivator(inputChannel = "writeFiles")
public MessageHandler outboundChannelAdapter(Storage gcs) {
GcsMessageHandler outboundChannelAdapter = new GcsMessageHandler(new GcsSessionFactory(gcs));
outboundChannelAdapter.setRemoteDirectoryExpression(new ValueExpression&lt;&gt;("your-gcs-bucket"));
return outboundChannelAdapter;
}</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_sample"><a class="link" href="#_sample">Sample</a></h4>
<div class="paragraph">
<p>See the <a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-integration-storage-sample">Spring Integration with Google Cloud Storage Sample Code</a>.</p>
</div>
</div>
</div>
</div>
</div>
</div>
<script type="text/javascript" src="js/tocbot/tocbot.min.js"></script>
<script type="text/javascript" src="js/toc.js"></script>
<link rel="stylesheet" href="js/highlight/styles/atom-one-dark-reasonable.min.css">
<script src="js/highlight/highlight.min.js"></script>
<script>hljs.initHighlighting()</script>
</body>
</html>