Implementing auto creation of streams

This pull request adds ability for producers and consumers to create a stream
 if it isn't already existing.
I've used the property `spring.cloud.stream.kinesis.binder.autoCreateStreams`
to trigger the process.
I've intentionally kept the scope to auto-creation only and not included shard
manipulation in this PR.
Included are passing unit tests and an integration test that works if a local
Kinesalite instance is running.
I also have a example project at https://github.com/JacobASeverson/stream-example-kinesis
that I've used to verify the functionality within AWS.

* Added ability to auto-create streams to the provisioner

* Added unit testing for the provisioner new stream handler

* Added an integration test for binding

Moving tests to appropriate project

Removing , default to creation, and remove stream handler layer

Cleanup for integration test

Setting shard size appropriately for producer and consumer

Forgot to clean up pom experiment

Removed the `KinesisStream` abstraction and added level enabled checks for logging

Adding static credentials

Adding a configurable consumer property for startup timeouts

Adding documentation for , merging upstream, and new Checkstyle errors

Finishing binder test for auto creation of stream

Fix imports for tests

Getting rid of spring-cloud-stream-binder-test dep

* Polish dependencies
* Polishing code style
* Add Eclipse code formatter configs
This commit is contained in:
Jacob Severson
2017-08-08 10:26:44 -04:00
committed by Artem Bilan
parent 698e469a58
commit 9226309556
11 changed files with 676 additions and 25 deletions

397
eclipse-code-formatter.xml Normal file
View File

@@ -0,0 +1,397 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<profiles version="12">
<profile kind="CodeFormatterProfile" name="Spring Boot Java Conventions" version="12">
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="do not insert"/>
<setting
id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="2"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="49"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="16"/>
<setting
id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="90"/>
<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments"
value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header"
value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_lambda_body" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="0"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration"
value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_resources_in_try" value="80"/>
<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
<setting id="org.eclipse.jdt.core.compiler.source" value="1.8"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="2"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.8"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_type_annotation" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="0"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header"
value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
<setting
id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference"
value="do not insert"/>
<setting
id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression"
value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_lambda_arrow" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.8"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
<setting
id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header"
value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_lambda_arrow" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration"
value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="true"/>
<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="tab"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation"
value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
</profile>
</profiles>

7
eclipse.importorder Normal file
View File

@@ -0,0 +1,7 @@
#Organize Import Order
#Wed Apr 26 12:53:22 EDT 2017
4=\#
3=org.springframework
2=
1=javax
0=java

View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -26,6 +26,12 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -19,8 +19,29 @@ package org.springframework.cloud.stream.binder.kinesis.properties;
/**
*
* @author Peter Oates
* @author Jacob Severson
*
*/
public class KinesisConsumerProperties {
private int startTimeout = 60000;
private int describeStreamRetries = 50;
public int getStartTimeout() {
return this.startTimeout;
}
public void setStartTimeout(int startTimeout) {
this.startTimeout = startTimeout;
}
public int getDescribeStreamRetries() {
return this.describeStreamRetries;
}
public void setDescribeStreamRetries(int describeStreamRetries) {
this.describeStreamRetries = describeStreamRetries;
}
}

View File

@@ -17,6 +17,11 @@
package org.springframework.cloud.stream.binder.kinesis.provisioning;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
@@ -34,12 +39,15 @@ import org.springframework.util.Assert;
*
* @author Peter Oates
* @author Artem Bilan
* @author Jacob Severson
*
*/
public class KinesisStreamProvisioner
implements
ProvisioningProvider<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>> {
private final Log logger = LogFactory.getLog(getClass());
private final AmazonKinesis amazonKinesis;
private final KinesisBinderConfigurationProperties configurationProperties;
@@ -56,18 +64,51 @@ public class KinesisStreamProvisioner
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<KinesisProducerProperties> properties) throws ProvisioningException {
KinesisProducerDestination producer = new KinesisProducerDestination(name);
if (logger.isInfoEnabled()) {
logger.info("Using Kinesis stream for outbound: " + name);
}
return producer;
return new KinesisProducerDestination(name, createOrUpdate(name, properties.getPartitionCount()));
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<KinesisConsumerProperties> properties) throws ProvisioningException {
KinesisConsumerDestination consumer = new KinesisConsumerDestination(name);
if (logger.isInfoEnabled()) {
logger.info("Using Kinesis stream for inbound: " + name);
}
return consumer;
int shardCount = properties.getInstanceCount() * properties.getConcurrency();
return new KinesisConsumerDestination(name, createOrUpdate(name, shardCount));
}
private Integer createOrUpdate(String name, Integer shards) {
try {
DescribeStreamResult streamResult = amazonKinesis.describeStream(name);
if (logger.isInfoEnabled()) {
logger.info("Stream found, using existing stream");
}
return streamResult.getStreamDescription().getShards().size();
}
catch (ResourceNotFoundException e) {
if (logger.isInfoEnabled()) {
logger.info("Stream not found");
}
}
if (logger.isInfoEnabled()) {
logger.info("Attempting to create stream");
}
amazonKinesis.createStream(name, shards);
return shards;
}
private static final class KinesisProducerDestination implements ProducerDestination {
@@ -76,10 +117,6 @@ public class KinesisStreamProvisioner
private final int shards;
KinesisProducerDestination(String streamName) {
this(streamName, 0);
}
KinesisProducerDestination(String streamName, Integer shards) {
this.streamName = streamName;
this.shards = shards;
@@ -113,10 +150,6 @@ public class KinesisStreamProvisioner
private final String dlqName;
KinesisConsumerDestination(String streamName) {
this(streamName, 0, null);
}
KinesisConsumerDestination(String streamName, int shards) {
this(streamName, shards, null);
}
@@ -140,7 +173,6 @@ public class KinesisStreamProvisioner
", dlqName='" + dlqName + '\'' +
'}';
}
}
}

View File

@@ -0,0 +1,149 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kinesis.provisioning;
import java.util.Collections;
import java.util.List;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.CreateStreamResult;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.StreamDescription;
import org.junit.Test;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author Jacob Severson
*/
public class KinesisStreamProvisionerTests {
@Test
public void testProvisionProducerSuccessfulWithExistingStream() {
AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class);
KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties();
KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties);
ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(
new KinesisProducerProperties());
String name = "test-stream";
DescribeStreamResult describeStreamResult = describeStreamResultWithShards(
Collections.singletonList(new Shard()));
when(amazonKinesisMock.describeStream(name)).thenReturn(describeStreamResult);
ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties);
verify(amazonKinesisMock, times(1)).describeStream(name);
assertThat(destination.getName(), is(name));
}
@Test
public void testProvisionConsumerSuccessfulWithExistingStream() {
AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class);
KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties();
KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties);
ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
new KinesisConsumerProperties());
String name = "test-stream";
String group = "test-group";
DescribeStreamResult describeStreamResult = describeStreamResultWithShards(
Collections.singletonList(new Shard()));
when(amazonKinesisMock.describeStream(name)).thenReturn(describeStreamResult);
ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group,
extendedConsumerProperties);
verify(amazonKinesisMock, times(1)).describeStream(name);
assertThat(destination.getName(), is(name));
}
@Test
public void testProvisionProducerSuccessfulWithNewStream() {
AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class);
KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties();
KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties);
ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(
new KinesisProducerProperties());
String name = "test-stream";
Integer shards = 1;
when(amazonKinesisMock.describeStream(name)).thenThrow(new ResourceNotFoundException("I got nothing"));
when(amazonKinesisMock.createStream(name, shards)).thenReturn(new CreateStreamResult());
ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties);
verify(amazonKinesisMock, times(1)).describeStream(name);
verify(amazonKinesisMock, times(1)).createStream(name, shards);
assertThat(destination.getName(), is(name));
}
@Test
public void testProvisionConsumerSuccessfulWithNewStream() {
AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class);
KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties();
KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties);
int instanceCount = 1;
int concurrency = 1;
ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
new KinesisConsumerProperties());
extendedConsumerProperties.setInstanceCount(instanceCount);
extendedConsumerProperties.setConcurrency(concurrency);
String name = "test-stream";
String group = "test-group";
when(amazonKinesisMock.describeStream(name)).thenThrow(new ResourceNotFoundException("I got nothing"));
when(amazonKinesisMock.createStream(name, instanceCount * concurrency)).thenReturn(new CreateStreamResult());
ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group,
extendedConsumerProperties);
verify(amazonKinesisMock, times(1)).describeStream(name);
verify(amazonKinesisMock, times(1)).createStream(name, instanceCount * concurrency);
assertThat(destination.getName(), is(name));
}
private static DescribeStreamResult describeStreamResultWithShards(List<Shard> shards) {
return new DescribeStreamResult()
.withStreamDescription(
new StreamDescription()
.withShards(shards));
}
}

View File

@@ -48,6 +48,15 @@ For general binding configuration options and properties, please refer to the ht
The following properties are available for Kinesis consumers only and must be prefixed with `spring.cloud.stream.kinesis.bindings.<channelName>.consumer.`.
startTimeout::
The amount of time to wait for the consumer to start, in milliseconds.
+
Default: `60000`.
describeStreamRetries::
The amount of times the consumer will retry a `DescribeStream` operation waiting for the stream to be in `ACTIVE` state.
+
Default: `50`.
=== Kinesis Producer Properties

View File

@@ -108,8 +108,8 @@ public class KinesisMessageChannelBinder extends
// need to move these properties to the appropriate properties class
adapter.setCheckpointMode(CheckpointMode.record);
adapter.setListenerMode(ListenerMode.record);
adapter.setStartTimeout(10000);
adapter.setDescribeStreamRetries(1);
adapter.setStartTimeout(properties.getExtension().getStartTimeout());
adapter.setDescribeStreamRetries(properties.getExtension().getDescribeStreamRetries());
adapter.setConcurrency(10);
// Deffer byte[] conversion to the ReceivingHandler

View File

@@ -16,8 +16,13 @@
package org.springframework.cloud.stream.binder.kinesis;
import org.junit.ClassRule;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
@@ -25,9 +30,14 @@ import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.integration.channel.DirectChannel;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* @author Artem Bilan
* @author Jacob Severson
*
*/
public class KinesisBinderTests
@@ -38,6 +48,26 @@ public class KinesisBinderTests
@ClassRule
public static LocalKinesisResource localKinesisResource = new LocalKinesisResource();
@Test
@SuppressWarnings("unchecked")
public void testAutoCreateStreamForNonExistingStream() throws Exception {
Binder binder = getBinder();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
String testStreamName = "nonexisting" + System.currentTimeMillis();
Binding<?> binding = binder.bindConsumer(testStreamName, "test", output, consumerProperties);
binding.unbind();
DescribeStreamResult streamResult = localKinesisResource.getResource().describeStream(testStreamName);
String createdStreamName = streamResult.getStreamDescription().getStreamName();
int createdShards = streamResult.getStreamDescription().getShards().size();
String createdStreamStatus = streamResult.getStreamDescription().getStreamStatus();
assertThat(createdStreamName, is(testStreamName));
assertThat(createdShards, is(consumerProperties.getInstanceCount() * consumerProperties.getConcurrency()));
assertThat(createdStreamStatus, is("ACTIVE"));
}
@Override
protected boolean usesExplicitRouting() {
return false;
@@ -51,16 +81,15 @@ public class KinesisBinderTests
@Override
protected KinesisTestBinder getBinder() throws Exception {
if (this.testBinder == null) {
this.testBinder =
new KinesisTestBinder(localKinesisResource.getResource(),
new KinesisBinderConfigurationProperties());
this.testBinder = new KinesisTestBinder(localKinesisResource.getResource(),
new KinesisBinderConfigurationProperties());
}
return this.testBinder;
}
@Override
protected ExtendedConsumerProperties<KinesisConsumerProperties> createConsumerProperties() {
final ExtendedConsumerProperties<KinesisConsumerProperties> kafkaConsumerProperties =
ExtendedConsumerProperties<KinesisConsumerProperties> kafkaConsumerProperties =
new ExtendedConsumerProperties<>(new KinesisConsumerProperties());
// set the default values that would normally be propagated by Spring Cloud Stream
kafkaConsumerProperties.setInstanceCount(1);

View File

@@ -37,6 +37,7 @@ public class KinesisTestBinder
public KinesisTestBinder(AmazonKinesisAsync amazonKinesis,
KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) {
KinesisStreamProvisioner provisioningProvider =
new KinesisStreamProvisioner(amazonKinesis, kinesisBinderConfigurationProperties);

View File

@@ -18,7 +18,8 @@ package org.springframework.cloud.stream.binder.kinesis;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
@@ -26,10 +27,9 @@ import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
import org.springframework.cloud.stream.test.junit.AbstractExternalResourceTestSupport;
import static org.mockito.Mockito.mock;
/**
* @author Artem Bilan
* @author Jacob Severson
*
*/
public class LocalKinesisResource extends AbstractExternalResourceTestSupport<AmazonKinesisAsync> {
@@ -53,7 +53,6 @@ public class LocalKinesisResource extends AbstractExternalResourceTestSupport<Am
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
this.resource = AmazonKinesisAsyncClientBuilder.standard()
.withCredentials(mock(AWSCredentialsProvider.class))
.withClientConfiguration(
new ClientConfiguration()
.withMaxErrorRetry(0)
@@ -61,6 +60,7 @@ public class LocalKinesisResource extends AbstractExternalResourceTestSupport<Am
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + this.port,
Regions.DEFAULT_REGION.getName()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.build();
// Check connection