Skip to content

Commit

Permalink
Merge pull request #15012 from s1monw/issues/14866
Browse files Browse the repository at this point in the history
Prevent writing to closed channel if translog is already closed
  • Loading branch information
s1monw committed Nov 26, 2015
2 parents 59be738 + 3dfa146 commit e363e0c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public BufferingTranslogWriter(ShardId shardId, long generation, ChannelReferenc
@Override
public Translog.Location add(BytesReference data) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
operationCounter++;
final long offset = totalOffset;
if (data.length() >= buffer.length) {
Expand Down Expand Up @@ -106,19 +107,25 @@ public void sync() throws IOException {
return;
}
synchronized (this) {
try (ReleasableLock lock = writeLock.acquire()) {
flush();
lastSyncedOffset = totalOffset;
channelReference.incRef();
try {
try (ReleasableLock lock = writeLock.acquire()) {
flush();
lastSyncedOffset = totalOffset;
}
// we can do this outside of the write lock but we have to protect from
// concurrent syncs
checkpoint(lastSyncedOffset, operationCounter, channelReference);
} finally {
channelReference.decRef();
}
// we can do this outside of the write lock but we have to protect from
// concurrent syncs
checkpoint(lastSyncedOffset, operationCounter, channelReference);
}
}


public void updateBufferSize(int bufferSize) {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (this.buffer.length != bufferSize) {
flush();
this.buffer = new byte[bufferSize];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,16 @@ public Location add(Operation operation) throws TranslogException {
out.seek(end);
final ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes);
if (config.isSyncOnEachOperation()) {
current.sync();
}
assert current.assertBytesAtLocation(location, bytes);
return location;
}
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Throwable e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
Expand Down Expand Up @@ -1285,8 +1288,8 @@ static Translog.Operation newOperationFromType(Translog.Operation.Type type) thr

@Override
public void prepareCommit() throws IOException {
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (currentCommittingTranslog != null) {
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
}
Expand Down Expand Up @@ -1318,9 +1321,9 @@ public void prepareCommit() throws IOException {

@Override
public void commit() throws IOException {
ensureOpen();
ImmutableTranslogReader toClose = null;
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (currentCommittingTranslog == null) {
prepareCommit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public static Type fromString(String type) {
* add the given bytes to the translog and return the location they were written at
*/
public Translog.Location add(BytesReference data) throws IOException {
ensureOpen();
final long position;
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
position = writtenOffset;
data.writeTo(channel);
writtenOffset = writtenOffset + data.length();
Expand Down Expand Up @@ -200,9 +200,9 @@ public TranslogReader newReaderFromWriter() {
* returns a new immutable reader which only exposes the current written operation *
*/
public ImmutableTranslogReader immutableReader() throws TranslogException {
ensureOpen();
if (channelReference.tryIncRef()) {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
flush();
ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, writtenOffset, operationCounter);
channelReference.incRef(); // for new reader
Expand Down
119 changes: 89 additions & 30 deletions core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,36 +460,7 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable {
final CountDownLatch downLatch = new CountDownLatch(1);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
downLatch.await();
for (int opCount = 0; opCount < opsPerThread; opCount++) {
Translog.Operation op;
switch (randomFrom(Translog.Operation.Type.values())) {
case CREATE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
default:
throw new ElasticsearchException("not supported op type");
}

Translog.Location loc = translog.add(op);
writtenOperations.add(new LocationOperation(op, loc));
}
} catch (Throwable t) {
threadExceptions[threadId] = t;
}
}
});
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
threads[i].setDaemon(true);
threads[i].start();
}
Expand Down Expand Up @@ -1220,4 +1191,92 @@ public void testOpenForeignTranslog() throws IOException {
assertNull(snapshot.next());
}
}

public void testFailOnClosedWrite() throws IOException {
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
translog.close();
try {
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
fail("closed");
} catch (AlreadyClosedException ex) {
// all is welll
}
}

public void testCloseConcurrently() throws Throwable {
final int opsPerThread = randomIntBetween(10, 200);
int threadCount = 2 + randomInt(5);

logger.info("testing with [{}] threads, each doing [{}] ops", threadCount, opsPerThread);
final BlockingQueue<LocationOperation> writtenOperations = new ArrayBlockingQueue<>(threadCount * opsPerThread);

Thread[] threads = new Thread[threadCount];
final Throwable[] threadExceptions = new Throwable[threadCount];
final CountDownLatch downLatch = new CountDownLatch(1);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
threads[i].setDaemon(true);
threads[i].start();
}

downLatch.countDown();
translog.close();

for (int i = 0; i < threadCount; i++) {
if (threadExceptions[i] != null) {
if ((threadExceptions[i] instanceof AlreadyClosedException) == false) {
throw threadExceptions[i];
}
}
threads[i].join(60 * 1000);
}
}

private static class TranslogThread extends Thread {
private final CountDownLatch downLatch;
private final int opsPerThread;
private final int threadId;
private final BlockingQueue<LocationOperation> writtenOperations;
private final Throwable[] threadExceptions;
private final Translog translog;

public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
this.translog = translog;
this.downLatch = downLatch;
this.opsPerThread = opsPerThread;
this.threadId = threadId;
this.writtenOperations = writtenOperations;
this.threadExceptions = threadExceptions;
}

@Override
public void run() {
try {
downLatch.await();
for (int opCount = 0; opCount < opsPerThread; opCount++) {
Translog.Operation op;
switch (randomFrom(Translog.Operation.Type.values())) {
case CREATE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
default:
throw new ElasticsearchException("not supported op type");
}

Translog.Location loc = translog.add(op);
writtenOperations.add(new LocationOperation(op, loc));
}
} catch (Throwable t) {
threadExceptions[threadId] = t;
}
}
}
}

0 comments on commit e363e0c

Please sign in to comment.