Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(pip): avoid response buffering #873

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 82 additions & 38 deletions src/Microsoft.ComponentDetection.Detectors/pip/IPyPiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public sealed class PyPiClient : IPyPiClient, IDisposable

private static readonly ProductInfoHeaderValue CommentValue = new("(+https://github.com/microsoft/component-detection)");

private static readonly TimeSpan CacheSlidingExpiration = TimeSpan.FromSeconds(CACHEINTERVALSECONDS);

// Keep telemetry on how the cache is being used for future refinements
private readonly PypiCacheTelemetryRecord cacheTelemetry;

Expand All @@ -63,7 +65,13 @@ public sealed class PyPiClient : IPyPiClient, IDisposable
/// A thread safe cache implementation which contains a mapping of URI -> HttpResponseMessage
/// and has a limited number of entries which will expire after the cache fills or a specified interval.
/// </summary>
private MemoryCache cachedResponses = new MemoryCache(new MemoryCacheOptions { SizeLimit = DEFAULTCACHEENTRIES });
private MemoryCache cachedResponses = new(new MemoryCacheOptions { SizeLimit = DEFAULTCACHEENTRIES });

/// <summary>
/// A thread safe cache mapping <see cref="Uri"/> -> <see cref="List&lt;PipDependencySpecification&gt;"/>
/// and has a limited number of entries which will expire after the cache fills or a specified interval.
/// </summary>
private MemoryCache cachedDependencies = new(new MemoryCacheOptions { SizeLimit = DEFAULTCACHEENTRIES });

public PyPiClient() => this.cacheTelemetry = new PypiCacheTelemetryRecord()
{
Expand All @@ -84,54 +92,88 @@ public PyPiClient(IEnvironmentVariableService environmentVariableService, ILogge

public static HttpClient HttpClient { get; internal set; } = new HttpClient(HttpClientHandler);

public async Task<IList<PipDependencySpecification>> FetchPackageDependenciesAsync(string name, string version, PythonProjectRelease release)
private static async Task<HttpResponseMessage> SendGetRequestAsync(
Uri uri,
HttpCompletionOption completionOption = HttpCompletionOption.ResponseContentRead)
{
var dependencies = new List<PipDependencySpecification>();
using var request = new HttpRequestMessage(HttpMethod.Get, uri);
request.Headers.UserAgent.Add(ProductValue);
request.Headers.UserAgent.Add(CommentValue);
var response = await HttpClient.SendAsync(request, completionOption);
return response;
}

public async Task<IList<PipDependencySpecification>> FetchPackageDependenciesAsync(string name, string version, PythonProjectRelease release)
{
var uri = release.Url;
var response = await this.GetAndCachePyPiResponseAsync(uri);
var key = new CacheDependencyKey(name, version, uri);

if (!response.IsSuccessStatusCode)
return (await this.cachedDependencies.GetOrCreateAsync(key, FetchDependencies)).ToList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .ToList might be redundant since the task result is already a list and it eventually gets converted to dictionary in pythonresolver.


async Task<List<PipDependencySpecification>> FetchDependencies(ICacheEntry cacheEntry)
{
this.logger.LogWarning("Http GET at {ReleaseUrl} failed with status code {ResponseStatusCode}", release.Url, response.StatusCode);
return dependencies;
}
cacheEntry.SlidingExpiration = CacheSlidingExpiration;
cacheEntry.Size = 1;
using var response = await SendGetRequestAsync(uri, HttpCompletionOption.ResponseHeadersRead);

var package = new ZipArchive(await response.Content.ReadAsStreamAsync());
if (!response.IsSuccessStatusCode)
{
this.logger.LogWarning(
"Http GET at {ReleaseUrl} failed with status code {ResponseStatusCode}",
release.Url,
response.StatusCode);
return new List<PipDependencySpecification>();
}

var entry = package.GetEntry($"{name.Replace('-', '_')}-{version}.dist-info/METADATA");
// Store the .whl file on the disk temporarily
var tempFilePath = Path.GetTempFileName();
await using (var fileStreamWrite =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? I know these are temp files, but depending on how big the file can be, couldn't writing to disk cause errors for systems that are particularly low on disk space?

Copy link
Author

@asaf92-legit asaf92-legit Nov 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We encountered the opposite problem very often. We had enough disk space, but got OOM when buffering (and caching) large .whl files into memory in the HTTP SendAsync part.

Perhaps we can make this behavior configurable, so that consumers will be able to select whether to use disk storage for the downloads?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can start by adding some telemetry on the size of some wheels. I am a bit worried that we would eat up all the disk space downloading the wheels to disk, as technically we'd download them twice in a given CI build (pip install + running CD).

A slider or a configurable limit on either:

  1. The size of the given wheel (only write wheels smaller than x bytes to disk)
  2. Total disk space used downloading wheels

sounds like the best options.

Copy link
Author

@asaf92-legit asaf92-legit Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we have 2 options:

  • Store wheels in memory (Default)
  • Store wheels on the disk

Would that be fine?

It would leave current behavior unchanged, but give an option to use disk space instead of process memory for those who want it. A more fine-grained control like you suggested can be added at a later point if necessary.

We can set this with an environment variable (or maybe even an Options<T>)

new FileStream(tempFilePath, FileMode.Create, FileAccess.Write, FileShare.None))
{
await (await response.Content.ReadAsStreamAsync()).CopyToAsync(fileStreamWrite);
}

// If there is no metadata file, the package doesn't have any declared dependencies
if (entry == null)
{
return dependencies;
}
var content = new List<string>();
await using (var fileStreamRead =
new FileStream(tempFilePath, FileMode.Open, FileAccess.Read, FileShare.None))
{
using var package = new ZipArchive(fileStreamRead);

var content = new List<string>();
using (var stream = entry.Open())
{
using var streamReader = new StreamReader(stream);
var entry = package.GetEntry($"{name.Replace('-', '_')}-{version}.dist-info/METADATA");

while (!streamReader.EndOfStream)
{
var line = await streamReader.ReadLineAsync();
// If there is no metadata file, the package doesn't have any declared dependencies
if (entry == null)
{
return new List<PipDependencySpecification>();
}

await using var stream = entry.Open();
using var streamReader = new StreamReader(stream);

if (PipDependencySpecification.RequiresDistRegex.IsMatch(line))
while (!streamReader.EndOfStream)
{
content.Add(line);
var line = await streamReader.ReadLineAsync();
if (line is null)
{
continue;
}

if (PipDependencySpecification.RequiresDistRegex.IsMatch(line))
{
content.Add(line);
}
}
}
}

// Pull the packages that aren't conditional based on "extras"
// Right now we just want to resolve the graph as most comsumers will
// experience it
foreach (var deps in content.Where(x => !x.Contains("extra ==")))
{
dependencies.Add(new PipDependencySpecification(deps, true));
}
File.Delete(tempFilePath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this fails?

The detector can run in parallel and potentially cause lockups.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code uses a different temporary path every time (GetTempFileName), so there won't be issues


return dependencies;
// Pull the packages that aren't conditional based on "extras"
// Right now we just want to resolve the graph as most consumers will
// experience it
return content.Where(x => !x.Contains("extra =="))
.Select(deps => new PipDependencySpecification(deps, true))
.ToList();
}
}

public async Task<SortedDictionary<string, IList<PythonProjectRelease>>> GetReleasesAsync(PipDependencySpecification spec)
Expand Down Expand Up @@ -247,15 +289,13 @@ private async Task<HttpResponseMessage> GetAndCachePyPiResponseAsync(Uri uri)
}

this.logger.LogInformation("Getting Python data from {Uri}", uri);
using var request = new HttpRequestMessage(HttpMethod.Get, uri);
request.Headers.UserAgent.Add(ProductValue);
request.Headers.UserAgent.Add(CommentValue);
var response = await HttpClient.SendAsync(request);
var response = await SendGetRequestAsync(uri);

// The `first - wins` response accepted into the cache. This might be different from the input if another caller wins the race.
return await this.cachedResponses.GetOrCreateAsync(uri, cacheEntry =>
{
cacheEntry.SlidingExpiration = TimeSpan.FromSeconds(CACHEINTERVALSECONDS); // This entry will expire after CACHEINTERVALSECONDS seconds from last use
cacheEntry.SlidingExpiration =
CacheSlidingExpiration; // This entry will expire after CACHEINTERVALSECONDS seconds from last use
cacheEntry.Size = 1; // Specify a size of 1 so a set number of entries can always be in the cache
return Task.FromResult(response);
});
Expand All @@ -272,6 +312,7 @@ private void InitializeNonDefaultMemoryCache()
{
this.logger.LogInformation("Setting IPyPiClient max cache entries to {MaxEntries}", maxEntries);
this.cachedResponses = new MemoryCache(new MemoryCacheOptions { SizeLimit = maxEntries });
this.cachedDependencies = new MemoryCache(new MemoryCacheOptions { SizeLimit = maxEntries });
}

this.checkedMaxEntriesVariable = true;
Expand All @@ -282,5 +323,8 @@ public void Dispose()
this.cacheTelemetry.FinalCacheSize = this.cachedResponses.Count;
this.cacheTelemetry.Dispose();
this.cachedResponses.Dispose();
this.cachedDependencies.Dispose();
}

private record CacheDependencyKey(string Name, string Version, Uri Uri);
}
Loading