Skip to content

Commit

Permalink
Merge branch 'master' into feature/query-refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
javanna committed Jun 11, 2015
2 parents e061aa8 + 0febe5a commit 74db0de
Show file tree
Hide file tree
Showing 28 changed files with 219 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;

/**
* Similar class to the {@link String} class except that it internally stores
Expand All @@ -37,23 +44,79 @@
*/
public final class CompressedXContent {

private static int crc32(BytesReference data) {
OutputStream dummy = new OutputStream() {
@Override
public void write(int b) throws IOException {
// no-op
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
// no-op
}
};
CRC32 crc32 = new CRC32();
try {
data.writeTo(new CheckedOutputStream(dummy, crc32));
} catch (IOException bogus) {
// cannot happen
throw new Error(bogus);
}
return (int) crc32.getValue();
}

private final byte[] bytes;
private int hashCode;
private final int crc32;

// Used for serialization
private CompressedXContent(byte[] compressed, int crc32) {
this.bytes = compressed;
this.crc32 = crc32;
assertConsistent();
}

/**
* Create a {@link CompressedXContent} out of a {@link ToXContent} instance.
*/
public CompressedXContent(ToXContent xcontent, XContentType type, ToXContent.Params params) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
OutputStream compressedStream = CompressorFactory.defaultCompressor().streamOutput(bStream);
CRC32 crc32 = new CRC32();
OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32);
try (XContentBuilder builder = XContentFactory.contentBuilder(type, checkedStream)) {
builder.startObject();
xcontent.toXContent(builder, params);
builder.endObject();
}
this.bytes = bStream.bytes().toBytes();
this.crc32 = (int) crc32.getValue();
assertConsistent();
}

/**
* Create a {@link CompressedXContent} out of a serialized {@link ToXContent}
* that may already be compressed.
*/
public CompressedXContent(BytesReference data) throws IOException {
Compressor compressor = CompressorFactory.compressor(data);
if (compressor != null) {
// already compressed...
this.bytes = data.toBytes();
this.crc32 = crc32(new BytesArray(uncompressed()));
} else {
BytesStreamOutput out = new BytesStreamOutput();
try (StreamOutput compressedOutput = CompressorFactory.defaultCompressor().streamOutput(out)) {
try (OutputStream compressedOutput = CompressorFactory.defaultCompressor().streamOutput(out)) {
data.writeTo(compressedOutput);
}
this.bytes = out.bytes().toBytes();
assert CompressorFactory.compressor(new BytesArray(bytes)) != null;
this.crc32 = crc32(data);
}
assertConsistent();
}

private void assertConsistent() {
assert CompressorFactory.compressor(new BytesArray(bytes)) != null;
assert this.crc32 == crc32(new BytesArray(uncompressed()));
}

public CompressedXContent(byte[] data) throws IOException {
Expand Down Expand Up @@ -88,12 +151,14 @@ public String string() throws IOException {
}

public static CompressedXContent readCompressedString(StreamInput in) throws IOException {
byte[] bytes = new byte[in.readVInt()];
in.readBytes(bytes, 0, bytes.length);
return new CompressedXContent(bytes);
int crc32 = in.readInt();
byte[] compressed = new byte[in.readVInt()];
in.readBytes(compressed, 0, compressed.length);
return new CompressedXContent(compressed, crc32);
}

public void writeTo(StreamOutput out) throws IOException {
out.writeInt(crc32);
out.writeVInt(bytes.length);
out.writeBytes(bytes);
}
Expand All @@ -109,19 +174,16 @@ public boolean equals(Object o) {
return true;
}

if (crc32 != that.crc32) {
return false;
}

return Arrays.equals(uncompressed(), that.uncompressed());
}

@Override
public int hashCode() {
if (hashCode == 0) {
int h = Arrays.hashCode(uncompressed());
if (h == 0) {
h = 1;
}
hashCode = h;
}
return hashCode;
return crc32;
}

@Override
Expand Down
15 changes: 0 additions & 15 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,37 +36,23 @@
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -313,7 +299,6 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) {
new StoreCloseListener(shardId, canDeleteShardContent, shardFilterCache), path));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
try {
shardInjector = modules.createChildInjector(injector);
} catch (CreationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringAndBytesText;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.Mapping.SourceTransform;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
Expand Down Expand Up @@ -481,13 +478,7 @@ public MergeResult merge(Mapping mapping, boolean simulate) {

private void refreshSource() throws ElasticsearchGenerationException {
try {
BytesStreamOutput bStream = new BytesStreamOutput();
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, CompressorFactory.defaultCompressor().streamOutput(bStream))) {
builder.startObject();
toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
}
mappingSource = new CompressedXContent(bStream.bytes());
mappingSource = new CompressedXContent(this, XContentType.JSON, ToXContent.EMPTY_PARAMS);
} catch (Exception e) {
throw new ElasticsearchGenerationException("failed to serialize source for type [" + type + "]", e);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public Query parse(QueryParseContext parseContext) throws IOException, QueryPars

public static Query newFilter(QueryParseContext parseContext, String fieldPattern, String queryName) {
final FieldNamesFieldMapper.FieldNamesFieldType fieldNamesFieldType = (FieldNamesFieldMapper.FieldNamesFieldType)parseContext.mapperService().fullName(FieldNamesFieldMapper.NAME);
if (fieldNamesFieldType == null) {
// can only happen when no types exist, so no docs exist either
return Queries.newMatchNoDocsQuery();
}

MapperService.SmartNameObjectMapper smartNameObjectMapper = parseContext.smartObjectMapper(fieldPattern);
if (smartNameObjectMapper != null && smartNameObjectMapper.hasMapper()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public static Query newFilter(QueryParseContext parseContext, String fieldPatter
}

final FieldNamesFieldMapper.FieldNamesFieldType fieldNamesFieldType = (FieldNamesFieldMapper.FieldNamesFieldType)parseContext.mapperService().fullName(FieldNamesFieldMapper.NAME);
if (fieldNamesFieldType == null) {
// can only happen when no types exist, so no docs exist either
return Queries.newMatchNoDocsQuery();
}

MapperService.SmartNameObjectMapper smartNameObjectMapper = parseContext.smartObjectMapper(fieldPattern);
if (smartNameObjectMapper != null && smartNameObjectMapper.hasMapper()) {
// automatic make the object mapper pattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
Expand Down Expand Up @@ -90,7 +92,7 @@ protected void configure() {
}

bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS, ENGINE_PREFIX, ENGINE_SUFFIX));

bind(MergeSchedulerProvider.class).to(ConcurrentMergeSchedulerProvider.class).asEagerSingleton();
bind(ShardIndexWarmerService.class).asEagerSingleton();
bind(ShardIndexingService.class).asEagerSingleton();
bind(ShardSlowLogIndexingService.class).asEagerSingleton();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
try {
recoveryStatus.indexShard().performBatchRecovery(request.operations());
} catch (TranslogRecoveryPerformer.BatchOperationException exception) {
if (ExceptionsHelper.unwrapCause(exception) instanceof MapperException == false) {
MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class);
if (mapperException == null) {
throw exception;
}
// in very rare cases a translog replay from primary is processed before a mapping update on this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.repositories;

import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
Expand Down Expand Up @@ -59,11 +58,11 @@ public interface Repository extends LifecycleComponent<Repository> {
* <p/>
* The returned meta data contains global metadata as well as metadata for all indices listed in the indices parameter.
*
* @param snapshotId snapshot ID
* @param snapshot snapshot
* @param indices list of indices
* @return information about snapshot
*/
MetaData readSnapshotMetaData(SnapshotId snapshotId, List<String> indices) throws IOException;
MetaData readSnapshotMetaData(SnapshotId snapshotId, Snapshot snapshot, List<String> indices) throws IOException;

/**
* Returns the list of snapshots currently stored in the repository
Expand Down
Loading

0 comments on commit 74db0de

Please sign in to comment.