aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/binary-cache-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/binary-cache-store.cc')
-rw-r--r--src/libstore/binary-cache-store.cc274
1 files changed, 191 insertions, 83 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index 9f52ddafa..74eb0a9ab 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -11,17 +11,20 @@
#include "nar-accessor.hh"
#include "json.hh"
#include "thread-pool.hh"
+#include "callback.hh"
#include <chrono>
#include <future>
#include <regex>
+#include <fstream>
#include <nlohmann/json.hpp>
namespace nix {
BinaryCacheStore::BinaryCacheStore(const Params & params)
- : Store(params)
+ : BinaryCacheStoreConfig(params)
+ , Store(params)
{
if (secretKeyFile != "")
secretKey = std::unique_ptr<SecretKey>(new SecretKey(readFile(secretKeyFile)));
@@ -57,6 +60,13 @@ void BinaryCacheStore::init()
}
}
+void BinaryCacheStore::upsertFile(const std::string & path,
+ std::string && data,
+ const std::string & mimeType)
+{
+ upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType);
+}
+
void BinaryCacheStore::getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept
{
@@ -76,8 +86,7 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink)
promise.set_exception(std::current_exception());
}
}});
- auto data = promise.get_future().get();
- sink((unsigned char *) data->data(), data->size());
+ sink(*promise.get_future().get());
}
std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
@@ -113,13 +122,63 @@ void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo)
diskCache->upsertNarInfo(getUri(), hashPart, std::shared_ptr<NarInfo>(narInfo));
}
-void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource,
- RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
+AutoCloseFD openFile(const Path & path)
+{
+ auto fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
+ if (!fd)
+ throw SysError("opening file '%1%'", path);
+ return fd;
+}
+
+ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
+ Source & narSource, RepairFlag repair, CheckSigsFlag checkSigs,
+ std::function<ValidPathInfo(HashResult)> mkInfo)
{
- // FIXME: See if we can use the original source to reduce memory usage.
- auto nar = make_ref<std::string>(narSource.drain());
+ auto [fdTemp, fnTemp] = createTempFile();
+
+ AutoDelete autoDelete(fnTemp);
+
+ auto now1 = std::chrono::steady_clock::now();
+
+ /* Read the NAR simultaneously into a CompressionSink+FileSink (to
+ write the compressed NAR to disk), into a HashSink (to get the
+ NAR hash), and into a NarAccessor (to get the NAR listing). */
+ HashSink fileHashSink { htSHA256 };
+ std::shared_ptr<FSAccessor> narAccessor;
+ HashSink narHashSink { htSHA256 };
+ {
+ FdSink fileSink(fdTemp.get());
+ TeeSink teeSinkCompressed { fileSink, fileHashSink };
+ auto compressionSink = makeCompressionSink(compression, teeSinkCompressed);
+ TeeSink teeSinkUncompressed { *compressionSink, narHashSink };
+ TeeSource teeSource { narSource, teeSinkUncompressed };
+ narAccessor = makeNarAccessor(teeSource);
+ compressionSink->finish();
+ fileSink.flush();
+ }
+
+ auto now2 = std::chrono::steady_clock::now();
+
+ auto info = mkInfo(narHashSink.finish());
+ auto narInfo = make_ref<NarInfo>(info);
+ narInfo->compression = compression;
+ auto [fileHash, fileSize] = fileHashSink.finish();
+ narInfo->fileHash = fileHash;
+ narInfo->fileSize = fileSize;
+ narInfo->url = "nar/" + narInfo->fileHash->to_string(Base32, false) + ".nar"
+ + (compression == "xz" ? ".xz" :
+ compression == "bzip2" ? ".bz2" :
+ compression == "zstd" ? ".zst" :
+ compression == "lzip" ? ".lzip" :
+ compression == "lz4" ? ".lz4" :
+ compression == "br" ? ".br" :
+ "");
- if (!repair && isValidPath(info.path)) return;
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+ printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache",
+ printStorePath(narInfo->path), info.narSize,
+ ((1.0 - (double) fileSize / info.narSize) * 100.0),
+ duration);
/* Verify that all references are valid. This may do some .narinfo
reads, but typically they'll already be cached. */
@@ -132,23 +191,6 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
printStorePath(info.path), printStorePath(ref));
}
- assert(nar->compare(0, narMagic.size(), narMagic) == 0);
-
- auto narInfo = make_ref<NarInfo>(info);
-
- narInfo->narSize = nar->size();
- narInfo->narHash = hashString(htSHA256, *nar);
-
- if (info.narHash && info.narHash != narInfo->narHash)
- throw Error("refusing to copy corrupted path '%1%' to binary cache", printStorePath(info.path));
-
- auto accessor_ = std::dynamic_pointer_cast<RemoteFSAccessor>(accessor);
-
- auto narAccessor = makeNarAccessor(nar);
-
- if (accessor_)
- accessor_->addToCache(printStorePath(info.path), *nar, narAccessor);
-
/* Optionally write a JSON file containing a listing of the
contents of the NAR. */
if (writeNARListing) {
@@ -160,33 +202,13 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
{
auto res = jsonRoot.placeholder("root");
- listNar(res, narAccessor, "", true);
+ listNar(res, ref<FSAccessor>(narAccessor), "", true);
}
}
- upsertFile(std::string(info.path.to_string()) + ".ls", jsonOut.str(), "application/json");
+ upsertFile(std::string(info.path.hashPart()) + ".ls", jsonOut.str(), "application/json");
}
- /* Compress the NAR. */
- narInfo->compression = compression;
- auto now1 = std::chrono::steady_clock::now();
- auto narCompressed = compress(compression, *nar, parallelCompression);
- auto now2 = std::chrono::steady_clock::now();
- narInfo->fileHash = hashString(htSHA256, *narCompressed);
- narInfo->fileSize = narCompressed->size();
-
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
- printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache",
- printStorePath(narInfo->path), narInfo->narSize,
- ((1.0 - (double) narCompressed->size() / nar->size()) * 100.0),
- duration);
-
- narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar"
- + (compression == "xz" ? ".xz" :
- compression == "bzip2" ? ".bz2" :
- compression == "br" ? ".br" :
- "");
-
/* Optionally maintain an index of DWARF debug info files
consisting of JSON files named 'debuginfo/<build-id>' that
specify the NAR file and member containing the debug info. */
@@ -247,12 +269,14 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
/* Atomically write the NAR file. */
if (repair || !fileExists(narInfo->url)) {
stats.narWrite++;
- upsertFile(narInfo->url, *narCompressed, "application/x-nix-nar");
+ upsertFile(narInfo->url,
+ std::make_shared<std::fstream>(fnTemp, std::ios_base::in | std::ios_base::binary),
+ "application/x-nix-nar");
} else
stats.narWriteAverted++;
- stats.narWriteBytes += nar->size();
- stats.narWriteCompressedBytes += narCompressed->size();
+ stats.narWriteBytes += info.narSize;
+ stats.narWriteCompressedBytes += fileSize;
stats.narWriteCompressionTimeMs += duration;
/* Atomically write the NAR info file.*/
@@ -261,6 +285,41 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
writeNarInfo(narInfo);
stats.narInfoWrite++;
+
+ return narInfo;
+}
+
+void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource,
+ RepairFlag repair, CheckSigsFlag checkSigs)
+{
+ if (!repair && isValidPath(info.path)) {
+ // FIXME: copyNAR -> null sink
+ narSource.drain();
+ return;
+ }
+
+ addToStoreCommon(narSource, repair, checkSigs, {[&](HashResult nar) {
+ /* FIXME reinstate these, once we can correctly do hash modulo sink as
+ needed. We need to throw here in case we uploaded a corrupted store path. */
+ // assert(info.narHash == nar.first);
+ // assert(info.narSize == nar.second);
+ return info;
+ }});
+}
+
+StorePath BinaryCacheStore::addToStoreFromDump(Source & dump, const string & name,
+ FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
+{
+ if (method != FileIngestionMethod::Recursive || hashAlgo != htSHA256)
+ unsupported("addToStoreFromDump");
+ return addToStoreCommon(dump, repair, CheckSigs, [&](HashResult nar) {
+ ValidPathInfo info {
+ makeFixedOutputPath(method, nar.first, name),
+ nar.first,
+ };
+ info.narSize = nar.second;
+ return info;
+ })->path;
}
bool BinaryCacheStore::isValidPathUncached(const StorePath & storePath)
@@ -275,14 +334,10 @@ void BinaryCacheStore::narFromPath(const StorePath & storePath, Sink & sink)
{
auto info = queryPathInfo(storePath).cast<const NarInfo>();
- uint64_t narSize = 0;
-
- LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
- sink(data, len);
- narSize += len;
- });
+ LengthSink narSize;
+ TeeSink tee { sink, narSize };
- auto decompressor = makeDecompressionSink(info->compression, wrapperSink);
+ auto decompressor = makeDecompressionSink(info->compression, tee);
try {
getFile(info->url, *decompressor);
@@ -294,7 +349,7 @@ void BinaryCacheStore::narFromPath(const StorePath & storePath, Sink & sink)
stats.narRead++;
//stats.narReadCompressedBytes += nar->size(); // FIXME
- stats.narReadBytes += narSize;
+ stats.narReadBytes += narSize.length;
}
void BinaryCacheStore::queryPathInfoUncached(const StorePath & storePath,
@@ -332,44 +387,97 @@ void BinaryCacheStore::queryPathInfoUncached(const StorePath & storePath,
StorePath BinaryCacheStore::addToStore(const string & name, const Path & srcPath,
FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
{
- // FIXME: some cut&paste from LocalStore::addToStore().
+ /* FIXME: Make BinaryCacheStore::addToStoreCommon support
+ non-recursive+sha256 so we can just use the default
+ implementation of this method in terms of addToStoreFromDump. */
- /* Read the whole path into memory. This is not a very scalable
- method for very large paths, but `copyPath' is mainly used for
- small files. */
- StringSink sink;
- Hash h;
+ HashSink sink { hashAlgo };
if (method == FileIngestionMethod::Recursive) {
dumpPath(srcPath, sink, filter);
- h = hashString(hashAlgo, *sink.s);
} else {
- auto s = readFile(srcPath);
- dumpString(s, sink);
- h = hashString(hashAlgo, s);
+ readFile(srcPath, sink);
}
+ auto h = sink.finish().first;
- ValidPathInfo info(makeFixedOutputPath(method, h, name));
-
- auto source = StringSource { *sink.s };
- addToStore(info, source, repair, CheckSigs, nullptr);
-
- return std::move(info.path);
+ auto source = sinkToSource([&](Sink & sink) {
+ dumpPath(srcPath, sink, filter);
+ });
+ return addToStoreCommon(*source, repair, CheckSigs, [&](HashResult nar) {
+ ValidPathInfo info {
+ makeFixedOutputPath(method, h, name),
+ nar.first,
+ };
+ info.narSize = nar.second;
+ info.ca = FixedOutputHash {
+ .method = method,
+ .hash = h,
+ };
+ return info;
+ })->path;
}
StorePath BinaryCacheStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair)
{
- ValidPathInfo info(computeStorePathForText(name, s, references));
- info.references = references;
-
- if (repair || !isValidPath(info.path)) {
- StringSink sink;
- dumpString(s, sink);
- auto source = StringSource { *sink.s };
- addToStore(info, source, repair, CheckSigs, nullptr);
+ auto textHash = hashString(htSHA256, s);
+ auto path = makeTextPath(name, textHash, references);
+
+ if (!repair && isValidPath(path))
+ return path;
+
+ StringSink sink;
+ dumpString(s, sink);
+ auto source = StringSource { *sink.s };
+ return addToStoreCommon(source, repair, CheckSigs, [&](HashResult nar) {
+ ValidPathInfo info { path, nar.first };
+ info.narSize = nar.second;
+ info.ca = TextHash { textHash };
+ info.references = references;
+ return info;
+ })->path;
+}
+
+std::optional<const Realisation> BinaryCacheStore::queryRealisation(const DrvOutput & id)
+{
+ if (diskCache) {
+ auto [cacheOutcome, maybeCachedRealisation] =
+ diskCache->lookupRealisation(getUri(), id);
+ switch (cacheOutcome) {
+ case NarInfoDiskCache::oValid:
+ debug("Returning a cached realisation for %s", id.to_string());
+ return *maybeCachedRealisation;
+ case NarInfoDiskCache::oInvalid:
+ debug("Returning a cached missing realisation for %s", id.to_string());
+ return {};
+ case NarInfoDiskCache::oUnknown:
+ break;
+ }
+ }
+
+ auto outputInfoFilePath = realisationsPrefix + "/" + id.to_string() + ".doi";
+ auto rawOutputInfo = getFile(outputInfoFilePath);
+
+ if (rawOutputInfo) {
+ auto realisation = Realisation::fromJSON(
+ nlohmann::json::parse(*rawOutputInfo), outputInfoFilePath);
+
+ if (diskCache)
+ diskCache->upsertRealisation(
+ getUri(), realisation);
+
+ return {realisation};
+ } else {
+ if (diskCache)
+ diskCache->upsertAbsentRealisation(getUri(), id);
+ return std::nullopt;
}
+}
- return std::move(info.path);
+void BinaryCacheStore::registerDrvOutput(const Realisation& info) {
+ if (diskCache)
+ diskCache->upsertRealisation(getUri(), info);
+ auto filePath = realisationsPrefix + "/" + info.id.to_string() + ".doi";
+ upsertFile(filePath, info.toJSON().dump(), "application/json");
}
ref<FSAccessor> BinaryCacheStore::getFSAccessor()