Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow submitting topologies as inactive/deactivated #448

Merged
merged 2 commits into from
Aug 28, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from six import itervalues

from ..dsl.component import JavaComponentSpec
from ..thrift import ShellComponent
from ..thrift import ShellComponent, SubmitOptions, TopologyInitialStatus

from ..util import (activate_env, get_config, get_env_config,
get_nimbus_client, get_topology_definition,
Expand Down Expand Up @@ -75,7 +75,7 @@ def _kill_existing_topology(topology_name, force, wait, nimbus_client):


def _submit_topology(topology_name, topology_class, remote_jar_path, config,
env_config, nimbus_client, options=None):
env_config, nimbus_client, options=None, active=True):
if options.get('pystorm.log.path'):
print("Routing Python logging to {}.".format(options['pystorm.log.path']))
sys.stdout.flush()
Expand All @@ -90,10 +90,14 @@ def _submit_topology(topology_name, topology_class, remote_jar_path, config,

print("Submitting {} topology to nimbus...".format(topology_name), end='')
sys.stdout.flush()
nimbus_client.submitTopology(name=topology_name,
uploadedJarLocation=remote_jar_path,
jsonConf=json.dumps(options),
topology=topology_class.thrift_topology)
initial_status = (TopologyInitialStatus.ACTIVE if active
else TopologyInitialStatus.INACTIVE)
submit_options = SubmitOptions(initial_status=initial_status)
nimbus_client.submitTopologyWithOpts(name=topology_name,
uploadedJarLocation=remote_jar_path,
jsonConf=json.dumps(options),
topology=topology_class.thrift_topology,
options=submit_options)
print('done')


Expand Down Expand Up @@ -145,7 +149,7 @@ def submit_topology(name=None, env_name=None, options=None, force=False,
wait=None, simple_jar=True, override_name=None,
requirements_paths=None, local_jar_path=None,
remote_jar_path=None, timeout=None, config_file=None,
overwrite_virtualenv=False, user='root'):
overwrite_virtualenv=False, user='root', active=True):
"""Submit a topology to a remote Storm cluster."""
config = get_config(config_file=config_file)
name, topology_file = get_topology_definition(name, config_file=config_file)
Expand Down Expand Up @@ -236,7 +240,8 @@ def submit_topology(name=None, env_name=None, options=None, force=False,
remote_jar_path = _upload_jar(nimbus_client, local_jar_path)
_kill_existing_topology(override_name, force, wait, nimbus_client)
_submit_topology(override_name, topology_class, remote_jar_path, config,
env_config, nimbus_client, options=options)
env_config, nimbus_client, options=options,
active=active)
_post_submit_hooks(override_name, env_name, env_config, options)


Expand All @@ -255,6 +260,13 @@ def subparser_hook(subparsers):
help='Force a topology to submit by killing any '
'currently running topologies with the same '
'name.')
subparser.add_argument('-i', '--inactive',
help='Submit topology as inactive instead of active.'
' This is useful if you are migrating the '
'topology to a new environment and already '
'have it running actively in an older one.',
action='store_false',
dest='active')
subparser.add_argument('-j', '--local_jar_path',
help='Path to a prebuilt JAR to upload to Nimbus. '
'This is useful when you have multiple '
Expand Down Expand Up @@ -296,4 +308,5 @@ def main(args):
timeout=args.timeout,
config_file=args.config,
overwrite_virtualenv=args.overwrite_virtualenv,
user=args.user)
user=args.user,
active=args.active)