Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSONDecodeError("Expecting ':' delimiter: line 1 column 8069 (char 8068)",) #85113

Closed
nchauhan5 opened this issue Mar 18, 2022 · 1 comment
Closed
Labels
>bug needs:triage Requires assignment of a team area label

Comments

@nchauhan5
Copy link

nchauhan5 commented Mar 18, 2022

Elasticsearch Version

7.17.1

Installed Plugins

kibana

Java Version

openjdk version "17.0.2" 2022-01-18

OS Version

Linux 5.14.0-1027-oem Ubuntu 20.04

Problem Description

I am still seeing this issue with my benchmarking test with ElasticSearch version 7.17.1, python 2.7 elasticsearch library 7.17.1, and bulk updates done for 5 million records through 8 processes using multiprocessing pool.async. One of the processes always returns a JSONDEcodeError when printing ApplyResult.get(). Please note a difference between two results received from elasticsearch client -

Correct result -
{"index":{"_index":"test_index","_type":"_doc","_id":"F1pTnX8B4XBe97ExF-mU","_version":1,"result":"created","_shards":{"total":1,"successful":1,"failed":0},"_seq_no":61538,"_primary_term":1,"status":201}},

Incorrect Result causing the JSONDecodeError -
{"index":{"_index":"test_index","_type""_primary_term":1,"status":201}},

see how the ":" delimiter is missing after "_type" and that's whats causing the issue.

Steps to Reproduce

Create a python multiprocessing app with pool size 8 and 5 million elasticsearch documents with a single value in xrange(5000000) and "test_index" index name.

def create_docs():
cache = []
for i in xrange(0, 5000000):
cache.append({
"_index": "test_index",
"x": i
})
return cache

Divide the records into 8 chunks, one chunk for each of the 8 processes.
slices = numpy.array_split(numpy.array(input_list), 8)

Call the bulk_update function for each of the processes.
pool.apply_async(bulk_insert_apply, args=(x, batch)

Note that the number of records created in elasticsearch is less than 5 million and on further debugging you will see that one of the processes returns a result (using ApplyResult.get()) -
JSONDecodeError("Expecting ':' delimiter: line 1 column 8069 (char 8068)",)

Code (if relevant)

elastic_client = Elasticsearch(host=es_host, port=es_port, maxsize=25)
cpu_count = multiprocessing.cpu_count()

def bulk_insert_apply(i, actions):
print("Job ID: {}".format(i))
res = helpers.bulk(elastic_client, actions)
print("Batch: {} Test bulk indexing finished with result: {}".format(i, res))
# elastic_client.transport.close()

def test_streaming_bulk_with_pool():
index_configs = OrderedDict()
index_configs["settings"] = OrderedDict()
index_configs["settings"]["index"] = OrderedDict()
index_configs["settings"]["index"]["refresh_interval"] = -1
index_configs["settings"]["index"]["number_of_replicas"] = 0

start = timeit.default_timer()
logger.info("Starting elasticsearch Test index bulk create job .....")
try:
    exists = elastic_client.indices.exists(index="test_index")
    # Since this is a baseline index update job, it first deletes Test index if they exist and then create them again from scratch.
    if exists:
        logger.info("Deleting existing Test index and documents within to create again from scratch .....")
        elastic_client.indices.delete(index="test_index", ignore=[400, 404])

    logger.info("Creating Test index .......")
    # Set the index setting index.refresh_interval to -1 second which will not refresh indexes while they are getting created.
    elastic_client.indices.create(index='test_index', body=index_configs, ignore=400)

    input_list = create_docs()

    slices = numpy.array_split(numpy.array(input_list), 1000)
    print(len(slices))

    result = []
    for y in range(125):     # 1000/cpu_count (8 in my case)
        pool = Pool(8)
        for x in range(8):
            batch = slices[y*8+x]
            length = len(batch)
            print("Batch {} length is: {}".format(x, length))   #x is the job id for bulk update within a mini chunk 
            print batch[0], batch[length-1]
            result.append(pool.apply_async(bulk_insert_apply, args=(x, batch)))
            time.sleep(0.5)
            # pool.map_async(bulk_insert_map, batch)
        pool.close()
        pool.join()

    passed = 0
    failed = 0
    for i, p in enumerate(result):
        try:
            p.get()
            passed += 1
        except Exception as e:
            failed += 1
            logger.exception(e)
    print [passed, failed]
    end = timeit.default_timer()
    logger.info("Baseline Test bulk indexing finished in time {}".format("{:.2f} secs".format(end-start)))
except SerializationError:
    logger.error("Serialization Error occurred while creating Test index documents", exc_info=True)
except ElasticsearchException:
    logger.error("ElasticSearchException occurred while creating Test index documents", exc_info=True)
except KeyboardInterrupt:
    logger.info("Ctrl+C pressed. ElasticSearch baseline update job shutting down.......")
except Exception:
    logger.error("Exception occurred while creating Test index documents", exc_info=True)
finally:

    # Set the index setting index.refresh_interval back to default by using value None as ES documentation says.
    logger.info("Resetting index config settings to the default values")
    index_configs = reset_index_settings_back_to_default(index_configs)
    elastic_client.indices.put_settings(index="test_index", body=index_configs)

    # As per documentation, force merge should be called on the index after the replicas are set to default value
    res = elastic_client.indices.forcemerge(index="test_index", params={"request_timeout": 600})
    logger.info("Force merged index with result: {}".format(res))

    # closing the connection to elasticsearch cluster
    logger.info("Closing elasticsearch client connection......")
    elastic_client.transport.close()

if name == "main":
test_streaming_bulk_with_pool()

@nchauhan5 nchauhan5 added >bug needs:triage Requires assignment of a team area label labels Mar 18, 2022
@DJRickyB
Copy link
Contributor

see how the ":" delimiter is missing after "_type" and that's whats causing the issue.

Well, it's not just the delimiter but a whole substring. Two things here that are concerning to me are the usage of python 2.7 and the usage of a single Elasticsearch client instance in a multiprocessing context. Per our engineers:

The Python client isn't fork-safe, only thread-safe. So a multiprocessing.Pool that shares a single client instance isn't appropriate.

I do not think this will work, and that your issue is with mutation safety, not with Elasticsearch itself. To disprove this, you would need a packet capture such as you get with Wireshark which demonstrates ES sending back malformed JSON in the response.

The absolute best way to do what you are attempting is with Rally. We do not support other methods of benchmarking Elasticsearch at this time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug needs:triage Requires assignment of a team area label
Projects
None yet
Development

No branches or pull requests

2 participants