Skip to content

Commit

Permalink
[7.4.0] Introduce --http_max_parallel_downloads. (#23352)
Browse files Browse the repository at this point in the history
This option sets the global limit for how many parallel HTTP connections
bazel will initiate to download files.

Working towards #19315.

Note: The first four commits of the PR are a refactoring from #23080.
The last commit is a bigger refactoring, moving `HttpDownloader` around
further. Commit-by-commit review is encouraged. The hope is to get this
PR in before 7.3.0rc1 is cut on Monday.

Closes #23106.

PiperOrigin-RevId: 657111369
Change-Id: I9a848758517cd78cd5a0e1b380bf39625045b77a

Commit
01d477d

Co-authored-by: Cornelius Riemenschneider <[email protected]>
  • Loading branch information
iancha1992 and criemen authored Aug 20, 2024
1 parent 694b280 commit cab54be
Show file tree
Hide file tree
Showing 22 changed files with 203 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader;
import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
import com.google.devtools.build.lib.bazel.repository.downloader.HttpDownloader;
import com.google.devtools.build.lib.bazel.repository.downloader.UrlRewriter;
import com.google.devtools.build.lib.bazel.repository.downloader.UrlRewriterParseException;
import com.google.devtools.build.lib.bazel.repository.starlark.StarlarkRepositoryFunction;
Expand Down Expand Up @@ -150,11 +148,6 @@ public class BazelRepositoryModule extends BlazeModule {
private final AtomicBoolean isFetch = new AtomicBoolean(false);
private final StarlarkRepositoryFunction starlarkRepositoryFunction;
private final RepositoryCache repositoryCache = new RepositoryCache();
private final HttpDownloader httpDownloader = new HttpDownloader();
private final DelegatingDownloader delegatingDownloader =
new DelegatingDownloader(httpDownloader);
private final DownloadManager downloadManager =
new DownloadManager(repositoryCache, delegatingDownloader);
private final MutableSupplier<Map<String, String>> clientEnvironmentSupplier =
new MutableSupplier<>();
private ImmutableMap<RepositoryName, PathFragment> overrides = ImmutableMap.of();
Expand All @@ -174,12 +167,16 @@ public class BazelRepositoryModule extends BlazeModule {
private boolean disableNativeRepoRules;
private SingleExtensionEvalFunction singleExtensionEvalFunction;

private final VendorCommand vendorCommand = new VendorCommand(clientEnvironmentSupplier);
private final RegistryFactoryImpl registryFactory =
new RegistryFactoryImpl(clientEnvironmentSupplier);

@Nullable private CredentialModule credentialModule;

private ImmutableMap<String, NonRegistryOverride> builtinModules = null;

public BazelRepositoryModule() {
this.starlarkRepositoryFunction = new StarlarkRepositoryFunction(downloadManager);
this.starlarkRepositoryFunction = new StarlarkRepositoryFunction();
this.repositoryHandlers = repositoryRules();
}

Expand Down Expand Up @@ -228,7 +225,7 @@ public void serverInit(OptionsParsingResult startupOptions, ServerBuilder builde
builder.addCommands(new FetchCommand());
builder.addCommands(new ModCommand());
builder.addCommands(new SyncCommand());
builder.addCommands(new VendorCommand(downloadManager, clientEnvironmentSupplier));
builder.addCommands(vendorCommand);
builder.addInfoItems(new RepositoryCacheInfoItem(repositoryCache));
}

Expand All @@ -252,7 +249,7 @@ public void workspaceInit(
directories,
BazelSkyframeExecutorConstants.EXTERNAL_PACKAGE_HELPER);
singleExtensionEvalFunction =
new SingleExtensionEvalFunction(directories, clientEnvironmentSupplier, downloadManager);
new SingleExtensionEvalFunction(directories, clientEnvironmentSupplier);

if (builtinModules == null) {
builtinModules = ModuleFileFunction.getBuiltinModules(directories.getEmbeddedBinariesRoot());
Expand All @@ -278,9 +275,7 @@ SkyFunctions.BAZEL_LOCK_FILE, new BazelLockFileFunction(directories.getWorkspace
.addSkyFunction(SkyFunctions.SINGLE_EXTENSION_USAGES, new SingleExtensionUsagesFunction())
.addSkyFunction(
SkyFunctions.REGISTRY,
new RegistryFunction(
new RegistryFactoryImpl(downloadManager, clientEnvironmentSupplier),
directories.getWorkspace()))
new RegistryFunction(registryFactory, directories.getWorkspace()))
.addSkyFunction(SkyFunctions.REPO_SPEC, new RepoSpecFunction())
.addSkyFunction(SkyFunctions.YANKED_VERSIONS, new YankedVersionsFunction())
.addSkyFunction(
Expand Down Expand Up @@ -314,6 +309,12 @@ public void initializeRuleClasses(ConfiguredRuleClassProvider.Builder builder) {

@Override
public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
DownloadManager downloadManager =
new DownloadManager(repositoryCache, env.getDownloaderDelegate(), env.getHttpDownloader());
this.starlarkRepositoryFunction.setDownloadManager(downloadManager);
this.vendorCommand.setDownloadManager(downloadManager);
this.registryFactory.setDownloadManager(downloadManager);

clientEnvironmentSupplier.set(env.getRepoEnv());
PackageOptions pkgOptions = env.getOptions().getOptions(PackageOptions.class);
isFetch.set(pkgOptions != null && pkgOptions.fetch);
Expand All @@ -323,6 +324,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
starlarkRepositoryFunction.setProcessWrapper(processWrapper);
starlarkRepositoryFunction.setSyscallCache(env.getSyscallCache());
singleExtensionEvalFunction.setProcessWrapper(processWrapper);
singleExtensionEvalFunction.setDownloadManager(downloadManager);

RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
if (repoOptions != null) {
Expand Down Expand Up @@ -453,16 +455,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
downloadManager.setDistdir(ImmutableList.of());
}

if (repoOptions.httpTimeoutScaling > 0) {
httpDownloader.setTimeoutScaling((float) repoOptions.httpTimeoutScaling);
} else {
env.getReporter()
.handle(Event.warn("Ignoring request to scale http timeouts by a non-positive factor"));
httpDownloader.setTimeoutScaling(1.0f);
}
httpDownloader.setMaxAttempts(repoOptions.httpConnectorAttempts);
httpDownloader.setMaxRetryTimeout(repoOptions.httpConnectorRetryMaxTimeout);

if (repoOptions.repositoryOverrides != null) {
// To get the usual latest-wins semantics, we need a mutable map, as the builder
// of an immutable map does not allow redefining the values of existing keys.
Expand Down Expand Up @@ -568,7 +560,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}
starlarkRepositoryFunction.setRepositoryRemoteExecutor(remoteExecutor);
singleExtensionEvalFunction.setRepositoryRemoteExecutor(remoteExecutor);
delegatingDownloader.setDelegate(env.getRuntime().getDownloaderSupplier().get());

clock = env.getClock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/** Prod implementation of {@link RegistryFactory}. */
public class RegistryFactoryImpl implements RegistryFactory {
private final DownloadManager downloadManager;
@Nullable private DownloadManager downloadManager;
private final Supplier<Map<String, String>> clientEnvironmentSupplier;

public RegistryFactoryImpl(
DownloadManager downloadManager, Supplier<Map<String, String>> clientEnvironmentSupplier) {
this.downloadManager = downloadManager;
public RegistryFactoryImpl(Supplier<Map<String, String>> clientEnvironmentSupplier) {
this.clientEnvironmentSupplier = clientEnvironmentSupplier;
}

public void setDownloadManager(DownloadManager downloadManager) {
this.downloadManager = downloadManager;
}

@Override
public Registry createRegistry(
String url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,19 @@
public class SingleExtensionEvalFunction implements SkyFunction {
private final BlazeDirectories directories;
private final Supplier<Map<String, String>> clientEnvironmentSupplier;
private final DownloadManager downloadManager;

private double timeoutScaling = 1.0;
@Nullable private ProcessWrapper processWrapper = null;
@Nullable private RepositoryRemoteExecutor repositoryRemoteExecutor = null;
@Nullable private DownloadManager downloadManager = null;

public SingleExtensionEvalFunction(
BlazeDirectories directories,
Supplier<Map<String, String>> clientEnvironmentSupplier,
DownloadManager downloadManager) {
BlazeDirectories directories, Supplier<Map<String, String>> clientEnvironmentSupplier) {
this.directories = directories;
this.clientEnvironmentSupplier = clientEnvironmentSupplier;
}

public void setDownloadManager(DownloadManager downloadManager) {
this.downloadManager = downloadManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,18 @@
public final class VendorCommand implements BlazeCommand {
public static final String NAME = "vendor";

private final DownloadManager downloadManager;
private final Supplier<Map<String, String>> clientEnvironmentSupplier;
@Nullable private VendorManager vendorManager = null;
@Nullable private DownloadManager downloadManager;

public VendorCommand(
DownloadManager downloadManager, Supplier<Map<String, String>> clientEnvironmentSupplier) {
this.downloadManager = downloadManager;
public VendorCommand(Supplier<Map<String, String>> clientEnvironmentSupplier) {
this.clientEnvironmentSupplier = clientEnvironmentSupplier;
}

public void setDownloadManager(DownloadManager downloadManager) {
this.downloadManager = downloadManager;
}

@Override
public void editOptions(OptionsParser optionsParser) {
// We only need to inject these options with fetch target (when there is a residue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.devtools.common.options.OptionMetadataTag;
import com.google.devtools.common.options.OptionsBase;
import com.google.devtools.common.options.OptionsParsingException;
import java.time.Duration;
import java.util.List;

/** Command-line options for repositories. */
Expand Down Expand Up @@ -133,31 +132,7 @@ public class RepositoryOptions extends OptionsBase {
+ "to download them.")
public List<PathFragment> experimentalDistdir;

@Option(
name = "http_timeout_scaling",
defaultValue = "1.0",
documentationCategory = OptionDocumentationCategory.BAZEL_CLIENT_OPTIONS,
effectTags = {OptionEffectTag.BAZEL_INTERNAL_CONFIGURATION},
help = "Scale all timeouts related to http downloads by the given factor")
public double httpTimeoutScaling;

@Option(
name = "http_connector_attempts",
defaultValue = "8",
documentationCategory = OptionDocumentationCategory.BAZEL_CLIENT_OPTIONS,
effectTags = {OptionEffectTag.BAZEL_INTERNAL_CONFIGURATION},
help = "The maximum number of attempts for http downloads.")
public int httpConnectorAttempts;

@Option(
name = "http_connector_retry_max_timeout",
defaultValue = "0s",
documentationCategory = OptionDocumentationCategory.BAZEL_CLIENT_OPTIONS,
effectTags = {OptionEffectTag.BAZEL_INTERNAL_CONFIGURATION},
help =
"The maximum timeout for http download retries. With a value of 0, no timeout maximum is"
+ " defined.")
public Duration httpConnectorRetryMaxTimeout;

@Option(
name = "override_repository",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@
*/
public class DownloadManager {
private final RepositoryCache repositoryCache;
private List<Path> distdir = ImmutableList.of();
private ImmutableList<Path> distdir = ImmutableList.of();
private UrlRewriter rewriter;
private final Downloader downloader;
private final HttpDownloader bzlmodHttpDownloader;
private boolean disableDownload = false;
private int retries = 0;
@Nullable private Credentials netrcCreds;
Expand All @@ -71,9 +72,19 @@ public interface CredentialFactory {
Credentials create(Map<URI, Map<String, List<String>>> authHeaders);
}

public DownloadManager(RepositoryCache repositoryCache, Downloader downloader) {
/**
* Creates a new {@link DownloadManager}.
*
* @param downloader The (delegating) downloader to use to download files. Is either a
* HttpDownloader, or a GrpcRemoteDownloader.
* @param bzlmodHttpDownloader The downloader to use for downloading files from the bzlmod
* registry.
*/
public DownloadManager(
RepositoryCache repositoryCache, Downloader downloader, HttpDownloader bzlmodHttpDownloader) {
this.repositoryCache = repositoryCache;
this.downloader = downloader;
this.bzlmodHttpDownloader = bzlmodHttpDownloader;
}

public void setDistdir(List<Path> distdir) {
Expand Down Expand Up @@ -425,12 +436,11 @@ public byte[] downloadAndReadOneUrlForBzlmod(
throw new IOException(getRewriterBlockedAllUrlsMessage(ImmutableList.of(originalUrl)));
}

HttpDownloader httpDownloader = new HttpDownloader();
byte[] content;
for (int attempt = 0; ; ++attempt) {
try {
content =
httpDownloader.downloadAndReadOneUrl(
bzlmodHttpDownloader.downloadAndReadOneUrl(
rewrittenUrls.get(0),
credentialFactory.create(authHeaders),
checksum,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,29 @@
*
* <p>This class uses {@link HttpConnectorMultiplexer} to connect to HTTP mirrors and then reads the
* file to disk.
*
* <p>This class is (outside of tests) a singleton instance, living in `BazelRepositoryModule`.
*/
public class HttpDownloader implements Downloader {
static final int MAX_PARALLEL_DOWNLOADS = 8;

private static final Semaphore SEMAPHORE = new Semaphore(MAX_PARALLEL_DOWNLOADS, true);
private static final Clock CLOCK = new JavaClock();
private static final Sleeper SLEEPER = new JavaSleeper();
private static final Locale LOCALE = Locale.getDefault();

private float timeoutScaling = 1.0f;
private int maxAttempts = 0;
private Duration maxRetryTimeout = Duration.ZERO;

public HttpDownloader() {}
private final Semaphore semaphore;
private final float timeoutScaling;
private final int maxAttempts;
private final Duration maxRetryTimeout;

public void setTimeoutScaling(float timeoutScaling) {
this.timeoutScaling = timeoutScaling;
}

public void setMaxAttempts(int maxAttempts) {
public HttpDownloader(
int maxAttempts, Duration maxRetryTimeout, int maxParallelDownloads, float timeoutScaling) {
this.maxAttempts = maxAttempts;
this.maxRetryTimeout = maxRetryTimeout;
semaphore = new Semaphore(maxParallelDownloads, true);
this.timeoutScaling = timeoutScaling;
}

public void setMaxRetryTimeout(Duration maxRetryTimeout) {
this.maxRetryTimeout = maxRetryTimeout;
public HttpDownloader() {
this(0, Duration.ZERO, 8, 1.0f);
}

@Override
Expand All @@ -94,7 +92,7 @@ public void download(
List<IOException> ioExceptions = ImmutableList.of();

for (URL url : urls) {
SEMAPHORE.acquire();
semaphore.acquire();

try (HttpStream payload = multiplexer.connect(url, checksum, headers, credentials, type);
OutputStream out = destination.getOutputStream()) {
Expand All @@ -119,7 +117,7 @@ public void download(
Event.warn("Download from " + url + " failed: " + e.getClass() + " " + e.getMessage()));
continue;
} finally {
SEMAPHORE.release();
semaphore.release();
eventHandler.post(new FetchEvent(url.toString(), success));
}
}
Expand Down Expand Up @@ -154,7 +152,7 @@ public byte[] downloadAndReadOneUrl(
HttpConnectorMultiplexer multiplexer = setUpConnectorMultiplexer(eventHandler, clientEnv);

ByteArrayOutputStream out = new ByteArrayOutputStream();
SEMAPHORE.acquire();
semaphore.acquire();
try (HttpStream payload =
multiplexer.connect(url, checksum, ImmutableMap.of(), credentials, Optional.empty())) {
ByteStreams.copy(payload, out);
Expand All @@ -166,7 +164,7 @@ public byte[] downloadAndReadOneUrl(
} catch (InterruptedIOException e) {
throw new InterruptedException(e.getMessage());
} finally {
SEMAPHORE.release();
semaphore.release();
// TODO(wyv): Do we need to report any event here?
}
return out.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,23 @@

/** A repository function to delegate work done by Starlark remote repositories. */
public final class StarlarkRepositoryFunction extends RepositoryFunction {
private final DownloadManager downloadManager;
private double timeoutScaling = 1.0;
private boolean useWorkers;
@Nullable private DownloadManager downloadManager;
@Nullable private ProcessWrapper processWrapper = null;
@Nullable private RepositoryRemoteExecutor repositoryRemoteExecutor;
@Nullable private SyscallCache syscallCache;

public StarlarkRepositoryFunction(DownloadManager downloadManager) {
this.downloadManager = downloadManager;
}
public StarlarkRepositoryFunction() {}

public void setTimeoutScaling(double timeoutScaling) {
this.timeoutScaling = timeoutScaling;
}

public void setDownloadManager(DownloadManager downloadManager) {
this.downloadManager = downloadManager;
}

public void setProcessWrapper(@Nullable ProcessWrapper processWrapper) {
this.processWrapper = processWrapper;
}
Expand Down
Loading

0 comments on commit cab54be

Please sign in to comment.