CDC Debezium Supplier, Source Boot Starter

- Add supplier, source and boot auto-config.
 - Add work around for KafkaNull handling.
 - Update CDC to 1.2.1.Final. Replace the deprecated properties.

remove comments

Add missing metadata configuration. Improve README docs.

CDC: add all metadata for doc auto-generation

Remove custom KafkaNullConverter.

  - The fixed KafkaNullConverter implementation comes with kafka binder 3.0.8-BUILD-SNAPSHOT or newer.

Add repositories to maven config in app generation project.

Remove the unnecessary function version in maven plugin configuration for app generation.
This commit is contained in:
Christian Tzolov
2020-07-09 12:45:58 +02:00
committed by Soby Chacko
parent 4acf224790
commit 8ab33a195d
56 changed files with 4317 additions and 4 deletions

View File

@@ -73,7 +73,7 @@ The following are the four major components of this repository.
|link:functions/supplier/websocket-supplier/README.adoc[Websocket]
|
|link:functions/consumer/twitter-consumer/README.adoc[Twitter]
|
||link:functions/supplier/cdc-debezium-supplier/README.adoc[CDC Debezium]
|
|link:functions/consumer/websocket-consumer/README.adoc[Websocket]
|
@@ -140,7 +140,7 @@ The following are the four major components of this repository.
|link:applications/source/websocket-source/README.adoc[Websocket]
|
|link:applications/sink/twitter-update-sink/README.adoc[Twitter Update]
|
||link:applications/source/cdc-debezium-source/README.adoc[CDC Debezium]
|
|link:applications/sink/wavefront-sink/README.adoc[Wavefront]
|===

View File

@@ -0,0 +1,289 @@
//tag::ref-doc[]
= CDC Source
https://en.wikipedia.org/wiki/Change_data_capture[Change Data Capture] (CDC) `source` that captures and streams change events from various databases.
Currently, it supports `MySQL`, `PostgreSQL`, `MongoDB`, `Oracle` and `SQL Server` databases.
Build upon https://debezium.io/docs/embedded/[Debezium Embedded Connector], the `CDC Source` allows capturing and streaming database changes over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.
It supports all Debezium configuration properties. Just add the `cdc.config.` prefix to the existing Debezium properties. For example to set the Debezium's `connector.class` property use the `cdc.config.connector.class` source property instead.
We provide convenient shortcuts for the most frequently used Debezium properties. For example instead of the long `cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector` Debezium property you can use our `cdc.connector=mysql` shortcut. The table below lists all available shortcuts along with the Debezium properties they represent.
The Debezium properties (e.g. `cdc.config.XXX`) always have precedence over the shortcuts!
The CDC Source introduces a new default `BackingOffsetStore` configuration, based on the [MetadataStore](https://github.com/spring-cloud/stream-applications/tree/master/functions/common/metadata-store-common) service. Later provides various microservices friendly ways for storing the offset metadata.
== Options
//tag::configuration-properties[]
$$cdc.config$$:: $$Spring pass-trough wrapper for debezium configuration properties. All properties with a 'cdc.config.' prefix are native Debezium properties. The prefix is removed, converting them into Debezium io.debezium.config.Configuration.$$ *($$Map<String, String>$$, default: `$$<none>$$`)*
$$cdc.connector$$:: $$Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not contradict with each other.$$ *($$ConnectorType$$, default: `$$<none>$$`, possible values: `mysql`,`postgres`,`mongodb`,`oracle`,`sqlserver`)*
$$cdc.flattering.add-fields$$:: $$Comma separated list of metadata fields to add to the flattened message. The fields will be prefixed with "__" or "__[<]struct]__", depending on the specification of the struct.$$ *($$String$$, default: `$$<none>$$`)*
$$cdc.flattering.add-headers$$:: $$Comma separated list specify a list of metadata fields to add to the header of the flattened message. The fields will be prefixed with "__" or "__[struct]__".$$ *($$String$$, default: `$$<none>$$`)*
$$cdc.flattering.delete-handling-mode$$:: $$Options for handling deleted records: (1) none - pass the records through, (2) drop - remove the records and (3) rewrite - add a '__deleted' field to the records.$$ *($$DeleteHandlingMode$$, default: `$$<none>$$`, possible values: `drop`,`rewrite`,`none`)*
$$cdc.flattering.drop-tombstones$$:: $$By default Debezium generates tombstone records to enable Kafka compaction on deleted records. The dropTombstones can suppress the tombstone records.$$ *($$Boolean$$, default: `$$true$$`)*
$$cdc.flattering.enabled$$:: $$Enable flattering the source record events (https://debezium.io/docs/configuration/event-flattening).$$ *($$Boolean$$, default: `$$true$$`)*
$$cdc.name$$:: $$Unique name for this sourceConnector instance.$$ *($$String$$, default: `$$<none>$$`)*
$$cdc.offset.commit-timeout$$:: $$Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.$$ *($$Duration$$, default: `$$5000ms$$`)*
$$cdc.offset.flush-interval$$:: $$Interval at which to try committing offsets. The default is 1 minute.$$ *($$Duration$$, default: `$$60000ms$$`)*
$$cdc.offset.policy$$:: $$Offset storage commit policy.$$ *($$OffsetPolicy$$, default: `$$<none>$$`)*
$$cdc.offset.storage$$:: $$Kafka connector tracks the number processed records and regularly stores the count (as "offsets") in a preconfigured metadata storage. On restart the connector resumes the reading from the last recorded source offset.$$ *($$OffsetStorageType$$, default: `$$<none>$$`, possible values: `memory`,`file`,`kafka`,`metadata`)*
$$cdc.schema$$:: $$Include the schema's as part of the outbound message.$$ *($$Boolean$$, default: `$$false$$`)*
$$cdc.stream.header.convert-connect-headers$$:: $$When true the {@link org.apache.kafka.connect.header.Header} are converted into message headers with the {@link org.apache.kafka.connect.header.Header#key()} as name and {@link org.apache.kafka.connect.header.Header#value()}.$$ *($$Boolean$$, default: `$$true$$`)*
$$cdc.stream.header.offset$$:: $$Serializes the source record's offset metadata into the outbound message header under cdc.offset.$$ *($$Boolean$$, default: `$$false$$`)*
//end::configuration-properties[]
==== Debezium property Shortcut mapping
The table below lists all available shortcuts along with the Debezium properties they represent.
.Table Shortcut Properties Mapping
|===
| Shortcut | Original | Description
|cdc.connector
|cdc.config.connector.class
|`mysql` : MySqlConnector, `postgres` : PostgresConnector, `mongodb` : MongodbSourceConnector, `oracle` : OracleConnector, `sqlserver` : SqlServerConnector
|cdc.name
|cdc.config.name
|
|cdc.offset.flush-interval
|cdc.config.offset.flush.interval.ms
|
|cdc.offset.commit-timeout
|cdc.config.offset.flush.timeout.ms
|
|cdc.offset.policy
|cdc.config.offset.commit.policy
|`periodic` : PeriodicCommitOffsetPolicy, `always` : AlwaysCommitOffsetPolicy
|cdc.offset.storage
|cdc.config.offset.storage
|`metadata` : MetadataStoreOffsetBackingStore, `file` : FileOffsetBackingStore, `kafka` : KafkaOffsetBackingStore, `memory` : MemoryOffsetBackingStore
|cdc.flattering.drop-tombstones
|cdc.config.drop.tombstones
|
|cdc.flattering.delete-handling-mode
|cdc.config.delete.handling.mode
|`none` : none, `drop` : drop, `rewrite` : rewrite
|===
== Database Support
The `CDC Source` uses the Debezium utilities, and currently supports CDC for five datastores: `MySQL`, `PostgreSQL`, `MongoDB`, `Oracle` and `SQL Server` databases.
== Examples and Testing
The [CdcSourceIntegrationTest](), [CdcDeleteHandlingIntegrationTest]() and [CdcFlatteringIntegrationTest]() integration tests use test databases fixtures, running on the local machine.
We use pre-build debezium docker database images.
The Maven builds create the test databases fixtures with the help of the `docker-maven-plugin`.
To run and debug the tests from your IDE you need to deploy the required database images from the command line.
Instructions below explains how to run pre-configured test databases form Docker images.
==== MySQL
Start the `debezium/example-mysql` in a docker:
[source, bash]
----
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
----
[TIP]
====
(optional) Use `mysql` client to connected to the database and to create a `debezium` user with required credentials:
[source, bash]
----
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
----
====
Use following properties to connect the CDC Source to the MySQL DB:
[source]
----
cdc.connector=mysql # <1>
cdc.name=my-sql-connector # <2>
cdc.config.database.server.id=85744 # <2>
cdc.config.database.server.name=my-app-connector # <2>
cdc.config.database.user=debezium # <3>
cdc.config.database.password=dbz # <3>
cdc.config.database.hostname=localhost # <3>
cdc.config.database.port=3306 # <3>
cdc.schema=true # <4>
cdc.flattering.enabled=true # <5>
----
<1> Configures the CDC Source to use https://debezium.io/docs/connectors/mysql/[MySqlConnector]. (equivalent to setting `cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector`).
<2> Metadata used to identify and dispatch the incoming events.
<3> Connection to the MySQL server running on `localhost:3306` as `debezium` user.
<4> Includes the https://debezium.io/docs/connectors/mysql/#change-events-value[Change Event Value] schema in the `SourceRecord` events.
<5> Enables the https://debezium.io/docs/configuration/event-flattening/[CDC Event Flattering].
You can run also the `CdcSourceIntegrationTests#CdcMysqlTests` using this mysql configuration.
==== PostgreSQL
Start a pre-configured postgres server from the `debezium/example-postgres:1.0` Docker image:
[source, bash]
----
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
----
You can connect to this server like this:
[source, bash]
----
psql -U postgres -h localhost -p 5432
----
Use following properties to connect the CDC Source to the PostgreSQL:
[source]
----
cdc.connector=postgres # <1>
cdc.offset.storage=memory #<2>
cdc.name=my-sql-connector # <3>
cdc.config.database.server.id=85744 # <3>
cdc.config.database.server.name=my-app-connector # <3>
cdc.config.database.user=postgres # <4>
cdc.config.database.password=postgres # <4>
cdc.config.database..dbname=postgres # <4>
cdc.config.database.hostname=localhost # <4>
cdc.config.database.port=5432 # <4>
cdc.schema=true # <5>
cdc.flattering.enabled=true # <6>
----
<1> Configures `CDC Source` to use https://debezium.io/docs/connectors/postgresql/[PostgresConnector]. Equivalent for setting `cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector`.
<2> Configures the Debezium engine to use `memory` (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.
<3> Metadata used to identify and dispatch the incoming events.
<4> Connection to the PostgreSQL server running on `localhost:5432` as `postgres` user.
<5> Includes the https://debezium.io/docs/connectors/mysql/#change-events-value[Change Event Value] schema in the `SourceRecord` events.
<6> Enables the https://debezium.io/docs/configuration/event-flattening/[CDC Event Flattering].
You can run also the `CdcSourceIntegrationTests#CdcPostgresTests` using this mysql configuration.
==== MongoDB
Start a pre-configured mongodb from the `debezium/example-mongodb:0.10` Docker image:
[source, bash]
----
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
----
Initialize the inventory collections
[source, bash]
----
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
----
In the `mongodb` terminal output, search for a log entry like `host: "3f95a8a6516e:27017"` :
[source, bash]
----
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
----
Add `127.0.0.1 3f95a8a6516e` entry to your `/etc/hosts`
Use following properties to connect the CDC Source to the MongoDB:
[source]
----
cdc.connector=mongodb # <1>
cdc.offset.storage=memory #<2>
cdc.config.mongodb.hosts=rs0/localhost:27017 # <3>
cdc.config.mongodb.name=dbserver1 # <3>
cdc.config.mongodb.user=debezium # <3>
cdc.config.mongodb.password=dbz # <3>
cdc.config.database.whitelist=inventory # <3>
cdc.config.tasks.max=1 # <4>
cdc.schema=true # <5>
cdc.flattering.enabled=true # <6>
----
<1> Configures `CDC Source` to use https://debezium.io/docs/connectors/mongodb/[MongoDB Connector]. This maps into `cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector`.
<2> Configures the Debezium engine to use `memory` (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.
<3> Connection to the MongoDB running on `localhost:27017` as `debezium` user.
<4> https://debezium.io/docs/connectors/mongodb/#tasks
<5> Includes the https://debezium.io/docs/connectors/mysql/#change-events-value[Change Event Value] schema in the `SourceRecord` events.
<6> Enables the https://debezium.io/docs/configuration/event-flattening/[CDC Event Flattering].
You can run also the `CdcSourceIntegrationTests#CdcPostgresTests` using this mysql configuration.
==== SQL Server
Start a `sqlserver` from the `debezium/example-postgres:1.0` Docker image:
[source, bash]
----
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
----
Populate with sample data form debezium's sqlserver tutorial:
[source, bash]
----
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
----
Use following properties to connect the CDC Source to the SQLServer:
[source]
----
cdc.connector=sqlserver # <1>
cdc.offset.storage=memory #<2>
cdc.name=my-sql-connector # <3>
cdc.config.database.server.id=85744 # <3>
cdc.config.database.server.name=my-app-connector # <3>
cdc.config.database.user=sa # <4>
cdc.config.database.password=Password! # <4>
cdc.config.database..dbname=testDB # <4>
cdc.config.database.hostname=localhost # <4>
cdc.config.database.port=1433 # <4>
----
<1> Configures `CDC Source` to use https://debezium.io/docs/connectors/sqlserver/[SqlServerConnector]. Equivalent for setting `cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector`.
<2> Configures the Debezium engine to use `memory` (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store.
<3> Metadata used to identify and dispatch the incoming events.
<4> Connection to the SQL Server running on `localhost:1433` as `sa` user.
You can run also the `CdcSourceIntegrationTests#CdcSqlServerTests` using this mysql configuration.
==== Oracle
Start Oracle reachable from localhost and set up with the configuration, users and grants described in the https://github.com/debezium/oracle-vagrant-box[Debezium Vagrant set-up]
Populate with sample data form Debezium's Oracle tutorial:
[source, bash]
----
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
----
//end::ref-doc[]
== Run standalone
```
java -jar cdc-debezium-source.jar --cdc.connector=mysql --cdc.name=my-sql-connector --cdc.config.database.server.id=85744 --cdc.config.database.server.name=my-app-connector --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.hostname=localhost --cdc.config.database.port=3306 --cdc.schema=true --cdc.flattering.enabled=true
```

View File

@@ -0,0 +1,298 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cdc-debezium-source</artifactId>
<version>3.0.0-SNAPSHOT</version>
<name>cdc-debezium-source</name>
<description>CDC Debezium source apps</description>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-core</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debezium-supplier</artifactId>
<version>${java-functions.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>function-test-support</artifactId>
<version>${java-functions.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.javacrumbs.json-unit</groupId>
<artifactId>json-unit</artifactId>
<version>1.25.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<!-- 3.0.8.BUILD-SNAPSHOT or newer is required because of an issue with the KafkaNullConverter -->
<version>3.0.8.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-app-starter-doc-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.cloud.stream.app.plugin</groupId>
<artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
<configuration>
<generatedApp>
<name>cdc-debezium</name>
<type>source</type>
<version>${project.version}</version>
<configClass>org.springframework.cloud.fn.supplier.cdc.CdcSupplierConfiguration.class</configClass>
<functionDefinition>cdcSupplier</functionDefinition>
</generatedApp>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debebzium-supplier</artifactId>
</dependency>
</dependencies>
</configuration>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.33.0</version>
<configuration>
<images>
<!-- <image>-->
<!-- <alias>test-mongodb2</alias>-->
<!-- <name>%a/example-mongodb:${project.version}</name>-->
<!-- <build>-->
<!-- <dockerFileDir>${project.basedir}/src/test/docker/mongodb</dockerFileDir>-->
<!-- <filter>@</filter>-->
<!-- </build>-->
<!-- <run>-->
<!-- <hostname>localhost</hostname>-->
<!-- <env>-->
<!-- <MONGODB_USER>debezium</MONGODB_USER>-->
<!-- <MONGODB_PASSWORD>dbz</MONGODB_PASSWORD>-->
<!-- </env>-->
<!-- <ports>-->
<!-- <port>27017:27017</port>-->
<!-- </ports>-->
<!-- <wait>-->
<!-- <log>port: 3306</log>-->
<!-- <time>300000</time>-->
<!-- </wait>-->
<!-- </run>-->
<!-- </image>-->
<!--<image>-->
<!--<alias>test-mongodb</alias>-->
<!--<name>debezium/example-mongodb:1.0</name>-->
<!--<run>-->
<!--<hostname>localhost</hostname>-->
<!--<env>-->
<!--<MONGODB_USER>debezium</MONGODB_USER>-->
<!--<MONGODB_PASSWORD>dbz</MONGODB_PASSWORD>-->
<!--</env>-->
<!--<ports>-->
<!--<port>27017:27017</port>-->
<!--</ports>-->
<!--<wait>-->
<!--<log>port: 3306</log>-->
<!--<time>300000</time>-->
<!--</wait>-->
<!--</run>-->
<!--</image>-->
<image>
<alias>mysql</alias>
<name>debezium/example-mysql:1.0</name>
<run>
<env>
<MYSQL_ROOT_PASSWORD>debezium</MYSQL_ROOT_PASSWORD>
<MYSQL_USER>mysqluser</MYSQL_USER>
<MYSQL_PASSWORD>mysqlpw</MYSQL_PASSWORD>
</env>
<ports>
<port>3306:3306</port>
</ports>
<wait>
<log>port: 3306</log>
<time>30000</time>
</wait>
</run>
</image>
<image>
<alias>postgres</alias>
<name>debezium/example-postgres:1.0</name>
<run>
<env>
<POSTGRES_USER>postgres</POSTGRES_USER>
<POSTGRES_PASSWORD>postgres</POSTGRES_PASSWORD>
</env>
<ports>
<port>5432:5432</port>
</ports>
<wait>
<log>PostgreSQL init process complete</log>
<time>30000</time>
</wait>
</run>
</image>
<image>
<external>
<type>properties</type>
<prefix>docker</prefix>
<mode>override</mode>
</external>
<name>%a/sqlserver-example:${project.version}</name>
<build>
<dockerFileDir>${project.basedir}/src/test/docker/sqlserver</dockerFileDir>
<filter>@</filter>
</build>
<run>
<env>
<ACCEPT_EULA>Y</ACCEPT_EULA>
<MSSQL_PID>Standard</MSSQL_PID>
<SA_PASSWORD>Password!</SA_PASSWORD>
<MSSQL_AGENT_ENABLED>true</MSSQL_AGENT_ENABLED>
</env>
<ports>
<port>1433:1433</port>
</ports>
<wait>
<log>1 rows affected</log>
<time>40000</time>
</wait>
</run>
</image>
</images>
</configuration>
<executions>
<execution>
<id>start</id>
<!--<phase>pre-integration-test</phase>-->
<phase>generate-test-resources</phase>
<goals>
<goal>build</goal>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
<execution>
<id>stop-pre</id>
<phase>clean</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
</repository>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
</repository>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
</repository>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>spring-libs-release</id>
<name>Spring Libs Release</name>
<url>https://repo.spring.io/libs-release</url>
</repository>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>spring-milestone-release</id>
<name>Spring Milestone Release</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release</url>
</pluginRepository>
<pluginRepository>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
</pluginRepository>
<pluginRepository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
</pluginRepository>
</pluginRepositories>
</project>

View File

@@ -0,0 +1,5 @@
configuration-properties.classes=org.springframework.cloud.fn.supplier.cdc.CdcSupplierProperties, \
org.springframework.cloud.fn.supplier.cdc.CdcSupplierProperties$Header, \
org.springframework.cloud.fn.common.cdc.CdcCommonProperties, \
org.springframework.cloud.fn.common.cdc.CdcCommonProperties$Flattering, \
org.springframework.cloud.fn.common.cdc.CdcCommonProperties$Offset

View File

@@ -0,0 +1,2 @@
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration
spring.cloud.stream.kafka.default.producer.messageKeyExpression=headers['cdc_key']

View File

@@ -0,0 +1,16 @@
FROM debezium/example-mongodb:0.10
## Bundle data source
COPY entrypoint.sh /usr/local/bin/
# Grant permissions for the import-data script to be executable
RUN chmod +x /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/init-inventory.sh
CMD /usr/local/bin/entrypoint.sh
#CMD ["mongod", "--replSet", "rs0", "--auth"]
#CMD ["/bin/bash", "./entrypoint.sh"]
#CMD ["mongod", "--replSet", "rs0", "--auth", "&", "sleep", "10s", "&", "/usr/local/bin/init-inventory.sh"]
#CMD mongod --replSet rs0 --auth

View File

@@ -0,0 +1,2 @@
#!/usr/bin/env bash
mongod --replSet rs0 --auth & sleep 10s

View File

@@ -0,0 +1,5 @@
#wait for the SQL Server to come up
sleep 10s
#populate the db
/usr/local/bin/init-inventory.sh

View File

@@ -0,0 +1,13 @@
FROM microsoft/mssql-server-linux:2017-CU9-GDR2
## Create sql directory
RUN mkdir -p /usr/src/data
WORKDIR /usr/src/data
## Bundle data source
COPY . /usr/src/data
# Grant permissions for the import-data script to be executable
RUN chmod +x /usr/src/data/import-data.sh
CMD /bin/bash ./entrypoint.sh

View File

@@ -0,0 +1,6 @@
# Start SQL Server, start the script to create the DB and import the data.
# Use the tail to keep the container running
/opt/mssql/bin/sqlservr & \
/usr/src/data/import-data.sh & \
echo "Data Loaded" & \
tail -f /dev/null

View File

@@ -0,0 +1,5 @@
#wait for the SQL Server to come up
sleep 10s
#run the setup script to create the DB and the schema in the DB
/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P $SA_PASSWORD -d master -i inventory.sql

View File

@@ -0,0 +1,84 @@
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
INSERT INTO products(name,description,weight)
VALUES ('scooter','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products(name,description,weight)
VALUES ('hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products(name,description,weight)
VALUES ('hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products(name,description,weight)
VALUES ('hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products(name,description,weight)
VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
VALUES ('jacket','water resistent black wind breaker',0.1);
INSERT INTO products(name,description,weight)
VALUES ('spare tire','24 inch spare tire',22.2);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;
-- Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
quantity INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products_on_hand', @role_name = NULL, @supports_net_changes = 0;
-- Create some customers ...
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
-- Create some very simple orders
CREATE TABLE orders (
id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY (purchaser) REFERENCES customers(id),
FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('16-JAN-2016', 1001, 1, 102);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('17-JAN-2016', 1002, 2, 105);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('19-FEB-2016', 1002, 2, 106);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('21-FEB-2016', 1003, 1, 107);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO

View File

@@ -0,0 +1,162 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.source.cdc;
import java.time.Duration;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ContextConsumer;
import org.springframework.cloud.fn.common.cdc.CdcCommonProperties;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.test.jdbc.JdbcTestUtils;
import org.springframework.util.ClassUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.cloud.fn.supplier.cdc.CdcSupplierConfiguration.ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL;
import static org.springframework.cloud.stream.app.source.cdc.CdcTestUtils.receiveAll;
/**
* @author Christian Tzolov
*/
public class CdcDeleteHandlingIntegrationTest {
private final JdbcTemplate jdbcTemplate = CdcTestUtils.jdbcTemplate(
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://localhost:3306/inventory",
"root", "debezium");
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(
TestChannelBinderConfiguration.getCompleteConfiguration(TestCdcSourceApplication.class))
.withPropertyValues(
"spring.cloud.stream.function.definition=cdcSupplier",
"cdc.name=my-sql-connector",
"cdc.schema=false",
"cdc.flattering.enabled=true",
"cdc.stream.header.offset=true",
"cdc.connector=mysql",
"cdc.config.database.user=debezium",
"cdc.config.database.password=dbz",
"cdc.config.database.hostname=localhost",
"cdc.config.database.port=3306",
"cdc.config.database.server.id=85744",
"cdc.config.database.server.name=my-app-connector",
"cdc.config.database.history=io.debezium.relational.history.MemoryDatabaseHistory");
@Test
public void handleRecordDeletionTest() {
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=none", "cdc.flattering.dropTombstones=true")
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=none", "cdc.flattering.dropTombstones=true")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=none", "cdc.flattering.dropTombstones=false")
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=none", "cdc.flattering.dropTombstones=false")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=drop", "cdc.flattering.dropTombstones=true")
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=drop", "cdc.flattering.dropTombstones=true")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=drop", "cdc.flattering.dropTombstones=false")
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=drop", "cdc.flattering.dropTombstones=false")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=rewrite", "cdc.flattering.dropTombstones=true")
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=rewrite", "cdc.flattering.dropTombstones=true")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=rewrite", "cdc.flattering.dropTombstones=false")
.run(consumer);
contextRunner.withPropertyValues("cdc.flattering.deleteHandlingMode=rewrite", "cdc.flattering.dropTombstones=false")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(consumer);
}
final ContextConsumer<? super ApplicationContext> consumer = context -> {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
CdcCommonProperties props = context.getBean(CdcCommonProperties.class);
boolean isKafkaPresent = ClassUtils.isPresent(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL, context.getClassLoader());
CdcCommonProperties.DeleteHandlingMode deleteHandlingMode = props.getFlattering().getDeleteHandlingMode();
boolean isDropTombstones = props.getFlattering().isDropTombstones();
jdbcTemplate.update("insert into `customers`(`first_name`,`last_name`,`email`) VALUES('Test666', 'Test666', 'Test666@spring.org')");
String newRecordId = jdbcTemplate.query("select * from `customers` where `first_name` = ?",
(rs, rowNum) -> rs.getString("id"), "Test666").iterator().next();
List<Message<?>> messages = receiveAll(outputDestination);
assertThat(messages).hasSize(53);
JdbcTestUtils.deleteFromTableWhere(jdbcTemplate, "customers", "first_name = ?", "Test666");
Message<?> received;
if (deleteHandlingMode == CdcCommonProperties.DeleteHandlingMode.drop) {
// Do nothing
}
else if (deleteHandlingMode == CdcCommonProperties.DeleteHandlingMode.none) {
received = outputDestination.receive(Duration.ofSeconds(10).toMillis());
assertThat(received).isNotNull();
assertThat(received.getPayload()).isEqualTo("null".getBytes());
}
else if (deleteHandlingMode == CdcCommonProperties.DeleteHandlingMode.rewrite) {
received = outputDestination.receive(Duration.ofSeconds(10).toMillis());
assertThat(received).isNotNull();
assertThat(toString(received.getPayload()).contains("\"__deleted\":\"true\""));
}
if (!isDropTombstones && isKafkaPresent) {
received = outputDestination.receive(Duration.ofSeconds(10).toMillis());
assertThat(received).isNotNull();
//Tombstones event should have KafkaNull payload
assertThat(received.getPayload().getClass().getCanonicalName())
.isEqualTo(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL);
String key = (String) received.getHeaders().get("cdc_key");
//Tombstones event should carry the deleted record id in the cdc_key header
assertThat(key).isEqualTo("{\"id\":" + newRecordId + "}");
}
received = outputDestination.receive(Duration.ofSeconds(10).toMillis());
assertThat(received).isNull();
};
private String toString(Object object) {
return new String((byte[]) object);
}
}

View File

@@ -0,0 +1,244 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.source.cdc;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ContextConsumer;
import org.springframework.cloud.fn.common.cdc.CdcCommonProperties;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.test.jdbc.JdbcTestUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
import static net.javacrumbs.jsonunit.JsonAssert.assertJsonEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.cloud.fn.supplier.cdc.CdcSupplierConfiguration.ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL;
import static org.springframework.cloud.stream.app.source.cdc.CdcTestUtils.receiveAll;
import static org.springframework.cloud.stream.app.source.cdc.CdcTestUtils.resourceToString;
/**
* @author Christian Tzolov
*/
public class CdcFlatteringIntegrationTest {
private final JdbcTemplate jdbcTemplate = CdcTestUtils.jdbcTemplate(
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://localhost:3306/inventory",
"root", "debezium");
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(
TestChannelBinderConfiguration.getCompleteConfiguration(TestCdcSourceApplication.class))
.withPropertyValues(
"spring.cloud.stream.function.definition=cdcSupplier",
"cdc.name=my-sql-connector",
"cdc.schema=false",
"cdc.stream.header.offset=false",
"cdc.connector=mysql",
"cdc.config.database.user=debezium",
"cdc.config.database.password=dbz",
"cdc.config.database.hostname=localhost",
"cdc.config.database.port=3306",
"cdc.config.database.server.id=85744",
"cdc.config.database.server.name=my-app-connector",
"cdc.config.database.history=io.debezium.relational.history.MemoryDatabaseHistory"
);
@Test
public void noFlatteredResponseNoKafka() {
contextRunner.withPropertyValues("cdc.flattering.enabled=false")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(noFlatteringTest);
}
@Test
public void noFlatteredResponseWithKafka() {
contextRunner.withPropertyValues("cdc.flattering.enabled=false")
.run(noFlatteringTest);
}
final ContextConsumer<? super ApplicationContext> noFlatteringTest = context -> {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
boolean isKafkaPresent = ClassUtils.isPresent(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL, context.getClassLoader());
List<Message<?>> messages = receiveAll(outputDestination);
assertThat(messages).hasSize(52);
assertJsonEquals(resourceToString(
"classpath:/json/mysql_ddl_drop_inventory_address_table.json"),
toString(messages.get(1).getPayload()));
assertThat(messages.get(1).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector");
assertJsonEquals("{\"databaseName\":\"inventory\"}", messages.get(1).getHeaders().get("cdc_key"));
assertJsonEquals(resourceToString("classpath:/json/mysql_insert_inventory_products_106.json"),
toString(messages.get(39).getPayload()));
assertThat(messages.get(39).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector.inventory.products");
assertJsonEquals("{\"id\":106}", messages.get(39).getHeaders().get("cdc_key"));
jdbcTemplate.update(
"insert into `customers`(`first_name`,`last_name`,`email`) VALUES('Test666', 'Test666', 'Test666@spring.org')");
String newRecordId = jdbcTemplate.query("select * from `customers` where `first_name` = ?",
(rs, rowNum) -> rs.getString("id"), "Test666").iterator().next();
jdbcTemplate.update("UPDATE `customers` SET `last_name`='Test999' WHERE first_name = 'Test666'");
JdbcTestUtils.deleteFromTableWhere(jdbcTemplate, "customers", "first_name = ?", "Test666");
messages = receiveAll(outputDestination);
assertThat(messages).hasSize(isKafkaPresent ? 4 : 3);
assertJsonEquals(resourceToString("classpath:/json/mysql_update_inventory_customers.json"),
toString(messages.get(1).getPayload()));
assertThat(messages.get(1).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector.inventory.customers");
assertJsonEquals("{\"id\":" + newRecordId + "}", messages.get(1).getHeaders().get("cdc_key"));
assertJsonEquals(resourceToString("classpath:/json/mysql_delete_inventory_customers.json"),
toString(messages.get(2).getPayload()));
assertThat(messages.get(1).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector.inventory.customers");
assertJsonEquals("{\"id\":" + newRecordId + "}", messages.get(1).getHeaders().get("cdc_key"));
if (isKafkaPresent) {
assertThat(messages.get(3).getPayload().getClass().getCanonicalName())
.isEqualTo(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL,
"Tombstones event should have KafkaNull payload");
assertThat(messages.get(3).getHeaders().get("cdc_topic"))
.isEqualTo("my-app-connector.inventory.customers");
assertJsonEquals("{\"id\":" + newRecordId + "}", messages.get(3).getHeaders().get("cdc_key"));
}
};
@Test
public void flatteredResponseNoKafka() {
contextRunner.withPropertyValues(
"cdc.flattering.enabled=true",
"cdc.flattering.deleteHandlingMode=none",
"cdc.flattering.dropTombstones=false",
"cdc.flattering.addHeaders=op",
"cdc.flattering.addFields=name,db")
.withClassLoader(new FilteredClassLoader(KafkaNull.class)) // Remove Kafka from the classpath
.run(flatteringTest);
}
@Test
public void flatteredResponseWithKafka() {
contextRunner.withPropertyValues(
"cdc.flattering.enabled=true",
"cdc.flattering.deleteHandlingMode=none",
"cdc.flattering.dropTombstones=false",
"cdc.flattering.addHeaders=op",
"cdc.flattering.addFields=name,db")
.run(flatteringTest);
}
@Test
public void flatteredResponseWithKafkaDropTombstone() {
contextRunner.withPropertyValues(
"cdc.flattering.enabled=true",
"cdc.flattering.deleteHandlingMode=none",
"cdc.flattering.dropTombstones=true",
"cdc.flattering.addHeaders=op",
"cdc.flattering.addFields=name,db")
.run(flatteringTest);
}
final ContextConsumer<? super ApplicationContext> flatteringTest = context -> {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
boolean isKafkaPresent = ClassUtils.isPresent(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL, context.getClassLoader());
List<Message<?>> messages = receiveAll(outputDestination);
assertThat(messages).hasSize(52);
CdcCommonProperties.Flattering flatteringProps = context.getBean(CdcCommonProperties.class).getFlattering();
assertJsonEquals(resourceToString(
"classpath:/json/mysql_ddl_drop_inventory_address_table.json"),
toString(messages.get(1).getPayload()));
assertThat(messages.get(1).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector");
assertJsonEquals("{\"databaseName\":\"inventory\"}",
messages.get(1).getHeaders().get("cdc_key"));
if (flatteringProps.isEnabled()) {
assertJsonEquals(resourceToString("classpath:/json/mysql_flattered_insert_inventory_products_106.json"),
toString(messages.get(39).getPayload()));
}
else {
assertJsonEquals(resourceToString("classpath:/json/mysql_insert_inventory_products_106.json"),
toString(messages.get(39).getPayload()));
}
assertThat(messages.get(39).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector.inventory.products");
assertJsonEquals("{\"id\":106}", messages.get(39).getHeaders().get("cdc_key"));
if (flatteringProps.isEnabled() && flatteringProps.getAddHeaders().contains("op")) {
assertThat(messages.get(39).getHeaders().get("__op")).isEqualTo("c");
}
jdbcTemplate.update(
"insert into `customers`(`first_name`,`last_name`,`email`) VALUES('Test666', 'Test666', 'Test666@spring.org')");
String newRecordId = jdbcTemplate.query("select * from `customers` where `first_name` = ?",
(rs, rowNum) -> rs.getString("id"), "Test666").iterator().next();
jdbcTemplate.update("UPDATE `customers` SET `last_name`='Test999' WHERE first_name = 'Test666'");
JdbcTestUtils.deleteFromTableWhere(jdbcTemplate, "customers", "first_name = ?", "Test666");
messages = receiveAll(outputDestination);
assertThat(messages).hasSize((!flatteringProps.isDropTombstones() && isKafkaPresent) ? 4 : 3);
assertJsonEquals(resourceToString("classpath:/json/mysql_flattered_update_inventory_customers.json"),
toString(messages.get(1).getPayload()));
assertThat(messages.get(1).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector.inventory.customers");
assertJsonEquals("{\"id\":" + newRecordId + "}", messages.get(1).getHeaders().get("cdc_key"));
if (!StringUtils.isEmpty(flatteringProps.getAddHeaders()) && flatteringProps.getAddHeaders().contains("op")) {
assertThat(messages.get(1).getHeaders().get("__op")).isEqualTo("u");
}
if (flatteringProps.getDeleteHandlingMode() == CdcCommonProperties.DeleteHandlingMode.none) {
assertThat(toString(messages.get(2).getPayload())).isEqualTo("null");
assertThat(messages.get(1).getHeaders().get("cdc_topic")).isEqualTo("my-app-connector.inventory.customers");
assertJsonEquals("{\"id\":" + newRecordId + "}", messages.get(1).getHeaders().get("cdc_key"));
if (!StringUtils.isEmpty(flatteringProps.getAddHeaders()) && flatteringProps.getAddHeaders().contains("op")) {
assertThat(messages.get(2).getHeaders().get("__op")).isEqualTo("d");
}
}
if (!flatteringProps.isDropTombstones() && isKafkaPresent) {
assertThat(messages.get(3).getPayload().getClass().getCanonicalName())
.isEqualTo(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL,
"Tombstones event should have KafkaNull payload");
assertThat(messages.get(3).getHeaders().get("cdc_topic"))
.isEqualTo("my-app-connector.inventory.customers");
assertJsonEquals("{\"id\":" + newRecordId + "}", messages.get(3).getHeaders().get("cdc_key"));
}
};
private String toString(Object object) {
return new String((byte[]) object);
}
}

View File

@@ -0,0 +1,123 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.source.cdc;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.cloud.stream.app.source.cdc.CdcTestUtils.receiveAll;
/**
* @author Christian Tzolov
*/
public class CdcSourceDatabasesIntegrationTest<b> {
private final SpringApplicationBuilder applicationBuilder = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(TestCdcSourceApplication.class))
.web(WebApplicationType.NONE)
.properties("spring.cloud.stream.function.definition=cdcSupplier",
"cdc.name=my-sql-connector",
"cdc.flattering.dropTombstones=false",
"cdc.schema=false",
"cdc.flattering.enabled=true",
"cdc.stream.header.offset=true",
"cdc.config.database.server.id=85744",
"cdc.config.database.server.name=my-app-connector",
"cdc.config.database.history=io.debezium.relational.history.MemoryDatabaseHistory");
@Test
public void mysql() {
try (ConfigurableApplicationContext context = applicationBuilder
.run("--cdc.connector=mysql",
"--cdc.config.database.user=debezium",
"--cdc.config.database.password=dbz",
"--cdc.config.database.hostname=localhost",
"--cdc.config.database.port=3306")) {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
// Using local region here
List<Message<?>> messages = receiveAll(outputDestination);
assertThat(messages).isNotNull();
assertThat(messages).hasSize(52);
}
}
@Test
public void sqlServer() {
try (ConfigurableApplicationContext context = applicationBuilder
.run("--cdc.connector=sqlserver",
//"--cdc.config.database.user=Standard",
"--cdc.config.database.user=sa",
"--cdc.config.database.password=Password!",
"--cdc.config.database.dbname=testDB",
"--cdc.config.database.hostname=localhost",
"--cdc.config.database.port=1433"
)) {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
// Using local region here
List<Message<?>> messages = receiveAll(outputDestination);
assertThat(messages).isNotNull();
assertThat(messages).hasSize(30);
}
}
@Test
public void postgres() {
try (ConfigurableApplicationContext context = applicationBuilder
.run("--cdc.connector=postgres",
"--cdc.config.database.user=postgres",
"--cdc.config.database.password=postgres",
"--cdc.config.database.dbname=postgres",
"--cdc.config.database.hostname=localhost",
"--cdc.config.database.port=5432")) {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
// Using local region here
List<Message<?>> messages = receiveAll(outputDestination);
assertThat(messages).isNotNull();
assertThat(messages).hasSize(5786);
}
}
//@Test
public void mongodb() {
try (ConfigurableApplicationContext context = applicationBuilder
.run("--cdc.connector=mongodb",
"--cdc.config.tasks.max=1",
"--cdc.config.mongodb.hosts=rs0/localhost:27017",
"--cdc.config.mongodb.name=dbserver1",
"--cdc.config.mongodb.user=debezium",
"--cdc.config.mongodb.password=dbz",
"--cdc.config.database.whitelist=inventory")) {
OutputDestination outputDestination = context.getBean(OutputDestination.class);
// Using local region here
List<Message<?>> messages = receiveAll(outputDestination);
assertThat(messages).isNotNull();
assertThat(messages).hasSize(666);
}
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.source.cdc;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.messaging.Message;
import org.springframework.util.StreamUtils;
/**
* @author Christian Tzolov
*/
public final class CdcTestUtils {
private CdcTestUtils() {
}
public static JdbcTemplate jdbcTemplate(String jdbcDriver, String jdbcUrl, String user, String password) {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(jdbcDriver);
dataSource.setUrl(jdbcUrl);
dataSource.setUsername(user);
dataSource.setPassword(password);
return new JdbcTemplate(dataSource);
}
public static List<Message<?>> receiveAll(OutputDestination outputDestination) {
List<Message<?>> list = new ArrayList<>();
Message<?> received;
do {
received = outputDestination.receive(Duration.ofSeconds(20).toMillis());
if (received != null) {
list.add(received);
}
} while (received != null);
return list;
}
public static String resourceToString(String resourcePath) throws IOException {
return StreamUtils.copyToString(
new DefaultResourceLoader().getResource(resourcePath).getInputStream(), StandardCharsets.UTF_8);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.source.cdc;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.cloud.fn.supplier.cdc.CdcSupplierConfiguration;
import org.springframework.cloud.stream.binder.kafka.KafkaNullConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.converter.MessageConverter;
/**
* @author Christian Tzolov
*/
@SpringBootConfiguration
@EnableAutoConfiguration(exclude = MongoAutoConfiguration.class)
@Import(CdcSupplierConfiguration.class)
public class TestCdcSourceApplication {
/**
* This is only required for testing. In production the KafkaNullConverter bean is auto-configured.
*/
@Bean
@ConditionalOnMissingBean(KafkaNullConverter.class)
MessageConverter kafkaNullConverter() {
return new KafkaNullConverter();
}
}

View File

@@ -0,0 +1,20 @@
{
"source": {
"version": "${json-unit.ignore}",
"connector": "mysql",
"name": "my-app-connector",
"ts_ms": 0,
"snapshot": "true",
"db": "inventory",
"table": "addresses",
"server_id": 0,
"gtid": null,
"file": "${json-unit.ignore}",
"pos": "${json-unit.ignore}",
"row": 0,
"thread": null,
"query": null
},
"databaseName": "inventory",
"ddl": "DROP TABLE IF EXISTS `inventory`.`addresses`"
}

View File

@@ -0,0 +1,28 @@
{
"before": {
"id": "${json-unit.ignore}",
"first_name": "Test666",
"last_name": "Test999",
"email": "Test666@spring.org"
},
"after": null,
"source": {
"version": "${json-unit.ignore}",
"connector": "mysql",
"name": "my-app-connector",
"ts_ms": "${json-unit.ignore}",
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": "${json-unit.ignore}",
"gtid": null,
"file": "${json-unit.ignore}",
"pos": "${json-unit.ignore}",
"row": 0,
"thread": "${json-unit.ignore}",
"query": null
},
"op": "d",
"ts_ms": "${json-unit.ignore}",
"transaction": null
}

View File

@@ -0,0 +1,8 @@
{
"__db" : "inventory",
"__name" : "my-app-connector",
"id": 106,
"name": "hammer",
"description": "16oz carpenter's hammer",
"weight": 1.0
}

View File

@@ -0,0 +1,8 @@
{
"id": "${json-unit.ignore}",
"first_name": "Test666",
"last_name": "Test999",
"email": "Test666@spring.org",
"__name": "my-app-connector",
"__db": "inventory"
}

View File

@@ -0,0 +1,28 @@
{
"before": null,
"after": {
"id": 106,
"name": "hammer",
"description": "16oz carpenter's hammer",
"weight": 1.0
},
"source": {
"version": "${json-unit.ignore}",
"connector": "mysql",
"name": "my-app-connector",
"ts_ms": 0,
"snapshot": "true",
"db": "inventory",
"table": "products",
"server_id": 0,
"gtid": null,
"file": "${json-unit.ignore}",
"pos": "${json-unit.ignore}",
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": "${json-unit.ignore}",
"transaction": null
}

View File

@@ -0,0 +1,33 @@
{
"before": {
"id": "${json-unit.ignore}",
"first_name": "Test666",
"last_name": "Test666",
"email": "Test666@spring.org"
},
"after": {
"id": "${json-unit.ignore}",
"first_name": "Test666",
"last_name": "Test999",
"email": "Test666@spring.org"
},
"source": {
"version": "${json-unit.ignore}",
"connector": "mysql",
"name": "my-app-connector",
"ts_ms": "${json-unit.ignore}",
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": "${json-unit.ignore}",
"gtid": null,
"file": "${json-unit.ignore}",
"pos": "${json-unit.ignore}",
"row": 0,
"thread": "${json-unit.ignore}",
"query": null
},
"op": "u",
"ts_ms": "${json-unit.ignore}",
"transaction": null
}

View File

@@ -28,5 +28,6 @@
<module>twitter-search-source</module>
<module>twitter-message-source</module>
<module>websocket-source</module>
<module>cdc-debezium-source</module>
</modules>
</project>

View File

@@ -20,6 +20,7 @@
<properties>
<apps.version>3.0.0-SNAPSHOT</apps.version>
<cdc-debezium-source.version>${apps.version}</cdc-debezium-source.version>
<file-source.version>${apps.version}</file-source.version>
<ftp-source.version>${apps.version}</ftp-source.version>
<geode-source.version>${apps.version}</geode-source.version>

View File

@@ -43,6 +43,8 @@
pom.properties['repo-spring-io']=
"${jdbc-source.version}".contains('SNAPSHOT') ? 'repo.spring.io/snapshot' :
"${jdbc-source.version}".contains('-') ? 'repo.spring.io/milestone' : 'repo.spring.io/release'
pom.properties['cdc-debezium-source-docker.tag']=
"${cdc-debezium-source.version}".contains('SNAPSHOT') ? 'latest' : "${cdc-debezium-source.version}"
pom.properties['file-source-docker.tag']=
"${file-source.version}".contains('SNAPSHOT') ? 'latest' : "${file-source.version}"
pom.properties['ftp-source-docker.tag']=

View File

@@ -1,3 +1,4 @@
source.cdc-debezium=docker:springcloudstream/cdc-debezium-source-kafka:@cdc-debezium-source-docker.tag@
source.file=docker:springcloudstream/file-source-kafka:@file-source-docker.tag@
source.ftp=docker:springcloudstream/ftp-source-kafka:@ftp-source-docker.tag@
source.geode=docker:springcloudstream/geode-source-kafka:@geode-source-docker.tag@

View File

@@ -1,3 +1,5 @@
source.cdc-debezium=https://@repo-spring-io@/org/springframework/cloud/stream/app/cdc-debezium-source-kafka/@cdc-debezium-source.version@/cdc-debezium-source-kafka-@cdc-debezium-source.version@.jar
source.cdc-debezium.metadata=https://@repo-spring-io@/org/springframework/cloud/stream/app/cdc-debezium-source-kafka/@cdc-debezium-source.version@/cdc-debezium-source-kafka-@cdc-debezium-source.version@-metadata.jar
source.file=https://@repo-spring-io@/org/springframework/cloud/stream/app/file-source-kafka/@file-source.version@/file-source-kafka-@file-source.version@.jar
source.file.metadata=https://@repo-spring-io@/org/springframework/cloud/stream/app/file-source-kafka/@file-source.version@/file-source-kafka-@file-source.version@-metadata.jar
source.ftp=https://@repo-spring-io@/org/springframework/cloud/stream/app/ftp-source-kafka/@ftp-source.version@/ftp-source-kafka-@ftp-source.version@.jar

View File

@@ -1,3 +1,5 @@
source.cdc-debezium=maven://org.springframework.cloud.stream.app:cdc-debezium-source-kafka:@cdc-debezium-source.version@
source.cdc-debezium.metadata=maven://org.springframework.cloud.stream.app:cdc-debezium-source-kafka:jar:metadata:@cdc-debezium-source.version@
source.file=maven://org.springframework.cloud.stream.app:file-source-kafka:@file-source.version@
source.file.metadata=maven://org.springframework.cloud.stream.app:file-source-kafka:jar:metadata:@file-source.version@
source.ftp=maven://org.springframework.cloud.stream.app:ftp-source-kafka:@ftp-source.version@

View File

@@ -1,3 +1,4 @@
source.cdc-debezium=docker:springcloudstream/cdc-debezium-source-rabbit:@cdc-debezium-source-docker.tag@
source.file=docker:springcloudstream/file-source-rabbit:@file-source-docker.tag@
source.ftp=docker:springcloudstream/ftp-source-rabbit:@ftp-source-docker.tag@
source.geode=docker:springcloudstream/geode-source-rabbit:@geode-source-docker.tag@

View File

@@ -1,3 +1,5 @@
source.cdc-debezium=https://@repo-spring-io@/org/springframework/cloud/stream/app/cdc-debezium-source-rabbit/@cdc-debezium-source.version@/cdc-debezium-source-rabbit-@cdc-debezium-source.version@.jar
source.cdc-debezium.metadata=https://@repo-spring-io@/org/springframework/cloud/stream/app/cdc-debezium-source-rabbit/@cdc-debezium-source.version@/cdc-debezium-source-rabbit-@cdc-debezium-source.version@-metadata.jar
source.file=https://@repo-spring-io@/org/springframework/cloud/stream/app/file-source-rabbit/@file-source.version@/file-source-rabbit-@file-source.version@.jar
source.file.metadata=https://@repo-spring-io@/org/springframework/cloud/stream/app/file-source-rabbit/@file-source.version@/file-source-rabbit-@file-source.version@-metadata.jar
source.ftp=https://@repo-spring-io@/org/springframework/cloud/stream/app/ftp-source-rabbit/@ftp-source.version@/ftp-source-rabbit-@ftp-source.version@.jar

View File

@@ -1,3 +1,5 @@
source.cdc-debezium=maven://org.springframework.cloud.stream.app:cdc-debezium-source-rabbit:@cdc-debezium-source.version@
source.cdc-debezium.metadata=maven://org.springframework.cloud.stream.app:cdc-debezium-source-rabbit:jar:metadata:@cdc-debezium-source.version@
source.file=maven://org.springframework.cloud.stream.app:file-source-rabbit:@file-source.version@
source.file.metadata=maven://org.springframework.cloud.stream.app:file-source-rabbit:jar:metadata:@file-source.version@
source.ftp=maven://org.springframework.cloud.stream.app:ftp-source-rabbit:@ftp-source.version@

View File

@@ -26,6 +26,11 @@
Modules with @ConfigurationProperties.
-->
<!-- common javadocs -->
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debezium-common</artifactId>
<version>${java-functions.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>file-common</artifactId>
@@ -62,6 +67,11 @@
<version>${java-functions.version}</version>
</dependency>
<!-- supplier javadocs -->
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debezium-supplier</artifactId>
<version>${java-functions.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>file-supplier</artifactId>

View File

@@ -3,6 +3,9 @@
:leveloffset: +2
[[spring-cloud-stream-modules-cdc-debezium-source]]
include::{stream-apps-root}/{branch}/applications/source/cdc-debezium-source/README.adoc[tags=ref-doc]
[[spring-cloud-stream-modules-file-source]]
include::{stream-apps-root}/{branch}/applications/source/file-source/README.adoc[tags=ref-doc]

View File

@@ -0,0 +1,124 @@
= Debezium Spring Boot Starter
Spring Boot Starter for easy integration of https://debezium.io[Debezium] in Spring Boot applications.
To use it you just add the `cdc-debezium-boot-starter` dependency to your application POM and implement your own `Consumer<SourceRecord>` handler to process the incoming database change events. Follow the link below for further instructions.
https://en.wikipedia.org/wiki/Change_data_capture[Change Data Capture] (CDC) Spring Boot Starter that allows capturing change events from various databases including `MySQL`, `PostgreSQL`, `MongoDB`, `Oracle` and `SQL Server`.
== Quick Start
https://start.spring.io/[Start new Spring Boot project] and add the following starter dependency to activate the CDC:
[source, xml]
----
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debezium-boot-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
----
NOTE: for MySQL you need to adjust the mysql client version to `mysql:mysql-connector-java:8.0.13`.
The CDC starter setups an embedded https://debezium.io/documentation/reference/0.10/connectors/index.html[debezium connector] to capture the database changes.
Then implement a `Consumer<SourceRecord>` bean handler to receive and process the CDC notifications:
[source, java]
----
@Bean
public Consumer<SourceRecord> mySourceRecordConsumer( # <1>
Function<SourceRecord, byte[]> keySerializer, # <2>
Function<SourceRecord, byte[]> valueSerializer) { # <3>
return sourceRecord -> {
System.out.println(sourceRecord.topic() # <4>
+ " : " + new String(keySerializer.apply(sourceRecord)) # <5>
+ " : " + new String(valueSerializer.apply(sourceRecord))); # <6>
};
}
----
<1> CDC event listener that is called on every data change.
<2> Optional keySerializer converter that allows to serialize the keys of the SourceRecord into a compact binary format.
<3> Optional valueSerializer converter (by default JsonConverter) that allows to serialize the SourceRecord into a compact binary format.
<4> The `topic()` provides a logical identifier of the changed data (e.g. cdc-name.DB-name.table-name)
<5> Convert kay to text message using the keySerializer
<6> Convert payload to JSON message using the valueSerializer.
NOTE: you may have to add `@AutoConfigureBefore(CdcAutoConfiguration.class)` to your Boot class to ensure that your record consumer overrides the default handlers.
Above handler will produce output messages like this:
[source, bash]
----
my-app-connector.inventory.addresses : {"id":10,"customer_id":1001,"street":"3183 Moore Avenue","city":"Euless","state":"Texas","zip":"76036","type":"SHIPPING"}
my-app-connector.inventory.customers : {"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}
my-app-connector.inventory.orders : {"order_number":10001,"order_date":16816,"purchaser":1001,"quantity":1,"product_id":102}
my-app-connector.inventory.products : {"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453}
my-app-connector.inventory.products_on_hand : {"product_id":101,"quantity":3}
----
All Debezium connector properties are supported. Just prefix those properties with `cdc.config` prefix.
Following snipped shows how to configure the https://debezium.io/docs/connectors/mysql/[MySqlConnector] to connect to a local MySQL database run via the `debezium/example-mysql` docker image:
[source, bash]
----
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
----
[TIP]
====
(optional) Use `mysql` client to connected to the database and to create a `debezium` user with required credentials:
[source, bash]
----
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
----
====
Configure `cdc-debezium-boot-starter` for consuming cdc events from the MySQL:
[source]
----
cdc.name=my-sql-connector # <1>
cdc.connector=mysql # <2>
cdc.config.database.server.id=85744 # <3>
cdc.config.database.server.name=my-app-connector # <3>
cdc.config.database.user=debezium # <3>
cdc.config.database.password=dbz # <3>
cdc.config.database.hostname=localhost # <3>
cdc.config.database.port=3306 # <3>
cdc.schema=false # <4>
cdc.flattering.enabled=true # <5>
----
<1> Metadata used to identify and dispatch the events received by this cdc consumer instance.
<2> Configures the CDC Source to use https://debezium.io/docs/connectors/mysql/[MySqlConnector]. (equivalent to setting `cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector`).
<3> Connector specific configurations. MySQL server logical name, connect location and access credentials.
<4> Do not serialize the record's schema in the output messages.
<5> Enables the https://debezium.io/docs/configuration/event-flattening/[CDC Event Flattering] feature.
The full list of properties:
==== Options
//tag::configuration-properties[]
$$cdc.config$$:: $$Spring pass-trough wrapper for debezium configuration properties. All properties with a 'cdc.config.' prefix are native Debezium properties. The prefix is removed, converting them into Debezium io.debezium.config.Configuration.$$ *($$Map<String, String>$$, default: `$$<none>$$`)*
$$cdc.connector$$:: $$Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not contradict with each other.$$ *($$ConnectorType$$, default: `$$<none>$$`, possible values: `mysql`,`postgres`,`mongodb`,`oracle`,`sqlserver`)*
$$cdc.flattering.add-fields$$:: $$Comma separated list of metadata fields to add to the flattened message. The fields will be prefixed with "__" or "__[<]struct]__", depending on the specification of the struct.$$ *($$String$$, default: `$$<none>$$`)*
$$cdc.flattering.add-headers$$:: $$Comma separated list specify a list of metadata fields to add to the header of the flattened message. The fields will be prefixed with "__" or "__[struct]__".$$ *($$String$$, default: `$$<none>$$`)*
$$cdc.flattering.delete-handling-mode$$:: $$Options for handling deleted records: (1) none - pass the records through, (2) drop - remove the records and (3) rewrite - add a '__deleted' field to the records.$$ *($$DeleteHandlingMode$$, default: `$$<none>$$`, possible values: `drop`,`rewrite`,`none`)*
$$cdc.flattering.drop-tombstones$$:: $$By default Debezium generates tombstone records to enable Kafka compaction on deleted records. The dropTombstones can suppress the tombstone records.$$ *($$Boolean$$, default: `$$true$$`)*
$$cdc.flattering.enabled$$:: $$Enable flattering the source record events (https://debezium.io/docs/configuration/event-flattening).$$ *($$Boolean$$, default: `$$true$$`)*
$$cdc.name$$:: $$Unique name for this sourceConnector instance.$$ *($$String$$, default: `$$<none>$$`)*
$$cdc.offset.commit-timeout$$:: $$Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.$$ *($$Duration$$, default: `$$5000ms$$`)*
$$cdc.offset.flush-interval$$:: $$Interval at which to try committing offsets. The default is 1 minute.$$ *($$Duration$$, default: `$$60000ms$$`)*
$$cdc.offset.policy$$:: $$Offset storage commit policy.$$ *($$OffsetPolicy$$, default: `$$<none>$$`)*
$$cdc.offset.storage$$:: $$Kafka connector tracks the number processed records and regularly stores the count (as "offsets") in a preconfigured metadata storage. On restart the connector resumes the reading from the last recorded source offset.$$ *($$OffsetStorageType$$, default: `$$<none>$$`, possible values: `memory`,`file`,`kafka`,`metadata`)*
$$cdc.schema$$:: $$Include the schema's as part of the outbound message.$$ *($$Boolean$$, default: `$$false$$`)*
//end::configuration-properties[]

View File

@@ -0,0 +1,175 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cdc-debezium-boot-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>cdc-debezium-boot-starter</name>
<description>Change Data Capture (CDC) Debezium Boot Starter</description>
<parent>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-functions-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../spring-functions-parent</relativePath>
</parent>
<properties>
<version.debezium>1.2.1.Final</version.debezium>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debezium-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<exclusions>
<exclusion>
<artifactId>mysql-connector-java</artifactId>
<groupId>mysql</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.debezium</groupId>-->
<!-- <artifactId>debezium-connector-mongodb</artifactId>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- <version>${version.debezium}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.33.0</version>
<configuration>
<images>
<image>
<alias>mysql</alias>
<name>debezium/example-mysql:1.0</name>
<run>
<env>
<MYSQL_ROOT_PASSWORD>debezium</MYSQL_ROOT_PASSWORD>
<MYSQL_USER>mysqluser</MYSQL_USER>
<MYSQL_PASSWORD>mysqlpw</MYSQL_PASSWORD>
</env>
<ports>
<port>3306:3306</port>
</ports>
<wait>
<log>port: 3306</log>
<time>30000</time>
</wait>
</run>
</image>
</images>
</configuration>
<executions>
<execution>
<id>start</id>
<!--<phase>pre-integration-test</phase>-->
<phase>generate-test-resources</phase>
<goals>
<goal>build</goal>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
<execution>
<id>stop-pre</id>
<phase>clean</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* @author Christian Tzolov
*/
@Configuration
@Import(CdcCommonConfiguration.class)
public class CdcAutoConfiguration {
private static final Log logger = LogFactory.getLog(CdcAutoConfiguration.class);
@Bean
@ConditionalOnMissingBean
public Consumer<SourceRecord> defaultSourceRecordConsumer() {
return sourceRecord -> logger.info("[CDC Event]: " + ((sourceRecord == null) ? "null" : sourceRecord.toString()));
}
@Bean
public EmbeddedEngineExecutorService embeddedEngine(EmbeddedEngine.Builder embeddedEngineBuilder,
Consumer<SourceRecord> sourceRecordConsumer, Function<SourceRecord, SourceRecord> recordFlattering) {
EmbeddedEngine embeddedEngine = embeddedEngineBuilder
.notifying(sourceRecord -> sourceRecordConsumer.accept(recordFlattering.apply(sourceRecord)))
.build();
return new EmbeddedEngineExecutorService(embeddedEngine) {
@PostConstruct
@Override
public void start() {
super.start();
}
@PreDestroy
@Override
public void close() {
super.close();
}
};
}
}

View File

@@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.fn.common.cdc.CdcAutoConfiguration

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc;
import org.junit.Test;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.test.jdbc.JdbcTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Christian Tzolov
*/
public class CdcBootStarterIntegrationTest {
private final JdbcTemplate jdbcTemplate = jdbcTemplate(
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://localhost:3306/inventory",
"root",
"debezium");
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(TestCdcApplication.class)
.withPropertyValues(
"cdc.name=my-sql-connector",
"cdc.schema=false",
"cdc.flattering.enabled=true",
"cdc.stream.header.offset=true",
"cdc.connector=mysql",
"cdc.config.database.user=debezium",
"cdc.config.database.password=dbz",
"cdc.config.database.hostname=localhost",
"cdc.config.database.port=3306",
"cdc.config.database.server.id=85744",
"cdc.config.database.server.name=my-app-connector",
"cdc.config.database.history=io.debezium.relational.history.MemoryDatabaseHistory");
@Test
public void consumerTest() {
contextRunner
.withPropertyValues(
"cdc.flattering.deleteHandlingMode=drop",
"cdc.flattering.dropTombstones=true")
.run(context -> {
TestCdcApplication.TestSourceRecordConsumer testConsumer =
context.getBean(TestCdcApplication.TestSourceRecordConsumer.class);
jdbcTemplate.update("insert into `customers`(`first_name`,`last_name`,`email`) VALUES('Test666', 'Test666', 'Test666@spring.org')");
JdbcTestUtils.deleteFromTableWhere(jdbcTemplate, "customers", "first_name = ?", "Test666");
Thread.sleep(10000);
assertThat(testConsumer.recordList).hasSize(53);
});
}
public static JdbcTemplate jdbcTemplate(String jdbcDriver, String jdbcUrl, String user, String password) {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(jdbcDriver);
dataSource.setUrl(jdbcUrl);
dataSource.setUsername(user);
dataSource.setPassword(password);
return new JdbcTemplate(dataSource);
}
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.metadata.SimpleMetadataStore;
/**
* @author Christian Tzolov
*/
@SpringBootConfiguration
@EnableAutoConfiguration
public class TestCdcApplication {
@Bean
public Consumer<SourceRecord> mySourceRecordConsumer(Function<SourceRecord, byte[]> valueSerializer,
Function<SourceRecord, byte[]> keySerializer) {
return new TestSourceRecordConsumer(valueSerializer, keySerializer);
}
@Bean
public SimpleMetadataStore simpleMetadataStore() {
return new SimpleMetadataStore();
}
public static class TestSourceRecordConsumer implements Consumer<SourceRecord> {
private final Function<SourceRecord, byte[]> valueSerializer;
private final Function<SourceRecord, byte[]> keySerializer;
public Map<Object, Object> keyValue = new HashMap<>();
public List<SourceRecord> recordList = new ArrayList<>();
public TestSourceRecordConsumer(Function<SourceRecord, byte[]> valueSerializer,
Function<SourceRecord, byte[]> keySerializer) {
this.valueSerializer = valueSerializer;
this.keySerializer = keySerializer;
}
@Override
public void accept(SourceRecord sourceRecord) {
if (sourceRecord != null) { // ignore null records
recordList.add(sourceRecord);
Object payload = valueSerializer.apply(sourceRecord);
Object key = keySerializer.apply(sourceRecord);
keyValue.put(key, payload);
System.out.println("[CDC Event]: " + sourceRecord.toString());
}
}
}
}

View File

@@ -0,0 +1,117 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cdc-debezium-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>cdc-debezium-common</name>
<description>Change Data Capture (CDC) Debezium Common</description>
<parent>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-functions-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../spring-functions-parent</relativePath>
</parent>
<properties>
<version.debezium>1.2.1.Final</version.debezium>
<!-- Note: postgresql version MUST match the version used by the Debezium postgres connector -->
<postgresql.version>42.2.5</postgresql.version>
<spring-jdbc.version>5.2.1.RELEASE</spring-jdbc.version>
<!-- Debezium requires a particular mysql-connector version -->
<mysql-connector-java>5.1.46</mysql-connector-java>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<exclusions>
<exclusion>
<artifactId>mysql-connector-java</artifactId>
<groupId>mysql</groupId>
</exclusion>
</exclusions>
<optional>true</optional>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<optional>true</optional>
<version>${mysql-connector-java}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<optional>true</optional>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<optional>true</optional>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<optional>true</optional>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<optional>true</optional>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>metadata-store-common</artifactId>
<version>${project.version}</version>
</dependency>
<!-- -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>config-common</artifactId>
<version>${spring-cloud-fn.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,131 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import io.debezium.transforms.ExtractNewRecordState;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.util.StringUtils;
/**
* @author Christian Tzolov
*/
@Configuration
@EnableConfigurationProperties(CdcCommonProperties.class)
@Import(CdcOffsetBackingStoreConfiguration.class)
public class CdcCommonConfiguration {
private static final Log logger = LogFactory.getLog(CdcCommonConfiguration.class);
@Bean
public io.debezium.config.Configuration configuration(CdcCommonProperties properties) {
Map<String, String> configMap = properties.getConfig();
return io.debezium.config.Configuration.from(configMap);
}
@Bean
public Function<SourceRecord, SourceRecord> recordFlattering(CdcCommonProperties properties,
ExtractNewRecordState extractNewRecordState) {
return sourceRecord -> properties.getFlattering().isEnabled() ?
(SourceRecord) extractNewRecordState.apply(sourceRecord) : sourceRecord;
}
@Bean
public ExtractNewRecordState extractNewRecordState(CdcCommonProperties properties) {
ExtractNewRecordState extractNewRecordState = new ExtractNewRecordState();
Map<String, Object> config = extractNewRecordState.config().defaultValues();
config.put("drop.tombstones", properties.getFlattering().isDropTombstones());
config.put("delete.handling.mode", properties.getFlattering().getDeleteHandlingMode().name());
if (!StringUtils.isEmpty(properties.getFlattering().getAddHeaders())) {
config.put("add.headers", properties.getFlattering().getAddHeaders());
}
if (!StringUtils.isEmpty(properties.getFlattering().getAddFields())) {
config.put("add.fields", properties.getFlattering().getAddFields());
}
extractNewRecordState.configure(config);
return extractNewRecordState;
}
@Bean
@ConditionalOnMissingBean
public JsonConverter jsonConverter(CdcCommonProperties properties) {
JsonConverter jsonConverter = new JsonConverter();
jsonConverter.configure(Collections.singletonMap("schemas.enable", properties.isSchema()), false);
return jsonConverter;
}
@Bean
public Function<SourceRecord, byte[]> valueSerializer(Converter valueConverter) {
return sourceRecord -> valueConverter.fromConnectData(
sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value());
}
@Bean
public Function<SourceRecord, byte[]> keySerializer(Converter valueConverter) {
return sourceRecord -> valueConverter.fromConnectData(
sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key());
}
@Bean
public EmbeddedEngine.Builder embeddedEngineBuilder(CdcCommonProperties properties,
OffsetBackingStore offsetBackingStore) {
if (!properties.getConfig().containsKey("connector.class")) {
properties.getConfig().put("connector.class", properties.getConnector().connectorClass);
}
if (!properties.getConfig().containsKey("name")) {
properties.getConfig().put("name", properties.getName());
}
if (!properties.getConfig().containsKey("offset.flush.interval.ms")) {
properties.getConfig().put("offset.flush.interval.ms", properties.getOffset().getFlushInterval().toMillis() + "");
}
if (!properties.getConfig().containsKey("offset.flush.timeout.ms")) {
properties.getConfig().put("offset.flush.timeout.ms", properties.getOffset().getCommitTimeout().toMillis() + "");
}
if (!properties.getConfig().containsKey("offset.commit.policy")) {
properties.getConfig().put("offset.commit.policy", properties.getOffset().getPolicy().policyClass);
}
if (!properties.getConfig().containsKey("offset.storage")) {
properties.getConfig().put("offset.storage", properties.getOffset().getStorage().offsetStorageClass);
}
return EmbeddedEngine.create()
.using(io.debezium.config.Configuration.from(properties.getConfig()))
.offsetBackingStore(offsetBackingStore);
}
}

View File

@@ -0,0 +1,312 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
*
* @author Christian Tzolov
*/
@ConfigurationProperties("cdc")
@Validated
public class CdcCommonProperties {
/**
* Unique name for this sourceConnector instance.
*/
@NotEmpty
private String name;
/**
* Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not
* contradict with each other.
*/
@NotNull
private ConnectorType connector = null;
private final Offset offset = new Offset();
/**
* Include the schema's as part of the outbound message.
*/
private boolean schema = false;
/**
* Event Flattering (https://debezium.io/docs/configuration/event-flattening).
*/
private final Flattering flattering = new Flattering();
/**
* Spring pass-trough wrapper for debezium configuration properties.
* All properties with a 'cdc.config.' prefix are native Debezium properties.
* The prefix is removed, converting them into Debezium io.debezium.config.Configuration.
*/
private Map<String, String> config = defaultConfig();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Offset getOffset() {
return offset;
}
public Flattering getFlattering() {
return flattering;
}
public Map<String, String> getConfig() {
return config;
}
public boolean isSchema() {
return schema;
}
public void setSchema(boolean schema) {
this.schema = schema;
}
private Map<String, String> defaultConfig() {
Map<String, String> defaultConfig = new HashMap<>();
defaultConfig.put("database.history", "io.debezium.relational.history.MemoryDatabaseHistory");
//defaultConfig.put("offset.flush.interval.ms", "60000");
return defaultConfig;
}
public ConnectorType getConnector() {
return connector;
}
public void setConnector(ConnectorType connector) {
this.connector = connector;
}
@AssertTrue
public boolean connectorIsSet() {
return this.getConnector() != null || this.getConfig().containsKey("connector.class");
}
public static class Offset {
/**
* Interval at which to try committing offsets. The default is 1 minute.
*/
private Duration flushInterval = Duration.ofMillis(60000);
/**
* Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to
* offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.
*/
private Duration commitTimeout = Duration.ofMillis(5000);
/**
* Offset storage commit policy.
*/
private OffsetPolicy policy = OffsetPolicy.periodic;
/**
* Kafka connector tracks the number processed records and regularly stores the count (as "offsets") in a
* preconfigured metadata storage. On restart the connector resumes the reading from the last recorded source offset.
*/
private OffsetStorageType storage = OffsetStorageType.metadata;
public enum OffsetPolicy {
/** periodic. */
periodic("io.debezium.embedded.spi.OffsetCommitPolicy$PeriodicCommitOffsetPolicy"),
/** always. */
always("OffsetCommitPolicy.AlwaysCommitOffsetPolicy.class.getName()");
/** policy. */
public final String policyClass;
OffsetPolicy(String policyClassName) {
this.policyClass = policyClassName;
}
}
public Duration getFlushInterval() {
return flushInterval;
}
public void setFlushInterval(Duration flushInterval) {
this.flushInterval = flushInterval;
}
public Duration getCommitTimeout() {
return commitTimeout;
}
public void setCommitTimeout(Duration commitTimeout) {
this.commitTimeout = commitTimeout;
}
public OffsetPolicy getPolicy() {
return policy;
}
public void setPolicy(OffsetPolicy policy) {
this.policy = policy;
}
public OffsetStorageType getStorage() {
return storage;
}
public void setStorage(OffsetStorageType storage) {
this.storage = storage;
}
}
public enum OffsetStorageType {
/** memory offset storage type. */
memory("org.apache.kafka.connect.storage.MemoryOffsetBackingStore"),
/** File offset storage type. */
file("org.apache.kafka.connect.storage.FileOffsetBackingStore"),
/** Kafka offset storage type. */
kafka("org.apache.kafka.connect.storage.KafkaOffsetBackingStore"),
/** Metadata offset storage type. */
metadata("org.springframework.cloud.stream.app.cdc.common.core.store.MetadataStoreOffsetBackingStore");
/** Class name of the Offset Storage. */
public final String offsetStorageClass;
OffsetStorageType(String type) {
this.offsetStorageClass = type;
}
}
public enum ConnectorType {
/** MySql connector type. */
mysql("io.debezium.connector.mysql.MySqlConnector"),
/** Postgres connector type. */
postgres("io.debezium.connector.postgresql.PostgresConnector"),
/** Mongodb connector type. */
mongodb("io.debezium.connector.mongodb.MongoDbConnector"),
/** Oracle connector type. */
oracle("io.debezium.connector.oracle.OracleConnector"),
/** SqlServer connector type. */
sqlserver("io.debezium.connector.sqlserver.SqlServerConnector");
/** Connector class name. */
public final String connectorClass;
ConnectorType(String type) {
this.connectorClass = type;
}
}
public enum DeleteHandlingMode {
/** records are removed. */
drop,
/** add a __deleted column with true/false values based on record operation. */
rewrite,
/** pass delete events. */
none
}
/**
* https://debezium.io/documentation/reference/0.10/configuration/event-flattening.html .
* https://debezium.io/documentation/reference/0.10/configuration/event-flattening.html#configuration_options
*/
public static class Flattering {
/**
* Enable flattering the source record events (https://debezium.io/docs/configuration/event-flattening).
*/
private boolean enabled = true;
/**
* By default Debezium generates tombstone records to enable Kafka compaction on deleted records.
* The dropTombstones can suppress the tombstone records.
*/
private boolean dropTombstones = true;
/**
* Options for handling deleted records: (1) none - pass the records through, (2) drop - remove the records and
* (3) rewrite - add a '__deleted' field to the records.
*/
private DeleteHandlingMode deleteHandlingMode = DeleteHandlingMode.drop;
/**
* Comma separated list of metadata fields to add to the flattened message.
* The fields will be prefixed with "__" or "__[<]struct]__", depending on the specification of the struct.
*/
private String addFields = null;
/**
* Comma separated list specify a list of metadata fields to add to the header of the flattened message.
* The fields will be prefixed with "__" or "__[struct]__".
*/
private String addHeaders = null;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public boolean isDropTombstones() {
return dropTombstones;
}
public void setDropTombstones(boolean dropTombstones) {
this.dropTombstones = dropTombstones;
}
public DeleteHandlingMode getDeleteHandlingMode() {
return deleteHandlingMode;
}
public void setDeleteHandlingMode(DeleteHandlingMode deleteHandlingMode) {
this.deleteHandlingMode = deleteHandlingMode;
}
public String getAddFields() {
return addFields;
}
public void setAddFields(String addFields) {
this.addFields = addFields;
}
public String getAddHeaders() {
return addHeaders;
}
public void setAddHeaders(String addHeaders) {
this.addHeaders = addHeaders;
}
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.fn.common.cdc.store.MetadataStoreOffsetBackingStore;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.metadata.MetadataStore;
/**
* @author Christian Tzolov
*/
@Configuration
public class CdcOffsetBackingStoreConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnExpression("'${cdc.config.offset.storage}'.equalsIgnoreCase('org.springframework.cloud.stream.app.cdc.common.core.store.MetadataStoreOffsetBackingStore') " +
"or '${cdc.offset.storage:metadata}'.equals('metadata')")
public OffsetBackingStore metadataStoreOffsetBackingStore(MetadataStore metadataStore) {
return new MetadataStoreOffsetBackingStore(metadataStore);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnExpression("'${cdc.config.offset.storage}'.equalsIgnoreCase('org.apache.kafka.connect.storage.FileOffsetBackingStore') " +
"or '${cdc.offset.storage:metadata}'.equalsIgnoreCase('file')")
public OffsetBackingStore fileOffsetBackingStore() {
return new FileOffsetBackingStore();
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnExpression("'${cdc.config.offset.storage}'.equalsIgnoreCase('org.apache.kafka.connect.storage.KafkaOffsetBackingStore') " +
"or '${cdc.offset.storage:metadata}'.equalsIgnoreCase('kafka')")
public OffsetBackingStore kafkaOffsetBackingStore() {
return new KafkaOffsetBackingStore();
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnExpression("'${cdc.config.offset.storage}'.equalsIgnoreCase('org.apache.kafka.connect.storage.MemoryOffsetBackingStore') " +
"or '${cdc.offset.storage:metadata}'.equalsIgnoreCase('memory')")
public OffsetBackingStore memoryOffsetBackingStore() {
return new MemoryOffsetBackingStore();
}
}

View File

@@ -0,0 +1,52 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author Christian Tzolov
*/
public class EmbeddedEngineExecutorService implements Closeable {
private static final Log logger = LogFactory.getLog(EmbeddedEngineExecutorService.class);
private final EmbeddedEngine engine;
private final ExecutorService executor;
public EmbeddedEngineExecutorService(EmbeddedEngine engine) {
this.engine = engine;
this.executor = Executors.newSingleThreadExecutor();
}
public void start() {
logger.info("Start Embedded Engine");
this.executor.execute(this.engine);
}
@Override
public void close() {
logger.info("Stop Embedded Engine");
this.engine.stop();
this.executor.shutdown();
}
}

View File

@@ -0,0 +1,90 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.common.cdc.store;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.springframework.integration.metadata.MetadataStore;
/**
* @author Christian Tzolov
*/
public class MetadataStoreOffsetBackingStore extends MemoryOffsetBackingStore {
private MetadataStore metadataStore;
private String offsetStoreName;
public MetadataStoreOffsetBackingStore(MetadataStore metadataStore) {
this.metadataStore = metadataStore;
}
@Override
public void configure(WorkerConfig config) {
super.configure(config);
this.offsetStoreName = config.getString(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG);
}
@Override
public synchronized void start() {
super.start();
//log.info("Starting FileOffsetBackingStore with file {}", file);
load();
}
@Override
public synchronized void stop() {
super.stop();
// Nothing to do since this doesn't maintain any outstanding connections/data
// log.info("Stopped FileOffsetBackingStore");
}
private void load() {
if (this.metadataStore.get("keys") != null) {
String[] keys = this.metadataStore.get("keys").split(",");
for (String keySting : keys) {
String valueString = this.metadataStore.get(keySting);
ByteBuffer key = ByteBuffer.wrap(keySting.getBytes());
ByteBuffer value = ByteBuffer.wrap(valueString.getBytes());
this.data.put(key, value);
}
}
}
@Override
protected void save() {
List<String> keys = new ArrayList<>();
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : this.data.entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
if (key != null && value != null) {
keys.add(new String(key, StandardCharsets.UTF_8));
this.metadataStore.put(
new String(key, StandardCharsets.UTF_8),
new String(value, StandardCharsets.UTF_8));
}
}
this.metadataStore.put("keys", String.join(",", keys));
}
}

View File

@@ -0,0 +1,2 @@
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration
spring.cloud.stream.kafka.default.producer.messageKeyExpression=headers['cdc_key']

View File

@@ -15,6 +15,11 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debezium-supplier</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>file-supplier</artifactId>
@@ -244,4 +249,4 @@
</dependencies>
</dependencyManagement>
</project>
</project>

View File

@@ -51,6 +51,8 @@
<module>common/tcp-common</module>
<module>common/twitter-common</module>
<module>common/tensorflow-common</module>
<module>common/cdc-debezium-common</module>
<module>common/cdc-debezium-boot-starter</module>
<module>consumer/cassandra-consumer</module>
<module>consumer/analytics-consumer</module>
@@ -98,6 +100,7 @@
<module>supplier/websocket-supplier</module>
<module>supplier/s3-supplier</module>
<module>supplier/twitter-supplier</module>
<module>supplier/cdc-debezium-supplier</module>
<module>spring-functions-parent</module>
<module>function-dependencies</module>

View File

@@ -38,4 +38,4 @@
</dependencies>
</dependencyManagement>
</project>
</project>

View File

@@ -0,0 +1,45 @@
# CDC Debezium Supplier
This module provides a CDC Debezium supplier that can be reused and composed in other applications.
The `Supplier` uses the `Debezium` library.
`cdcSupplier` is implemented as a `java.util.function.Supplier`.
This supplier gives you a reactive stream from CDC sources. The supplier has a signature of `Supplier<Flux<Message<?>>>`.
Users have to subscribe to this `Flux` and then receive the data.
https://en.wikipedia.org/wiki/Change_data_capture[Change Data Capture] (CDC) `source` that captures and streams change events from various databases.
Currently, it supports `MySQL`, `PostgreSQL`, `MongoDB`, `Oracle` and `SQL Server` databases.
Build upon https://debezium.io/docs/embedded/[Debezium Embedded Connector], the `CDC Source` allows capturing and streaming database changes over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.
## Beans for injection
You can import the `CdcSupplierConfiguration` in the application and then inject the following bean.
`cdcSupplier`
You need to inject this as `Supplier<Flux<Message<?>>>`.
You can use `cdcSupplier` as a qualifier when injecting.
Once injected, you can use the `get` method of the `Supplier` to invoke it and then subscribe to the returned `Flux`.
## Configuration Options
All configuration properties are prefixed with `cdc`.
Use the `cdc.config.` prefix to inject native, Debezium properties. Convenient shortcuts for the most frequently used
Debezium properties are available.
Uses `BackingOffsetStore` configuration, based on the [MetadataStore](https://github.com/spring-cloud/stream-applications/tree/master/functions/common/metadata-store-common) service.
Later provides various microservices friendly ways for storing the offset metadata.
For more information on the various options available, please see:
* link:../../common/cdc-debezium-common/src/main/java/org/springframework/cloud/fn/common/cdc/CdcCommonProperties.java[CdcCommonProperties].
* link:src/main/java/org/springframework/cloud/fn/supplier/cdc-debezium-supplier/CdcSupplierProperties.java[CdcSupplierProperties].
## Examples
See this link:../../../applications/source/cdc-debezium-source/src/test/java/org/springframework/cloud/stream/app/source/cdc[test suite] for the various ways, this supplier is used.
## Other usage
See this link:../../../applications/source/cdc-debezium-source/README.adoc[README] where this supplier is used to create a Spring Cloud Stream CDC Source.
Also check link:../../common/cdc-debezium-boot-starter/README.adoc[README] to see how the core CDC can be used a Spring Boot Starter.

View File

@@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cdc-debezium-supplier</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>cdc-debezium-supplier</name>
<description>CDC Debezium Suppliers</description>
<parent>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-functions-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../spring-functions-parent</relativePath>
</parent>
<properties>
<version.debezium>1.2.1.Final</version.debezium>
<mysql.version>8.0.13</mysql.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>cdc-debezium-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<exclusions>
<exclusion>
<artifactId>mysql-connector-java</artifactId>
<groupId>mysql</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<version>${version.debezium}</version>
</dependency>
<!-- -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,171 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.supplier.cdc;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.cdc.CdcCommonConfiguration;
import org.springframework.cloud.fn.common.cdc.EmbeddedEngine;
import org.springframework.cloud.fn.common.cdc.EmbeddedEngineExecutorService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeTypeUtils;
/**
* CDC source that uses the Debezium Connectors to monitor and record all of the row-level changes in the databases.
* https://debezium.io/docs/connectors
*
* @author Christian Tzolov
*/
@EnableConfigurationProperties(CdcSupplierProperties.class)
@Import(CdcCommonConfiguration.class)
public class CdcSupplierConfiguration implements BeanClassLoaderAware {
private static final Log logger = LogFactory.getLog(CdcSupplierConfiguration.class);
/**
* ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL.
*/
public static final String ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL = "org.springframework.kafka.support.KafkaNull";
private Object kafkaNull = null;
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
try {
Class<?> clazz = ClassUtils.forName(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL, classLoader);
Field field = clazz.getDeclaredField("INSTANCE");
this.kafkaNull = field.get(null);
}
catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) {
}
}
@Bean
public Supplier<Flux<Message<?>>> cdcSupplier(EmbeddedEngineExecutorService embeddedEngineExecutorService) {
return () -> emitterProcessor
.doOnSubscribe(subscription -> {
embeddedEngineExecutorService.start();
})
.doAfterTerminate(() -> {
logger.info("Proactive shutdown");
embeddedEngineExecutorService.close();
})
.doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
}
private EmitterProcessor<Message<?>> emitterProcessor = EmitterProcessor.create(1, false);
@Bean
public EmbeddedEngineExecutorService embeddedEngineExecutorService(
EmbeddedEngine.Builder embeddedEngineBuilder,
Function<SourceRecord, byte[]> valueSerializer, Function<SourceRecord, byte[]> keySerializer,
Function<SourceRecord, SourceRecord> recordFlattering,
ObjectMapper mapper, CdcSupplierProperties cdcStreamingEngineProperties) {
FluxSink<Message<?>> sink = emitterProcessor.sink();
Consumer<SourceRecord> messageConsumer = sourceRecord -> {
// When cdc.flattering.deleteHandlingMode=none and cdc.flattering.dropTombstones=false
// then on deletion event an additional sourceRecord is sent with value Null.
// Here we filter out such condition.
if (sourceRecord == null) {
logger.debug("Ignore disabled tombstone events");
return;
}
Object cdcJsonPayload = valueSerializer.apply(sourceRecord);
// When the tombstone event is enabled, Debezium serializes the payload to null (e.g. empty payload)
// while the metadata information is carried through the headers (cdc_key).
// Note: Event for none flattered responses, when the cdc.config.tombstones.on.delete=true (default),
// tombstones are generate by Debezium and handled by the code below.
if (cdcJsonPayload == null) {
cdcJsonPayload = this.kafkaNull;
}
// If payload is still null ignore the message.
if (cdcJsonPayload == null) {
logger.info("dropped null payload message");
return;
}
byte[] key = keySerializer.apply(sourceRecord);
if (key == null) {
logger.warn("Null serialised key for sourceRecord: " + sourceRecord);
key = new byte[0];
}
MessageBuilder<?> messageBuilder = MessageBuilder
.withPayload(cdcJsonPayload)
.setHeader("cdc_key", new String(key))
.setHeader("cdc_topic", sourceRecord.topic())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE);
if (cdcStreamingEngineProperties.getHeader().isConvertConnectHeaders()) {
// Convert the Connect Headers into Message Headers.
if (sourceRecord.headers() != null && !sourceRecord.headers().isEmpty()) {
Iterator<Header> itr = sourceRecord.headers().iterator();
while (itr.hasNext()) {
Header header = itr.next();
messageBuilder.setHeader(header.key(), header.value());
}
}
}
if (cdcStreamingEngineProperties.getHeader().isOffset()) {
try {
messageBuilder.setHeader("cdc_offset",
mapper.writeValueAsString(sourceRecord.sourceOffset()));
}
catch (JsonProcessingException e) {
logger.warn("Failed to record cdc_offset header", e);
}
}
sink.next(messageBuilder.build());
};
EmbeddedEngine engine = embeddedEngineBuilder
.notifying(record -> messageConsumer.accept(recordFlattering.apply(record)))
.build();
return new EmbeddedEngineExecutorService(engine);
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.supplier.cdc;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
*
* @author Christian Tzolov
*/
@ConfigurationProperties("cdc.stream")
@Validated
public class CdcSupplierProperties {
/**
* Control what information to be serialized in the outbound message headers.
*/
private Header header = new Header();
public Header getHeader() {
return header;
}
public static class Header {
/**
* Serializes the source record's offset metadata into the outbound message header under cdc.offset.
*/
private boolean offset = false;
/**
* When true the {@link org.apache.kafka.connect.header.Header} are converted into message headers with the
* {@link org.apache.kafka.connect.header.Header#key()} as name and {@link org.apache.kafka.connect.header.Header#value()}.
*/
private boolean convertConnectHeaders = true;
public boolean isOffset() {
return offset;
}
public void setOffset(boolean offset) {
this.offset = offset;
}
public boolean isConvertConnectHeaders() {
return convertConnectHeaders;
}
public void setConvertConnectHeaders(boolean convertConnectHeaders) {
this.convertConnectHeaders = convertConnectHeaders;
}
}
}

View File

@@ -0,0 +1,2 @@
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration
spring.cloud.stream.kafka.default.producer.messageKeyExpression=headers['cdc_key']