Skip to content

Commit

Permalink
Create-Workload Improvements: Separate operations and test procedures…
Browse files Browse the repository at this point in the history
… from workload.json (#446)

Signed-off-by: Ian Hoang <[email protected]>
Co-authored-by: Ian Hoang <[email protected]>
  • Loading branch information
IanHoang and Ian Hoang authored Jan 25, 2024
1 parent d81aaf8 commit 5538546
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 75 deletions.
35 changes: 5 additions & 30 deletions osbenchmark/resources/base-workload.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,10 @@
]
}{% endfor %}
],
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": {
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}}
},{% endraw -%}
{% block queries %}{% endblock %}
"operations": [
{% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %}
],
"test_procedures": [
{% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %}
]
}
27 changes: 27 additions & 0 deletions osbenchmark/resources/custom-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{
"name": "wait-until-merges-finish",
"operation-type": "index-stats",
"index": "_all",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
},
"retry-until-success": true,
"include-in-reporting": false
},
{%- block queries -%}
{% for query in custom_queries %}
{
"name": "{{query.name}}",
"operation-type": "{{query['operation-type']}}",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {{query.body | replace("'", '"') }}
}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
14 changes: 0 additions & 14 deletions osbenchmark/resources/custom-query-workload.json.j2

This file was deleted.

64 changes: 64 additions & 0 deletions osbenchmark/resources/custom-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"name": "custom-test-procedures",
"description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},
{
"operation": {
"operation-type": "create-index",
{% raw %}"settings": {{ index_settings | default({}) | tojson }}
{% endraw %}}
},
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"request-params": {
{% raw %}"wait_for_status": "{{ cluster_health | default('green') }}",
{% endraw -%}"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": "index-append",
{% raw -%}"clients": {{ bulk_indexing_clients | default(8) }},
{% endraw -%}
{% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}"
{% endraw -%}},
{
"name": "refresh-after-index",
"operation": "refresh"
},
{
"operation": {
"operation-type": "force-merge",
"request-timeout": 7200{%- if force_merge_max_num_segments is defined %},
"max-num-segments": {{ force_merge_max_num_segments | tojson }}
{%- endif %}
}
},
{
"name": "refresh-after-force-merge",
"operation": "refresh"
},
{
"operation": "wait-until-merges-finish"
},
{%- block queries -%}
{% for query in custom_queries %}
{
"operation":"{{query.name}}",
{% raw -%}
"warmup-iterations": {{ warmup_iterations | default(50) }},
"iterations": {{ iterations | default(100) }},
"target-throughput": {{ target_throughput | default(3) }},
"clients": {{ search_clients | default(1) }}
{% endraw -%}}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
]
}
28 changes: 28 additions & 0 deletions osbenchmark/resources/default-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{
"name": "wait-until-merges-finish",
"operation-type": "index-stats",
"index": "_all",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
},
"retry-until-success": true,
"include-in-reporting": false
},
{
"name": "match-all",
"operation-type": "search",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {
"size": {{match_all_size | default(10)}},
"query": {
"match_all": {}
}
}
}
16 changes: 0 additions & 16 deletions osbenchmark/resources/default-query-workload.json.j2

This file was deleted.

59 changes: 59 additions & 0 deletions osbenchmark/resources/default-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"name": "default-test-procedure",
"description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark. Workload deletes existing indexes, creates indexes, ingests documents, and runs default match-all query.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},
{
"operation": {
"operation-type": "create-index",
{% raw %}"settings": {{ index_settings | default({}) | tojson }}
{% endraw %}}
},
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"request-params": {
{% raw %}"wait_for_status": "{{ cluster_health | default('green') }}",
{% endraw -%}"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": "index-append",
{% raw -%}"clients": {{ bulk_indexing_clients | default(8) }},
{% endraw -%}
{% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}"
{% endraw -%}},
{
"name": "refresh-after-index",
"operation": "refresh"
},
{
"operation": {
"operation-type": "force-merge",
"request-timeout": 7200{%- if force_merge_max_num_segments is defined %},
"max-num-segments": {{ force_merge_max_num_segments | tojson }}
{%- endif %}
}
},
{
"name": "refresh-after-force-merge",
"operation": "refresh"
},
{
"operation": "wait-until-merges-finish"
},{% raw %}
{
"operation": "match-all",
"warmup-iterations": {{ warmup_iterations | default(50) }},
"iterations": {{ iterations | default(100) }},
"target-throughput": {{ target_throughput | default(3) }},
"clients": {{ search_clients | default(1) }}
}{% endraw -%}
]
}
79 changes: 64 additions & 15 deletions osbenchmark/workload_generator/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import logging
import os
import shutil
import json

from opensearchpy import OpenSearchException
Expand All @@ -34,14 +35,6 @@
from osbenchmark.workload_generator import corpus, index
from osbenchmark.utils import io, opts, console


def process_template(templates_path, template_filename, template_vars, output_path):
env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml']))
template = env.get_template(template_filename)

with open(output_path, "w") as f:
f.write(template.render(template_vars))

def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested):
if not docs_were_requested:
return
Expand Down Expand Up @@ -108,35 +101,84 @@ def process_custom_queries(custom_queries):

return data

def write_template(template_vars, templates_path, output_path, template_file):
template = get_template(template_file, templates_path)
with open(output_path, "w") as f:
f.write(template.render(template_vars))

def get_template(template_file, templates_path):
template_file_name = template_file + ".json.j2"

env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml']))

return env.get_template(template_file_name)

def render_templates(workload_path,
operations_path,
test_procedures_path,
templates_path,
template_vars,
custom_queries):
write_template(template_vars, templates_path, workload_path, "base-workload")

if custom_queries:
write_template(template_vars, templates_path, operations_path, "custom-operations")
write_template(template_vars, templates_path, test_procedures_path, "custom-test-procedures")
else:
write_template(template_vars, templates_path, operations_path, "default-operations")
write_template(template_vars, templates_path, test_procedures_path, "default-test-procedures")

def create_workload(cfg):
logger = logging.getLogger(__name__)

# All inputs provided by user
workload_name = cfg.opts("workload", "workload.name")
indices = cfg.opts("generator", "indices")
root_path = cfg.opts("generator", "output.path")
target_hosts = cfg.opts("client", "hosts")
client_options = cfg.opts("client", "options")
number_of_docs = cfg.opts("generator", "number_of_docs")
unprocessed_custom_queries = cfg.opts("workload", "custom_queries")
templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources")

# Process custom queries
custom_queries = process_custom_queries(unprocessed_custom_queries)

logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices)
logger.info("Number of Docs: %s", number_of_docs)

# Initialize client factory
client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT],
client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create()

info = client.info()
console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger)

# Establish output paths directory
output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name))

operations_path = os.path.join(output_path, "operations")
test_procedures_path = os.path.join(output_path, "test_procedures")

if os.path.exists(output_path):
try:
logger.info("Workload already exists. Removing existing workload [%s] in path [%s]", workload_name, output_path)
shutil.rmtree(output_path)
except OSError:
logger.error("Had issues removing existing workload [%s] in path [%s]", workload_name, output_path)

io.ensure_dir(output_path)
io.ensure_dir(operations_path)
io.ensure_dir(test_procedures_path)

# Extract Indices and Corpora
logger.info("Extracting indices and corpora")
indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs)
logger.info("Finished extracting indices and corpora")

if len(indices) == 0:
raise RuntimeError("Failed to extract any indices for workload!")

# Collect all itmes into dictionary
template_vars = {
"workload_name": workload_name,
"indices": indices,
Expand All @@ -147,12 +189,19 @@ def create_workload(cfg):
logger.info("Template Vars: %s", template_vars)

workload_path = os.path.join(output_path, "workload.json")
templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources")

if custom_queries:
process_template(templates_path, "custom-query-workload.json.j2", template_vars, workload_path)
else:
process_template(templates_path, "default-query-workload.json.j2", template_vars, workload_path)
operations_path = os.path.join(operations_path, "default.json")
test_procedures_path = os.path.join(test_procedures_path, "default.json")

# Render all templates
logger.info("Rendering templates")
render_templates(
workload_path,
operations_path,
test_procedures_path,
templates_path,
template_vars,
custom_queries
)

console.println("")
console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}")

0 comments on commit 5538546

Please sign in to comment.