diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 11e5d492ff2f1..ab704dcb036fb 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -210,6 +210,22 @@
+
+ com.github.spotbugs
+ spotbugs-maven-plugin
+
+ ${basedir}/src/main/resources/findbugsExclude.xml
+
+
+
+ spotbugs
+ verify
+
+ check
+
+
+
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
index a1c7614795fda..762f8f28dc86d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -24,8 +24,6 @@
import com.google.common.annotations.VisibleForTesting;
public class BackoffBuilder {
- private long backoffIntervalNanos;
- private long maxBackoffIntervalNanos;
private long initial;
private TimeUnit unitInitial;
private long max;
@@ -40,8 +38,6 @@ public class BackoffBuilder {
this.max = 0;
this.mandatoryStop = 0;
this.clock = Clock.systemDefaultZone();
- this.backoffIntervalNanos = 0;
- this.maxBackoffIntervalNanos = 0;
}
public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index e3dd97b5e8b2d..57aaa06ab436f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -24,6 +24,8 @@
/**
*/
public class BatchMessageIdImpl extends MessageIdImpl {
+
+ private static final long serialVersionUID = 1L;
private final static int NO_BATCH = -1;
private final int batchIndex;
private final int batchSize;
@@ -92,7 +94,7 @@ public int compareTo(MessageId o) {
@Override
public int hashCode() {
- return (int) (31 * (ledgerId + 31 * entryId) + (31 * partitionIndex) + batchIndex);
+ return (int) (31 * (ledgerId + 31 * entryId) + (31 * (long) partitionIndex) + batchIndex);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0993f0df05759..88ea82e854b14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -480,7 +480,7 @@ protected CompletableFuture doAcknowledgeWithTxn(List messageId
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
txn.registerAckOp(ackFuture);
} else {
- ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
+ ackFuture = doAcknowledge(messageIdList, ackType, properties, null);
}
return ackFuture;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 2db06734e3009..36d688df7f2d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -434,7 +434,7 @@ public ConsumerBuilder batchReceivePolicy(BatchReceivePolicy batchReceivePoli
@Override
public String toString() {
- return conf != null ? conf.toString() : null;
+ return conf != null ? conf.toString() : "";
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 724dc44745cb7..59f296db6ada2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -602,7 +602,7 @@ protected CompletableFuture doReconsumeLater(Message> message, AckType a
}
if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
- reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
+ reconsumetimes = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = reconsumetimes + 1;
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index c9b2c49cc53c4..f87c463f0f506 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -46,7 +46,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
- private Consumer> consumer;
+ private final Consumer> consumer;
private PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java
index 4d98a011b991c..d76869bb0abcd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.client.impl;
-import java.util.HashMap;
import java.util.Map;
-
import org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData;
public class DefaultCryptoKeyReaderBuilder implements Cloneable {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index c806c7feaac34..3d561190305dd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -479,16 +479,14 @@ protected CompletableFuture doReconsumeLater(Message> message, AckType a
}
if (ackType == AckType.Cumulative) {
- Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
+ Consumer> individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
- MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName());
- MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit)
.thenRun(() ->unAckedMessageTracker.remove(topicMessageId));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index a3b9e4a19675f..48f9f2e347818 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -342,6 +342,6 @@ private void setMessageRoutingMode() throws PulsarClientException {
@Override
public String toString() {
- return conf != null ? conf.toString() : null;
+ return conf != null ? conf.toString() : "";
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b0107d626ad4f..05ca1a5b6c877 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1129,8 +1129,7 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
}
return true;
} else {
- log.warn("[{}] Failed while casting {} into ByteBufPair", producerName,
- (op.cmd == null ? null : op.cmd.getClass().getName()));
+ log.warn("[{}] Failed while casting empty ByteBufPair, ", producerName);
return false;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 923b6634ddbb0..e7075da8d131d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -39,10 +39,10 @@
public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
private static final long serialVersionUID = 1L;
- private TimerTask stat;
- private Timeout statTimeout;
- private ProducerImpl> producer;
- private PulsarClientImpl pulsarClient;
+ private transient TimerTask stat;
+ private transient Timeout statTimeout;
+ private transient ProducerImpl> producer;
+ private transient PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
private final LongAdder numMsgsSent;
@@ -55,7 +55,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
private final LongAdder totalAcksReceived;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
- private final DoublesSketch ds;
+ private transient final DoublesSketch ds;
private volatile double sendMsgsRate;
private volatile double sendBytesRate;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 933d69c601cce..59ccde382b5a1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -47,11 +47,11 @@ public class TypedMessageBuilderImpl implements TypedMessageBuilder {
private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
- private final ProducerBase> producer;
- private final MessageMetadata msgMetadata = new MessageMetadata();
- private final Schema schema;
- private ByteBuffer content;
- private final TransactionImpl txn;
+ private transient final ProducerBase> producer;
+ private transient final MessageMetadata msgMetadata = new MessageMetadata();
+ private transient final Schema schema;
+ private transient ByteBuffer content;
+ private transient final TransactionImpl txn;
public TypedMessageBuilderImpl(ProducerBase> producer, Schema schema) {
this(producer, schema, null);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
index 6d78004a0678f..e79d084f3f7fa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
@@ -24,7 +24,9 @@
@Slf4j
public class AuthenticationDataKeyStoreTls implements AuthenticationDataProvider {
- private final KeyStoreParams keyStoreParams;
+
+ private static final long serialVersionUID = 1L;
+ private transient final KeyStoreParams keyStoreParams;
public AuthenticationDataKeyStoreTls(KeyStoreParams keyStoreParams) throws Exception {
this.keyStoreParams = keyStoreParams;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index 3e9e903145669..b7987eb88e2d1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -33,12 +33,13 @@
import org.slf4j.LoggerFactory;
public class AuthenticationDataTls implements AuthenticationDataProvider {
+ private static final long serialVersionUID = 1L;
protected X509Certificate[] tlsCertificates;
protected PrivateKey tlsPrivateKey;
- private FileModifiedTimeUpdater certFile, keyFile;
+ private transient FileModifiedTimeUpdater certFile, keyFile;
// key and cert using stream
- private InputStream certStream, keyStream;
- private Supplier certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
+ private transient InputStream certStream, keyStream;
+ private transient Supplier certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
if (certFilePath == null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java
index e8c7764f027fc..c8c139678e998 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java
@@ -47,7 +47,7 @@ public class AuthenticationKeyStoreTls implements Authentication, EncodedAuthent
public final static String KEYSTORE_PW = "keyStorePassword";
private final static String DEFAULT_KEYSTORE_TYPE = "JKS";
- private KeyStoreParams keyStoreParams;
+ private transient KeyStoreParams keyStoreParams;
public AuthenticationKeyStoreTls() {
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index e5f8361dd1dff..db3d94e6e15a1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -44,7 +44,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
private String certFilePath;
private String keyFilePath;
- private Supplier certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
+ private transient Supplier certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
public AuthenticationTls() {
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
index 1f381c1662798..77940480e7a17 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
@@ -42,7 +42,7 @@
public class AuthenticationToken implements Authentication, EncodedAuthenticationParameterSupport {
private static final long serialVersionUID = 1L;
- private transient Supplier tokenSupplier;
+ private transient Supplier tokenSupplier = null;
public AuthenticationToken() {
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 35f15e8fa16c8..4a3091bbad455 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -49,7 +49,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private transient ServiceUrlProvider serviceUrlProvider;
@JsonIgnore
- private Authentication authentication = AuthenticationDisabled.INSTANCE;
+ private Authentication authentication;
private String authPluginClassName;
@Secret
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index b01cba4c2100a..da15e4ced26b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -96,7 +96,7 @@ public class ConsumerConfigurationData implements Serializable, Cloneable {
private CryptoKeyReader cryptoKeyReader = null;
@JsonIgnore
- private MessageCrypto messageCrypto = null;
+ private transient MessageCrypto messageCrypto = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
@@ -110,7 +110,7 @@ public class ConsumerConfigurationData implements Serializable, Cloneable {
private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;
- private DeadLetterPolicy deadLetterPolicy;
+ private transient DeadLetterPolicy deadLetterPolicy;
private boolean retryEnable = false;
@@ -125,7 +125,7 @@ public class ConsumerConfigurationData implements Serializable, Cloneable {
private boolean resetIncludeHead = false;
- private KeySharedPolicy keySharedPolicy;
+ private transient KeySharedPolicy keySharedPolicy;
private boolean batchIndexAckEnabled = false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index eb011ad3a5f8d..27690ee6c380e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -84,7 +84,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private CryptoKeyReader cryptoKeyReader;
@JsonIgnore
- private MessageCrypto messageCrypto = null;
+ private transient MessageCrypto messageCrypto = null;
@JsonIgnore
private Set encryptionKeys = new TreeSet<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index c9f0adf5fc8a7..276160276b9a5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -33,6 +33,8 @@
@Data
public class ReaderConfigurationData implements Serializable, Cloneable {
+ private static final long serialVersionUID = 1L;
+
private String topicName;
@JsonIgnore
@@ -55,7 +57,7 @@ public class ReaderConfigurationData implements Serializable, Cloneable {
private boolean readCompacted = false;
private boolean resetIncludeHead = false;
- private List keyHashRanges;
+ private transient List keyHashRanges;
@SuppressWarnings("unchecked")
public ReaderConfigurationData clone() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
index 7a806b5891588..d5b451f4ac08c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java
@@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -76,7 +77,8 @@ private static void serializeFileDescriptor(Descriptors.FileDescriptor fileDescr
if (unResolvedFileDescriptNames.length == 0) {
fileDescriptorCache.put(fileDescriptor.getFullName(), fileDescriptor.toProto());
} else {
- throw new SchemaSerializationException(fileDescriptor.getFullName() + " can't resolve dependency '" + unResolvedFileDescriptNames + "'.");
+ throw new SchemaSerializationException(fileDescriptor.getFullName() + " can't resolve dependency '" +
+ Arrays.toString(unResolvedFileDescriptNames) + "'.");
}
}
@@ -96,7 +98,7 @@ public static Descriptors.Descriptor deserialize(byte[] schemaDataBytes) {
//extract root fileDescriptor
Descriptors.FileDescriptor fileDescriptor = fileDescriptorCache.get(schemaData.getRootFileDescriptorName());
//trim package
- String[] paths = StringUtils.removeFirst(schemaData.getRootMessageTypeName(), fileDescriptor.getPackage()).replaceFirst(".", "").split("\\.");
+ String[] paths = StringUtils.removeFirst(schemaData.getRootMessageTypeName(), fileDescriptor.getPackage()).replaceFirst("\\.", "").split("\\.");
//extract root message
descriptor = fileDescriptor.findMessageTypeByName(paths[0]);
//extract nested message
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index 22c63a94a5f18..b78b31efed858 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -47,7 +47,7 @@ public class GenericAvroReader implements SchemaReader {
private final List fields;
private final Schema schema;
private final byte[] schemaVersion;
- private int offset;
+ private final int offset;
public GenericAvroReader(Schema schema) {
this(null, schema, null);
@@ -66,7 +66,7 @@ public GenericAvroReader(Schema writerSchema, Schema readerSchema, byte[] schema
this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
}
this.byteArrayOutputStream = new ByteArrayOutputStream();
- this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+ this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
if (schema.getObjectProp(GenericAvroSchema.OFFSET_PROP) != null) {
this.offset = Integer.parseInt(schema.getObjectProp(GenericAvroSchema.OFFSET_PROP).toString());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
index 28ad1163f272b..c5346888a42f0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
@@ -37,7 +37,7 @@ public class GenericAvroWriter implements SchemaWriter {
public GenericAvroWriter(Schema schema) {
this.writer = new GenericDatumWriter<>(schema);
this.byteArrayOutputStream = new ByteArrayOutputStream();
- this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+ this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java
index eccc48247cdd4..78e8393036388 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java
@@ -36,9 +36,9 @@
public class GenericProtobufNativeReader implements SchemaReader {
- private Descriptors.Descriptor descriptor;
- private byte[] schemaVersion;
- private List fields;
+ private final Descriptors.Descriptor descriptor;
+ private final byte[] schemaVersion;
+ private final List fields;
public GenericProtobufNativeReader(Descriptors.Descriptor descriptor) {
this(descriptor, null);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java
index 51b4cb063da79..5c9d57d1cfc78 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java
@@ -24,10 +24,10 @@
import java.util.List;
-public class GenericProtobufNativeRecord extends VersionedGenericRecord {
+public class GenericProtobufNativeRecord extends VersionedGenericRecord {
- private DynamicMessage record;
- private Descriptors.Descriptor msgDesc;
+ private final DynamicMessage record;
+ private final Descriptors.Descriptor msgDesc;
protected GenericProtobufNativeRecord(byte[] schemaVersion, Descriptors.Descriptor msgDesc, List fields, DynamicMessage record) {
super(schemaVersion, fields);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
index a260ebf1ee295..5a348af6dbdb5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
@@ -40,7 +40,7 @@ public AvroWriter(Schema schema) {
public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) {
this.byteArrayOutputStream = new ByteArrayOutputStream();
- this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
+ this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
ReflectData reflectData = new ReflectData();
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 2269f5e40000b..eddd5dc310e24 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -83,7 +83,6 @@ public class TransactionImpl implements Transaction {
// register the topics that will be modified by this transaction
public synchronized CompletableFuture registerProducedTopic(String topic) {
- CompletableFuture completableFuture = new CompletableFuture<>();
// we need to issue the request to TC to register the produced topic
return registerPartitionMap.compute(topic, (key, future) -> {
if (future != null) {
@@ -102,7 +101,6 @@ public synchronized void registerSendOp(CompletableFuture sendFuture)
// register the topics that will be modified by this transaction
public synchronized CompletableFuture registerAckedTopic(String topic, String subscription) {
- CompletableFuture completableFuture = new CompletableFuture<>();
// we need to issue the request to TC to register the acked topic
return registerSubscriptionMap.compute(topic, (key, future) -> {
if (future != null) {
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000000000..f887c9908b135
--- /dev/null
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,97 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+