Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc spelling updates #5

Merged
merged 4 commits into from
Jun 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Install development requirements:

Tests can then be run with:

./rununittests.sh
./run_unit_tests.sh

Lint the project with:

Expand Down
2 changes: 1 addition & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ TODO
* Add queued_dttm to task_instance table

#### Wishlist
* Support for cron like synthax (0 * * * ) using croniter library
* Support for cron like syntax (0 * * * ) using croniter library
* Pause flag at the task level
* Task callbacks as tasks?
* Increase unit test coverage
8 changes: 4 additions & 4 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def initdb(args):
def resetdb(args):
print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN'))
if raw_input(
"This will drop exisiting tables if they exist. "
"This will drop existing tables if they exist. "
"Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=logging.DEBUG,
format=settings.SIMPLE_LOG_FORMAT)
Expand Down Expand Up @@ -365,9 +365,9 @@ def get_parser():
"-t", "--task_regex",
help="The regex to filter specific task_ids to clear (optional)")
parser_clear.add_argument(
"-s", "--start_date", help="Overide start_date YYYY-MM-DD")
"-s", "--start_date", help="Override start_date YYYY-MM-DD")
parser_clear.add_argument(
"-e", "--end_date", help="Overide end_date YYYY-MM-DD")
"-e", "--end_date", help="Override end_date YYYY-MM-DD")
ht = "Include upstream tasks"
parser_clear.add_argument(
"-u", "--upstream", help=ht, action="store_true")
Expand Down Expand Up @@ -485,7 +485,7 @@ def get_parser():
parser_list_dags = subparsers.add_parser('list_dags', help=ht)
parser_list_dags.set_defaults(func=list_dags)

ht = "List the tasks whithin a DAG"
ht = "List the tasks within a DAG"
parser_list_tasks = subparsers.add_parser('list_tasks', help=ht)
parser_list_tasks.add_argument(
"-t", "--tree", help="Tree view", action="store_true")
Expand Down
10 changes: 5 additions & 5 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is use in autamated emails that
# cname you are using. This is use in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080

Expand Down Expand Up @@ -104,7 +104,7 @@
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentaly
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
Expand Down Expand Up @@ -214,7 +214,7 @@ def mkdir_p(path):

'''
Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
"~/airflow" and "~/airflow/airflow.cfg" repectively as defaults.
"~/airflow" and "~/airflow/airflow.cfg" respectively as defaults.
'''

if 'AIRFLOW_HOME' not in os.environ:
Expand All @@ -238,14 +238,14 @@ def mkdir_p(path):
it is missing. The right way to change your configuration is to alter your
configuration file, not this code.
'''
logging.info("Createing new config file in: " + AIRFLOW_CONFIG)
logging.info("Creating new config file in: " + AIRFLOW_CONFIG)
f = open(AIRFLOW_CONFIG, 'w')
f.write(DEFAULT_CONFIG.format(**locals()))
f.close()

TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg'
if not os.path.isfile(TEST_CONFIG_FILE):
logging.info("Createing new config file in: " + TEST_CONFIG_FILE)
logging.info("Creating new config file in: " + TEST_CONFIG_FILE)
f = open(TEST_CONFIG_FILE, 'w')
f.write(TEST_CONFIG.format(**locals()))
f.close()
Expand Down
2 changes: 1 addition & 1 deletion airflow/default_login.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'''
Override this file to handle your authenticatin / login.
Override this file to handle your authenticating / login.

Copy and alter this file and put in your PYTHONPATH as airflow_login.py,
the new module will override this one.
Expand Down
4 changes: 3 additions & 1 deletion airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

dag = DAG(dag_id='example_python_operator', default_args=args)


def my_sleeping_function(random_base):
'''This is a function that will run whithin the DAG execution'''
'''This is a function that will run within the DAG execution'''
time.sleep(random_base)


def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Paramater I passed in'},
params={'my_param': 'Parameter I passed in'},
dag=dag)

t2.set_upstream(t1)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, parallelism=PARALLELISM):

:param parallelism: how many jobs should run at one time. Set to
``0`` for infinity
:type paralllelism: int
:type parallelism: int
"""
self.parallelism = parallelism
self.queued_tasks = {}
Expand Down
3 changes: 1 addition & 2 deletions airflow/hooks/S3_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def load_file(self, filename,
:type key: str
:param bucket_name: Name of the bucket in which to store the file
:type bucket_name: str
:param replace: A flag to decide whther or not to overwrite the key
:param replace: A flag to decide whether or not to overwrite the key
if it already exists
:type replace: bool
"""
Expand All @@ -296,4 +296,3 @@ def load_file(self, filename,
key_size = key_obj.set_contents_from_filename(filename, replace=replace)
logging.info("The key {key} now contains"
" {key_size} bytes".format(**locals()))

6 changes: 3 additions & 3 deletions airflow/hooks/hive_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def get_partitions(
'''
Returns a list of all partitions in a table. Works only
for tables with less than 32767 (java short max val).
For subpartitionned table, the number might easily exceed this.
For subpartitioned table, the number might easily exceed this.

>>> hh = HiveMetastoreHook()
>>> t = 'static_babynames_partitioned'
Expand All @@ -229,7 +229,7 @@ def get_partitions(
self.metastore._oprot.trans.open()
table = self.metastore.get_table(dbname=schema, tbl_name=table_name)
if len(table.partitionKeys) == 0:
raise Exception("The table isn't partitionned")
raise Exception("The table isn't partitioned")
else:
if filter:
parts = self.metastore.get_partitions_by_filter(
Expand All @@ -246,7 +246,7 @@ def get_partitions(
def max_partition(self, schema, table_name, field=None, filter=None):
'''
Returns the maximum value for all partitions in a table. Works only
for tables that have a single partition key. For subpartitionned
for tables that have a single partition key. For subpartitioned
table, we recommend using signal tables.

>>> hh = HiveMetastoreHook()
Expand Down
14 changes: 7 additions & 7 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def heartbeat_callback(self):

def heartbeat(self):
'''
Heartbeats update the job's entry in the the database with a timestamp
Heartbeats update the job's entry in the database with a timestamp
for the latest_heartbeat and allows for the job to be killed
externally. This allows at the system level to monitor what is
actually active.
Expand Down Expand Up @@ -172,12 +172,12 @@ def run(self):
statsd.incr(self.__class__.__name__.lower()+'_end', 1, 1)

def _execute(self):
raise NotImplemented("This method needs to be overriden")
raise NotImplemented("This method needs to be overridden")


class SchedulerJob(BaseJob):
"""
This SchedulerJob runs indefinetly and constantly schedules the jobs
This SchedulerJob runs indefinitely and constantly schedules the jobs
that are ready to run. It figures out the latest runs for each
task and see if the dependencies for the next schedules are met.
If so it triggers the task instance. It does this for each task
Expand Down Expand Up @@ -216,8 +216,8 @@ def __init__(

def process_dag(self, dag, executor):
"""
This moethod schedules a single DAG by looking at the latest
run for eachtask and attempting to schedule the following run.
This method schedules a single DAG by looking at the latest
run for each task and attempting to schedule the following run.

As multiple schedulers may be running for redundancy, this
function takes a lock on the DAG and timestamps the last run
Expand Down Expand Up @@ -404,7 +404,7 @@ def signal_handler(signum, frame):
except Exception as e:
logging.exception(e)
logging.debug(
"Done qeuing tasks, calling the executor's heartbeat")
"Done queuing tasks, calling the executor's heartbeat")
try:
# We really just want the scheduler to never ever stop.
executor.heartbeat()
Expand Down Expand Up @@ -467,7 +467,7 @@ def _execute(self):
executor = self.executor
executor.start()

# Build a list of all intances to run
# Build a list of all instances to run
tasks_to_run = {}
failed = []
succeeded = []
Expand Down
4 changes: 2 additions & 2 deletions airflow/macros/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def closest_ds_partition(
metastore_conn_id='metastore_default'):
'''
This function finds the date in a list closest to the target date.
An optional paramter can be given to get the closest before or after.
An optional parameter can be given to get the closest before or after.

:param table: A hive table name
:type table: str
:param ds: A datestamp ``%Y-%m-%d`` i.e. ``yyyy-mm-dd``
:param ds: A datestamp ``%Y-%m-%d`` e.g. ``yyyy-mm-dd``
:type ds: datetime.date list
:param before: closest before (True), after (False) or either side of ds
:type before: bool or None
Expand Down
10 changes: 5 additions & 5 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def get_dag(self, dag_id):
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
"""
Given a path to a python module, this method imports the module and
look for dag objects whithin it.
look for dag objects within it.
"""
try:
# This failed before in what may have been a git sync
Expand Down Expand Up @@ -1349,7 +1349,7 @@ class DAG(object):
:type default_args: dict
:param params: a dictionary of DAG level parameters that are made
accessible in templates, namespaced under `params`. These
params can be overriden at the task level.
params can be overridden at the task level.
:type params: dict
"""

Expand Down Expand Up @@ -1430,8 +1430,8 @@ def resolve_template_files(self):
def crawl_for_tasks(objects):
"""
Typically called at the end of a script by passing globals() as a
parameter. This allows to not explicitely add every single task to the
dag explicitely.
parameter. This allows to not explicitly add every single task to the
dag explicitly.
"""
raise NotImplemented("")

Expand Down Expand Up @@ -1504,7 +1504,7 @@ def clear(
TI = TaskInstance
tis = session.query(TI)
if include_subdags:
# Creafting the right filter for dag_id and task_ids combo
# Crafting the right filter for dag_id and task_ids combo
conditions = []
for dag in self.subdags + [self]:
conditions.append(
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HiveOperator(BaseOperator):
${var} gets translated into jina-type templating {{ var }}
:type hiveconf_jinja_translate: boolean
:param script_begin_tag: If defined, the operator will get rid of the
part of the script before the first occurence of `script_begin_tag`
part of the script before the first occurrence of `script_begin_tag`
:type script_begin_tag: str
"""

Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/mysql_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class MySqlOperator(BaseOperator):
:param mysql_conn_id: reference to a specific mysql database
:type mysql_conn_id: string
:param sql: the sql code to be executed
:type sql: string or string pointing to a template file. Fil must have
:type sql: string or string pointing to a template file. File must have
a '.sql' extensions.
"""

Expand Down
4 changes: 2 additions & 2 deletions airflow/operators/mysql_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MySqlToHiveTransfer(BaseOperator):
MySQL, stores the file locally before loading it into a Hive table.
If the ``create`` or ``recreate`` arguments are set to ``True``,
a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
Hive data types are inferred from the cursors's metadata.
Hive data types are inferred from the cursor's metadata.

Note that the table generated in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
Expand All @@ -41,7 +41,7 @@ class MySqlToHiveTransfer(BaseOperator):
:type delimiter: str
:param mysql_conn_id: source mysql connection
:type mysql_conn_id: str
:param hive_conn_id: desctination hive connection
:param hive_conn_id: destination hive connection
:type hive_conn_id: str
"""

Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/postgres_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class PostgresOperator(BaseOperator):
:param postgres_conn_id: reference to a specific postgres database
:type postgres_conn_id: string
:param sql: the sql code to be executed
:type sql: string or string pointing to a template file. Fil must have
:type sql: string or string pointing to a template file. File must have
a '.sql' extensions.
"""

Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class PythonOperator(BaseOperator):

:param python_callable: A reference to an object that is callable
:type python_callable: python callable
:param op_kwargs: a dictionnary of keyword arguments that will get unpacked
:param op_kwargs: a dictionary of keyword arguments that will get unpacked
in your function
:type op_kwargs: dict
:param op_args: a list of positional arguments that will get unpacked when
Expand Down
7 changes: 4 additions & 3 deletions airflow/operators/s3_to_hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class S3ToHiveTransfer(BaseOperator):
stores the file locally before loading it into a Hive table.
If the ``create`` or ``recreate`` arguments are set to ``True``,
a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
Hive data types are inferred from the cursors's metadata from.
Hive data types are inferred from the cursor's metadata from.

Note that the table generated in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
Expand Down Expand Up @@ -50,7 +50,7 @@ class S3ToHiveTransfer(BaseOperator):
:type delimiter: str
:param s3_conn_id: source s3 connection
:type s3_conn_id: str
:param hive_conn_id: desctination hive connection
:param hive_conn_id: destination hive connection
:type hive_conn_id: str
"""

Expand Down Expand Up @@ -98,7 +98,8 @@ def execute(self, context):
s3_key_object = self.s3.get_wildcard_key(self.s3_key)
else:
if not self.s3.check_for_key(self.s3_key):
raise Exception("The key {0} does not exists".format(self.s3_key))
raise Exception(
"The key {0} does not exists".format(self.s3_key))
s3_key_object = self.s3.get_key(self.s3_key)
with NamedTemporaryFile("w") as f:
logging.info("Dumping S3 key {0} contents to local"
Expand Down
8 changes: 4 additions & 4 deletions airflow/operators/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class SqlSensor(BaseSensorOperator):
Runs a sql statement until a criteria is met. It will keep trying until
sql returns no row, or if the first cell in (0, '0', '').

:param conn_id: The connection to run the sensor agains
:param conn_id: The connection to run the sensor against
:type conn_id: string
:param sql: The sql to run. To pass, it needs to return at least one cell
that contains a non-zero / empty string value.
Expand Down Expand Up @@ -142,7 +142,7 @@ class HivePartitionSensor(BaseSensorOperator):
notation (my_database.my_table)
:type table: string
:param partition: The partition clause to wait for. This is passed as
is to the Metastor Thrift client "get_partitions_by_filter" method,
is to the Metastore Thrift client "get_partitions_by_filter" method,
and apparently supports SQL like notation as in `ds='2015-01-01'
AND type='value'` and > < sings as in "ds>=2015-01-01"
:type partition: string
Expand Down Expand Up @@ -218,8 +218,8 @@ class S3KeySensor(BaseSensorOperator):
:type bucket_key: str
:param bucket_name: Name of the S3 bucket
:type bucket_name: str
:param wildcard_match: whether the bucket_key should be interpreted as a Unix
wildcard pattern
:param wildcard_match: whether the bucket_key should be interpreted as a
Unix wildcard pattern
:type wildcard_match: bool
"""
template_fields = ('bucket_key', 'bucket_name')
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/sqlite_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class SqliteOperator(BaseOperator):
:param sqlite_conn_id: reference to a specific sqlite database
:type sqlite_conn_id: string
:param sql: the sql code to be executed
:type sql: string or string pointing to a template file. Fil must have
:type sql: string or string pointing to a template file. File must have
a '.sql' extensions.
"""

Expand Down
Loading