Files
spring-cloud-static/spring-cloud-gcp/1.2.0.M2/reference/html/spring-integration.html
2019-08-15 15:10:34 +00:00

548 lines
26 KiB
HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<!--[if IE]><meta http-equiv="X-UA-Compatible" content="IE=edge"><![endif]-->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="generator" content="Asciidoctor 1.5.7.1">
<title>Spring Integration</title>
<link rel="stylesheet" href="css/spring.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<style>
.hidden {
display: none;
}
.switch {
border-width: 1px 1px 0 1px;
border-style: solid;
border-color: #7a2518;
display: inline-block;
}
.switch--item {
padding: 10px;
background-color: #ffffff;
color: #7a2518;
display: inline-block;
cursor: pointer;
}
.switch--item:not(:first-child) {
border-width: 0 0 0 1px;
border-style: solid;
border-color: #7a2518;
}
.switch--item.selected {
background-color: #7a2519;
color: #ffffff;
}
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/zepto/1.2.0/zepto.min.js"></script>
<script type="text/javascript">
function addBlockSwitches() {
$('.primary').each(function() {
primary = $(this);
createSwitchItem(primary, createBlockSwitch(primary)).item.addClass("selected");
primary.children('.title').remove();
});
$('.secondary').each(function(idx, node) {
secondary = $(node);
primary = findPrimary(secondary);
switchItem = createSwitchItem(secondary, primary.children('.switch'));
switchItem.content.addClass('hidden');
findPrimary(secondary).append(switchItem.content);
secondary.remove();
});
}
function createBlockSwitch(primary) {
blockSwitch = $('<div class="switch"></div>');
primary.prepend(blockSwitch);
return blockSwitch;
}
function findPrimary(secondary) {
candidate = secondary.prev();
while (!candidate.is('.primary')) {
candidate = candidate.prev();
}
return candidate;
}
function createSwitchItem(block, blockSwitch) {
blockName = block.children('.title').text();
content = block.children('.content').first().append(block.next('.colist'));
item = $('<div class="switch--item">' + blockName + '</div>');
item.on('click', '', content, function(e) {
$(this).addClass('selected');
$(this).siblings().removeClass('selected');
e.data.siblings('.content').addClass('hidden');
e.data.removeClass('hidden');
});
blockSwitch.append(item);
return {'item': item, 'content': content};
}
$(addBlockSwitches);
</script>
</head>
<body class="book toc2 toc-left">
<div id="header">
<div id="toc" class="toc2">
<div id="toctitle">Table of Contents</div>
<ul class="sectlevel1">
<li><a href="#_spring_integration">Spring Integration</a>
<ul class="sectlevel2">
<li><a href="#_channel_adapters_for_cloud_pubsub">Channel Adapters for Cloud Pub/Sub</a></li>
<li><a href="#_sample">Sample</a></li>
<li><a href="#_channel_adapters_for_google_cloud_storage">Channel Adapters for Google Cloud Storage</a></li>
<li><a href="#_sample_2">Sample</a></li>
</ul>
</li>
</ul>
</div>
</div>
<div id="content">
<div class="sect1">
<h2 id="_spring_integration"><a class="link" href="#_spring_integration">Spring Integration</a></h2>
<div class="sectionbody">
<div class="paragraph">
<p>Spring Cloud GCP provides Spring Integration adapters that allow your applications to use Enterprise Integration Patterns backed up by Google Cloud Platform services.</p>
</div>
<div class="sect2">
<h3 id="_channel_adapters_for_cloud_pubsub"><a class="link" href="#_channel_adapters_for_cloud_pubsub">Channel Adapters for Cloud Pub/Sub</a></h3>
<div class="paragraph">
<p>The channel adapters for Google Cloud Pub/Sub connect your Spring <a href="https://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel"><code>MessageChannels</code></a> to Google Cloud Pub/Sub topics and subscriptions.
This enables messaging between different processes, applications or micro-services backed up by Google Cloud Pub/Sub.</p>
</div>
<div class="paragraph">
<p>The Spring Integration Channel Adapters for Google Cloud Pub/Sub are included in the <code>spring-cloud-gcp-pubsub</code> module.</p>
</div>
<div class="paragraph">
<p>Maven coordinates, using <a href="getting-started.html#_bill_of_materials">Spring Cloud GCP BOM</a>:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-gcp-pubsub&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.integration&lt;/groupId&gt;
&lt;artifactId&gt;spring-integration-core&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Gradle coordinates:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-pubsub'
compile group: 'org.springframework.integration', name: 'spring-integration-core'
}</code></pre>
</div>
</div>
<div class="sect3">
<h4 id="_inbound_channel_adapter_using_pubsub_streaming_pull"><a class="link" href="#_inbound_channel_adapter_using_pubsub_streaming_pull">Inbound channel adapter (using Pub/Sub Streaming Pull)</a></h4>
<div class="paragraph">
<p><code>PubSubInboundChannelAdapter</code> is the inbound channel adapter for GCP Pub/Sub that listens to a GCP Pub/Sub subscription for new messages.
It converts new messages to an internal Spring <a href="https://docs.spring.io/spring-integration/reference/html/messaging-construction-chapter.html#message"><code>Message</code></a> and then sends it to the bound output channel.</p>
</div>
<div class="paragraph">
<p>Google Pub/Sub treats message payloads as byte arrays.
So, by default, the inbound channel adapter will construct the Spring <code>Message</code> with <code>byte[]</code> as the payload.
However, you can change the desired payload type by setting the <code>payloadType</code> property of the <code>PubSubInboundChannelAdapter</code>.
The <code>PubSubInboundChannelAdapter</code> delegates the conversion to the desired payload type to the <code>PubSubMessageConverter</code> configured in the <code>PubSubTemplate</code>.</p>
</div>
<div class="paragraph">
<p>To use the inbound channel adapter, a <code>PubSubInboundChannelAdapter</code> must be provided and configured on the user application side.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
public MessageChannel pubsubInputChannel() {
return new PublishSubscribeChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
SubscriberFactory subscriberFactory) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(subscriberFactory, "subscriptionName");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>In the example, we first specify the <code>MessageChannel</code> where the adapter is going to write incoming messages to.
The <code>MessageChannel</code> implementation isn&#8217;t important here.
Depending on your use case, you might want to use a <code>MessageChannel</code> other than <code>PublishSubscribeChannel</code>.</p>
</div>
<div class="paragraph">
<p>Then, we declare a <code>PubSubInboundChannelAdapter</code> bean.
It requires the channel we just created and a <code>SubscriberFactory</code>, which creates <code>Subscriber</code> objects from the Google Cloud Java Client for Pub/Sub.
The Spring Boot starter for GCP Pub/Sub provides a configured <code>SubscriberFactory</code>.</p>
</div>
<div class="paragraph">
<p>The <code>PubSubInboundChannelAdapter</code> supports three acknowledgement modes, with <code>AckMode.AUTO</code> being the default value;</p>
</div>
<div class="paragraph">
<p>Automatic acking (<code>AckMode.AUTO</code>)</p>
</div>
<div class="paragraph">
<p>A message is acked with GCP Pub/Sub if the adapter sent it to the channel and no exceptions were thrown.
If a <code>RuntimeException</code> is thrown while the message is processed, then the message is nacked.</p>
</div>
<div class="paragraph">
<p>Automatic acking OK (<code>AckMode.AUTO_ACK</code>)</p>
</div>
<div class="paragraph">
<p>A message is acked with GCP Pub/Sub if the adapter sent it to the channel and no exceptions were thrown.
If a <code>RuntimeException</code> is thrown while the message is processed, then the message is neither acked / nor nacked.</p>
</div>
<div class="paragraph">
<p>This is useful when using the subscription&#8217;s ack deadline timeout as a retry delivery backoff mechanism.</p>
</div>
<div class="paragraph">
<p>Manually acking (<code>AckMode.MANUAL</code>)</p>
</div>
<div class="paragraph">
<p>The adapter attaches a <code>BasicAcknowledgeablePubsubMessage</code> object to the <code>Message</code> headers.
Users can extract the <code>BasicAcknowledgeablePubsubMessage</code> using the <code>GcpPubSubHeaders.ORIGINAL_MESSAGE</code> key and use it to (n)ack a message.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -&gt; {
LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
BasicAcknowledgeablePubsubMessage originalMessage =
message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_pollable_message_source_using_pubsub_synchronous_pull"><a class="link" href="#_pollable_message_source_using_pubsub_synchronous_pull">Pollable Message Source (using Pub/Sub Synchronous Pull)</a></h4>
<div class="paragraph">
<p>While <code>PubSubInboundChannelAdapter</code>, through the underlying Asynchronous Pull Pub/Sub mechanism, provides the best performance for high-volume applications that receive a steady flow of messages, it can create load balancing anomalies due to message caching.
This behavior is most obvious when publishing a large batch of small messages that take a long time to process individually.
It manifests as one subscriber taking up most messages, even if multiple subscribers are available to take on the work.
For a more detailed explanation of this scenario, see <a href="https://cloud.google.com/pubsub/docs/pull#dealing-with-large-backlogs-of-small-messages">GCP Pub/Sub documentation</a>.</p>
</div>
<div class="paragraph">
<p>In such a scenario, a <code>PubSubMessageSource</code> can help spread the load between different subscribers more evenly.</p>
</div>
<div class="paragraph">
<p>As with the Inbound Channel Adapter, the message source has a configurable acknowledgement mode, payload type, and header mapping.</p>
</div>
<div class="paragraph">
<p>The default behavior is to return from the synchronous pull operation immediately if no messages are present.
This can be overridden by using <code>setBlockOnPull()</code> method to wait for at least one message to arrive.</p>
</div>
<div class="paragraph">
<p>By default, <code>PubSubMessageSource</code> pulls from the subscription one message at a time.
To pull a batch of messages on each request, use the <code>setMaxFetchSize()</code> method to set the batch size.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@InboundChannelAdapter(channel = "pubsubInputChannel", poller = @Poller(fixedDelay = "100"))
public MessageSource&lt;Object&gt; pubsubAdapter(PubSubTemplate pubSubTemplate) {
PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "exampleSubscription");
messageSource.setAckMode(AckMode.MANUAL);
messageSource.setPayloadType(String.class);
messageSource.setBlockOnPull(true);
messageSource.setMaxFetchSize(100);
return messageSource;
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The <code>@InboundChannelAdapter</code> annotation above ensures that the configured <code>MessageSource</code> is polled for messages, which are then available for manipulation with any Spring Integration mechanism on the <code>pubsubInputChannel</code> message channel.
For example, messages can be retrieved in a method annotated with <code>@ServiceActivator</code>, as seen below.</p>
</div>
<div class="paragraph">
<p>For additional flexibility, <code>PubSubMessageSource</code> attaches an <code>AcknowledgeablePubSubMessage</code> object to the <code>GcpPubSubHeaders.ORIGINAL_MESSAGE</code> message header.
The object can be used for manually (n)acking the message.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) AcknowledgeablePubsubMessage message)
throws InterruptedException {
LOGGER.info("Message arrived by Synchronous Pull! Payload: " + payload);
message.ack();
}</code></pre>
</div>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
<code>AcknowledgeablePubSubMessage</code> objects acquired by synchronous pull are aware of their own acknowledgement IDs.
Streaming pull does not expose this information due to limitations of the underlying API, and returns <code>BasicAcknowledgeablePubsubMessage</code> objects that allow acking/nacking individual messages, but not extracting acknowledgement IDs for future processing.
</td>
</tr>
</table>
</div>
</div>
<div class="sect3">
<h4 id="_outbound_channel_adapter"><a class="link" href="#_outbound_channel_adapter">Outbound channel adapter</a></h4>
<div class="paragraph">
<p><code>PubSubMessageHandler</code> is the outbound channel adapter for GCP Pub/Sub that listens for new messages on a Spring <code>MessageChannel</code>.
It uses <code>PubSubTemplate</code> to post them to a GCP Pub/Sub topic.</p>
</div>
<div class="paragraph">
<p>To construct a Pub/Sub representation of the message, the outbound channel adapter needs to convert the Spring <code>Message</code> payload to a byte array representation expected by Pub/Sub.
It delegates this conversion to the <code>PubSubTemplate</code>.
To customize the conversion, you can specify a <code>PubSubMessageConverter</code> in the <code>PubSubTemplate</code> that should convert the <code>Object</code> payload and headers of the Spring <code>Message</code> to a <code>PubsubMessage</code>.</p>
</div>
<div class="paragraph">
<p>To use the outbound channel adapter, a <code>PubSubMessageHandler</code> bean must be provided and configured on the user application side.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "topicName");
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The provided <code>PubSubTemplate</code> contains all the necessary configuration to publish messages to a GCP Pub/Sub topic.</p>
</div>
<div class="paragraph">
<p><code>PubSubMessageHandler</code> publishes messages asynchronously by default.
A publish timeout can be configured for synchronous publishing.
If none is provided, the adapter waits indefinitely for a response.</p>
</div>
<div class="paragraph">
<p>It is possible to set user-defined callbacks for the <code>publish()</code> call in <code>PubSubMessageHandler</code> through the <code>setPublishFutureCallback()</code> method.
These are useful to process the message ID, in case of success, or the error if any was thrown.</p>
</div>
<div class="paragraph">
<p>To override the default destination you can use the <code>GcpPubSubHeaders.DESTINATION</code> header.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Autowired
private MessageChannel pubsubOutputChannel;
public void handleMessage(Message&lt;?&gt; msg) throws MessagingException {
final Message&lt;?&gt; message = MessageBuilder
.withPayload(msg.getPayload())
.setHeader(GcpPubSubHeaders.TOPIC, "customTopic").build();
pubsubOutputChannel.send(message);
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>It is also possible to set an SpEL expression for the topic with the <code>setTopicExpression()</code> or <code>setTopicExpressionString()</code> methods.</p>
</div>
</div>
<div class="sect3">
<h4 id="_header_mapping"><a class="link" href="#_header_mapping">Header mapping</a></h4>
<div class="paragraph">
<p>These channel adapters contain header mappers that allow you to map, or filter out, headers from Spring to Google Cloud Pub/Sub messages, and vice-versa.
By default, the inbound channel adapter maps every header on the Google Cloud Pub/Sub messages to the Spring messages produced by the adapter.
The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring, like headers with key <code>"id"</code>, <code>"timestamp"</code> and <code>"gcp_pubsub_acknowledgement"</code>.
In the process, the outbound mapper also converts the value of the headers into string.</p>
</div>
<div class="paragraph">
<p>Each adapter declares a <code>setHeaderMapper()</code> method to let you further customize which headers you want to map from Spring to Google Cloud Pub/Sub, and vice-versa.</p>
</div>
<div class="paragraph">
<p>For example, to filter out headers <code>"foo"</code>, <code>"bar"</code> and all headers starting with the prefix "prefix_", you can use <code>setHeaderMapper()</code> along with the <code>PubSubHeaderMapper</code> implementation provided by this module.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">PubSubMessageHandler adapter = ...
...
PubSubHeaderMapper headerMapper = new PubSubHeaderMapper();
headerMapper.setOutboundHeaderPatterns("!foo", "!bar", "!prefix_*", "*");
adapter.setHeaderMapper(headerMapper);</code></pre>
</div>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
The order in which the patterns are declared in <code>PubSubHeaderMapper.setOutboundHeaderPatterns()</code> and <code>PubSubHeaderMapper.setInboundHeaderPatterns()</code> matters.
The first patterns have precedence over the following ones.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>In the previous example, the <code>"*"</code> pattern means every header is mapped.
However, because it comes last in the list, <a href="https://docs.spring.io/spring-integration/api/org/springframework/integration/util/PatternMatchUtils.html#smartMatch-java.lang.String-java.lang.String&#8230;&#8203;-">the previous patterns take precedence</a>.</p>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_sample"><a class="link" href="#_sample">Sample</a></h3>
<div class="paragraph">
<p>Available examples:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample">sender and receiver sample application</a></p>
</li>
<li>
<p><a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-json-sample">JSON payloads sample application</a></p>
</li>
<li>
<p><a href="https://codelabs.developers.google.com/codelabs/cloud-spring-cloud-gcp-pubsub-integration/index.html">codelab</a></p>
</li>
</ul>
</div>
</div>
<div class="sect2">
<h3 id="_channel_adapters_for_google_cloud_storage"><a class="link" href="#_channel_adapters_for_google_cloud_storage">Channel Adapters for Google Cloud Storage</a></h3>
<div class="paragraph">
<p>The channel adapters for Google Cloud Storage allow you to read and write files to Google Cloud Storage through <code>MessageChannels</code>.</p>
</div>
<div class="paragraph">
<p>Spring Cloud GCP provides two inbound adapters, <code>GcsInboundFileSynchronizingMessageSource</code> and <code>GcsStreamingMessageSource</code>, and one outbound adapter, <code>GcsMessageHandler</code>.</p>
</div>
<div class="paragraph">
<p>The Spring Integration Channel Adapters for Google Cloud Storage are included in the <code>spring-cloud-gcp-storage</code> module.</p>
</div>
<div class="paragraph">
<p>To use the Storage portion of Spring Integration for Spring Cloud GCP, you must also provide the <code>spring-integration-file</code> dependency, since it isn&#8217;t pulled transitively.</p>
</div>
<div class="paragraph">
<p>Maven coordinates, using <a href="getting-started.html#_bill_of_materials">Spring Cloud GCP BOM</a>:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-gcp-storage&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.integration&lt;/groupId&gt;
&lt;artifactId&gt;spring-integration-file&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Gradle coordinates:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-storage'
compile group: 'org.springframework.integration', name: 'spring-integration-file'
}</code></pre>
</div>
</div>
<div class="sect3">
<h4 id="_inbound_channel_adapter"><a class="link" href="#_inbound_channel_adapter">Inbound channel adapter</a></h4>
<div class="paragraph">
<p>The Google Cloud Storage inbound channel adapter polls a Google Cloud Storage bucket for new files and sends each of them in a <code>Message</code> payload to the <code>MessageChannel</code> specified in the <code>@InboundChannelAdapter</code> annotation.
The files are temporarily stored in a folder in the local file system.</p>
</div>
<div class="paragraph">
<p>Here is an example of how to configure a Google Cloud Storage inbound channel adapter.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@InboundChannelAdapter(channel = "new-file-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource&lt;File&gt; synchronizerAdapter(Storage gcs) {
GcsInboundFileSynchronizer synchronizer = new GcsInboundFileSynchronizer(gcs);
synchronizer.setRemoteDirectory("your-gcs-bucket");
GcsInboundFileSynchronizingMessageSource synchAdapter =
new GcsInboundFileSynchronizingMessageSource(synchronizer);
synchAdapter.setLocalDirectory(new File("local-directory"));
return synchAdapter;
}</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_inbound_streaming_channel_adapter"><a class="link" href="#_inbound_streaming_channel_adapter">Inbound streaming channel adapter</a></h4>
<div class="paragraph">
<p>The inbound streaming channel adapter is similar to the normal inbound channel adapter, except it does not require files to be stored in the file system.</p>
</div>
<div class="paragraph">
<p>Here is an example of how to configure a Google Cloud Storage inbound streaming channel adapter.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@InboundChannelAdapter(channel = "streaming-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource&lt;InputStream&gt; streamingAdapter(Storage gcs) {
GcsStreamingMessageSource adapter =
new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs)));
adapter.setRemoteDirectory("your-gcs-bucket");
return adapter;
}</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_outbound_channel_adapter_2"><a class="link" href="#_outbound_channel_adapter_2">Outbound channel adapter</a></h4>
<div class="paragraph">
<p>The outbound channel adapter allows files to be written to Google Cloud Storage.
When it receives a <code>Message</code> containing a payload of type <code>File</code>, it writes that file to the Google Cloud Storage bucket specified in the adapter.</p>
</div>
<div class="paragraph">
<p>Here is an example of how to configure a Google Cloud Storage outbound channel adapter.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
@ServiceActivator(inputChannel = "writeFiles")
public MessageHandler outboundChannelAdapter(Storage gcs) {
GcsMessageHandler outboundChannelAdapter = new GcsMessageHandler(new GcsSessionFactory(gcs));
outboundChannelAdapter.setRemoteDirectoryExpression(new ValueExpression&lt;&gt;("your-gcs-bucket"));
return outboundChannelAdapter;
}</code></pre>
</div>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_sample_2"><a class="link" href="#_sample_2">Sample</a></h3>
<div class="paragraph">
<p>A <a href="https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-integration-storage-sample">sample application</a> is available.</p>
</div>
</div>
</div>
</div>
</div>
<script type="text/javascript" src="js/tocbot/tocbot.min.js"></script>
<script type="text/javascript" src="js/toc.js"></script>
<link rel="stylesheet" href="js/highlight/styles/atom-one-dark-reasonable.min.css">
<script src="js/highlight/highlight.min.js"></script>
<script>hljs.initHighlighting()</script>
</body>
</html>