@@ -17,7 +17,7 @@
|
||||
package org.springframework.cloud.schema.registry.client;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -40,6 +40,7 @@ import org.springframework.web.client.RestTemplate;
|
||||
* @author Vinicius Carvalho
|
||||
* @author Marius Bogoevici
|
||||
* @author Jon Archer
|
||||
* @author Tengzhou Dong
|
||||
*/
|
||||
public class ConfluentSchemaRegistryClient implements SchemaRegistryClient {
|
||||
|
||||
@@ -72,28 +73,30 @@ public class ConfluentSchemaRegistryClient implements SchemaRegistryClient {
|
||||
|
||||
@Override
|
||||
public SchemaRegistrationResponse register(String subject, String format,
|
||||
String schema) {
|
||||
String schema) {
|
||||
Assert.isTrue("avro".equals(format), "Only Avro is supported");
|
||||
String path = String.format("/subjects/%s/versions", subject);
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.put("Accept", ACCEPT_HEADERS);
|
||||
headers.add("Content-Type", "application/json");
|
||||
Integer version = null;
|
||||
Integer id = null;
|
||||
String payload = null;
|
||||
Map<String, String> maps = new HashMap<>();
|
||||
maps.put("subject", subject);
|
||||
maps.put("format", format);
|
||||
maps.put("definition", schema);
|
||||
try {
|
||||
payload = this.mapper
|
||||
.writeValueAsString(Collections.singletonMap("schema", schema));
|
||||
payload = this.mapper.writeValueAsString(maps);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new RuntimeException("Could not parse schema, invalid JSON format", e);
|
||||
}
|
||||
try {
|
||||
HttpEntity<String> request = new HttpEntity<>(payload, headers);
|
||||
ResponseEntity<Map> response = this.template.exchange(this.endpoint + path,
|
||||
ResponseEntity<Map> response = this.template.exchange(this.endpoint,
|
||||
HttpMethod.POST, request, Map.class);
|
||||
id = (Integer) response.getBody().get("id");
|
||||
version = getSubjectVersion(subject, payload);
|
||||
version = (Integer) ((Map) response.getBody()).get("version");
|
||||
}
|
||||
catch (HttpStatusCodeException httpException) {
|
||||
throw new RuntimeException(String.format(
|
||||
@@ -107,39 +110,10 @@ public class ConfluentSchemaRegistryClient implements SchemaRegistryClient {
|
||||
return schemaRegistrationResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Confluent register API returns the id, but we need the version of a given schema
|
||||
* subject. After a successful registration we can inquire the server to get the
|
||||
* version of a schema
|
||||
* @param subject the schema subject
|
||||
* @param payload payload to send
|
||||
* @return the version of the returned schema
|
||||
*/
|
||||
private Integer getSubjectVersion(String subject, String payload) {
|
||||
String path = String.format("/subjects/%s", subject);
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.put("Accept", ACCEPT_HEADERS);
|
||||
headers.add("Content-Type", "application/json");
|
||||
Integer version = null;
|
||||
try {
|
||||
|
||||
HttpEntity<String> request = new HttpEntity<>(payload, headers);
|
||||
ResponseEntity<Map> response = this.template.exchange(this.endpoint + path,
|
||||
HttpMethod.POST, request, Map.class);
|
||||
version = (Integer) response.getBody().get("version");
|
||||
}
|
||||
catch (HttpStatusCodeException httpException) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Failed to register subject %s, server replied with status %d",
|
||||
subject, httpException.getStatusCode().value()), httpException);
|
||||
}
|
||||
return version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String fetch(SchemaReference schemaReference) {
|
||||
String path = String.format("/subjects/%s/versions/%d",
|
||||
schemaReference.getSubject(), schemaReference.getVersion());
|
||||
String path = String.format("/%s/%s/v%d",
|
||||
schemaReference.getSubject(), schemaReference.getFormat(), schemaReference.getVersion());
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.put("Accept", ACCEPT_HEADERS);
|
||||
headers.add("Content-Type", "application/vnd.schemaregistry.v1+json");
|
||||
@@ -162,7 +136,7 @@ public class ConfluentSchemaRegistryClient implements SchemaRegistryClient {
|
||||
|
||||
@Override
|
||||
public String fetch(int id) {
|
||||
String path = String.format("/schemas/ids/%d", id);
|
||||
String path = String.format("/schemas/%d", id);
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.put("Accept", ACCEPT_HEADERS);
|
||||
headers.add("Content-Type", "application/vnd.schemaregistry.v1+json");
|
||||
|
||||
@@ -40,6 +40,7 @@ import static org.springframework.test.web.client.response.MockRestResponseCreat
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
* @author TengZhou Dong
|
||||
*/
|
||||
public class ConfluentSchemaRegistryClientTests {
|
||||
|
||||
@@ -57,18 +58,11 @@ public class ConfluentSchemaRegistryClientTests {
|
||||
@Test
|
||||
public void registerSchema() throws Exception {
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user/versions"))
|
||||
.expect(requestTo("http://localhost:8081"))
|
||||
.andExpect(method(HttpMethod.POST))
|
||||
.andExpect(header("Content-Type", "application/json"))
|
||||
.andExpect(header("Accept", "application/vnd.schemaregistry.v1+json"))
|
||||
.andRespond(withSuccess("{\"id\":101}", MediaType.APPLICATION_JSON));
|
||||
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user"))
|
||||
.andExpect(method(HttpMethod.POST))
|
||||
.andExpect(header("Content-Type", "application/json"))
|
||||
.andExpect(header("Accept", "application/vnd.schemaregistry.v1+json"))
|
||||
.andRespond(withSuccess("{\"version\":1}", MediaType.APPLICATION_JSON));
|
||||
.andRespond(withSuccess("{\"id\":101,\"version\":1}", MediaType.APPLICATION_JSON));
|
||||
|
||||
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient(
|
||||
this.restTemplate);
|
||||
@@ -81,7 +75,7 @@ public class ConfluentSchemaRegistryClientTests {
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void registerWithInvalidJson() {
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user/versions"))
|
||||
.expect(requestTo("http://localhost:8081"))
|
||||
.andExpect(method(HttpMethod.POST))
|
||||
.andExpect(header("Content-Type", "application/json"))
|
||||
.andExpect(header("Accept", "application/vnd.schemaregistry.v1+json"))
|
||||
@@ -94,7 +88,7 @@ public class ConfluentSchemaRegistryClientTests {
|
||||
@Test
|
||||
public void registerIncompatibleSchema() {
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user/versions"))
|
||||
.expect(requestTo("http://localhost:8081"))
|
||||
.andExpect(method(HttpMethod.POST))
|
||||
.andExpect(header("Content-Type", "application/json"))
|
||||
.andExpect(header("Accept", "application/vnd.schemaregistry.v1+json"))
|
||||
@@ -113,39 +107,10 @@ public class ConfluentSchemaRegistryClientTests {
|
||||
this.mockRestServiceServer.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void responseErrorFetch() {
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user/versions"))
|
||||
.andExpect(method(HttpMethod.POST))
|
||||
.andExpect(header("Content-Type", "application/json"))
|
||||
.andExpect(header("Accept", "application/vnd.schemaregistry.v1+json"))
|
||||
.andRespond(withSuccess("{\"id\":101}", MediaType.APPLICATION_JSON));
|
||||
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user"))
|
||||
.andExpect(method(HttpMethod.POST))
|
||||
.andExpect(header("Content-Type", "application/json"))
|
||||
.andExpect(header("Accept", "application/vnd.schemaregistry.v1+json"))
|
||||
.andRespond(withBadRequest());
|
||||
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient(
|
||||
this.restTemplate);
|
||||
Exception expected = null;
|
||||
try {
|
||||
SchemaRegistrationResponse response = client.register("user", "avro", "{}");
|
||||
}
|
||||
catch (Exception e) {
|
||||
expected = e;
|
||||
}
|
||||
assertThat(expected instanceof RuntimeException).isTrue();
|
||||
assertThat(expected.getCause() instanceof HttpStatusCodeException).isTrue();
|
||||
this.mockRestServiceServer.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void findByReference() {
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user/versions/1"))
|
||||
.expect(requestTo("http://localhost:8081/user/avro/v1"))
|
||||
.andExpect(method(HttpMethod.GET))
|
||||
.andExpect(
|
||||
header("Content-Type", "application/vnd.schemaregistry.v1+json"))
|
||||
@@ -162,7 +127,7 @@ public class ConfluentSchemaRegistryClientTests {
|
||||
@Test(expected = SchemaNotFoundException.class)
|
||||
public void schemaNotFound() {
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081/subjects/user/versions/1"))
|
||||
.expect(requestTo("http://localhost:8081/user/avro/v1"))
|
||||
.andExpect(method(HttpMethod.GET))
|
||||
.andExpect(
|
||||
header("Content-Type", "application/vnd.schemaregistry.v1+json"))
|
||||
@@ -174,4 +139,26 @@ public class ConfluentSchemaRegistryClientTests {
|
||||
String schema = client.fetch(reference);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void responseErrorFetch() {
|
||||
this.mockRestServiceServer
|
||||
.expect(requestTo("http://localhost:8081"))
|
||||
.andExpect(method(HttpMethod.POST))
|
||||
.andExpect(header("Content-Type", "application/json"))
|
||||
.andExpect(header("Accept", "application/vnd.schemaregistry.v1+json"))
|
||||
.andRespond(withBadRequest());
|
||||
|
||||
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient(
|
||||
this.restTemplate);
|
||||
Exception expected = null;
|
||||
try {
|
||||
SchemaRegistrationResponse response = client.register("user", "avro", "{}");
|
||||
}
|
||||
catch (Exception e) {
|
||||
expected = e;
|
||||
}
|
||||
assertThat(expected != null).isTrue();
|
||||
assertThat(expected.getCause() instanceof HttpStatusCodeException).isTrue();
|
||||
this.mockRestServiceServer.verify();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user