Skip to content

Commit

Permalink
Prevent a UTF-8 decoder error from breaking Celery
Browse files Browse the repository at this point in the history
Prior to this commit, a non-UTF8 encoded header string would cause an un
caught UnicodeDecodeError, which would cause an unrecoverable Celery
error. Now we log non-UTF8 header strings (names and values), and
continue processing.

Closes https://github.com/celery/celery/issues/2873
  • Loading branch information
Dave Smith committed Jan 23, 2016
1 parent 10ab885 commit 3bc856f
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions amqp/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import absolute_import

import calendar
import logging
import sys

from datetime import datetime
Expand All @@ -40,6 +41,7 @@ def byte(n):
else:
byte = chr

AMQP_LOGGER = logging.getLogger('amqp')

ILLEGAL_TABLE_TYPE_WITH_KEY = """\
Table type {0!r} for key {1!r} not handled by amqp. [value: {2!r}]
Expand All @@ -49,6 +51,11 @@ def byte(n):
Table type {0!r} not handled by amqp. [value: {1!r}]
"""

class DecodeError(Exception):
def __init__(self, raw_str, original_error):
super(DecodeError, self).__init__()
self.raw_str = raw_str
self.original_error = original_error

class AMQPReader(object):
"""Read higher-level AMQP types from a bytestream."""
Expand Down Expand Up @@ -117,7 +124,11 @@ def read_shortstr(self):
"""
self.bitcount = self.bits = 0
slen = unpack('B', self.input.read(1))[0]
return self.input.read(slen).decode('utf-8')
raw = self.input.read(slen)
try:
return raw.decode('utf-8')
except UnicodeDecodeError as err:
raise DecodeError(raw, err)

def read_longstr(self):
"""Read a string that's up to 2**32 bytes.
Expand All @@ -128,7 +139,11 @@ def read_longstr(self):
"""
self.bitcount = self.bits = 0
slen = unpack('>I', self.input.read(4))[0]
return self.input.read(slen).decode('utf-8')
raw = self.input.read(slen)
try:
return raw.decode('utf-8')
except UnicodeDecodeError as err:
raise DecodeError(raw, err)

def read_table(self):
"""Read an AMQP table, and return as a Python dictionary."""
Expand All @@ -137,8 +152,16 @@ def read_table(self):
table_data = AMQPReader(self.input.read(tlen))
result = {}
while table_data.input.tell() < tlen:
name = table_data.read_shortstr()
val = table_data.read_item()
try:
name = table_data.read_shortstr()
except DecodeError as err:
AMQP_LOGGER.warn('Failed to UTF-8 decode header name. Raw value: "{}". Error: {}'.format(repr(err.raw_str), err.original_error))
continue
try:
val = table_data.read_item()
except DecodeError as err:
AMQP_LOGGER.warn('Failed to UTF-8 decode header value for "{}". Raw value: "{}". Error: {}'.format(name, repr(err.raw_str), err.original_error))
continue
result[name] = val
return result

Expand Down

0 comments on commit 3bc856f

Please sign in to comment.