Skip to content

Commit

Permalink
Convert subjob start SafeThread to standard Thread (#384)
Browse files Browse the repository at this point in the history
Sending a subjob to a slave is done asynchronously. Even though it's
bad if it fails, it shouldn't bring down the entire master service.

This is a quick fix because we've run into this issue multiple
times. I'll follow up with a change to clean this up and improve the
behavior in the error case.
  • Loading branch information
josephharrington authored Jul 11, 2017
1 parent 6d22c40 commit d0a26aa
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions app/master/slave.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from threading import Thread

import requests

from app.master.build import Build
from app.util import analytics, log
from app.util.counter import Counter
from app.util.network import Network
from app.util.safe_thread import SafeThread
from app.util.secret import Secret
from app.util.session_id import SessionId
from app.util.url_builder import UrlBuilder
Expand Down Expand Up @@ -116,20 +117,25 @@ def start_subjob(self, subjob):
raise SlaveMarkedForShutdownError('Tried to start a subjob on a slave in shutdown mode. ({}, id: {})'
.format(self.url, self.id))

# todo: This should not be a SafeThread. https://github.com/box/ClusterRunner/issues/337
SafeThread(target=self._async_start_subjob, args=(subjob,)).start()
# todo: This class should not be responsible for starting threads. That should be done at a higher level.
Thread(target=self._async_start_subjob, args=(subjob,)).start()

def _async_start_subjob(self, subjob):
"""
:type subjob: Subjob
"""
execution_url = self._slave_api.url('build', subjob.build_id(), 'subjob', subjob.subjob_id())
post_data = {'atomic_commands': subjob.atomic_commands()}
response = self._network.post_with_digest(execution_url, post_data, Secret.get(), error_on_failure=True)
try:
execution_url = self._slave_api.url('build', subjob.build_id(), 'subjob', subjob.subjob_id())
post_data = {'atomic_commands': subjob.atomic_commands()}
response = self._network.post_with_digest(execution_url, post_data, Secret.get(), error_on_failure=True)

subjob_executor_id = response.json().get('executor_id')
analytics.record_event(analytics.MASTER_TRIGGERED_SUBJOB, executor_id=subjob_executor_id,
build_id=subjob.build_id(), subjob_id=subjob.subjob_id(), slave_id=self.id)

subjob_executor_id = response.json().get('executor_id')
analytics.record_event(analytics.MASTER_TRIGGERED_SUBJOB, executor_id=subjob_executor_id,
build_id=subjob.build_id(), subjob_id=subjob.subjob_id(), slave_id=self.id)
except Exception: # pylint: disable=broad-except
# todo: This could result in subjobs being lost if the call to send the subjob fails.
self._logger.exception('Error occurred trying to start subjob on {}', self)

def num_executors_in_use(self):
return self._num_executors_in_use.value()
Expand Down

0 comments on commit d0a26aa

Please sign in to comment.