Skip to content

Commit

Permalink
fix: concurrency issue causing file overwrite due to identical filena…
Browse files Browse the repository at this point in the history
…mes (#572)

Co-authored-by: imbajin <[email protected]>
  • Loading branch information
zyxxoo and imbajin authored Feb 24, 2024
1 parent 60cca5e commit b62c745
Showing 1 changed file with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.hugegraph.loader.direct.loader;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
Expand Down Expand Up @@ -59,8 +62,69 @@ public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, KeyV
private SinkToHBase sinkToHBase;
private LoadDistributeMetrics loadDistributeMetrics;

private static final int RANDOM_VALUE1;
private static final short RANDOM_VALUE2;
private static final AtomicInteger NEXT_COUNTER;

public static final Logger LOG = Log.logger(HBaseDirectLoader.class);

static {
try {
SecureRandom secureRandom = new SecureRandom();
RANDOM_VALUE1 = secureRandom.nextInt(0x01000000);
RANDOM_VALUE2 = (short) secureRandom.nextInt(0x00008000);
NEXT_COUNTER = new AtomicInteger(new SecureRandom().nextInt());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static byte int3(final int x) {
return (byte) (x >> 24);
}

private static byte int2(final int x) {
return (byte) (x >> 16);
}

private static byte int1(final int x) {
return (byte) (x >> 8);
}

private static byte int0(final int x) {
return (byte) (x);
}

private static byte short1(final short x) {
return (byte) (x >> 8);
}

private static byte short0(final short x) {
return (byte) (x);
}

public static String fileID() {
long timeStamp = System.currentTimeMillis() / 1000;
ByteBuffer byteBuffer = ByteBuffer.allocate(12);

byteBuffer.put(int3((int) timeStamp));
byteBuffer.put(int2((int) timeStamp));
byteBuffer.put(int1((int) timeStamp));
byteBuffer.put(int0((int) timeStamp));

byteBuffer.put(int2(RANDOM_VALUE1));
byteBuffer.put(int1(RANDOM_VALUE1));
byteBuffer.put(int0(RANDOM_VALUE1));
byteBuffer.put(short1(RANDOM_VALUE2));
byteBuffer.put(short0(RANDOM_VALUE2));

byteBuffer.put(int2(NEXT_COUNTER.incrementAndGet()));
byteBuffer.put(int1(NEXT_COUNTER.incrementAndGet()));
byteBuffer.put(int0(NEXT_COUNTER.incrementAndGet()));

return Bytes.toHex(byteBuffer.array());
}

public HBaseDirectLoader(LoadOptions loadOptions,
InputStruct struct,
LoadDistributeMetrics loadDistributeMetrics) {
Expand Down Expand Up @@ -144,8 +208,8 @@ String generateFiles(JavaPairRDD<ImmutableBytesWritable, KeyValue> buildAndSerRd

public String getHFilePath(Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
long timeStr = System.currentTimeMillis();
String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + timeStr + "/";
String fileID = fileID();
String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + fileID + "/";
Path hfileGenPath = new Path(pathStr);
if (fs.exists(hfileGenPath)) {
LOG.info("\n Delete the path where the hfile is generated,path {} ", pathStr);
Expand Down

0 comments on commit b62c745

Please sign in to comment.