Skip to content

Commit

Permalink
Be explicit with tuples for %s formatting
Browse files Browse the repository at this point in the history
Fix #1633
  • Loading branch information
jeffwidman committed Nov 13, 2018
1 parent f0ef99f commit 91facfd
Show file tree
Hide file tree
Showing 27 changed files with 46 additions and 46 deletions.
2 changes: 1 addition & 1 deletion kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def __init__(self, **configs):
log.debug("Starting Kafka administration interface")
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down
2 changes: 1 addition & 1 deletion kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):

return decoder_fn(future.value)

raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,))

def _payloads_by_broker(self, payloads):
payloads_by_broker = collections.defaultdict(list)
Expand Down
2 changes: 1 addition & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _maybe_connect(self, node_id):
conn = self._conns.get(node_id)

if conn is None:
assert broker, 'Broker id %s not in current metadata' % node_id
assert broker, 'Broker id %s not in current metadata' % (node_id,)

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
Expand Down
14 changes: 7 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
remaining_ms = timeout_ms - elapsed_ms

raise Errors.KafkaTimeoutError(
"Failed to get offsets by timestamps in %s ms" % timeout_ms)
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))

def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
Expand Down Expand Up @@ -911,7 +911,7 @@ def record(self, partition, num_bytes, num_records):
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
self.metrics = metrics
self.group_name = '%s-fetch-manager-metrics' % prefix
self.group_name = '%s-fetch-manager-metrics' % (prefix,)

self.bytes_fetched = metrics.sensor('bytes-fetched')
self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
Expand Down Expand Up @@ -955,15 +955,15 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
'The average number of bytes fetched per request for topic %s' % topic,
'The average number of bytes fetched per request for topic %s' % (topic,),
metric_tags), Avg())
bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
self.group_name,
'The maximum number of bytes fetched per request for topic %s' % topic,
'The maximum number of bytes fetched per request for topic %s' % (topic,),
metric_tags), Max())
bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
self.group_name,
'The average number of bytes consumed per second for topic %s' % topic,
'The average number of bytes consumed per second for topic %s' % (topic,),
metric_tags), Rate())
bytes_fetched.record(num_bytes)

Expand All @@ -976,10 +976,10 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
'The average number of records in each request for topic %s' % topic,
'The average number of records in each request for topic %s' % (topic,),
metric_tags), Avg())
records_fetched.add(self.metrics.metric_name('records-consumed-rate',
self.group_name,
'The average number of records consumed per second for topic %s' % topic,
'The average number of records consumed per second for topic %s' % (topic,),
metric_tags), Rate())
records_fetched.record(num_records)
2 changes: 1 addition & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def __init__(self, *topics, **configs):
# Only check for extra config keys in top-level class
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def seek(self, offset, whence=None, partition=None):
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError('Unexpected value for `whence`, %d' % whence)
raise ValueError('Unexpected value for `whence`, %d' % (whence,))

# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def assign_from_subscribed(self, assignments):

for tp in assignments:
if tp.topic not in self.subscription:
raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,))

# after rebalancing, we always reinitialize the assignment state
self.assignment.clear()
Expand Down
6 changes: 3 additions & 3 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def _on_join_complete(self, generation, member_id, protocol,
self._assignment_snapshot = None

assignor = self._lookup_assignor(protocol)
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)

assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)

Expand Down Expand Up @@ -297,7 +297,7 @@ def time_to_next_poll(self):

def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,)
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
Expand Down Expand Up @@ -804,7 +804,7 @@ def _maybe_auto_commit_offsets_async(self):
class ConsumerCoordinatorMetrics(object):
def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,)

self.commit_latency = metrics.sensor('commit-latency')
self.commit_latency.add(metrics.metric_name(
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def register_metric(self, metric):
with self._lock:
if metric.metric_name in self.metrics:
raise ValueError('A metric named "%s" already exists, cannot'
' register another one.' % metric.metric_name)
' register another one.' % (metric.metric_name,))
self.metrics[metric.metric_name] = metric
for reporter in self._reporters:
reporter.metric_change(metric)
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/percentiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
' to be 0.0.')
self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
else:
ValueError('Unknown bucket type: %s' % bucketing)
ValueError('Unknown bucket type: %s' % (bucketing,))

def stats(self):
measurables = []
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def convert(self, time_ms):
elif self._unit == TimeUnit.DAYS:
return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
else:
raise ValueError('Unknown unit: %s' % self._unit)
raise ValueError('Unknown unit: %s' % (self._unit,))


class SampledTotal(AbstractSampledStat):
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/sampled_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def purge_obsolete_samples(self, config, now):
sample.reset(now)

def _advance(self, config, time_ms):
self._current = (self._current + 1) % config.samples
self._current = (self._current + 1) % (config.samples,)
if self._current >= len(self._samples):
sample = self.new_sample(time_ms)
self._samples.append(sample)
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _check_forest(self, sensors):
"""Validate that this sensor doesn't end up referencing itself."""
if self in sensors:
raise ValueError('Circular dependency in sensors: %s is its own'
'parent.' % self.name)
'parent.' % (self.name,))
sensors.add(self)
for parent in self._parents:
parent._check_forest(sensors)
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def __init__(self, client,
if codec is None:
codec = CODEC_NONE
elif codec not in ALL_CODECS:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))

self.codec = codec
self.codec_compresslevel = codec_compresslevel
Expand Down Expand Up @@ -419,7 +419,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
raise AsyncProducerQueueFull(
msg[idx:],
'Producer async queue overfilled. '
'Current queue size %d.' % self.queue.qsize())
'Current queue size %d.' % (self.queue.qsize(),))
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _produce_success(self, offset_and_timestamp):
def get(self, timeout=None):
if not self.is_done and not self._produce_future.wait(timeout):
raise Errors.KafkaTimeoutError(
"Timeout after waiting for %s secs." % timeout)
"Timeout after waiting for %s secs." % (timeout,))
assert self.is_done
if self.failed():
raise self.exception # pylint: disable-msg=raising-bad-type
Expand Down
10 changes: 5 additions & 5 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,11 @@ def __init__(self, **configs):
self.config[key] = configs.pop(key)

# Only check for extra config keys in top-level class
assert not configs, 'Unrecognized configs: %s' % configs
assert not configs, 'Unrecognized configs: %s' % (configs,)

if self.config['client_id'] is None:
self.config['client_id'] = 'kafka-python-producer-%s' % \
PRODUCER_CLIENT_ID_SEQUENCE.increment()
(PRODUCER_CLIENT_ID_SEQUENCE.increment(),)

if self.config['acks'] == 'all':
self.config['acks'] = -1
Expand Down Expand Up @@ -633,12 +633,12 @@ def _ensure_valid_record_size(self, size):
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the maximum request size you have configured with the"
" max_request_size configuration" % size)
" max_request_size configuration" % (size,))
if size > self.config['buffer_memory']:
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the total memory buffer you have configured with the"
" buffer_memory configuration." % size)
" buffer_memory configuration." % (size,))

def _wait_on_metadata(self, topic, max_wait):
"""
Expand Down Expand Up @@ -679,7 +679,7 @@ def _wait_on_metadata(self, topic, max_wait):
elapsed = time.time() - begin
if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %.1f secs." % max_wait)
"Failed to update metadata after %.1f secs." % (max_wait,))
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
else:
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ def send(self, topic, key, msg):
return self.send_messages(topic, key, msg)

def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async_send
return '<KeyedProducer batch=%s>' % (self.async_send,)
6 changes: 3 additions & 3 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)

error = None
if not self.in_retry() and is_full and timeout < since_append:
error = "%d seconds have passed since last append" % since_append
error = "%d seconds have passed since last append" % (since_append,)
elif not self.in_retry() and timeout < since_ready:
error = "%d seconds have passed since batch creation plus linger time" % since_ready
error = "%d seconds have passed since batch creation plus linger time" % (since_ready,)
elif self.in_retry() and timeout < since_backoff:
error = "%d seconds have passed since last attempt plus backoff time" % since_backoff
error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,)

if error:
self.records.close()
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ def send_messages(self, topic, *msg):
)

def __repr__(self):
return '<SimpleProducer batch=%s>' % self.async_send
return '<SimpleProducer batch=%s>' % (self.async_send,)
2 changes: 1 addition & 1 deletion kafka/protocol/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,4 @@ def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None)
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages, key)]
else:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
4 changes: 2 additions & 2 deletions kafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _encode_self(self, recalc_crc=True):
elif version == 0:
fields = (self.crc, self.magic, self.attributes, self.key, self.value)
else:
raise ValueError('Unrecognized message version: %s' % version)
raise ValueError('Unrecognized message version: %s' % (version,))
message = Message.SCHEMAS[version].encode(fields)
if not recalc_crc:
return message
Expand Down Expand Up @@ -143,7 +143,7 @@ def __hash__(self):

class PartialMessage(bytes):
def __repr__(self):
return 'PartialMessage(%s)' % self
return 'PartialMessage(%s)' % (self,)


class MessageSet(AbstractType):
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _process_response(self, read_buffer):
raise Errors.CorrelationIdError(
'No in-flight-request found for server response'
' with correlation ID %d'
% recv_correlation_id)
% (recv_correlation_id,))

(correlation_id, request) = self.in_flight_requests.popleft()

Expand Down
2 changes: 1 addition & 1 deletion kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def __iter__(self):
# There should only ever be a single layer of compression
assert not attrs & self.CODEC_MASK, (
'MessageSet at offset %d appears double-compressed. This '
'should not happen -- check your producers!' % offset)
'should not happen -- check your producers!' % (offset,))

# When magic value is greater than 0, the timestamp
# of a compressed message depends on the
Expand Down
8 changes: 4 additions & 4 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ def kafka_run_class_args(cls, *args):
def kafka_run_class_env(self):
env = os.environ.copy()
env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \
self.test_resource("log4j.properties")
(self.test_resource("log4j.properties"),)
return env

@classmethod
def render_template(cls, source_file, target_file, binding):
log.info('Rendering %s from template %s', target_file.strpath, source_file)
with open(source_file, "r") as handle:
template = handle.read()
assert len(template) > 0, 'Empty template %s' % source_file
assert len(template) > 0, 'Empty template %s' % (source_file,)
with open(target_file.strpath, "w") as handle:
handle.write(template.format(**binding))
handle.flush()
Expand Down Expand Up @@ -257,7 +257,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
# TODO: checking for port connection would be better than scanning logs
# until then, we need the pattern to work across all supported broker versions
# The logging format changed slightly in 1.0.0
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,)

self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
Expand Down Expand Up @@ -291,7 +291,7 @@ def _create_zk_chroot(self):
"%s:%d" % (self.zookeeper.host,
self.zookeeper.port),
"create",
"/%s" % self.zk_chroot,
"/%s" % (self.zk_chroot,),
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Expand Down
2 changes: 1 addition & 1 deletion test/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def test_reporter(metrics):

for key in list(expected.keys()):
metrics = expected.pop(key)
expected['foo.%s' % key] = metrics
expected['foo.%s' % (key,)] = metrics
assert expected == foo_reporter.snapshot()


Expand Down
2 changes: 1 addition & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_end_to_end(kafka_broker, compression):
except StopIteration:
break

assert msgs == set(['msg %d' % i for i in range(messages)])
assert msgs == set(['msg %d' % (i,) for i in range(messages)])
consumer.close()


Expand Down
2 changes: 1 addition & 1 deletion test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def construct_lambda(s):
op_str = s[0:2] # >= <=
v_str = s[2:]
else:
raise ValueError('Unrecognized kafka version / operator: %s' % s)
raise ValueError('Unrecognized kafka version / operator: %s' % (s,))

op_map = {
'=': operator.eq,
Expand Down

0 comments on commit 91facfd

Please sign in to comment.