Add support for schema deletion
- Add REST endpoints for schema deletion - by subject, format, version - by subject - by id - Add tests Resolves #716 Update doc
This commit is contained in:
committed by
Marius Bogoevici
parent
46fa71d3a5
commit
5d1cef2715
@@ -1280,6 +1280,7 @@ Spring Cloud Stream provides a schema registry server implementation.
|
||||
In order to use it, you can simply add the `spring-cloud-stream-schema-server` artifact to your project and use the `@EnableSchemaRegistryServer` annotation, adding the schema registry server REST controller to your application.
|
||||
This annotation is intended to be used with Spring Boot web applications, and the listening port of the server is controlled by the `server.port` setting.
|
||||
The `spring.cloud.stream.schema.server.path` setting can be used to control the root path of the schema server (especially when it is embedded in other applications).
|
||||
The `spring.cloud.stream.schema.server.alllowSchemaDeletion` boolean setting enables the deletion of schema. By default this is disabled.
|
||||
|
||||
The schema registry server uses a relational database to store the schemas.
|
||||
By default, it uses an embedded database.
|
||||
@@ -1344,6 +1345,18 @@ Response is a schema object in JSON format, with the following fields:
|
||||
* `version` the schema version;
|
||||
* `definition` the schema definition.
|
||||
|
||||
====== `DELETE /{subject}/{format}/{version}`
|
||||
|
||||
Delete an existing schema by its subject, format and version.
|
||||
|
||||
====== `DELETE /schemas/{id}`
|
||||
|
||||
Delete an existing schema by its id.
|
||||
|
||||
====== `DELETE /{subject}`
|
||||
|
||||
Delete existing schemas by their subject.
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
This note applies to users of Spring Cloud Stream 1.1.0.RELEASE only.
|
||||
|
||||
@@ -44,8 +44,8 @@ import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
|
||||
public class SchemaServerConfiguration {
|
||||
|
||||
@Bean
|
||||
public ServerController serverController(SchemaRepository repository) {
|
||||
return new ServerController(repository, schemaValidators());
|
||||
public ServerController serverController(SchemaRepository repository, SchemaServerProperties schemeServerProperties) {
|
||||
return new ServerController(repository, schemaValidators(), schemeServerProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -21,6 +21,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@ConfigurationProperties("spring.cloud.stream.schema.server")
|
||||
public class SchemaServerProperties {
|
||||
@@ -32,6 +33,11 @@ public class SchemaServerProperties {
|
||||
*/
|
||||
private String path;
|
||||
|
||||
/**
|
||||
* Boolean flag to enable/disable schema deletion.
|
||||
*/
|
||||
private boolean alllowSchemaDeletion;
|
||||
|
||||
public String getPath() {
|
||||
return this.path;
|
||||
}
|
||||
@@ -39,4 +45,12 @@ public class SchemaServerProperties {
|
||||
public void setPath(String path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public boolean isAlllowSchemaDeletion() {
|
||||
return alllowSchemaDeletion;
|
||||
}
|
||||
|
||||
public void setAlllowSchemaDeletion(boolean alllowSchemaDeletion) {
|
||||
this.alllowSchemaDeletion = alllowSchemaDeletion;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,9 +20,11 @@ package org.springframework.cloud.stream.schema.server.controllers;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.cloud.stream.schema.server.config.SchemaServerProperties;
|
||||
import org.springframework.cloud.stream.schema.server.model.Schema;
|
||||
import org.springframework.cloud.stream.schema.server.repository.SchemaRepository;
|
||||
import org.springframework.cloud.stream.schema.server.support.InvalidSchemaException;
|
||||
import org.springframework.cloud.stream.schema.server.support.SchemaDeletionNotAllowedException;
|
||||
import org.springframework.cloud.stream.schema.server.support.SchemaNotFoundException;
|
||||
import org.springframework.cloud.stream.schema.server.support.SchemaValidator;
|
||||
import org.springframework.cloud.stream.schema.server.support.UnsupportedFormatException;
|
||||
@@ -42,6 +44,7 @@ import org.springframework.web.util.UriComponentsBuilder;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping(path = "${spring.cloud.stream.schema.server.path:}")
|
||||
@@ -51,12 +54,16 @@ public class ServerController {
|
||||
|
||||
private final Map<String, SchemaValidator> validators;
|
||||
|
||||
private final SchemaServerProperties schemaServerProperties;
|
||||
|
||||
public ServerController(SchemaRepository repository,
|
||||
Map<String, SchemaValidator> validators) {
|
||||
Map<String, SchemaValidator> validators,
|
||||
SchemaServerProperties schemaServerProperties) {
|
||||
Assert.notNull(repository, "cannot be null");
|
||||
Assert.notEmpty(validators, "cannot be empty");
|
||||
this.repository = repository;
|
||||
this.validators = validators;
|
||||
this.schemaServerProperties = schemaServerProperties;
|
||||
}
|
||||
|
||||
@RequestMapping(method = RequestMethod.POST, path = "/", consumes = "application/json", produces = "application/json")
|
||||
@@ -126,6 +133,49 @@ public class ServerController {
|
||||
return new ResponseEntity<>(schema, HttpStatus.OK);
|
||||
}
|
||||
|
||||
@RequestMapping(value = "/{subject}/{format}/v{version}", method = RequestMethod.DELETE)
|
||||
public void delete(@PathVariable("subject") String subject,
|
||||
@PathVariable("format") String format,
|
||||
@PathVariable("version") Integer version) {
|
||||
if (this.schemaServerProperties.isAlllowSchemaDeletion()) {
|
||||
Schema schema = this.repository.findOneBySubjectAndFormatAndVersion(subject, format,
|
||||
version);
|
||||
deleteSchema(schema);
|
||||
}
|
||||
else {
|
||||
throw new SchemaDeletionNotAllowedException();
|
||||
}
|
||||
}
|
||||
|
||||
@RequestMapping(value = "/schemas/{id}", method = RequestMethod.DELETE)
|
||||
public void delete(@PathVariable("id") Integer id) {
|
||||
if (this.schemaServerProperties.isAlllowSchemaDeletion()) {
|
||||
Schema schema = this.repository.findOne(id);
|
||||
deleteSchema(schema);
|
||||
}
|
||||
else {
|
||||
throw new SchemaDeletionNotAllowedException();
|
||||
}
|
||||
}
|
||||
|
||||
@RequestMapping(value = "/{subject}", method = RequestMethod.DELETE)
|
||||
public void delete(@PathVariable("subject") String subject) {
|
||||
if (this.schemaServerProperties.isAlllowSchemaDeletion()) {
|
||||
for (Schema schema : this.repository.findAll()) {
|
||||
if (schema.getSubject().equals(subject)) {
|
||||
deleteSchema(schema);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteSchema(Schema schema) {
|
||||
if (schema == null) {
|
||||
throw new SchemaNotFoundException("Could not find Schema");
|
||||
}
|
||||
this.repository.delete(schema);
|
||||
}
|
||||
|
||||
@ExceptionHandler(UnsupportedFormatException.class)
|
||||
@ResponseStatus(value = HttpStatus.BAD_REQUEST, reason = "Format not supported")
|
||||
public void unsupportedFormat(UnsupportedFormatException ex) {
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.schema.server.support;
|
||||
|
||||
/**
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class SchemaDeletionNotAllowedException extends RuntimeException {
|
||||
public SchemaDeletionNotAllowedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public SchemaDeletionNotAllowedException() {
|
||||
super("Schema Deletion Not Allowed");
|
||||
}
|
||||
}
|
||||
@@ -26,15 +26,25 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.TestConfiguration;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.cloud.stream.schema.server.config.SchemaServerProperties;
|
||||
import org.springframework.cloud.stream.schema.server.model.Schema;
|
||||
import org.springframework.cloud.stream.schema.server.support.SchemaDeletionNotAllowedException;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.test.web.servlet.MockMvc;
|
||||
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
|
||||
import org.springframework.web.context.WebApplicationContext;
|
||||
|
||||
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
|
||||
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
|
||||
|
||||
/**
|
||||
* @author Vinicius Carvalho
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
|
||||
@@ -56,6 +66,12 @@ public class SchemaRegistryServerAvroTests {
|
||||
@Autowired
|
||||
private TestRestTemplate client;
|
||||
|
||||
@Autowired
|
||||
private SchemaServerProperties schemaServerProperties;
|
||||
|
||||
@Autowired
|
||||
private WebApplicationContext wac;
|
||||
|
||||
@Test
|
||||
public void testUnsupportedFormat() throws Exception {
|
||||
Schema schema = new Schema();
|
||||
@@ -149,6 +165,89 @@ public class SchemaRegistryServerAvroTests {
|
||||
Assert.assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaDeletionBySubjectFormatVersion() throws Exception {
|
||||
Schema schema = new Schema();
|
||||
schema.setFormat("avro");
|
||||
schema.setSubject("test");
|
||||
schema.setDefinition(USER_SCHEMA_V1);
|
||||
ResponseEntity<Schema> response1 = client.postForEntity("http://localhost:8990/",
|
||||
schema, Schema.class);
|
||||
Assert.assertTrue(response1.getStatusCode().is2xxSuccessful());
|
||||
schemaServerProperties.setAlllowSchemaDeletion(true);
|
||||
client.delete("http://localhost:8990/test/avro/v1");
|
||||
ResponseEntity<Schema> response2 = client
|
||||
.getForEntity("http://localhost:8990/test/avro/v1", Schema.class);
|
||||
Assert.assertEquals(HttpStatus.NOT_FOUND, response2.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaDeletionById() throws Exception {
|
||||
Schema schema = new Schema();
|
||||
schema.setFormat("avro");
|
||||
schema.setSubject("test");
|
||||
schema.setDefinition(USER_SCHEMA_V1);
|
||||
ResponseEntity<Schema> response1 = client.postForEntity("http://localhost:8990/",
|
||||
schema, Schema.class);
|
||||
Assert.assertTrue(response1.getStatusCode().is2xxSuccessful());
|
||||
ResponseEntity<Schema> response2 = client
|
||||
.getForEntity("http://localhost:8990/test/avro/v1", Schema.class);
|
||||
Assert.assertEquals(HttpStatus.OK, response2.getStatusCode());
|
||||
schemaServerProperties.setAlllowSchemaDeletion(true);
|
||||
client.delete("http://localhost:8990/schemas/1");
|
||||
ResponseEntity<Schema> response3 = client
|
||||
.getForEntity("http://localhost:8990/test/avro/1", Schema.class);
|
||||
Assert.assertEquals(HttpStatus.NOT_FOUND, response3.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaDeletionBySubject() throws Exception {
|
||||
Schema schema1 = new Schema();
|
||||
schema1.setFormat("avro");
|
||||
schema1.setSubject("test");
|
||||
schema1.setDefinition(USER_SCHEMA_V1);
|
||||
ResponseEntity<Schema> response1 = client.postForEntity("http://localhost:8990/",
|
||||
schema1, Schema.class);
|
||||
Assert.assertTrue(response1.getStatusCode().is2xxSuccessful());
|
||||
Assert.assertEquals(HttpStatus.OK, client.getForEntity("http://localhost:8990/test/avro/v1", Schema.class).getStatusCode());
|
||||
client.getForEntity("http://localhost:8990/test/avro/1", Schema.class);
|
||||
Schema schema2 = new Schema();
|
||||
schema2.setFormat("avro");
|
||||
schema2.setSubject("test");
|
||||
schema2.setDefinition(USER_SCHEMA_V2);
|
||||
ResponseEntity<Schema> response2 = client.postForEntity("http://localhost:8990/",
|
||||
schema2, Schema.class);
|
||||
Assert.assertTrue(response2.getStatusCode().is2xxSuccessful());
|
||||
Assert.assertEquals(HttpStatus.OK, client.getForEntity("http://localhost:8990/test/avro/v2", Schema.class).getStatusCode());
|
||||
schemaServerProperties.setAlllowSchemaDeletion(true);
|
||||
client.delete("http://localhost:8990/test");
|
||||
ResponseEntity<Schema> response4 = client
|
||||
.getForEntity("http://localhost:8990/test/avro/v1", Schema.class);
|
||||
Assert.assertEquals(HttpStatus.NOT_FOUND, response4.getStatusCode());
|
||||
ResponseEntity<Schema> response5 = client
|
||||
.getForEntity("http://localhost:8990/test/avro/v2", Schema.class);
|
||||
Assert.assertEquals(HttpStatus.NOT_FOUND, response5.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaDeletionNotAllowed() throws Exception {
|
||||
Schema schema = new Schema();
|
||||
schema.setFormat("avro");
|
||||
schema.setSubject("test");
|
||||
schema.setDefinition(USER_SCHEMA_V1);
|
||||
ResponseEntity<Schema> response1 = client.postForEntity("http://localhost:8990/",
|
||||
schema, Schema.class);
|
||||
Assert.assertTrue(response1.getStatusCode().is2xxSuccessful());
|
||||
MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(wac).defaultRequest(
|
||||
get("/").accept(MediaType.APPLICATION_JSON)).build();
|
||||
try {
|
||||
mockMvc.perform(delete("http://localhost:8990/test/avro/v1"));
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.assertTrue(e.getCause() instanceof SchemaDeletionNotAllowedException);
|
||||
}
|
||||
}
|
||||
|
||||
@TestConfiguration
|
||||
static class Config {
|
||||
@Bean
|
||||
|
||||
Reference in New Issue
Block a user