Skip to content

Commit

Permalink
[Pulsar client] Enable spotbugs in module pulsar-client. (#9630)
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 authored Feb 22, 2021
1 parent 5216ee1 commit 2a1828c
Show file tree
Hide file tree
Showing 30 changed files with 165 additions and 54 deletions.
16 changes: 16 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,22 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
</configuration>
<executions>
<execution>
<id>spotbugs</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.google.common.annotations.VisibleForTesting;

public class BackoffBuilder {
private long backoffIntervalNanos;
private long maxBackoffIntervalNanos;
private long initial;
private TimeUnit unitInitial;
private long max;
Expand All @@ -40,8 +38,6 @@ public class BackoffBuilder {
this.max = 0;
this.mandatoryStop = 0;
this.clock = Clock.systemDefaultZone();
this.backoffIntervalNanos = 0;
this.maxBackoffIntervalNanos = 0;
}

public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
*/
public class BatchMessageIdImpl extends MessageIdImpl {

private static final long serialVersionUID = 1L;
private final static int NO_BATCH = -1;
private final int batchIndex;
private final int batchSize;
Expand Down Expand Up @@ -92,7 +94,7 @@ public int compareTo(MessageId o) {

@Override
public int hashCode() {
return (int) (31 * (ledgerId + 31 * entryId) + (31 * partitionIndex) + batchIndex);
return (int) (31 * (ledgerId + 31 * entryId) + (31 * (long) partitionIndex) + batchIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageId
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
txn.registerAckOp(ackFuture);
} else {
ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
ackFuture = doAcknowledge(messageIdList, ackType, properties, null);
}
return ackFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePoli

@Override
public String toString() {
return conf != null ? conf.toString() : null;
return conf != null ? conf.toString() : "";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}

if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = reconsumetimes + 1;

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
private Consumer<?> consumer;
private final Consumer<?> consumer;
private PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData;

public class DefaultCryptoKeyReaderBuilder implements Cloneable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,16 +479,14 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}

if (ackType == AckType.Cumulative) {
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
Consumer<?> individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit)
.thenRun(() ->unAckedMessageTracker.remove(topicMessageId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,6 @@ private void setMessageRoutingMode() throws PulsarClientException {

@Override
public String toString() {
return conf != null ? conf.toString() : null;
return conf != null ? conf.toString() : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
}
return true;
} else {
log.warn("[{}] Failed while casting {} into ByteBufPair", producerName,
(op.cmd == null ? null : op.cmd.getClass().getName()));
log.warn("[{}] Failed while casting empty ByteBufPair, ", producerName);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {

private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
private ProducerImpl<?> producer;
private PulsarClientImpl pulsarClient;
private transient TimerTask stat;
private transient Timeout statTimeout;
private transient ProducerImpl<?> producer;
private transient PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
private final LongAdder numMsgsSent;
Expand All @@ -55,7 +55,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
private final LongAdder totalAcksReceived;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
private final DoublesSketch ds;
private transient final DoublesSketch ds;

private volatile double sendMsgsRate;
private volatile double sendBytesRate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {

private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);

private final ProducerBase<?> producer;
private final MessageMetadata msgMetadata = new MessageMetadata();
private final Schema<T> schema;
private ByteBuffer content;
private final TransactionImpl txn;
private transient final ProducerBase<?> producer;
private transient final MessageMetadata msgMetadata = new MessageMetadata();
private transient final Schema<T> schema;
private transient ByteBuffer content;
private transient final TransactionImpl txn;

public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema) {
this(producer, schema, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

@Slf4j
public class AuthenticationDataKeyStoreTls implements AuthenticationDataProvider {
private final KeyStoreParams keyStoreParams;

private static final long serialVersionUID = 1L;
private transient final KeyStoreParams keyStoreParams;

public AuthenticationDataKeyStoreTls(KeyStoreParams keyStoreParams) throws Exception {
this.keyStoreParams = keyStoreParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import org.slf4j.LoggerFactory;

public class AuthenticationDataTls implements AuthenticationDataProvider {
private static final long serialVersionUID = 1L;
protected X509Certificate[] tlsCertificates;
protected PrivateKey tlsPrivateKey;
private FileModifiedTimeUpdater certFile, keyFile;
private transient FileModifiedTimeUpdater certFile, keyFile;
// key and cert using stream
private InputStream certStream, keyStream;
private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
private transient InputStream certStream, keyStream;
private transient Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;

public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
if (certFilePath == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AuthenticationKeyStoreTls implements Authentication, EncodedAuthent
public final static String KEYSTORE_PW = "keyStorePassword";
private final static String DEFAULT_KEYSTORE_TYPE = "JKS";

private KeyStoreParams keyStoreParams;
private transient KeyStoreParams keyStoreParams;

public AuthenticationKeyStoreTls() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP

private String certFilePath;
private String keyFilePath;
private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
private transient Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;

public AuthenticationTls() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class AuthenticationToken implements Authentication, EncodedAuthenticationParameterSupport {

private static final long serialVersionUID = 1L;
private transient Supplier<String> tokenSupplier;
private transient Supplier<String> tokenSupplier = null;

public AuthenticationToken() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private transient ServiceUrlProvider serviceUrlProvider;

@JsonIgnore
private Authentication authentication = AuthenticationDisabled.INSTANCE;
private Authentication authentication;
private String authPluginClassName;

@Secret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private CryptoKeyReader cryptoKeyReader = null;

@JsonIgnore
private MessageCrypto messageCrypto = null;
private transient MessageCrypto messageCrypto = null;

private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

Expand All @@ -110,7 +110,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;

private DeadLetterPolicy deadLetterPolicy;
private transient DeadLetterPolicy deadLetterPolicy;

private boolean retryEnable = false;

Expand All @@ -125,7 +125,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

private boolean resetIncludeHead = false;

private KeySharedPolicy keySharedPolicy;
private transient KeySharedPolicy keySharedPolicy;

private boolean batchIndexAckEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private CryptoKeyReader cryptoKeyReader;

@JsonIgnore
private MessageCrypto messageCrypto = null;
private transient MessageCrypto messageCrypto = null;

@JsonIgnore
private Set<String> encryptionKeys = new TreeSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@Data
public class ReaderConfigurationData<T> implements Serializable, Cloneable {

private static final long serialVersionUID = 1L;

private String topicName;

@JsonIgnore
Expand All @@ -55,7 +57,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private boolean readCompacted = false;
private boolean resetIncludeHead = false;

private List<Range> keyHashRanges;
private transient List<Range> keyHashRanges;

@SuppressWarnings("unchecked")
public ReaderConfigurationData<T> clone() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -76,7 +77,8 @@ private static void serializeFileDescriptor(Descriptors.FileDescriptor fileDescr
if (unResolvedFileDescriptNames.length == 0) {
fileDescriptorCache.put(fileDescriptor.getFullName(), fileDescriptor.toProto());
} else {
throw new SchemaSerializationException(fileDescriptor.getFullName() + " can't resolve dependency '" + unResolvedFileDescriptNames + "'.");
throw new SchemaSerializationException(fileDescriptor.getFullName() + " can't resolve dependency '" +
Arrays.toString(unResolvedFileDescriptNames) + "'.");
}
}

Expand All @@ -96,7 +98,7 @@ public static Descriptors.Descriptor deserialize(byte[] schemaDataBytes) {
//extract root fileDescriptor
Descriptors.FileDescriptor fileDescriptor = fileDescriptorCache.get(schemaData.getRootFileDescriptorName());
//trim package
String[] paths = StringUtils.removeFirst(schemaData.getRootMessageTypeName(), fileDescriptor.getPackage()).replaceFirst(".", "").split("\\.");
String[] paths = StringUtils.removeFirst(schemaData.getRootMessageTypeName(), fileDescriptor.getPackage()).replaceFirst("\\.", "").split("\\.");
//extract root message
descriptor = fileDescriptor.findMessageTypeByName(paths[0]);
//extract nested message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> {
private final List<Field> fields;
private final Schema schema;
private final byte[] schemaVersion;
private int offset;
private final int offset;

public GenericAvroReader(Schema schema) {
this(null, schema, null);
Expand All @@ -66,7 +66,7 @@ public GenericAvroReader(Schema writerSchema, Schema readerSchema, byte[] schema
this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
}
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);

if (schema.getObjectProp(GenericAvroSchema.OFFSET_PROP) != null) {
this.offset = Integer.parseInt(schema.getObjectProp(GenericAvroSchema.OFFSET_PROP).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class GenericAvroWriter implements SchemaWriter<GenericRecord> {
public GenericAvroWriter(Schema schema) {
this.writer = new GenericDatumWriter<>(schema);
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@

public class GenericProtobufNativeReader implements SchemaReader<GenericRecord> {

private Descriptors.Descriptor descriptor;
private byte[] schemaVersion;
private List<Field> fields;
private final Descriptors.Descriptor descriptor;
private final byte[] schemaVersion;
private final List<Field> fields;

public GenericProtobufNativeReader(Descriptors.Descriptor descriptor) {
this(descriptor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

import java.util.List;

public class GenericProtobufNativeRecord<T extends DynamicMessage> extends VersionedGenericRecord {
public class GenericProtobufNativeRecord extends VersionedGenericRecord {

private DynamicMessage record;
private Descriptors.Descriptor msgDesc;
private final DynamicMessage record;
private final Descriptors.Descriptor msgDesc;

protected GenericProtobufNativeRecord(byte[] schemaVersion, Descriptors.Descriptor msgDesc, List<Field> fields, DynamicMessage record) {
super(schemaVersion, fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public AvroWriter(Schema schema) {

public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) {
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
ReflectData reflectData = new ReflectData();
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
Expand Down
Loading

0 comments on commit 2a1828c

Please sign in to comment.