diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java index 521362d517..30b9f7fbff 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/config/xml/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,4 +17,5 @@ /** * Provides classes for Cassandra parsers and namespace handlers. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.cassandra.config.xml; diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/package-info.java index 788490080d..4bd41357f0 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/package-info.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/dsl/package-info.java @@ -1,6 +1,5 @@ /** * Provides Apache Cassandra Components support for the Java DSL. */ -@org.springframework.lang.NonNullApi -@org.springframework.lang.NonNullFields +@org.jspecify.annotations.NullMarked package org.springframework.integration.cassandra.dsl; diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java index 76ac5d6a66..0b5764177d 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/CassandraMessageHandler.java @@ -27,6 +27,7 @@ import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import org.jspecify.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -50,7 +51,6 @@ import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; import org.springframework.integration.handler.MessageProcessor; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -85,8 +85,10 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHandle */ private WriteOptions writeOptions; + @SuppressWarnings("NullAway.Init") private ReactiveSessionMessageCallback sessionMessageCallback; + @SuppressWarnings("NullAway.Init") private EvaluationContext evaluationContext; public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations) { @@ -164,10 +166,13 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHandle public void setStatementProcessor(MessageProcessor> statementProcessor) { Assert.notNull(statementProcessor, "'statementProcessor' must not be null."); this.sessionMessageCallback = - (session, requestMessage) -> - session.execute( - QueryOptionsUtil.addQueryOptions(statementProcessor.processMessage(requestMessage), - this.writeOptions)); + (session, requestMessage) -> { + Statement statement = statementProcessor.processMessage(requestMessage); + Assert.notNull(statement, "Statement must not be null"); + return session.execute( + QueryOptionsUtil.addQueryOptions(statement, + this.writeOptions)); + }; this.mode = Type.STATEMENT; } @@ -191,6 +196,7 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHandle } @Override + @Nullable protected Object handleRequestMessage(Message requestMessage) { Object payload = requestMessage.getPayload(); @@ -220,7 +226,8 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHandle @SuppressWarnings("unchecked") private Mono handleInsert(Object payload) { - if (this.ingestQuery != null) { + final String localIngestQuery = this.ingestQuery; + if (localIngestQuery != null) { Assert.isInstanceOf(List.class, payload, "to perform 'ingest' the 'payload' must be of 'List>' type."); List list = (List) payload; @@ -232,7 +239,7 @@ public class CassandraMessageHandler extends AbstractReplyProducingMessageHandle return this.cassandraOperations.getReactiveCqlOperations() .execute((ReactiveSessionCallback) session -> session.prepare( - QueryOptionsUtil.addQueryOptions(SimpleStatement.newInstance(this.ingestQuery), + QueryOptionsUtil.addQueryOptions(SimpleStatement.newInstance(localIngestQuery), this.writeOptions)) .flatMapMany(s -> Flux.fromIterable(rows) diff --git a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java index 32acd1b881..4470882581 100644 --- a/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java +++ b/spring-integration-cassandra/src/main/java/org/springframework/integration/cassandra/outbound/package-info.java @@ -1,5 +1,5 @@ /** * Provides classes supporting Cassandra outbound endpoints. */ -@org.springframework.lang.NonNullApi +@org.jspecify.annotations.NullMarked package org.springframework.integration.cassandra.outbound;