Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal site search downloader #13

Merged
merged 3 commits into from
Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions dags/ga_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@
'start_date': yesterday,
}


with models.DAG(
'ga_benchmark',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
project_id = models.Variable.get('GCP_PROJECT','dta-ga-bigquery')
project_id = models.Variable.get('GCP_PROJECT', 'dta-ga-bigquery')

view_id = '69211100'
timestamp = '20190425'
temp_table = 'benchmark_%s_%s' % (view_id, timestamp)
query = """
CREATE TABLE `{{params.project_id}}.tmp.{{ params.temp_table }}`
Expand Down Expand Up @@ -58,6 +55,7 @@
export_benchmark_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
task_id='export_benchmark_to_gcs',
source_project_dataset_table="%s.tmp.%s" % (project_id, temp_table),
destination_cloud_storage_uris=["gs://us-central1-maxious-airflow-64b78389-bucket/data/%s.csv" % (temp_table,)],
destination_cloud_storage_uris=["gs://%s/data/%s.csv" % (
models.Variable.get('AIRFLOW_BUCKET', 'us-east1-dta-airflow-b3415db4-bucket'), temp_table)],
export_format='CSV')
query_benchmark >> export_benchmark_to_gcs
50 changes: 27 additions & 23 deletions dags/ga_daily_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import glob
import os


from airflow import models
from airflow.operators import python_operator
from airflow.contrib.operators import slack_webhook_operator
Expand All @@ -17,10 +16,10 @@
'start_date': datetime.datetime(2019, 4, 26),
# http://airflow.apache.org/_api/airflow/contrib/operators/dataflow_operator/index.html
'dataflow_default_options': {
'project': models.Variable.get('GCP_PROJECT','dta-ga-bigquery'),
'project': models.Variable.get('GCP_PROJECT', 'dta-ga-bigquery'),
'region': 'us-central1',
'zone': 'us-central1-b',
'tempLocation': 'gs://staging.%s.appspot.com/' % models.Variable.get('GCP_PROJECT','dta-ga-bigquery')
'tempLocation': 'gs://staging.%s.appspot.com/' % models.Variable.get('GCP_PROJECT', 'dta-ga-bigquery')
}
}

Expand All @@ -32,49 +31,49 @@
def combine_tally():
from tablib import Dataset
data = Dataset()
for f in glob.glob(DATA_DIR+'tally_69211100_20190425.csv-*'):
for f in glob.glob(DATA_DIR + 'tally_69211100_20190425.csv-*'):
d = Dataset().load(open(f, 'rt').read())
for row in d:
data.append(row)

with open(DATA_DIR+'tally_69211100_20190425.csv', 'wt') as f:
with open(DATA_DIR + 'tally_69211100_20190425.csv', 'wt') as f:
f.write('path,hits\n')
f.write(data.csv)


def generate_plotly_chart():
from tablib import Dataset

df = Dataset().load(open(DATA_DIR+'tally_69211100_20190425.csv', 'r').read()).df.sort_values(by=['hits'])
df = Dataset().load(open(DATA_DIR + 'tally_69211100_20190425.csv', 'r').read()).df.sort_values(by=['hits'])
df = df[df['hits'] > 30]

import plotly
import plotly.graph_objs as go
plotly.offline.plot({
"data": [go.Bar(x=df.path, y=df.hits)]}, filename=DATA_DIR+"temp-plot.html", auto_open=False)
"data": [go.Bar(x=df.path, y=df.hits)]}, filename=DATA_DIR + "temp-plot.html", auto_open=False)


def generate_graph():

import igraph
g = igraph.Graph()
g.add_vertices(3)
g.add_edges([(0,1), (1,2)])
g.add_edges([(0, 1), (1, 2)])
print(g)
g.write_graphml(DATA_DIR+"social_network.graphml")
g.write_graphml(DATA_DIR + "social_network.graphml")


def find_number_one():
from tablib import Dataset

df = Dataset().load(open(DATA_DIR+'tally_69211100_20190425.csv', 'r').read()).df.sort_values(by=['hits'])
df = Dataset().load(open(DATA_DIR + 'tally_69211100_20190425.csv', 'r').read()).df.sort_values(by=['hits'])

return df.values[-1][0], df.values[-1][1]


def tell_slack(context):
o = slack_webhook_operator.SlackWebhookOperator(task_id="tell_slack", http_conn_id='slack_default',
message="Number one page today is %s (%s hits)" % (find_number_one()))
message="Number one page today is %s (%s hits)" % (
find_number_one()))
return o.execute(context)


Expand All @@ -92,7 +91,9 @@ def tell_slack(context):
# https://stackoverflow.com/questions/52054427/how-to-integrate-apache-airflow-with-slack
tell_slack = slack_webhook_operator.SlackWebhookOperator(task_id="tell_slack", http_conn_id='slack_default',
message="A new report is out: "
"https://storage.cloud.google.com/us-central1-maxious-airflow-64b78389-bucket/data/tally_69211100_20190425.csv")
"https://%s/data/tally_69211100_20190425.csv" % (
models.Variable.get('AIRFLOW_BUCKET',
'us-east1-dta-airflow-b3415db4-bucket')))

generate_graph = python_operator.PythonOperator(
task_id='generate_graph',
Expand All @@ -105,16 +106,19 @@ def tell_slack(context):
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

kubetest = KubernetesPodOperator(
task_id='pod-ex-minimum',
name='pod-ex-minimum',
namespace='default',
image='gcr.io/%s/galileo' % models.Variable.get('GCP_PROJECT','dta-ga-bigquery'),
image_pull_policy="Always",
cmds=['bash', '-c'],
arguments=['gsutil cp gs://%s/data/tally_69211100_20190425.csv . && ' % models.Variable.get('AIRFLOW_BUCKET', 'us-east1-dta-airflow-b3415db4-bucket') +
'gsutil cp gs://%s/dags/r_scripts/csvggplot.R . && ' % models.Variable.get('AIRFLOW_BUCKET', 'us-east1-dta-airflow-b3415db4-bucket') +
'R -f csvggplot.R && '
'gsutil cp tally_69211100_20190425.png gs://%s/data/' % models.Variable.get('AIRFLOW_BUCKET', 'us-east1-dta-airflow-b3415db4-bucket') ],)
task_id='pod-ex-minimum',
name='pod-ex-minimum',
namespace='default',
image='gcr.io/%s/galileo' % models.Variable.get('GCP_PROJECT', 'dta-ga-bigquery'),
image_pull_policy="Always",
cmds=['bash', '-c'],
arguments=['gsutil cp gs://%s/data/tally_69211100_20190425.csv . && ' % models.Variable.get('AIRFLOW_BUCKET',
'us-east1-dta-airflow-b3415db4-bucket') +
'gsutil cp gs://%s/dags/r_scripts/csvggplot.R . && ' % models.Variable.get('AIRFLOW_BUCKET',
'us-east1-dta-airflow-b3415db4-bucket') +
'R -f csvggplot.R && '
'gsutil cp tally_69211100_20190425.png gs://%s/data/' % models.Variable.get('AIRFLOW_BUCKET',
'us-east1-dta-airflow-b3415db4-bucket')], )

benchmark_tally >> combine_tally
combine_tally >> generate_plotly_chart
Expand Down
4 changes: 2 additions & 2 deletions dags/ga_daily_reporter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ def test_dag_import():
if __package__ is None:
import sys
from os import path
sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) )
sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
import ga_daily_reporter as module
else:
from . import ga_daily_reporter as module
unit_testing.assert_has_valid_dag(module)


if __name__ == '__main__':
test_dag_import()
test_dag_import()
11 changes: 7 additions & 4 deletions dags/ga_quarterly_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

def send_report():
datestamp = datetime.datetime.now().strftime('%d%b%Y')
report_file = DATA_DIR+'GA360-%s.csv' % datestamp
report_file = DATA_DIR + 'GA360-%s.csv' % datestamp

table = Dataset().load(open(report_file, 'rt').read()).export('df').to_html()

Expand All @@ -47,10 +47,13 @@ def send_report():
image='gcr.io/%s/galileo' % models.Variable.get('GCP_PROJECT', 'dta-ga-bigquery'),
cmds=['bash', '-c'],
image_pull_policy="Always",
arguments=['gsutil cp gs://%s/data/credentials.json . && '% models.Variable.get('AIRFLOW_BUCKET','us-east1-dta-airflow-b3415db4-bucket') +
'gsutil cp gs://%s/dags/r_scripts/extractaccinfo.R . && ' % models.Variable.get('AIRFLOW_BUCKET','us-east1-dta-airflow-b3415db4-bucket') +
arguments=['gsutil cp gs://%s/data/credentials.json . && ' % models.Variable.get('AIRFLOW_BUCKET',
'us-east1-dta-airflow-b3415db4-bucket') +
'gsutil cp gs://%s/dags/r_scripts/extractaccinfo.R . && ' % models.Variable.get('AIRFLOW_BUCKET',
'us-east1-dta-airflow-b3415db4-bucket') +
'R -f extractaccinfo.R && '
'gsutil cp GA360*.csv gs://%s/data/' % models.Variable.get('AIRFLOW_BUCKET','us-east1-dta-airflow-b3415db4-bucket') ])
'gsutil cp GA360*.csv gs://%s/data/' % models.Variable.get('AIRFLOW_BUCKET',
'us-east1-dta-airflow-b3415db4-bucket')])

email_summary = PythonOperator(
task_id='email_summary',
Expand Down
5 changes: 5 additions & 0 deletions dags/galileo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import re


def domain_slug(domain):
return re.sub(r"http(s)|:|\/|www.?|\.", "", domain)
47 changes: 46 additions & 1 deletion dags/galileo/ga.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import tablib

try:
from . import galileo
except ImportError:
Expand Down Expand Up @@ -39,5 +40,49 @@ def generate_accounts_views_index():
f.write(data.csv)


def get_events(name, view_id, category, action):
print('fetching', name, 'for', view_id)
service = galileo.get_service(api_name='analyticsreporting', api_version='v4',
scopes=['https://www.googleapis.com/auth/analytics.readonly'])
response = service.reports().batchGet(
body={
'reportRequests': [
{
'viewId': view_id,
'dateRanges': [{'startDate': '30daysAgo', 'endDate': 'today'}],
'metrics': [{'expression': 'ga:totalEvents'}],
'dimensions': [{'name': 'ga:eventLabel'}],
'orderBys': [{"fieldName": 'ga:totalEvents', "sortOrder": "DESCENDING"}],
'filtersExpression': 'ga:totalEvents>10;ga:eventCategory==' + category + ';ga:eventAction==' + action,
'pageSize': 100000
}]
}
).execute()
result = []
for row in response.get('reports', [])[0].get('data', {}).get('rows', []):
if row:
# print(row['dimensions'][0], row['metrics'][0]['values'][0])
result.append({"query": row['dimensions'][0], name: int(row['metrics'][0]['values'][0])})

return result


if __name__ == '__main__':
generate_accounts_views_index()
# generate_accounts_views_index()

searches = get_events('impressions', '114274207', "ElasticSearch-Results", "Successful Search")
search_clicks = get_events('clicks', '114274207', "ElasticSearch-Results Clicks", "Page Result Click")
from collections import defaultdict

d = defaultdict(dict)
for l in (searches, search_clicks):
for elem in l:
d[elem['query'].lower()].update(elem)
data = tablib.Dataset(headers=['query', 'impressions', 'clicks'])
for l in d.values():
data.append((l['query'], l.get('impressions'), l.get('clicks')))
import datetime

with open(galileo.DATA_DIR + 'internalsearch_114274207_' + datetime.datetime.now().strftime('%Y%m%d') + '.csv',
'wt') as f:
f.write(data.csv)
8 changes: 6 additions & 2 deletions dags/galileo/galileo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@


def get_service(api_name, api_version, scopes):
credentials = ServiceAccountCredentials.from_json_keyfile_name(
DATA_DIR + '/test-credentials.json', scopes=scopes)
if scopes == ['https://www.googleapis.com/auth/webmasters.readonly']:
credentials = ServiceAccountCredentials.from_json_keyfile_name(
DATA_DIR + '/test-credentials.json', scopes=scopes)
else:
credentials = ServiceAccountCredentials.from_json_keyfile_name(
DATA_DIR + '/credentials.json', scopes=scopes)

# Build the service object.
service = build(api_name, api_version, credentials=credentials)
Expand Down
13 changes: 7 additions & 6 deletions dags/galileo/searchconsole.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import tablib
import datetime
import os

try:
from . import galileo
except ImportError:
import galileo


def generate_search_query_report(property_uri,
days=10,
end_date=datetime.date.today()):
def generate_web_search_query_report(property_uri,
days=10,
end_date=datetime.date.today()):
# Authenticate and construct service.
service = galileo.get_service(
api_name='webmasters',
Expand Down Expand Up @@ -63,9 +64,9 @@ def generate_search_query_report(property_uri,
page_start += 25000
else:
print("done ", property_uri)
if not os.path.isdir(galileo.DATA_DIR+'/searchqueries'):
os.mkdir(galileo.DATA_DIR+'/searchqueries')
with open(galileo.DATA_DIR + '/searchqueries/{}-searchqueries-{}-{}.csv'.format(
if not os.path.isdir(galileo.DATA_DIR + '/searchqueries'):
os.mkdir(galileo.DATA_DIR + '/searchqueries')
with open(galileo.DATA_DIR + '/searchqueries/{}_websearch_{}_{}.csv'.format(
galileo.domain_slug(property_uri),
data_start_date.replace('-', ''),
data_end_date.replace('-', ''),
Expand Down
2 changes: 1 addition & 1 deletion dags/galileo/unit_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ def assert_has_valid_dag(module):

if no_dag_found:
raise AssertionError('module does not contain a valid DAG')
# [END composer_dag_unit_testing]
# [END composer_dag_unit_testing]
2 changes: 1 addition & 1 deletion dags/observatory.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@
'cf v3-push observatory-blue &&'
'cf map-route observatory-blue apps.y.cld.gov.au -n observatory &&'
'cf unmap-route observatory-blue apps.y.cld.gov.au -n observatory-green'
.format(GCS_BUCKET=GCS_BUCKET, HTPASSWD=htpasswd)])
.format(GCS_BUCKET=GCS_BUCKET, HTPASSWD=htpasswd)])
7 changes: 4 additions & 3 deletions dags/pipelines/benchmark_tally.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://us-central1-maxious-airflow-64b78389-bucket/data/benchmark_69211100_20190425.csv',
#'../../data/benchmark_69211100_20190425.csv',
# '../../data/benchmark_69211100_20190425.csv',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://us-central1-maxious-airflow-64b78389-bucket/data/tally_69211100_20190425.csv',
#'../../data/tally_69211100_20190425.csv',
# '../../data/tally_69211100_20190425.csv',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)

Expand Down Expand Up @@ -57,4 +58,4 @@ def format_result(path_count):

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
run()
2 changes: 1 addition & 1 deletion dags/r_scripts/csvggplot.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ library(tidyverse)
d <- read.csv("tally_69211100_20190425.csv")
d <- filter(d, hits > 1)
d$path = str_wrap(d$path, width = 50)
p <- ggplot(d, aes(y=path, x=hits)) +geom_point()
p <- ggplot(d, aes(y = path, x = hits)) + geom_point()
png("tally_69211100_20190425.png")
print(p)
dev.off()
6 changes: 3 additions & 3 deletions dags/r_scripts/extractaccinfo.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ options(googleAuthR.scopes.selected = "https://www.googleapis.com/auth/analytics
gar_auth_service("credentials.json")

gg <- ga_account_list() %>%
select(accountId, internalWebPropertyId,websiteUrl, webPropertyId, level, type, viewId, viewName) %>%
filter(level=="PREMIUM" & type == "WEB") %>%
select(accountId, internalWebPropertyId, websiteUrl, webPropertyId, level, type, viewId, viewName) %>%
filter(level == "PREMIUM" & type == "WEB") %>%
distinct(webPropertyId, .keep_all = TRUE)

name <- paste0("GA360-", format(Sys.time(), '%d%b%Y'), ".csv")
write.csv(gg,name)
write.csv(gg, name)
Loading