diff options
Diffstat (limited to 'src/libstore/binary-cache-store.cc')
-rw-r--r-- | src/libstore/binary-cache-store.cc | 274 |
1 files changed, 191 insertions, 83 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 9f52ddafa..74eb0a9ab 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -11,17 +11,20 @@ #include "nar-accessor.hh" #include "json.hh" #include "thread-pool.hh" +#include "callback.hh" #include <chrono> #include <future> #include <regex> +#include <fstream> #include <nlohmann/json.hpp> namespace nix { BinaryCacheStore::BinaryCacheStore(const Params & params) - : Store(params) + : BinaryCacheStoreConfig(params) + , Store(params) { if (secretKeyFile != "") secretKey = std::unique_ptr<SecretKey>(new SecretKey(readFile(secretKeyFile))); @@ -57,6 +60,13 @@ void BinaryCacheStore::init() } } +void BinaryCacheStore::upsertFile(const std::string & path, + std::string && data, + const std::string & mimeType) +{ + upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType); +} + void BinaryCacheStore::getFile(const std::string & path, Callback<std::shared_ptr<std::string>> callback) noexcept { @@ -76,8 +86,7 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink) promise.set_exception(std::current_exception()); } }}); - auto data = promise.get_future().get(); - sink((unsigned char *) data->data(), data->size()); + sink(*promise.get_future().get()); } std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) @@ -113,13 +122,63 @@ void BinaryCacheStore::writeNarInfo(ref<NarInfo> narInfo) diskCache->upsertNarInfo(getUri(), hashPart, std::shared_ptr<NarInfo>(narInfo)); } -void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource, - RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor) +AutoCloseFD openFile(const Path & path) +{ + auto fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); + if (!fd) + throw SysError("opening file '%1%'", path); + return fd; +} + +ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon( + Source & narSource, RepairFlag repair, CheckSigsFlag checkSigs, + std::function<ValidPathInfo(HashResult)> mkInfo) { - // FIXME: See if we can use the original source to reduce memory usage. - auto nar = make_ref<std::string>(narSource.drain()); + auto [fdTemp, fnTemp] = createTempFile(); + + AutoDelete autoDelete(fnTemp); + + auto now1 = std::chrono::steady_clock::now(); + + /* Read the NAR simultaneously into a CompressionSink+FileSink (to + write the compressed NAR to disk), into a HashSink (to get the + NAR hash), and into a NarAccessor (to get the NAR listing). */ + HashSink fileHashSink { htSHA256 }; + std::shared_ptr<FSAccessor> narAccessor; + HashSink narHashSink { htSHA256 }; + { + FdSink fileSink(fdTemp.get()); + TeeSink teeSinkCompressed { fileSink, fileHashSink }; + auto compressionSink = makeCompressionSink(compression, teeSinkCompressed); + TeeSink teeSinkUncompressed { *compressionSink, narHashSink }; + TeeSource teeSource { narSource, teeSinkUncompressed }; + narAccessor = makeNarAccessor(teeSource); + compressionSink->finish(); + fileSink.flush(); + } + + auto now2 = std::chrono::steady_clock::now(); + + auto info = mkInfo(narHashSink.finish()); + auto narInfo = make_ref<NarInfo>(info); + narInfo->compression = compression; + auto [fileHash, fileSize] = fileHashSink.finish(); + narInfo->fileHash = fileHash; + narInfo->fileSize = fileSize; + narInfo->url = "nar/" + narInfo->fileHash->to_string(Base32, false) + ".nar" + + (compression == "xz" ? ".xz" : + compression == "bzip2" ? ".bz2" : + compression == "zstd" ? ".zst" : + compression == "lzip" ? ".lzip" : + compression == "lz4" ? ".lz4" : + compression == "br" ? ".br" : + ""); - if (!repair && isValidPath(info.path)) return; + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache", + printStorePath(narInfo->path), info.narSize, + ((1.0 - (double) fileSize / info.narSize) * 100.0), + duration); /* Verify that all references are valid. This may do some .narinfo reads, but typically they'll already be cached. */ @@ -132,23 +191,6 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource printStorePath(info.path), printStorePath(ref)); } - assert(nar->compare(0, narMagic.size(), narMagic) == 0); - - auto narInfo = make_ref<NarInfo>(info); - - narInfo->narSize = nar->size(); - narInfo->narHash = hashString(htSHA256, *nar); - - if (info.narHash && info.narHash != narInfo->narHash) - throw Error("refusing to copy corrupted path '%1%' to binary cache", printStorePath(info.path)); - - auto accessor_ = std::dynamic_pointer_cast<RemoteFSAccessor>(accessor); - - auto narAccessor = makeNarAccessor(nar); - - if (accessor_) - accessor_->addToCache(printStorePath(info.path), *nar, narAccessor); - /* Optionally write a JSON file containing a listing of the contents of the NAR. */ if (writeNARListing) { @@ -160,33 +202,13 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource { auto res = jsonRoot.placeholder("root"); - listNar(res, narAccessor, "", true); + listNar(res, ref<FSAccessor>(narAccessor), "", true); } } - upsertFile(std::string(info.path.to_string()) + ".ls", jsonOut.str(), "application/json"); + upsertFile(std::string(info.path.hashPart()) + ".ls", jsonOut.str(), "application/json"); } - /* Compress the NAR. */ - narInfo->compression = compression; - auto now1 = std::chrono::steady_clock::now(); - auto narCompressed = compress(compression, *nar, parallelCompression); - auto now2 = std::chrono::steady_clock::now(); - narInfo->fileHash = hashString(htSHA256, *narCompressed); - narInfo->fileSize = narCompressed->size(); - - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); - printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache", - printStorePath(narInfo->path), narInfo->narSize, - ((1.0 - (double) narCompressed->size() / nar->size()) * 100.0), - duration); - - narInfo->url = "nar/" + narInfo->fileHash.to_string(Base32, false) + ".nar" - + (compression == "xz" ? ".xz" : - compression == "bzip2" ? ".bz2" : - compression == "br" ? ".br" : - ""); - /* Optionally maintain an index of DWARF debug info files consisting of JSON files named 'debuginfo/<build-id>' that specify the NAR file and member containing the debug info. */ @@ -247,12 +269,14 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { stats.narWrite++; - upsertFile(narInfo->url, *narCompressed, "application/x-nix-nar"); + upsertFile(narInfo->url, + std::make_shared<std::fstream>(fnTemp, std::ios_base::in | std::ios_base::binary), + "application/x-nix-nar"); } else stats.narWriteAverted++; - stats.narWriteBytes += nar->size(); - stats.narWriteCompressedBytes += narCompressed->size(); + stats.narWriteBytes += info.narSize; + stats.narWriteCompressedBytes += fileSize; stats.narWriteCompressionTimeMs += duration; /* Atomically write the NAR info file.*/ @@ -261,6 +285,41 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource writeNarInfo(narInfo); stats.narInfoWrite++; + + return narInfo; +} + +void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource, + RepairFlag repair, CheckSigsFlag checkSigs) +{ + if (!repair && isValidPath(info.path)) { + // FIXME: copyNAR -> null sink + narSource.drain(); + return; + } + + addToStoreCommon(narSource, repair, checkSigs, {[&](HashResult nar) { + /* FIXME reinstate these, once we can correctly do hash modulo sink as + needed. We need to throw here in case we uploaded a corrupted store path. */ + // assert(info.narHash == nar.first); + // assert(info.narSize == nar.second); + return info; + }}); +} + +StorePath BinaryCacheStore::addToStoreFromDump(Source & dump, const string & name, + FileIngestionMethod method, HashType hashAlgo, RepairFlag repair) +{ + if (method != FileIngestionMethod::Recursive || hashAlgo != htSHA256) + unsupported("addToStoreFromDump"); + return addToStoreCommon(dump, repair, CheckSigs, [&](HashResult nar) { + ValidPathInfo info { + makeFixedOutputPath(method, nar.first, name), + nar.first, + }; + info.narSize = nar.second; + return info; + })->path; } bool BinaryCacheStore::isValidPathUncached(const StorePath & storePath) @@ -275,14 +334,10 @@ void BinaryCacheStore::narFromPath(const StorePath & storePath, Sink & sink) { auto info = queryPathInfo(storePath).cast<const NarInfo>(); - uint64_t narSize = 0; - - LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { - sink(data, len); - narSize += len; - }); + LengthSink narSize; + TeeSink tee { sink, narSize }; - auto decompressor = makeDecompressionSink(info->compression, wrapperSink); + auto decompressor = makeDecompressionSink(info->compression, tee); try { getFile(info->url, *decompressor); @@ -294,7 +349,7 @@ void BinaryCacheStore::narFromPath(const StorePath & storePath, Sink & sink) stats.narRead++; //stats.narReadCompressedBytes += nar->size(); // FIXME - stats.narReadBytes += narSize; + stats.narReadBytes += narSize.length; } void BinaryCacheStore::queryPathInfoUncached(const StorePath & storePath, @@ -332,44 +387,97 @@ void BinaryCacheStore::queryPathInfoUncached(const StorePath & storePath, StorePath BinaryCacheStore::addToStore(const string & name, const Path & srcPath, FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair) { - // FIXME: some cut&paste from LocalStore::addToStore(). + /* FIXME: Make BinaryCacheStore::addToStoreCommon support + non-recursive+sha256 so we can just use the default + implementation of this method in terms of addToStoreFromDump. */ - /* Read the whole path into memory. This is not a very scalable - method for very large paths, but `copyPath' is mainly used for - small files. */ - StringSink sink; - Hash h; + HashSink sink { hashAlgo }; if (method == FileIngestionMethod::Recursive) { dumpPath(srcPath, sink, filter); - h = hashString(hashAlgo, *sink.s); } else { - auto s = readFile(srcPath); - dumpString(s, sink); - h = hashString(hashAlgo, s); + readFile(srcPath, sink); } + auto h = sink.finish().first; - ValidPathInfo info(makeFixedOutputPath(method, h, name)); - - auto source = StringSource { *sink.s }; - addToStore(info, source, repair, CheckSigs, nullptr); - - return std::move(info.path); + auto source = sinkToSource([&](Sink & sink) { + dumpPath(srcPath, sink, filter); + }); + return addToStoreCommon(*source, repair, CheckSigs, [&](HashResult nar) { + ValidPathInfo info { + makeFixedOutputPath(method, h, name), + nar.first, + }; + info.narSize = nar.second; + info.ca = FixedOutputHash { + .method = method, + .hash = h, + }; + return info; + })->path; } StorePath BinaryCacheStore::addTextToStore(const string & name, const string & s, const StorePathSet & references, RepairFlag repair) { - ValidPathInfo info(computeStorePathForText(name, s, references)); - info.references = references; - - if (repair || !isValidPath(info.path)) { - StringSink sink; - dumpString(s, sink); - auto source = StringSource { *sink.s }; - addToStore(info, source, repair, CheckSigs, nullptr); + auto textHash = hashString(htSHA256, s); + auto path = makeTextPath(name, textHash, references); + + if (!repair && isValidPath(path)) + return path; + + StringSink sink; + dumpString(s, sink); + auto source = StringSource { *sink.s }; + return addToStoreCommon(source, repair, CheckSigs, [&](HashResult nar) { + ValidPathInfo info { path, nar.first }; + info.narSize = nar.second; + info.ca = TextHash { textHash }; + info.references = references; + return info; + })->path; +} + +std::optional<const Realisation> BinaryCacheStore::queryRealisation(const DrvOutput & id) +{ + if (diskCache) { + auto [cacheOutcome, maybeCachedRealisation] = + diskCache->lookupRealisation(getUri(), id); + switch (cacheOutcome) { + case NarInfoDiskCache::oValid: + debug("Returning a cached realisation for %s", id.to_string()); + return *maybeCachedRealisation; + case NarInfoDiskCache::oInvalid: + debug("Returning a cached missing realisation for %s", id.to_string()); + return {}; + case NarInfoDiskCache::oUnknown: + break; + } + } + + auto outputInfoFilePath = realisationsPrefix + "/" + id.to_string() + ".doi"; + auto rawOutputInfo = getFile(outputInfoFilePath); + + if (rawOutputInfo) { + auto realisation = Realisation::fromJSON( + nlohmann::json::parse(*rawOutputInfo), outputInfoFilePath); + + if (diskCache) + diskCache->upsertRealisation( + getUri(), realisation); + + return {realisation}; + } else { + if (diskCache) + diskCache->upsertAbsentRealisation(getUri(), id); + return std::nullopt; } +} - return std::move(info.path); +void BinaryCacheStore::registerDrvOutput(const Realisation& info) { + if (diskCache) + diskCache->upsertRealisation(getUri(), info); + auto filePath = realisationsPrefix + "/" + info.id.to_string() + ".doi"; + upsertFile(filePath, info.toJSON().dump(), "application/json"); } ref<FSAccessor> BinaryCacheStore::getFSAccessor() |