Skip to content

Commit

Permalink
roll back change that I think was causing messages to be leaked
Browse files Browse the repository at this point in the history
  • Loading branch information
Fake Name committed Oct 10, 2017
1 parent f5e9553 commit fce8827
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
5 changes: 1 addition & 4 deletions amqp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,14 @@ def get_iterator(self):
# self.log.info("Item fetched from queue.")
# return ret

def put_response(self, out_msg, routing_key_override=None):
def put_response(self, out_msg):
if self.config['master']:
out_queue = self.config['task_exchange']
out_key = self.config['task_queue_name'].split(".")[0]
else:
out_queue = self.config['response_exchange']
out_key = self.config['response_queue_name'].split(".")[0]

if routing_key_override:
out_key = routing_key_override

msg_prop = {}
if self.config['durable']:
# Is this supposed to be a hyphen or a underscore?
Expand Down
45 changes: 21 additions & 24 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def __init__(self, settings, seen_lock, serialize_lock):
self.settings = settings

# Require clientID in settings
assert 'clientid' in settings
assert "RABBIT_LOGIN" in settings
assert "RABBIT_PASWD" in settings
assert "RABBIT_SRVER" in settings
assert 'clientid' in settings
assert "RABBIT_LOGIN" in settings
assert "RABBIT_PASWD" in settings
assert "RABBIT_SRVER" in settings
assert "RPC_RABBIT_VHOST" in settings

if not self.settings:
Expand All @@ -64,12 +64,12 @@ def findCert(self):

curDir = os.path.split(curFile)[0]
caCert = os.path.abspath(os.path.join(curDir, './deps/cacert.pem'))
cert = os.path.abspath(os.path.join(curDir, './deps/cert.pem'))
keyf = os.path.abspath(os.path.join(curDir, './deps/key.pem'))
cert = os.path.abspath(os.path.join(curDir, './deps/cert.pem'))
keyf = os.path.abspath(os.path.join(curDir, './deps/key.pem'))

assert os.path.exists(caCert), "No certificates found on path '%s'" % caCert
assert os.path.exists(cert), "No certificates found on path '%s'" % cert
assert os.path.exists(keyf), "No certificates found on path '%s'" % keyf
assert os.path.exists(cert), "No certificates found on path '%s'" % cert
assert os.path.exists(keyf), "No certificates found on path '%s'" % keyf


return {"cert_reqs" : ssl.CERT_REQUIRED,
Expand All @@ -80,11 +80,10 @@ def findCert(self):



def process(self, body, context_responder): # pylint: disable=unused-argument
def process(self, body, context_responder):
raise ValueError("This must be subclassed!")


def partial_response(self, context, connector, response_routing_key):
def partial_response(self, context, connector):
# Hurray for closure abuse.
def partial_capture(logs, content):
assert isinstance(content, dict), '`partial response` must be passed a dict!'
Expand All @@ -110,11 +109,11 @@ def partial_capture(logs, content):
if 'extradat' in context:
response['extradat'] = context['extradat']

self.put_message_chunked(response, connector, routing_key_override=response_routing_key)
self.put_message_chunked(response, connector)

return partial_capture

def _process_internal(self, body, connector, response_routing_key):
def __process(self, body, connector):

delay = None

Expand All @@ -123,7 +122,7 @@ def _process_internal(self, body, connector, response_routing_key):
delay = int(body['postDelay'])

self.log.info("Received request. Processing.")
ret = self.process(body, self.partial_response(body, connector, response_routing_key))
ret = self.process(body, self.partial_response(body, connector))

assert isinstance(ret, dict), '`process()` call in child-class must return a dict!'

Expand Down Expand Up @@ -181,7 +180,7 @@ def _process_internal(self, body, connector, response_routing_key):

return ret, delay

def _process_binary_message(self, body_r, connector):
def _process(self, body_r, connector):
# body = json.loads(body)
body = msgpack.unpackb(body_r, use_list=True, encoding='utf-8')

Expand All @@ -205,11 +204,9 @@ def _process_binary_message(self, body_r, connector):
self.log.info("New unique message ID: %s. Fetching.", mid)
INSTANCE_SEEN_MESSAGE_IDS.add(mid)

response_routing_key = body.get('response_routing_key', "none")

try:
ret, delay = self._process_internal(body, connector, response_routing_key)
return ret, delay, response_routing_key
ret, delay = self.__process(body, connector)
return ret, delay
finally:
if have_serialize_lock:
self.log.info("Releasing serialization lock.")
Expand All @@ -236,7 +233,7 @@ def successDelay(self, sleeptime):
self.log.info( "Breaking due to exit flag being set")
break

def put_message_chunked(self, message, connector, routing_key_override=None):
def put_message_chunked(self, message, connector):

message_bytes = msgpack.packb(message, use_bin_type=True)
if len(message_bytes) < CHUNK_SIZE_BYTES:
Expand All @@ -248,7 +245,7 @@ def put_message_chunked(self, message, connector, routing_key_override=None):
bmessage = msgpack.packb(message, use_bin_type=True)

self.log.info("Response message size: %0.3fK. Sending", len(bmessage)/1024.0)
connector.put_response(bmessage, routing_key_override=routing_key_override)
connector.put_response(bmessage)
else:
chunked_id = "chunk-merge-key-"+uuid.uuid4().hex
chunkl = list(enumerate(chunk_input(message_bytes, CHUNK_SIZE_BYTES)))
Expand All @@ -262,7 +259,7 @@ def put_message_chunked(self, message, connector, routing_key_override=None):
}
bmessage = msgpack.packb(message, use_bin_type=True)
self.log.info("Response chunk message size: %0.3fK. Sending", len(bmessage)/1024.0)
connector.put_response(bmessage, routing_key_override=routing_key_override)
connector.put_response(bmessage)



Expand All @@ -278,9 +275,9 @@ def process_messages(self, connector_instance, loops):
self.log.info("Processing message. (%s of %s before connection reset)", msg_count, loops)

try:
response, postDelay, routing_key_override = self._process_binary_message(message.body, connector_instance)
response, postDelay = self._process(message.body, connector_instance)

self.put_message_chunked(response, connector_instance, routing_key_override=routing_key_override)
self.put_message_chunked(response, connector_instance)
# connector_instance.put_response(response)

# Ack /after/ we've done the task.
Expand Down
21 changes: 15 additions & 6 deletions vendored/update-chromium.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,37 @@ set -e
rm -rf /tmp/cr-build
mkdir -p /tmp/cr-build
cd /tmp/cr-build
fetch --no-history chromium
fetch --nohooks --no-history chromium
gclient sync
cd src
git pull origin master
build/install-build-deps.sh --no-prompt
gclient runhooks
mkdir -p out/Headless
mkdir -p out/Default

echo 'import("//build/args/headless.gn")' > out/Headless/args.gn
echo 'is_debug = false' >> out/Headless/args.gn
echo 'symbol_level = 0' >> out/Headless/args.gn
<<<<<<< HEAD
echo 'is_component_build = false' >> out/Headless/args.gn
# echo 'remove_webcore_debug_symbols = true' >> out/Headless/args.gn
# echo 'enable_nacl = false' >> out/Headless/args.gn
=======
echo 'remove_webcore_debug_symbols = true' >> out/Headless/args.gn
echo 'enable_nacl = false' >> out/Headless/args.gn
>>>>>>> parent of 6ef8b84... Add the facility for a custom response routing key.

# echo 'is_debug = false' > out/Default/args.gn
# echo 'symbol_level = 0' >> out/Default/args.gn
# echo 'remove_webcore_debug_symbols = true' >> out/Default/args.gn
# echo 'enable_nacl = false' >> out/Default/args.gn
echo 'is_debug = false' > out/Default/args.gn
echo 'symbol_level = 0' >> out/Default/args.gn
echo 'remove_webcore_debug_symbols = true' >> out/Default/args.gn
echo 'enable_nacl = false' >> out/Default/args.gn
gn gen out/Headless
<<<<<<< HEAD
# gn gen out/Default
ninja -C out/Headless headless_shell
=======
gn gen out/Default
ninja -C out/Headless headless
>>>>>>> parent of 6ef8b84... Add the facility for a custom response routing key.
# ninja -C out/Default

0 comments on commit fce8827

Please sign in to comment.