Skip to content

Commit

Permalink
Resolve "Atomic write in GOR"
Browse files Browse the repository at this point in the history
  • Loading branch information
gmagnu committed Dec 17, 2024
1 parent b49301d commit f40b4fc
Show file tree
Hide file tree
Showing 37 changed files with 218 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public AzureBlobSource(String bucket, String key, String subset) {
this.bucket = bucket;
this.key = key;
this.sourceReference = new SourceReferenceBuilder(AzureBlobHelper.makeUrl(bucket, key))
.chrSubset(subset).build();
.build();
}

@Override
Expand Down Expand Up @@ -176,7 +176,7 @@ public StreamSourceMetadata getSourceMetadata() {
long lastModified = props.getLastModified().toEpochSecond();
var tag = props.getETag();

return new StreamSourceMetadata(this, getName(), lastModified, length, tag, false);
return new StreamSourceMetadata(this, getName(), lastModified, length, tag);
} catch (Throwable t) {
handleExceptions(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ public GoogleCloudStorageBlobSource(SourceReference sourceReference, String json
* @param bucket Google Cloud Storage bucket name
* @param key Object key (path)
*/
public GoogleCloudStorageBlobSource(String bucket, String key, String subset) {
public GoogleCloudStorageBlobSource(String bucket, String key) {
this.bucket = bucket;
this.key = key;
this.sourceReference = new SourceReferenceBuilder(GoogleCloudStorageBlobHelper.makeUrl(bucket, key)).chrSubset(subset).build();
this.sourceReference = new SourceReferenceBuilder(GoogleCloudStorageBlobHelper.makeUrl(bucket, key)).build();
init();
}

Expand Down Expand Up @@ -154,7 +154,7 @@ public String getName() {
public StreamSourceMetadata getSourceMetadata() {
long length = cb.getSize();
long lastModified = cb.getCreateTimeOffsetDateTime().toInstant().toEpochMilli();
return new StreamSourceMetadata(this, getName(), lastModified, length, cb.getEtag(), false);
return new StreamSourceMetadata(this, getName(), lastModified, length, cb.getEtag());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ private StreamSourceMetadata createMetaData(String bucket, String key) {
getName(),
getResponse.getLastModified().toInstant().toEpochMilli(),
getResponse.getContentLength(),
null,
false);
null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new GorSystemException(e);
Expand Down
2 changes: 1 addition & 1 deletion drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private S3SourceMetadata loadMetadata(String bucket, String key) {
private S3SourceMetadata createMetaData(String bucket, String key) {
try {
var objectMetaResponse = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build());
return new S3SourceMetadata(this, objectMetaResponse, sourceReference.getLinkLastModified(), sourceReference.getChrSubset());
return new S3SourceMetadata(this, objectMetaResponse, sourceReference.getLinkLastModified());
} catch (SdkClientException e) {
throw new GorResourceException("Failed to load metadata for " + bucket + "/" + key, getPath().toString(), e).retry();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ public class S3SourceMetadata extends StreamSourceMetadata {

private final HeadObjectResponse omd;

public S3SourceMetadata(S3Source source, HeadObjectResponse md, Long linkLastModified, String subset) {
super(source, source.getName(), md.lastModified().toEpochMilli(), linkLastModified, md.contentLength(), null, false);
public S3SourceMetadata(S3Source source, HeadObjectResponse md, Long linkLastModified) {
super(source, source.getName(), md.lastModified().toEpochMilli(), linkLastModified, md.contentLength(), null);
this.omd = md;
}

public S3SourceMetadata(S3Source source, HeadObjectResponse md, String subset) {
super(source, source.getName(), md.lastModified().toEpochMilli(), md.contentLength(), null, false);
public S3SourceMetadata(S3Source source, HeadObjectResponse md) {
super(source, source.getName(), md.lastModified().toEpochMilli(), md.contentLength(), null);
this.omd = md;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private SourceReference createFallbackSourceReference(SourceReference sourceRefe
String fallbackUrl = getFallbackUrl(sourceReference.getUrl());
if (fallbackUrl != null) {
SourceReference updatedSourceReference = new SourceReference(fallbackUrl, sourceReference.securityContext, sourceReference.commonRoot,
sourceReference.getLookup(), sourceReference.chrSubset, sourceReference.getLinkSubPath(),
sourceReference.getLookup(), sourceReference.getLinkSubPath(),
sourceReference.isWriteSource());
return updatedSourceReference;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected String getDataName(String name) {

@Override
protected StreamSource createSource(String name) throws IOException {
var sr = new SourceReference(name, securityContext(), null, null, null, null, false);
var sr = new SourceReference(name, securityContext(), null, null, null, false);
return provider.resolveDataSource(sr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected String securityContext() {

@Override
protected StreamSource createSource(String name) throws IOException {
var sr = new SourceReference(name, securityContext(), null, null, null, null, false);
var sr = new SourceReference(name, securityContext(), null, null, null, false);
return provider.resolveDataSource(sr);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected String securityContext() {
}

protected StreamSource createSource(String name) throws IOException {
var sr = new SourceReference(name, securityContext(), null, null, null, null, false);
var sr = new SourceReference(name, securityContext(), null, null, null, false);
return provider.resolveDataSource(sr);
}

Expand Down
4 changes: 2 additions & 2 deletions gorscripts/src/main/java/org/gorpipe/gor/cli/GorBench.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void run() {
long startTime = System.currentTimeMillis();
GenomicIterator iter = null;
try {
iter = gorDriver.createIterator(new SourceReferenceBuilder(file).chrSubset(subset).build());
iter = gorDriver.createIterator(new SourceReferenceBuilder(file).build());
if (seeks == 0) {
for (int j = 0; j < readBases - 1; j++) {
if (!iter.hasNext()) {
Expand Down Expand Up @@ -142,7 +142,7 @@ public void run() {
}
if (newSource) {
iter.close();
iter = gorDriver.createIterator(new SourceReferenceBuilder(file).chrSubset(subset).build());
iter = gorDriver.createIterator(new SourceReferenceBuilder(file).build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class IndexableSourceReference extends SourceReference {
private final String indexSource;
private final String referenceSource;

public IndexableSourceReference(String url, String indexSource, String referenceSource, String securityContext, String commonRoot, ChromoLookup lookup, String chrSubset) {
super(url, securityContext, commonRoot, lookup, chrSubset, null, false);
public IndexableSourceReference(String url, String indexSource, String referenceSource, String securityContext, String commonRoot, ChromoLookup lookup) {
super(url, securityContext, commonRoot, lookup, null, false);

this.indexSource = indexSource;
this.referenceSource = referenceSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,15 @@ public class SourceMetadata {
private final Long lastModified;
private final Long linkLastModified;
private final String uniqueId;
private final boolean isSubset;
private final DataSource source;

/**
* @param canonicalName Canonical name of this source
* @param lastModified Last modified - see getLastModified()
* @param linkLastModified Link Last modified
* @param uniqueId See uniqueId. If this is null, it will be generated from canonicalName and lastModified
* @param isSubset true if this source only has access to a subset of the original source, else false.
*/
public SourceMetadata(DataSource source, String canonicalName, Long lastModified, Long linkLastModified, String uniqueId, boolean isSubset) {
public SourceMetadata(DataSource source, String canonicalName, Long lastModified, Long linkLastModified, String uniqueId) {
this.source = source;
this.canonicalName = canonicalName;
this.lastModified = lastModified;
Expand All @@ -61,17 +59,15 @@ public SourceMetadata(DataSource source, String canonicalName, Long lastModified
} else {
this.uniqueId = uniqueId;
}
this.isSubset = isSubset;
}

/**
* @param canonicalName Canonical name of this source
* @param lastModified Last modified - see getLastModified()
* @param uniqueId See uniqueId. If this is null, it will be generated from canonicalName and lastModified
* @param isSubset true if this source only has access to a subset of the original source, else false.
*/
public SourceMetadata(DataSource source, String canonicalName, Long lastModified, String uniqueId, boolean isSubset) {
this(source, canonicalName, lastModified, lastModified, uniqueId, isSubset);
public SourceMetadata(DataSource source, String canonicalName, Long lastModified, String uniqueId) {
this(source, canonicalName, lastModified, lastModified, uniqueId);
}

/**
Expand Down Expand Up @@ -109,13 +105,6 @@ public String getUniqueId() {
return uniqueId;
}

/**
* Return whether or not this source only has access to a subset of the original source.
*/
public boolean isSubset() {
return isSubset;
}

public DataSource getSource() {
return source;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class SourceReference {
public final boolean writeSource;
@JsonIgnore
ChromoLookup lookup;
public final String chrSubset;
private final String linkSubPath;
private boolean isCreatedFromLink = false;
private Long linkLastModified = null;
Expand All @@ -61,7 +60,7 @@ public class SourceReference {
// - should the context hash map be stored as a part of this class or should it enter the chain at some other point?

public SourceReference(String url, String securityContext, String commonRoot, ChromoLookup lookup,
String chrSubset, String linkSubPath, boolean writeSource, boolean isFallback) {
String linkSubPath, boolean writeSource, boolean isFallback) {
this.url = url;
// Pick up default security context here - it's not propagated from GorOptions if this is a sub query.
if (securityContext == null) {
Expand All @@ -71,22 +70,21 @@ public SourceReference(String url, String securityContext, String commonRoot, Ch
}
this.commonRoot = commonRoot;
this.lookup = lookup != null ? lookup : new DefaultChromoLookup();
this.chrSubset = chrSubset;
this.linkSubPath = linkSubPath;
this.writeSource = writeSource;
this.isFallback = isFallback;
}

public SourceReference(String url, String securityContext, String commonRoot, ChromoLookup lookup,
String chrSubset, String linkSubPath, boolean writeSource) {
this(url, securityContext, commonRoot, lookup, chrSubset, linkSubPath, writeSource, true);
String linkSubPath, boolean writeSource) {
this(url, securityContext, commonRoot, lookup, linkSubPath, writeSource, true);
}

/**
* @param url url for the source.
*/
public SourceReference(String url) {
this(url, null, null, null, null, null, false);
this(url, null, null, null, null, false);
}

/**
Expand All @@ -111,7 +109,7 @@ public SourceReference(String url, SourceReference parentSourceReference, String
*/
public SourceReference(String url, SourceReference parentSourceReference, String linkSubPath, String securityContext) {
this(url, securityContext, parentSourceReference.getCommonRoot(),
parentSourceReference.getLookup(), parentSourceReference.getChrSubset(), linkSubPath,
parentSourceReference.getLookup(), linkSubPath,
parentSourceReference.isWriteSource());
if (this.parentSourceReference == null) {
this.parentSourceReference = parentSourceReference;
Expand All @@ -120,8 +118,8 @@ public SourceReference(String url, SourceReference parentSourceReference, String

@JsonCreator
public SourceReference(@JsonProperty("url") String url, @JsonProperty("securityContext") String securityContext,
@JsonProperty("commonRoot") String commonRoot, @JsonProperty("chrSubset") String chrSubset) {
this(url, securityContext, commonRoot, null, chrSubset, null, false);
@JsonProperty("commonRoot") String commonRoot) {
this(url, securityContext, commonRoot, null, null, false);
}

public String getUrl() {
Expand Down Expand Up @@ -152,10 +150,6 @@ public void setLookup(ChromoLookup lookup) {
this.lookup = lookup;
}

public String getChrSubset() {
return chrSubset;
}

public int[] getColumns() {
return null;
}
Expand Down Expand Up @@ -211,13 +205,12 @@ public boolean equals(Object o) {
return Objects.equals(url, that.url)
&& Objects.equals(securityContext, that.securityContext)
&& Objects.equals(commonRoot, that.commonRoot)
&& Objects.equals(lookup, that.lookup)
&& Objects.equals(chrSubset, that.chrSubset);
&& Objects.equals(lookup, that.lookup);
}

@Override
public int hashCode() {
return Objects.hash(url, securityContext, commonRoot, lookup, chrSubset);
return Objects.hash(url, securityContext, commonRoot, lookup);
}

@Override
Expand All @@ -227,7 +220,6 @@ public String toString() {
", securityContext='" + securityContext + '\'' +
", commonRoot='" + commonRoot + '\'' +
", lookup=" + lookup +
", chrSubset='" + chrSubset + '\'' +
'}';
}

Expand Down Expand Up @@ -257,7 +249,6 @@ public static class Builder {
private String securityContext;
private String commonRoot;
private ChromoLookup lookup;
private String chrSubset;
private int[] columns;
private String linkSubPath;

Expand All @@ -271,12 +262,11 @@ public Builder(String url, SourceReference parentSourceReference) {
this.securityContext = parentSourceReference.securityContext;
this.commonRoot = parentSourceReference.commonRoot;
this.lookup = parentSourceReference.lookup;
this.chrSubset = parentSourceReference.chrSubset;
this.linkSubPath = parentSourceReference.linkSubPath;
}

public SourceReference build() {
return new SourceReference(url, securityContext, commonRoot, lookup, chrSubset, linkSubPath, false);
return new SourceReference(url, securityContext, commonRoot, lookup, linkSubPath, false);
}

public Builder securityContext(String securityContext) {
Expand All @@ -293,10 +283,5 @@ public Builder lookup(ChromoLookup lookup) {
this.lookup = lookup;
return this;
}

public Builder chrSubset(String chrSubset) {
this.chrSubset = chrSubset;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class SourceReferenceBuilder {
private String securityContext;
private String commonRoot;
private ChromoLookup lookup;
private String chrSubset;
private boolean writeSource = false;
private boolean isFallBack = true;

Expand All @@ -46,13 +45,12 @@ public SourceReferenceBuilder(String url, SourceReference parentSourceReference)
this.securityContext = parentSourceReference.securityContext;
this.commonRoot = parentSourceReference.commonRoot;
this.lookup = parentSourceReference.lookup;
this.chrSubset = parentSourceReference.chrSubset;
this.writeSource = parentSourceReference.writeSource;
this.isFallBack = parentSourceReference.isFallback();
}

public SourceReference build() {
return new SourceReference(url, securityContext, commonRoot, lookup, chrSubset, null, writeSource, isFallBack);
return new SourceReference(url, securityContext, commonRoot, lookup, null, writeSource, isFallBack);
}

public SourceReferenceBuilder securityContext(String securityContext) {
Expand All @@ -75,11 +73,6 @@ public SourceReferenceBuilder writeSource(boolean writeSource) {
return this;
}

public SourceReferenceBuilder chrSubset(String chrSubset) {
this.chrSubset = chrSubset;
return this;
}

public SourceReferenceBuilder isFallback(boolean isFallBack) {
this.isFallBack = isFallBack;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public SourceMetadata getSourceMetadata() {
timestamp = dbsource.queryDefaultTableChange(tableName);
}
}
return new SourceMetadata(this, sourceReference.getUrl(), timestamp, null, false);
return new SourceMetadata(this, sourceReference.getUrl(), timestamp, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public boolean supportsFiltering() {

@Override
public SourceMetadata getSourceMetadata() {
return new SourceMetadata(this, getName(), 0L, Util.md5(getName()), false);
return new SourceMetadata(this, getName(), 0L, Util.md5(getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public SourceMetadata getSourceMetadata() {
timestamp = dbsource.queryDefaultTableChange(tables[0]);
}
}
return new SourceMetadata(this, sourceReference.getUrl(), timestamp, null, false);
return new SourceMetadata(this, sourceReference.getUrl(), timestamp, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,6 @@ public GenomicIterator createIterator(DataSource source) throws IOException {

} else {
if (file.supportsIndex() && file.getIndexSource() == null) {
if (source.getSourceMetadata().isSubset()) {
throw new UnsupportedOperationException("Cannot handle indexed file on top of subset source");
}

StreamSource idxSource = findIndexFileFromFileDriver(file, sourceRef);

if (idxSource != null) {
Expand Down
Loading

0 comments on commit f40b4fc

Please sign in to comment.