diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 47a61c44a00cf..4567edf34aa4d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -63,7 +63,7 @@ Install development requirements: Tests can then be run with: - ./rununittests.sh + ./run_unit_tests.sh Lint the project with: diff --git a/TODO.md b/TODO.md index 9576d4a9274b0..9e042485b5503 100644 --- a/TODO.md +++ b/TODO.md @@ -25,7 +25,7 @@ TODO * Add queued_dttm to task_instance table #### Wishlist -* Support for cron like synthax (0 * * * ) using croniter library +* Support for cron like syntax (0 * * * ) using croniter library * Pause flag at the task level * Task callbacks as tasks? * Increase unit test coverage diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index bed2386a35208..9cdc5fb1ed402 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -301,7 +301,7 @@ def initdb(args): def resetdb(args): print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN')) if raw_input( - "This will drop exisiting tables if they exist. " + "This will drop existing tables if they exist. " "Proceed? (y/n)").upper() == "Y": logging.basicConfig(level=logging.DEBUG, format=settings.SIMPLE_LOG_FORMAT) @@ -365,9 +365,9 @@ def get_parser(): "-t", "--task_regex", help="The regex to filter specific task_ids to clear (optional)") parser_clear.add_argument( - "-s", "--start_date", help="Overide start_date YYYY-MM-DD") + "-s", "--start_date", help="Override start_date YYYY-MM-DD") parser_clear.add_argument( - "-e", "--end_date", help="Overide end_date YYYY-MM-DD") + "-e", "--end_date", help="Override end_date YYYY-MM-DD") ht = "Include upstream tasks" parser_clear.add_argument( "-u", "--upstream", help=ht, action="store_true") @@ -485,7 +485,7 @@ def get_parser(): parser_list_dags = subparsers.add_parser('list_dags', help=ht) parser_list_dags.set_defaults(func=list_dags) - ht = "List the tasks whithin a DAG" + ht = "List the tasks within a DAG" parser_list_tasks = subparsers.add_parser('list_tasks', help=ht) parser_list_tasks.add_argument( "-t", "--tree", help="Tree view", action="store_true") diff --git a/airflow/configuration.py b/airflow/configuration.py index 3e306304ca3ba..7771cbd4d9c17 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -63,7 +63,7 @@ [webserver] # The base url of your website as airflow cannot guess what domain or -# cname you are using. This is use in autamated emails that +# cname you are using. This is use in automated emails that # airflow sends to point links to the right web server base_url = http://localhost:8080 @@ -104,7 +104,7 @@ # visible from the main web server to connect into the workers. worker_log_server_port = 8793 -# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentaly +# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally # a sqlalchemy database. Refer to the Celery documentation for more # information. broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow @@ -214,7 +214,7 @@ def mkdir_p(path): ''' Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using -"~/airflow" and "~/airflow/airflow.cfg" repectively as defaults. +"~/airflow" and "~/airflow/airflow.cfg" respectively as defaults. ''' if 'AIRFLOW_HOME' not in os.environ: @@ -238,14 +238,14 @@ def mkdir_p(path): it is missing. The right way to change your configuration is to alter your configuration file, not this code. ''' - logging.info("Createing new config file in: " + AIRFLOW_CONFIG) + logging.info("Creating new config file in: " + AIRFLOW_CONFIG) f = open(AIRFLOW_CONFIG, 'w') f.write(DEFAULT_CONFIG.format(**locals())) f.close() TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg' if not os.path.isfile(TEST_CONFIG_FILE): - logging.info("Createing new config file in: " + TEST_CONFIG_FILE) + logging.info("Creating new config file in: " + TEST_CONFIG_FILE) f = open(TEST_CONFIG_FILE, 'w') f.write(TEST_CONFIG.format(**locals())) f.close() diff --git a/airflow/default_login.py b/airflow/default_login.py index b1cd105562be7..82f7d5c53ace8 100644 --- a/airflow/default_login.py +++ b/airflow/default_login.py @@ -1,5 +1,5 @@ ''' -Override this file to handle your authenticatin / login. +Override this file to handle your authenticating / login. Copy and alter this file and put in your PYTHONPATH as airflow_login.py, the new module will override this one. diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index cce1d0e4efec1..c7c7b8bdf7992 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -15,10 +15,12 @@ dag = DAG(dag_id='example_python_operator', default_args=args) + def my_sleeping_function(random_base): - '''This is a function that will run whithin the DAG execution''' + '''This is a function that will run within the DAG execution''' time.sleep(random_base) + def print_context(ds, **kwargs): pprint(kwargs) print(ds) diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index aea4a9b1e420c..f1ffef6e030c1 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -43,7 +43,7 @@ task_id='templated', depends_on_past=False, bash_command=templated_command, - params={'my_param': 'Paramater I passed in'}, + params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index e69c7b654c494..1c85b7eb3f8a9 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -15,7 +15,7 @@ def __init__(self, parallelism=PARALLELISM): :param parallelism: how many jobs should run at one time. Set to ``0`` for infinity - :type paralllelism: int + :type parallelism: int """ self.parallelism = parallelism self.queued_tasks = {} diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index 56b85653a1c54..98ebcfe2cc49a 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -282,7 +282,7 @@ def load_file(self, filename, :type key: str :param bucket_name: Name of the bucket in which to store the file :type bucket_name: str - :param replace: A flag to decide whther or not to overwrite the key + :param replace: A flag to decide whether or not to overwrite the key if it already exists :type replace: bool """ @@ -296,4 +296,3 @@ def load_file(self, filename, key_size = key_obj.set_contents_from_filename(filename, replace=replace) logging.info("The key {key} now contains" " {key_size} bytes".format(**locals())) - diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 6af005573e08e..133a373c0fe80 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -216,7 +216,7 @@ def get_partitions( ''' Returns a list of all partitions in a table. Works only for tables with less than 32767 (java short max val). - For subpartitionned table, the number might easily exceed this. + For subpartitioned table, the number might easily exceed this. >>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' @@ -229,7 +229,7 @@ def get_partitions( self.metastore._oprot.trans.open() table = self.metastore.get_table(dbname=schema, tbl_name=table_name) if len(table.partitionKeys) == 0: - raise Exception("The table isn't partitionned") + raise Exception("The table isn't partitioned") else: if filter: parts = self.metastore.get_partitions_by_filter( @@ -246,7 +246,7 @@ def get_partitions( def max_partition(self, schema, table_name, field=None, filter=None): ''' Returns the maximum value for all partitions in a table. Works only - for tables that have a single partition key. For subpartitionned + for tables that have a single partition key. For subpartitioned table, we recommend using signal tables. >>> hh = HiveMetastoreHook() diff --git a/airflow/jobs.py b/airflow/jobs.py index 20ec6efb7feeb..6596b707e3def 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -108,7 +108,7 @@ def heartbeat_callback(self): def heartbeat(self): ''' - Heartbeats update the job's entry in the the database with a timestamp + Heartbeats update the job's entry in the database with a timestamp for the latest_heartbeat and allows for the job to be killed externally. This allows at the system level to monitor what is actually active. @@ -172,12 +172,12 @@ def run(self): statsd.incr(self.__class__.__name__.lower()+'_end', 1, 1) def _execute(self): - raise NotImplemented("This method needs to be overriden") + raise NotImplemented("This method needs to be overridden") class SchedulerJob(BaseJob): """ - This SchedulerJob runs indefinetly and constantly schedules the jobs + This SchedulerJob runs indefinitely and constantly schedules the jobs that are ready to run. It figures out the latest runs for each task and see if the dependencies for the next schedules are met. If so it triggers the task instance. It does this for each task @@ -216,8 +216,8 @@ def __init__( def process_dag(self, dag, executor): """ - This moethod schedules a single DAG by looking at the latest - run for eachtask and attempting to schedule the following run. + This method schedules a single DAG by looking at the latest + run for each task and attempting to schedule the following run. As multiple schedulers may be running for redundancy, this function takes a lock on the DAG and timestamps the last run @@ -404,7 +404,7 @@ def signal_handler(signum, frame): except Exception as e: logging.exception(e) logging.debug( - "Done qeuing tasks, calling the executor's heartbeat") + "Done queuing tasks, calling the executor's heartbeat") try: # We really just want the scheduler to never ever stop. executor.heartbeat() @@ -467,7 +467,7 @@ def _execute(self): executor = self.executor executor.start() - # Build a list of all intances to run + # Build a list of all instances to run tasks_to_run = {} failed = [] succeeded = [] diff --git a/airflow/macros/hive.py b/airflow/macros/hive.py index 4556a10c16666..aa7ae025be418 100644 --- a/airflow/macros/hive.py +++ b/airflow/macros/hive.py @@ -63,11 +63,11 @@ def closest_ds_partition( metastore_conn_id='metastore_default'): ''' This function finds the date in a list closest to the target date. - An optional paramter can be given to get the closest before or after. + An optional parameter can be given to get the closest before or after. :param table: A hive table name :type table: str - :param ds: A datestamp ``%Y-%m-%d`` i.e. ``yyyy-mm-dd`` + :param ds: A datestamp ``%Y-%m-%d`` e.g. ``yyyy-mm-dd`` :type ds: datetime.date list :param before: closest before (True), after (False) or either side of ds :type before: bool or None diff --git a/airflow/models.py b/airflow/models.py index 9fc8be4845a21..c798e153342e1 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -129,7 +129,7 @@ def get_dag(self, dag_id): def process_file(self, filepath, only_if_updated=True, safe_mode=True): """ Given a path to a python module, this method imports the module and - look for dag objects whithin it. + look for dag objects within it. """ try: # This failed before in what may have been a git sync @@ -1349,7 +1349,7 @@ class DAG(object): :type default_args: dict :param params: a dictionary of DAG level parameters that are made accessible in templates, namespaced under `params`. These - params can be overriden at the task level. + params can be overridden at the task level. :type params: dict """ @@ -1430,8 +1430,8 @@ def resolve_template_files(self): def crawl_for_tasks(objects): """ Typically called at the end of a script by passing globals() as a - parameter. This allows to not explicitely add every single task to the - dag explicitely. + parameter. This allows to not explicitly add every single task to the + dag explicitly. """ raise NotImplemented("") @@ -1504,7 +1504,7 @@ def clear( TI = TaskInstance tis = session.query(TI) if include_subdags: - # Creafting the right filter for dag_id and task_ids combo + # Crafting the right filter for dag_id and task_ids combo conditions = [] for dag in self.subdags + [self]: conditions.append( diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 4a19fa6db918f..c25ad8c7febcc 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -18,7 +18,7 @@ class HiveOperator(BaseOperator): ${var} gets translated into jina-type templating {{ var }} :type hiveconf_jinja_translate: boolean :param script_begin_tag: If defined, the operator will get rid of the - part of the script before the first occurence of `script_begin_tag` + part of the script before the first occurrence of `script_begin_tag` :type script_begin_tag: str """ diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py index 6e7a73dd41428..0b757ebc1689f 100644 --- a/airflow/operators/mysql_operator.py +++ b/airflow/operators/mysql_operator.py @@ -12,7 +12,7 @@ class MySqlOperator(BaseOperator): :param mysql_conn_id: reference to a specific mysql database :type mysql_conn_id: string :param sql: the sql code to be executed - :type sql: string or string pointing to a template file. Fil must have + :type sql: string or string pointing to a template file. File must have a '.sql' extensions. """ diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index a0d7fbf71e667..69442e18d25cb 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -15,7 +15,7 @@ class MySqlToHiveTransfer(BaseOperator): MySQL, stores the file locally before loading it into a Hive table. If the ``create`` or ``recreate`` arguments are set to ``True``, a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated. - Hive data types are inferred from the cursors's metadata. + Hive data types are inferred from the cursor's metadata. Note that the table generated in Hive uses ``STORED AS textfile`` which isn't the most efficient serialization format. If a @@ -41,7 +41,7 @@ class MySqlToHiveTransfer(BaseOperator): :type delimiter: str :param mysql_conn_id: source mysql connection :type mysql_conn_id: str - :param hive_conn_id: desctination hive connection + :param hive_conn_id: destination hive connection :type hive_conn_id: str """ diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py index d4edc2b5be67e..7f229074d5a85 100644 --- a/airflow/operators/postgres_operator.py +++ b/airflow/operators/postgres_operator.py @@ -12,7 +12,7 @@ class PostgresOperator(BaseOperator): :param postgres_conn_id: reference to a specific postgres database :type postgres_conn_id: string :param sql: the sql code to be executed - :type sql: string or string pointing to a template file. Fil must have + :type sql: string or string pointing to a template file. File must have a '.sql' extensions. """ diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 87022d19cce83..67d81e83bb884 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -10,7 +10,7 @@ class PythonOperator(BaseOperator): :param python_callable: A reference to an object that is callable :type python_callable: python callable - :param op_kwargs: a dictionnary of keyword arguments that will get unpacked + :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function :type op_kwargs: dict :param op_args: a list of positional arguments that will get unpacked when diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index da641bec33c8f..7793fedc0d302 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -12,7 +12,7 @@ class S3ToHiveTransfer(BaseOperator): stores the file locally before loading it into a Hive table. If the ``create`` or ``recreate`` arguments are set to ``True``, a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated. - Hive data types are inferred from the cursors's metadata from. + Hive data types are inferred from the cursor's metadata from. Note that the table generated in Hive uses ``STORED AS textfile`` which isn't the most efficient serialization format. If a @@ -50,7 +50,7 @@ class S3ToHiveTransfer(BaseOperator): :type delimiter: str :param s3_conn_id: source s3 connection :type s3_conn_id: str - :param hive_conn_id: desctination hive connection + :param hive_conn_id: destination hive connection :type hive_conn_id: str """ @@ -98,7 +98,8 @@ def execute(self, context): s3_key_object = self.s3.get_wildcard_key(self.s3_key) else: if not self.s3.check_for_key(self.s3_key): - raise Exception("The key {0} does not exists".format(self.s3_key)) + raise Exception( + "The key {0} does not exists".format(self.s3_key)) s3_key_object = self.s3.get_key(self.s3_key) with NamedTemporaryFile("w") as f: logging.info("Dumping S3 key {0} contents to local" diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index b3d7fb024964d..b7076cdf0417b 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -58,7 +58,7 @@ class SqlSensor(BaseSensorOperator): Runs a sql statement until a criteria is met. It will keep trying until sql returns no row, or if the first cell in (0, '0', ''). - :param conn_id: The connection to run the sensor agains + :param conn_id: The connection to run the sensor against :type conn_id: string :param sql: The sql to run. To pass, it needs to return at least one cell that contains a non-zero / empty string value. @@ -142,7 +142,7 @@ class HivePartitionSensor(BaseSensorOperator): notation (my_database.my_table) :type table: string :param partition: The partition clause to wait for. This is passed as - is to the Metastor Thrift client "get_partitions_by_filter" method, + is to the Metastore Thrift client "get_partitions_by_filter" method, and apparently supports SQL like notation as in `ds='2015-01-01' AND type='value'` and > < sings as in "ds>=2015-01-01" :type partition: string @@ -218,8 +218,8 @@ class S3KeySensor(BaseSensorOperator): :type bucket_key: str :param bucket_name: Name of the S3 bucket :type bucket_name: str - :param wildcard_match: whether the bucket_key should be interpreted as a Unix - wildcard pattern + :param wildcard_match: whether the bucket_key should be interpreted as a + Unix wildcard pattern :type wildcard_match: bool """ template_fields = ('bucket_key', 'bucket_name') diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py index 5abe539184b75..55b9ab430dffa 100644 --- a/airflow/operators/sqlite_operator.py +++ b/airflow/operators/sqlite_operator.py @@ -12,7 +12,7 @@ class SqliteOperator(BaseOperator): :param sqlite_conn_id: reference to a specific sqlite database :type sqlite_conn_id: string :param sql: the sql code to be executed - :type sql: string or string pointing to a template file. Fil must have + :type sql: string or string pointing to a template file. File must have a '.sql' extensions. """ diff --git a/airflow/settings.py b/airflow/settings.py index 0f9ce3c39f94e..7c234cca0340f 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -24,7 +24,7 @@ engine_args = {} if 'sqlite' not in SQL_ALCHEMY_CONN: - # Engine args not supported by sqllite + # Engine args not supported by sqlite engine_args['pool_size'] = 50 engine_args['pool_recycle'] = 3600 diff --git a/airflow/utils.py b/airflow/utils.py index 38bec60b1227b..201ee8849469b 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -356,7 +356,7 @@ def send_MIME_email(e_from, e_to, mime_msg): def import_module_attrs(parent_module_globals, module_attrs_dict): ''' - Attemps to import a set of modules and specified attributes in the + Attempts to import a set of modules and specified attributes in the form of a dictionary. The attributes are copied in the parent module's namespace. The function returns a list of attributes names that can be affected to __all__. diff --git a/airflow/www/app.py b/airflow/www/app.py index bf9cda6064eea..72a921c092ec0 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -74,7 +74,7 @@ def decorated_function(*args, **kwargs): ): return f(*args, **kwargs) else: - flash("This page requires superuser priviledges", "error") + flash("This page requires superuser privileges", "error") return redirect(url_for('admin.index')) return decorated_function @@ -91,7 +91,7 @@ def decorated_function(*args, **kwargs): ): return f(*args, **kwargs) else: - flash("This page requires data profiling priviledges", "error") + flash("This page requires data profiling privileges", "error") return redirect(url_for('admin.index')) return decorated_function @@ -1747,6 +1747,7 @@ def fqueued_slots(v, c, m, p): '&flt2_state_equals=queued') return Markup("{1}".format(url, m.queued_slots())) + class PoolModelView(SuperUserMixin, ModelView): column_list = ('pool', 'slots', 'used_slots', 'queued_slots') column_formatters = dict( diff --git a/airflow/www/templates/airflow/confirm.html b/airflow/www/templates/airflow/confirm.html index b4ebac4ec5986..68fd6d1690590 100644 --- a/airflow/www/templates/airflow/confirm.html +++ b/airflow/www/templates/airflow/confirm.html @@ -10,7 +10,7 @@