From 2d37e88319441958b7eed876042a106e79c7c85d Mon Sep 17 00:00:00 2001
From: Eelco Dolstra <edolstra@gmail.com>
Date: Mon, 24 Sep 2018 13:53:44 +0200
Subject: [PATCH] Move most of the daemon implementation to libstore

---
 src/libstore/daemon.cc          | 798 +++++++++++++++++++++++++++++++
 src/libstore/daemon.hh          |  12 +
 src/libstore/worker-protocol.hh |   3 +
 src/nix-daemon/nix-daemon.cc    | 802 +-------------------------------
 4 files changed, 821 insertions(+), 794 deletions(-)
 create mode 100644 src/libstore/daemon.cc
 create mode 100644 src/libstore/daemon.hh

diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc
new file mode 100644
index 000000000..2e1d10bcc
--- /dev/null
+++ b/src/libstore/daemon.cc
@@ -0,0 +1,798 @@
+#include "daemon.hh"
+#include "monitor-fd.hh"
+#include "worker-protocol.hh"
+#include "store-api.hh"
+#include "local-store.hh"
+#include "finally.hh"
+#include "affinity.hh"
+#include "archive.hh"
+#include "derivations.hh"
+#include "args.hh"
+
+namespace nix::daemon {
+
+Sink & operator << (Sink & sink, const Logger::Fields & fields)
+{
+    sink << fields.size();
+    for (auto & f : fields) {
+        sink << f.type;
+        if (f.type == Logger::Field::tInt)
+            sink << f.i;
+        else if (f.type == Logger::Field::tString)
+            sink << f.s;
+        else abort();
+    }
+    return sink;
+}
+
+/* Logger that forwards log messages to the client, *if* we're in a
+   state where the protocol allows it (i.e., when canSendStderr is
+   true). */
+struct TunnelLogger : public Logger
+{
+    FdSink & to;
+
+    struct State
+    {
+        bool canSendStderr = false;
+        std::vector<std::string> pendingMsgs;
+    };
+
+    Sync<State> state_;
+
+    unsigned int clientVersion;
+
+    TunnelLogger(FdSink & to, unsigned int clientVersion)
+        : to(to), clientVersion(clientVersion) { }
+
+    void enqueueMsg(const std::string & s)
+    {
+        auto state(state_.lock());
+
+        if (state->canSendStderr) {
+            assert(state->pendingMsgs.empty());
+            try {
+                to(s);
+                to.flush();
+            } catch (...) {
+                /* Write failed; that means that the other side is
+                   gone. */
+                state->canSendStderr = false;
+                throw;
+            }
+        } else
+            state->pendingMsgs.push_back(s);
+    }
+
+    void log(Verbosity lvl, const FormatOrString & fs) override
+    {
+        if (lvl > verbosity) return;
+
+        StringSink buf;
+        buf << STDERR_NEXT << (fs.s + "\n");
+        enqueueMsg(*buf.s);
+    }
+
+    /* startWork() means that we're starting an operation for which we
+      want to send out stderr to the client. */
+    void startWork()
+    {
+        auto state(state_.lock());
+        state->canSendStderr = true;
+
+        for (auto & msg : state->pendingMsgs)
+            to(msg);
+
+        state->pendingMsgs.clear();
+
+        to.flush();
+    }
+
+    /* stopWork() means that we're done; stop sending stderr to the
+       client. */
+    void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
+    {
+        auto state(state_.lock());
+
+        state->canSendStderr = false;
+
+        if (success)
+            to << STDERR_LAST;
+        else {
+            to << STDERR_ERROR << msg;
+            if (status != 0) to << status;
+        }
+    }
+
+    void startActivity(ActivityId act, Verbosity lvl, ActivityType type,
+        const std::string & s, const Fields & fields, ActivityId parent) override
+    {
+        if (GET_PROTOCOL_MINOR(clientVersion) < 20) {
+            if (!s.empty())
+                log(lvl, s + "...");
+            return;
+        }
+
+        StringSink buf;
+        buf << STDERR_START_ACTIVITY << act << lvl << type << s << fields << parent;
+        enqueueMsg(*buf.s);
+    }
+
+    void stopActivity(ActivityId act) override
+    {
+        if (GET_PROTOCOL_MINOR(clientVersion) < 20) return;
+        StringSink buf;
+        buf << STDERR_STOP_ACTIVITY << act;
+        enqueueMsg(*buf.s);
+    }
+
+    void result(ActivityId act, ResultType type, const Fields & fields) override
+    {
+        if (GET_PROTOCOL_MINOR(clientVersion) < 20) return;
+        StringSink buf;
+        buf << STDERR_RESULT << act << type << fields;
+        enqueueMsg(*buf.s);
+    }
+};
+
+struct TunnelSink : Sink
+{
+    Sink & to;
+    TunnelSink(Sink & to) : to(to) { }
+    virtual void operator () (const unsigned char * data, size_t len)
+    {
+        to << STDERR_WRITE;
+        writeString(data, len, to);
+    }
+};
+
+struct TunnelSource : BufferedSource
+{
+    Source & from;
+    BufferedSink & to;
+    TunnelSource(Source & from, BufferedSink & to) : from(from), to(to) { }
+    size_t readUnbuffered(unsigned char * data, size_t len) override
+    {
+        to << STDERR_READ << len;
+        to.flush();
+        size_t n = readString(data, len, from);
+        if (n == 0) throw EndOfFile("unexpected end-of-file");
+        return n;
+    }
+};
+
+/* If the NAR archive contains a single file at top-level, then save
+   the contents of the file to `s'.  Otherwise barf. */
+struct RetrieveRegularNARSink : ParseSink
+{
+    bool regular;
+    string s;
+
+    RetrieveRegularNARSink() : regular(true) { }
+
+    void createDirectory(const Path & path)
+    {
+        regular = false;
+    }
+
+    void receiveContents(unsigned char * data, unsigned int len)
+    {
+        s.append((const char *) data, len);
+    }
+
+    void createSymlink(const Path & path, const string & target)
+    {
+        regular = false;
+    }
+};
+
+static void performOp(TunnelLogger * logger, ref<Store> store,
+    bool trusted, unsigned int clientVersion,
+    Source & from, BufferedSink & to, unsigned int op)
+{
+    switch (op) {
+
+    case wopIsValidPath: {
+        /* 'readStorePath' could raise an error leading to the connection
+           being closed.  To be able to recover from an invalid path error,
+           call 'startWork' early, and do 'assertStorePath' afterwards so
+           that the 'Error' exception handler doesn't close the
+           connection.  */
+        Path path = readString(from);
+        logger->startWork();
+        store->assertStorePath(path);
+        bool result = store->isValidPath(path);
+        logger->stopWork();
+        to << result;
+        break;
+    }
+
+    case wopQueryValidPaths: {
+        PathSet paths = readStorePaths<PathSet>(*store, from);
+        logger->startWork();
+        PathSet res = store->queryValidPaths(paths);
+        logger->stopWork();
+        to << res;
+        break;
+    }
+
+    case wopHasSubstitutes: {
+        Path path = readStorePath(*store, from);
+        logger->startWork();
+        PathSet res = store->querySubstitutablePaths({path});
+        logger->stopWork();
+        to << (res.find(path) != res.end());
+        break;
+    }
+
+    case wopQuerySubstitutablePaths: {
+        PathSet paths = readStorePaths<PathSet>(*store, from);
+        logger->startWork();
+        PathSet res = store->querySubstitutablePaths(paths);
+        logger->stopWork();
+        to << res;
+        break;
+    }
+
+    case wopQueryPathHash: {
+        Path path = readStorePath(*store, from);
+        logger->startWork();
+        auto hash = store->queryPathInfo(path)->narHash;
+        logger->stopWork();
+        to << hash.to_string(Base16, false);
+        break;
+    }
+
+    case wopQueryReferences:
+    case wopQueryReferrers:
+    case wopQueryValidDerivers:
+    case wopQueryDerivationOutputs: {
+        Path path = readStorePath(*store, from);
+        logger->startWork();
+        PathSet paths;
+        if (op == wopQueryReferences)
+            paths = store->queryPathInfo(path)->references;
+        else if (op == wopQueryReferrers)
+            store->queryReferrers(path, paths);
+        else if (op == wopQueryValidDerivers)
+            paths = store->queryValidDerivers(path);
+        else paths = store->queryDerivationOutputs(path);
+        logger->stopWork();
+        to << paths;
+        break;
+    }
+
+    case wopQueryDerivationOutputNames: {
+        Path path = readStorePath(*store, from);
+        logger->startWork();
+        StringSet names;
+        names = store->queryDerivationOutputNames(path);
+        logger->stopWork();
+        to << names;
+        break;
+    }
+
+    case wopQueryDeriver: {
+        Path path = readStorePath(*store, from);
+        logger->startWork();
+        auto deriver = store->queryPathInfo(path)->deriver;
+        logger->stopWork();
+        to << deriver;
+        break;
+    }
+
+    case wopQueryPathFromHashPart: {
+        string hashPart = readString(from);
+        logger->startWork();
+        Path path = store->queryPathFromHashPart(hashPart);
+        logger->stopWork();
+        to << path;
+        break;
+    }
+
+    case wopAddToStore: {
+        bool fixed, recursive;
+        std::string s, baseName;
+        from >> baseName >> fixed /* obsolete */ >> recursive >> s;
+        /* Compatibility hack. */
+        if (!fixed) {
+            s = "sha256";
+            recursive = true;
+        }
+        HashType hashAlgo = parseHashType(s);
+
+        TeeSource savedNAR(from);
+        RetrieveRegularNARSink savedRegular;
+
+        if (recursive) {
+            /* Get the entire NAR dump from the client and save it to
+               a string so that we can pass it to
+               addToStoreFromDump(). */
+            ParseSink sink; /* null sink; just parse the NAR */
+            parseDump(sink, savedNAR);
+        } else
+            parseDump(savedRegular, from);
+
+        logger->startWork();
+        if (!savedRegular.regular) throw Error("regular file expected");
+
+        auto store2 = store.dynamic_pointer_cast<LocalStore>();
+        if (!store2) throw Error("operation is only supported by LocalStore");
+
+        Path path = store2->addToStoreFromDump(recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, hashAlgo);
+        logger->stopWork();
+
+        to << path;
+        break;
+    }
+
+    case wopAddTextToStore: {
+        string suffix = readString(from);
+        string s = readString(from);
+        PathSet refs = readStorePaths<PathSet>(*store, from);
+        logger->startWork();
+        Path path = store->addTextToStore(suffix, s, refs, NoRepair);
+        logger->stopWork();
+        to << path;
+        break;
+    }
+
+    case wopExportPath: {
+        Path path = readStorePath(*store, from);
+        readInt(from); // obsolete
+        logger->startWork();
+        TunnelSink sink(to);
+        store->exportPath(path, sink);
+        logger->stopWork();
+        to << 1;
+        break;
+    }
+
+    case wopImportPaths: {
+        logger->startWork();
+        TunnelSource source(from, to);
+        Paths paths = store->importPaths(source, nullptr,
+            trusted ? NoCheckSigs : CheckSigs);
+        logger->stopWork();
+        to << paths;
+        break;
+    }
+
+    case wopBuildPaths: {
+        PathSet drvs = readStorePaths<PathSet>(*store, from);
+        BuildMode mode = bmNormal;
+        if (GET_PROTOCOL_MINOR(clientVersion) >= 15) {
+            mode = (BuildMode) readInt(from);
+
+            /* Repairing is not atomic, so disallowed for "untrusted"
+               clients.  */
+            if (mode == bmRepair && !trusted)
+                throw Error("repairing is not allowed because you are not in 'trusted-users'");
+        }
+        logger->startWork();
+        store->buildPaths(drvs, mode);
+        logger->stopWork();
+        to << 1;
+        break;
+    }
+
+    case wopBuildDerivation: {
+        Path drvPath = readStorePath(*store, from);
+        BasicDerivation drv;
+        readDerivation(from, *store, drv);
+        BuildMode buildMode = (BuildMode) readInt(from);
+        logger->startWork();
+        if (!trusted)
+            throw Error("you are not privileged to build derivations");
+        auto res = store->buildDerivation(drvPath, drv, buildMode);
+        logger->stopWork();
+        to << res.status << res.errorMsg;
+        break;
+    }
+
+    case wopEnsurePath: {
+        Path path = readStorePath(*store, from);
+        logger->startWork();
+        store->ensurePath(path);
+        logger->stopWork();
+        to << 1;
+        break;
+    }
+
+    case wopAddTempRoot: {
+        Path path = readStorePath(*store, from);
+        logger->startWork();
+        store->addTempRoot(path);
+        logger->stopWork();
+        to << 1;
+        break;
+    }
+
+    case wopAddIndirectRoot: {
+        Path path = absPath(readString(from));
+        logger->startWork();
+        store->addIndirectRoot(path);
+        logger->stopWork();
+        to << 1;
+        break;
+    }
+
+    case wopSyncWithGC: {
+        logger->startWork();
+        store->syncWithGC();
+        logger->stopWork();
+        to << 1;
+        break;
+    }
+
+    case wopFindRoots: {
+        logger->startWork();
+        Roots roots = store->findRoots(!trusted);
+        logger->stopWork();
+
+        size_t size = 0;
+        for (auto & i : roots)
+            size += i.second.size();
+
+        to << size;
+
+        for (auto & [target, links] : roots)
+            for (auto & link : links)
+                to << link << target;
+
+        break;
+    }
+
+    case wopCollectGarbage: {
+        GCOptions options;
+        options.action = (GCOptions::GCAction) readInt(from);
+        options.pathsToDelete = readStorePaths<PathSet>(*store, from);
+        from >> options.ignoreLiveness >> options.maxFreed;
+        // obsolete fields
+        readInt(from);
+        readInt(from);
+        readInt(from);
+
+        GCResults results;
+
+        logger->startWork();
+        if (options.ignoreLiveness)
+            throw Error("you are not allowed to ignore liveness");
+        store->collectGarbage(options, results);
+        logger->stopWork();
+
+        to << results.paths << results.bytesFreed << 0 /* obsolete */;
+
+        break;
+    }
+
+    case wopSetOptions: {
+        settings.keepFailed = readInt(from);
+        settings.keepGoing = readInt(from);
+        settings.tryFallback = readInt(from);
+        verbosity = (Verbosity) readInt(from);
+        settings.maxBuildJobs.assign(readInt(from));
+        settings.maxSilentTime = readInt(from);
+        readInt(from); // obsolete useBuildHook
+        settings.verboseBuild = lvlError == (Verbosity) readInt(from);
+        readInt(from); // obsolete logType
+        readInt(from); // obsolete printBuildTrace
+        settings.buildCores = readInt(from);
+        settings.useSubstitutes  = readInt(from);
+
+        StringMap overrides;
+        if (GET_PROTOCOL_MINOR(clientVersion) >= 12) {
+            unsigned int n = readInt(from);
+            for (unsigned int i = 0; i < n; i++) {
+                string name = readString(from);
+                string value = readString(from);
+                overrides.emplace(name, value);
+            }
+        }
+
+        logger->startWork();
+
+        for (auto & i : overrides) {
+            auto & name(i.first);
+            auto & value(i.second);
+
+            auto setSubstituters = [&](Setting<Strings> & res) {
+                if (name != res.name && res.aliases.count(name) == 0)
+                    return false;
+                StringSet trusted = settings.trustedSubstituters;
+                for (auto & s : settings.substituters.get())
+                    trusted.insert(s);
+                Strings subs;
+                auto ss = tokenizeString<Strings>(value);
+                for (auto & s : ss)
+                    if (trusted.count(s))
+                        subs.push_back(s);
+                    else
+                        warn("ignoring untrusted substituter '%s'", s);
+                res = subs;
+                return true;
+            };
+
+            try {
+                if (name == "ssh-auth-sock") // obsolete
+                    ;
+                else if (trusted
+                    || name == settings.buildTimeout.name
+                    || name == "connect-timeout"
+                    || (name == "builders" && value == ""))
+                    settings.set(name, value);
+                else if (setSubstituters(settings.substituters))
+                    ;
+                else if (setSubstituters(settings.extraSubstituters))
+                    ;
+                else
+                    warn("ignoring the user-specified setting '%s', because it is a restricted setting and you are not a trusted user", name);
+            } catch (UsageError & e) {
+                warn(e.what());
+            }
+        }
+
+        logger->stopWork();
+        break;
+    }
+
+    case wopQuerySubstitutablePathInfo: {
+        Path path = absPath(readString(from));
+        logger->startWork();
+        SubstitutablePathInfos infos;
+        store->querySubstitutablePathInfos({path}, infos);
+        logger->stopWork();
+        SubstitutablePathInfos::iterator i = infos.find(path);
+        if (i == infos.end())
+            to << 0;
+        else {
+            to << 1 << i->second.deriver << i->second.references << i->second.downloadSize << i->second.narSize;
+        }
+        break;
+    }
+
+    case wopQuerySubstitutablePathInfos: {
+        PathSet paths = readStorePaths<PathSet>(*store, from);
+        logger->startWork();
+        SubstitutablePathInfos infos;
+        store->querySubstitutablePathInfos(paths, infos);
+        logger->stopWork();
+        to << infos.size();
+        for (auto & i : infos) {
+            to << i.first << i.second.deriver << i.second.references
+               << i.second.downloadSize << i.second.narSize;
+        }
+        break;
+    }
+
+    case wopQueryAllValidPaths: {
+        logger->startWork();
+        PathSet paths = store->queryAllValidPaths();
+        logger->stopWork();
+        to << paths;
+        break;
+    }
+
+    case wopQueryPathInfo: {
+        Path path = readStorePath(*store, from);
+        std::shared_ptr<const ValidPathInfo> info;
+        logger->startWork();
+        try {
+            info = store->queryPathInfo(path);
+        } catch (InvalidPath &) {
+            if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw;
+        }
+        logger->stopWork();
+        if (info) {
+            if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
+                to << 1;
+            to << info->deriver << info->narHash.to_string(Base16, false) << info->references
+               << info->registrationTime << info->narSize;
+            if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
+                to << info->ultimate
+                   << info->sigs
+                   << info->ca;
+            }
+        } else {
+            assert(GET_PROTOCOL_MINOR(clientVersion) >= 17);
+            to << 0;
+        }
+        break;
+    }
+
+    case wopOptimiseStore:
+        logger->startWork();
+        store->optimiseStore();
+        logger->stopWork();
+        to << 1;
+        break;
+
+    case wopVerifyStore: {
+        bool checkContents, repair;
+        from >> checkContents >> repair;
+        logger->startWork();
+        if (repair && !trusted)
+            throw Error("you are not privileged to repair paths");
+        bool errors = store->verifyStore(checkContents, (RepairFlag) repair);
+        logger->stopWork();
+        to << errors;
+        break;
+    }
+
+    case wopAddSignatures: {
+        Path path = readStorePath(*store, from);
+        StringSet sigs = readStrings<StringSet>(from);
+        logger->startWork();
+        if (!trusted)
+            throw Error("you are not privileged to add signatures");
+        store->addSignatures(path, sigs);
+        logger->stopWork();
+        to << 1;
+        break;
+    }
+
+    case wopNarFromPath: {
+        auto path = readStorePath(*store, from);
+        logger->startWork();
+        logger->stopWork();
+        dumpPath(path, to);
+        break;
+    }
+
+    case wopAddToStoreNar: {
+        bool repair, dontCheckSigs;
+        ValidPathInfo info;
+        info.path = readStorePath(*store, from);
+        from >> info.deriver;
+        if (!info.deriver.empty())
+            store->assertStorePath(info.deriver);
+        info.narHash = Hash(readString(from), htSHA256);
+        info.references = readStorePaths<PathSet>(*store, from);
+        from >> info.registrationTime >> info.narSize >> info.ultimate;
+        info.sigs = readStrings<StringSet>(from);
+        from >> info.ca >> repair >> dontCheckSigs;
+        if (!trusted && dontCheckSigs)
+            dontCheckSigs = false;
+        if (!trusted)
+            info.ultimate = false;
+
+        std::string saved;
+        std::unique_ptr<Source> source;
+        if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
+            source = std::make_unique<TunnelSource>(from, to);
+        else {
+            TeeSink tee(from);
+            parseDump(tee, tee.source);
+            saved = std::move(*tee.source.data);
+            source = std::make_unique<StringSource>(saved);
+        }
+
+        logger->startWork();
+
+        // FIXME: race if addToStore doesn't read source?
+        store->addToStore(info, *source, (RepairFlag) repair,
+            dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
+
+        logger->stopWork();
+        break;
+    }
+
+    case wopQueryMissing: {
+        PathSet targets = readStorePaths<PathSet>(*store, from);
+        logger->startWork();
+        PathSet willBuild, willSubstitute, unknown;
+        unsigned long long downloadSize, narSize;
+        store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
+        logger->stopWork();
+        to << willBuild << willSubstitute << unknown << downloadSize << narSize;
+        break;
+    }
+
+    default:
+        throw Error(format("invalid operation %1%") % op);
+    }
+}
+
+void processConnection(
+    FdSource & from,
+    FdSink & to,
+    bool trusted,
+    const std::string & userName,
+    uid_t userId)
+{
+    MonitorFdHup monitor(from.fd);
+
+    /* Exchange the greeting. */
+    unsigned int magic = readInt(from);
+    if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
+    to << WORKER_MAGIC_2 << PROTOCOL_VERSION;
+    to.flush();
+    unsigned int clientVersion = readInt(from);
+
+    if (clientVersion < 0x10a)
+        throw Error("the Nix client version is too old");
+
+    auto tunnelLogger = new TunnelLogger(to, clientVersion);
+    auto prevLogger = nix::logger;
+    logger = tunnelLogger;
+
+    unsigned int opCount = 0;
+
+    Finally finally([&]() {
+        _isInterrupted = false;
+        prevLogger->log(lvlDebug, fmt("%d operations", opCount));
+    });
+
+    if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from))
+        setAffinityTo(readInt(from));
+
+    readInt(from); // obsolete reserveSpace
+
+    /* Send startup error messages to the client. */
+    tunnelLogger->startWork();
+
+    try {
+
+        /* If we can't accept clientVersion, then throw an error
+           *here* (not above). */
+
+#if 0
+        /* Prevent users from doing something very dangerous. */
+        if (geteuid() == 0 &&
+            querySetting("build-users-group", "") == "")
+            throw Error("if you run 'nix-daemon' as root, then you MUST set 'build-users-group'!");
+#endif
+
+        /* Open the store. */
+        Store::Params params; // FIXME: get params from somewhere
+        // Disable caching since the client already does that.
+        params["path-info-cache-size"] = "0";
+        auto store = openStore(settings.storeUri, params);
+
+        store->createUser(userName, userId);
+
+        tunnelLogger->stopWork();
+        to.flush();
+
+        /* Process client requests. */
+        while (true) {
+            WorkerOp op;
+            try {
+                op = (WorkerOp) readInt(from);
+            } catch (Interrupted & e) {
+                break;
+            } catch (EndOfFile & e) {
+                break;
+            }
+
+            opCount++;
+
+            try {
+                performOp(tunnelLogger, store, trusted, clientVersion, from, to, op);
+            } catch (Error & e) {
+                /* If we're not in a state where we can send replies, then
+                   something went wrong processing the input of the
+                   client.  This can happen especially if I/O errors occur
+                   during addTextToStore() / importPath().  If that
+                   happens, just send the error message and exit. */
+                bool errorAllowed = tunnelLogger->state_.lock()->canSendStderr;
+                tunnelLogger->stopWork(false, e.msg(), e.status);
+                if (!errorAllowed) throw;
+            } catch (std::bad_alloc & e) {
+                tunnelLogger->stopWork(false, "Nix daemon out of memory", 1);
+                throw;
+            }
+
+            to.flush();
+
+            assert(!tunnelLogger->state_.lock()->canSendStderr);
+        };
+
+    } catch (std::exception & e) {
+        tunnelLogger->stopWork(false, e.what(), 1);
+        to.flush();
+        return;
+    }
+}
+
+}
diff --git a/src/libstore/daemon.hh b/src/libstore/daemon.hh
new file mode 100644
index 000000000..6d4015e34
--- /dev/null
+++ b/src/libstore/daemon.hh
@@ -0,0 +1,12 @@
+#include "serialise.hh"
+
+namespace nix::daemon {
+
+void processConnection(
+    FdSource & from,
+    FdSink & to,
+    bool trusted,
+    const std::string & userName,
+    uid_t userId);
+
+}
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index 5ebdfaf13..a03edd10f 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -62,6 +62,9 @@ typedef enum {
 #define STDERR_RESULT         0x52534c54
 
 
+class Store;
+class Source;
+
 Path readStorePath(Store & store, Source & from);
 template<class T> T readStorePaths(Store & store, Source & from);
 
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc
index cd18489b0..799d2bacf 100644
--- a/src/nix-daemon/nix-daemon.cc
+++ b/src/nix-daemon/nix-daemon.cc
@@ -2,14 +2,12 @@
 #include "local-store.hh"
 #include "util.hh"
 #include "serialise.hh"
-#include "worker-protocol.hh"
 #include "archive.hh"
-#include "affinity.hh"
 #include "globals.hh"
-#include "monitor-fd.hh"
 #include "derivations.hh"
 #include "finally.hh"
 #include "legacy.hh"
+#include "daemon.hh"
 
 #include <algorithm>
 
@@ -32,6 +30,7 @@
 #endif
 
 using namespace nix;
+using namespace nix::daemon;
 
 #ifndef __linux__
 #define SPLICE_F_MOVE 0
@@ -53,793 +52,6 @@ static ssize_t splice(int fd_in, void *off_in, int fd_out, void *off_out, size_t
 }
 #endif
 
-static FdSource from(STDIN_FILENO);
-static FdSink to(STDOUT_FILENO);
-
-
-Sink & operator << (Sink & sink, const Logger::Fields & fields)
-{
-    sink << fields.size();
-    for (auto & f : fields) {
-        sink << f.type;
-        if (f.type == Logger::Field::tInt)
-            sink << f.i;
-        else if (f.type == Logger::Field::tString)
-            sink << f.s;
-        else abort();
-    }
-    return sink;
-}
-
-
-/* Logger that forwards log messages to the client, *if* we're in a
-   state where the protocol allows it (i.e., when canSendStderr is
-   true). */
-struct TunnelLogger : public Logger
-{
-    struct State
-    {
-        bool canSendStderr = false;
-        std::vector<std::string> pendingMsgs;
-    };
-
-    Sync<State> state_;
-
-    unsigned int clientVersion;
-
-    TunnelLogger(unsigned int clientVersion) : clientVersion(clientVersion) { }
-
-    void enqueueMsg(const std::string & s)
-    {
-        auto state(state_.lock());
-
-        if (state->canSendStderr) {
-            assert(state->pendingMsgs.empty());
-            try {
-                to(s);
-                to.flush();
-            } catch (...) {
-                /* Write failed; that means that the other side is
-                   gone. */
-                state->canSendStderr = false;
-                throw;
-            }
-        } else
-            state->pendingMsgs.push_back(s);
-    }
-
-    void log(Verbosity lvl, const FormatOrString & fs) override
-    {
-        if (lvl > verbosity) return;
-
-        StringSink buf;
-        buf << STDERR_NEXT << (fs.s + "\n");
-        enqueueMsg(*buf.s);
-    }
-
-    /* startWork() means that we're starting an operation for which we
-      want to send out stderr to the client. */
-    void startWork()
-    {
-        auto state(state_.lock());
-        state->canSendStderr = true;
-
-        for (auto & msg : state->pendingMsgs)
-            to(msg);
-
-        state->pendingMsgs.clear();
-
-        to.flush();
-    }
-
-    /* stopWork() means that we're done; stop sending stderr to the
-       client. */
-    void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
-    {
-        auto state(state_.lock());
-
-        state->canSendStderr = false;
-
-        if (success)
-            to << STDERR_LAST;
-        else {
-            to << STDERR_ERROR << msg;
-            if (status != 0) to << status;
-        }
-    }
-
-    void startActivity(ActivityId act, Verbosity lvl, ActivityType type,
-        const std::string & s, const Fields & fields, ActivityId parent) override
-    {
-        if (GET_PROTOCOL_MINOR(clientVersion) < 20) {
-            if (!s.empty())
-                log(lvl, s + "...");
-            return;
-        }
-
-        StringSink buf;
-        buf << STDERR_START_ACTIVITY << act << lvl << type << s << fields << parent;
-        enqueueMsg(*buf.s);
-    }
-
-    void stopActivity(ActivityId act) override
-    {
-        if (GET_PROTOCOL_MINOR(clientVersion) < 20) return;
-        StringSink buf;
-        buf << STDERR_STOP_ACTIVITY << act;
-        enqueueMsg(*buf.s);
-    }
-
-    void result(ActivityId act, ResultType type, const Fields & fields) override
-    {
-        if (GET_PROTOCOL_MINOR(clientVersion) < 20) return;
-        StringSink buf;
-        buf << STDERR_RESULT << act << type << fields;
-        enqueueMsg(*buf.s);
-    }
-};
-
-
-struct TunnelSink : Sink
-{
-    Sink & to;
-    TunnelSink(Sink & to) : to(to) { }
-    virtual void operator () (const unsigned char * data, size_t len)
-    {
-        to << STDERR_WRITE;
-        writeString(data, len, to);
-    }
-};
-
-
-struct TunnelSource : BufferedSource
-{
-    Source & from;
-    TunnelSource(Source & from) : from(from) { }
-protected:
-    size_t readUnbuffered(unsigned char * data, size_t len) override
-    {
-        to << STDERR_READ << len;
-        to.flush();
-        size_t n = readString(data, len, from);
-        if (n == 0) throw EndOfFile("unexpected end-of-file");
-        return n;
-    }
-};
-
-
-/* If the NAR archive contains a single file at top-level, then save
-   the contents of the file to `s'.  Otherwise barf. */
-struct RetrieveRegularNARSink : ParseSink
-{
-    bool regular;
-    string s;
-
-    RetrieveRegularNARSink() : regular(true) { }
-
-    void createDirectory(const Path & path)
-    {
-        regular = false;
-    }
-
-    void receiveContents(unsigned char * data, unsigned int len)
-    {
-        s.append((const char *) data, len);
-    }
-
-    void createSymlink(const Path & path, const string & target)
-    {
-        regular = false;
-    }
-};
-
-
-static void performOp(TunnelLogger * logger, ref<Store> store,
-    bool trusted, unsigned int clientVersion,
-    Source & from, Sink & to, unsigned int op)
-{
-    switch (op) {
-
-    case wopIsValidPath: {
-        /* 'readStorePath' could raise an error leading to the connection
-           being closed.  To be able to recover from an invalid path error,
-           call 'startWork' early, and do 'assertStorePath' afterwards so
-           that the 'Error' exception handler doesn't close the
-           connection.  */
-        Path path = readString(from);
-        logger->startWork();
-        store->assertStorePath(path);
-        bool result = store->isValidPath(path);
-        logger->stopWork();
-        to << result;
-        break;
-    }
-
-    case wopQueryValidPaths: {
-        PathSet paths = readStorePaths<PathSet>(*store, from);
-        logger->startWork();
-        PathSet res = store->queryValidPaths(paths);
-        logger->stopWork();
-        to << res;
-        break;
-    }
-
-    case wopHasSubstitutes: {
-        Path path = readStorePath(*store, from);
-        logger->startWork();
-        PathSet res = store->querySubstitutablePaths({path});
-        logger->stopWork();
-        to << (res.find(path) != res.end());
-        break;
-    }
-
-    case wopQuerySubstitutablePaths: {
-        PathSet paths = readStorePaths<PathSet>(*store, from);
-        logger->startWork();
-        PathSet res = store->querySubstitutablePaths(paths);
-        logger->stopWork();
-        to << res;
-        break;
-    }
-
-    case wopQueryPathHash: {
-        Path path = readStorePath(*store, from);
-        logger->startWork();
-        auto hash = store->queryPathInfo(path)->narHash;
-        logger->stopWork();
-        to << hash.to_string(Base16, false);
-        break;
-    }
-
-    case wopQueryReferences:
-    case wopQueryReferrers:
-    case wopQueryValidDerivers:
-    case wopQueryDerivationOutputs: {
-        Path path = readStorePath(*store, from);
-        logger->startWork();
-        PathSet paths;
-        if (op == wopQueryReferences)
-            paths = store->queryPathInfo(path)->references;
-        else if (op == wopQueryReferrers)
-            store->queryReferrers(path, paths);
-        else if (op == wopQueryValidDerivers)
-            paths = store->queryValidDerivers(path);
-        else paths = store->queryDerivationOutputs(path);
-        logger->stopWork();
-        to << paths;
-        break;
-    }
-
-    case wopQueryDerivationOutputNames: {
-        Path path = readStorePath(*store, from);
-        logger->startWork();
-        StringSet names;
-        names = store->queryDerivationOutputNames(path);
-        logger->stopWork();
-        to << names;
-        break;
-    }
-
-    case wopQueryDeriver: {
-        Path path = readStorePath(*store, from);
-        logger->startWork();
-        auto deriver = store->queryPathInfo(path)->deriver;
-        logger->stopWork();
-        to << deriver;
-        break;
-    }
-
-    case wopQueryPathFromHashPart: {
-        string hashPart = readString(from);
-        logger->startWork();
-        Path path = store->queryPathFromHashPart(hashPart);
-        logger->stopWork();
-        to << path;
-        break;
-    }
-
-    case wopAddToStore: {
-        bool fixed, recursive;
-        std::string s, baseName;
-        from >> baseName >> fixed /* obsolete */ >> recursive >> s;
-        /* Compatibility hack. */
-        if (!fixed) {
-            s = "sha256";
-            recursive = true;
-        }
-        HashType hashAlgo = parseHashType(s);
-
-        TeeSource savedNAR(from);
-        RetrieveRegularNARSink savedRegular;
-
-        if (recursive) {
-            /* Get the entire NAR dump from the client and save it to
-               a string so that we can pass it to
-               addToStoreFromDump(). */
-            ParseSink sink; /* null sink; just parse the NAR */
-            parseDump(sink, savedNAR);
-        } else
-            parseDump(savedRegular, from);
-
-        logger->startWork();
-        if (!savedRegular.regular) throw Error("regular file expected");
-
-        auto store2 = store.dynamic_pointer_cast<LocalStore>();
-        if (!store2) throw Error("operation is only supported by LocalStore");
-
-        Path path = store2->addToStoreFromDump(recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, hashAlgo);
-        logger->stopWork();
-
-        to << path;
-        break;
-    }
-
-    case wopAddTextToStore: {
-        string suffix = readString(from);
-        string s = readString(from);
-        PathSet refs = readStorePaths<PathSet>(*store, from);
-        logger->startWork();
-        Path path = store->addTextToStore(suffix, s, refs, NoRepair);
-        logger->stopWork();
-        to << path;
-        break;
-    }
-
-    case wopExportPath: {
-        Path path = readStorePath(*store, from);
-        readInt(from); // obsolete
-        logger->startWork();
-        TunnelSink sink(to);
-        store->exportPath(path, sink);
-        logger->stopWork();
-        to << 1;
-        break;
-    }
-
-    case wopImportPaths: {
-        logger->startWork();
-        TunnelSource source(from);
-        Paths paths = store->importPaths(source, nullptr,
-            trusted ? NoCheckSigs : CheckSigs);
-        logger->stopWork();
-        to << paths;
-        break;
-    }
-
-    case wopBuildPaths: {
-        PathSet drvs = readStorePaths<PathSet>(*store, from);
-        BuildMode mode = bmNormal;
-        if (GET_PROTOCOL_MINOR(clientVersion) >= 15) {
-            mode = (BuildMode) readInt(from);
-
-            /* Repairing is not atomic, so disallowed for "untrusted"
-               clients.  */
-            if (mode == bmRepair && !trusted)
-                throw Error("repairing is not allowed because you are not in 'trusted-users'");
-        }
-        logger->startWork();
-        store->buildPaths(drvs, mode);
-        logger->stopWork();
-        to << 1;
-        break;
-    }
-
-    case wopBuildDerivation: {
-        Path drvPath = readStorePath(*store, from);
-        BasicDerivation drv;
-        readDerivation(from, *store, drv);
-        BuildMode buildMode = (BuildMode) readInt(from);
-        logger->startWork();
-        if (!trusted)
-            throw Error("you are not privileged to build derivations");
-        auto res = store->buildDerivation(drvPath, drv, buildMode);
-        logger->stopWork();
-        to << res.status << res.errorMsg;
-        break;
-    }
-
-    case wopEnsurePath: {
-        Path path = readStorePath(*store, from);
-        logger->startWork();
-        store->ensurePath(path);
-        logger->stopWork();
-        to << 1;
-        break;
-    }
-
-    case wopAddTempRoot: {
-        Path path = readStorePath(*store, from);
-        logger->startWork();
-        store->addTempRoot(path);
-        logger->stopWork();
-        to << 1;
-        break;
-    }
-
-    case wopAddIndirectRoot: {
-        Path path = absPath(readString(from));
-        logger->startWork();
-        store->addIndirectRoot(path);
-        logger->stopWork();
-        to << 1;
-        break;
-    }
-
-    case wopSyncWithGC: {
-        logger->startWork();
-        store->syncWithGC();
-        logger->stopWork();
-        to << 1;
-        break;
-    }
-
-    case wopFindRoots: {
-        logger->startWork();
-        Roots roots = store->findRoots(!trusted);
-        logger->stopWork();
-
-        size_t size = 0;
-        for (auto & i : roots)
-            size += i.second.size();
-
-        to << size;
-
-        for (auto & [target, links] : roots)
-            for (auto & link : links)
-                to << link << target;
-
-        break;
-    }
-
-    case wopCollectGarbage: {
-        GCOptions options;
-        options.action = (GCOptions::GCAction) readInt(from);
-        options.pathsToDelete = readStorePaths<PathSet>(*store, from);
-        from >> options.ignoreLiveness >> options.maxFreed;
-        // obsolete fields
-        readInt(from);
-        readInt(from);
-        readInt(from);
-
-        GCResults results;
-
-        logger->startWork();
-        if (options.ignoreLiveness)
-            throw Error("you are not allowed to ignore liveness");
-        store->collectGarbage(options, results);
-        logger->stopWork();
-
-        to << results.paths << results.bytesFreed << 0 /* obsolete */;
-
-        break;
-    }
-
-    case wopSetOptions: {
-        settings.keepFailed = readInt(from);
-        settings.keepGoing = readInt(from);
-        settings.tryFallback = readInt(from);
-        verbosity = (Verbosity) readInt(from);
-        settings.maxBuildJobs.assign(readInt(from));
-        settings.maxSilentTime = readInt(from);
-        readInt(from); // obsolete useBuildHook
-        settings.verboseBuild = lvlError == (Verbosity) readInt(from);
-        readInt(from); // obsolete logType
-        readInt(from); // obsolete printBuildTrace
-        settings.buildCores = readInt(from);
-        settings.useSubstitutes  = readInt(from);
-
-        StringMap overrides;
-        if (GET_PROTOCOL_MINOR(clientVersion) >= 12) {
-            unsigned int n = readInt(from);
-            for (unsigned int i = 0; i < n; i++) {
-                string name = readString(from);
-                string value = readString(from);
-                overrides.emplace(name, value);
-            }
-        }
-
-        logger->startWork();
-
-        for (auto & i : overrides) {
-            auto & name(i.first);
-            auto & value(i.second);
-
-            auto setSubstituters = [&](Setting<Strings> & res) {
-                if (name != res.name && res.aliases.count(name) == 0)
-                    return false;
-                StringSet trusted = settings.trustedSubstituters;
-                for (auto & s : settings.substituters.get())
-                    trusted.insert(s);
-                Strings subs;
-                auto ss = tokenizeString<Strings>(value);
-                for (auto & s : ss)
-                    if (trusted.count(s))
-                        subs.push_back(s);
-                    else
-                        warn("ignoring untrusted substituter '%s'", s);
-                res = subs;
-                return true;
-            };
-
-            try {
-                if (name == "ssh-auth-sock") // obsolete
-                    ;
-                else if (trusted
-                    || name == settings.buildTimeout.name
-                    || name == "connect-timeout"
-                    || (name == "builders" && value == ""))
-                    settings.set(name, value);
-                else if (setSubstituters(settings.substituters))
-                    ;
-                else if (setSubstituters(settings.extraSubstituters))
-                    ;
-                else
-                    warn("ignoring the user-specified setting '%s', because it is a restricted setting and you are not a trusted user", name);
-            } catch (UsageError & e) {
-                warn(e.what());
-            }
-        }
-
-        logger->stopWork();
-        break;
-    }
-
-    case wopQuerySubstitutablePathInfo: {
-        Path path = absPath(readString(from));
-        logger->startWork();
-        SubstitutablePathInfos infos;
-        store->querySubstitutablePathInfos({path}, infos);
-        logger->stopWork();
-        SubstitutablePathInfos::iterator i = infos.find(path);
-        if (i == infos.end())
-            to << 0;
-        else {
-            to << 1 << i->second.deriver << i->second.references << i->second.downloadSize << i->second.narSize;
-        }
-        break;
-    }
-
-    case wopQuerySubstitutablePathInfos: {
-        PathSet paths = readStorePaths<PathSet>(*store, from);
-        logger->startWork();
-        SubstitutablePathInfos infos;
-        store->querySubstitutablePathInfos(paths, infos);
-        logger->stopWork();
-        to << infos.size();
-        for (auto & i : infos) {
-            to << i.first << i.second.deriver << i.second.references
-               << i.second.downloadSize << i.second.narSize;
-        }
-        break;
-    }
-
-    case wopQueryAllValidPaths: {
-        logger->startWork();
-        PathSet paths = store->queryAllValidPaths();
-        logger->stopWork();
-        to << paths;
-        break;
-    }
-
-    case wopQueryPathInfo: {
-        Path path = readStorePath(*store, from);
-        std::shared_ptr<const ValidPathInfo> info;
-        logger->startWork();
-        try {
-            info = store->queryPathInfo(path);
-        } catch (InvalidPath &) {
-            if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw;
-        }
-        logger->stopWork();
-        if (info) {
-            if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
-                to << 1;
-            to << info->deriver << info->narHash.to_string(Base16, false) << info->references
-               << info->registrationTime << info->narSize;
-            if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
-                to << info->ultimate
-                   << info->sigs
-                   << info->ca;
-            }
-        } else {
-            assert(GET_PROTOCOL_MINOR(clientVersion) >= 17);
-            to << 0;
-        }
-        break;
-    }
-
-    case wopOptimiseStore:
-        logger->startWork();
-        store->optimiseStore();
-        logger->stopWork();
-        to << 1;
-        break;
-
-    case wopVerifyStore: {
-        bool checkContents, repair;
-        from >> checkContents >> repair;
-        logger->startWork();
-        if (repair && !trusted)
-            throw Error("you are not privileged to repair paths");
-        bool errors = store->verifyStore(checkContents, (RepairFlag) repair);
-        logger->stopWork();
-        to << errors;
-        break;
-    }
-
-    case wopAddSignatures: {
-        Path path = readStorePath(*store, from);
-        StringSet sigs = readStrings<StringSet>(from);
-        logger->startWork();
-        if (!trusted)
-            throw Error("you are not privileged to add signatures");
-        store->addSignatures(path, sigs);
-        logger->stopWork();
-        to << 1;
-        break;
-    }
-
-    case wopNarFromPath: {
-        auto path = readStorePath(*store, from);
-        logger->startWork();
-        logger->stopWork();
-        dumpPath(path, to);
-        break;
-    }
-
-    case wopAddToStoreNar: {
-        bool repair, dontCheckSigs;
-        ValidPathInfo info;
-        info.path = readStorePath(*store, from);
-        from >> info.deriver;
-        if (!info.deriver.empty())
-            store->assertStorePath(info.deriver);
-        info.narHash = Hash(readString(from), htSHA256);
-        info.references = readStorePaths<PathSet>(*store, from);
-        from >> info.registrationTime >> info.narSize >> info.ultimate;
-        info.sigs = readStrings<StringSet>(from);
-        from >> info.ca >> repair >> dontCheckSigs;
-        if (!trusted && dontCheckSigs)
-            dontCheckSigs = false;
-        if (!trusted)
-            info.ultimate = false;
-
-        std::string saved;
-        std::unique_ptr<Source> source;
-        if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
-            source = std::make_unique<TunnelSource>(from);
-        else {
-            TeeSink tee(from);
-            parseDump(tee, tee.source);
-            saved = std::move(*tee.source.data);
-            source = std::make_unique<StringSource>(saved);
-        }
-
-        logger->startWork();
-
-        // FIXME: race if addToStore doesn't read source?
-        store->addToStore(info, *source, (RepairFlag) repair,
-            dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
-
-        logger->stopWork();
-        break;
-    }
-
-    case wopQueryMissing: {
-        PathSet targets = readStorePaths<PathSet>(*store, from);
-        logger->startWork();
-        PathSet willBuild, willSubstitute, unknown;
-        unsigned long long downloadSize, narSize;
-        store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
-        logger->stopWork();
-        to << willBuild << willSubstitute << unknown << downloadSize << narSize;
-        break;
-    }
-
-    default:
-        throw Error(format("invalid operation %1%") % op);
-    }
-}
-
-
-static void processConnection(bool trusted,
-    const std::string & userName, uid_t userId)
-{
-    MonitorFdHup monitor(from.fd);
-
-    /* Exchange the greeting. */
-    unsigned int magic = readInt(from);
-    if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
-    to << WORKER_MAGIC_2 << PROTOCOL_VERSION;
-    to.flush();
-    unsigned int clientVersion = readInt(from);
-
-    if (clientVersion < 0x10a)
-        throw Error("the Nix client version is too old");
-
-    auto tunnelLogger = new TunnelLogger(clientVersion);
-    auto prevLogger = nix::logger;
-    logger = tunnelLogger;
-
-    unsigned int opCount = 0;
-
-    Finally finally([&]() {
-        _isInterrupted = false;
-        prevLogger->log(lvlDebug, fmt("%d operations", opCount));
-    });
-
-    if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from))
-        setAffinityTo(readInt(from));
-
-    readInt(from); // obsolete reserveSpace
-
-    /* Send startup error messages to the client. */
-    tunnelLogger->startWork();
-
-    try {
-
-        /* If we can't accept clientVersion, then throw an error
-           *here* (not above). */
-
-#if 0
-        /* Prevent users from doing something very dangerous. */
-        if (geteuid() == 0 &&
-            querySetting("build-users-group", "") == "")
-            throw Error("if you run 'nix-daemon' as root, then you MUST set 'build-users-group'!");
-#endif
-
-        /* Open the store. */
-        Store::Params params; // FIXME: get params from somewhere
-        // Disable caching since the client already does that.
-        params["path-info-cache-size"] = "0";
-        auto store = openStore(settings.storeUri, params);
-
-        store->createUser(userName, userId);
-
-        tunnelLogger->stopWork();
-        to.flush();
-
-        /* Process client requests. */
-        while (true) {
-            WorkerOp op;
-            try {
-                op = (WorkerOp) readInt(from);
-            } catch (Interrupted & e) {
-                break;
-            } catch (EndOfFile & e) {
-                break;
-            }
-
-            opCount++;
-
-            try {
-                performOp(tunnelLogger, store, trusted, clientVersion, from, to, op);
-            } catch (Error & e) {
-                /* If we're not in a state where we can send replies, then
-                   something went wrong processing the input of the
-                   client.  This can happen especially if I/O errors occur
-                   during addTextToStore() / importPath().  If that
-                   happens, just send the error message and exit. */
-                bool errorAllowed = tunnelLogger->state_.lock()->canSendStderr;
-                tunnelLogger->stopWork(false, e.msg(), e.status);
-                if (!errorAllowed) throw;
-            } catch (std::bad_alloc & e) {
-                tunnelLogger->stopWork(false, "Nix daemon out of memory", 1);
-                throw;
-            }
-
-            to.flush();
-
-            assert(!tunnelLogger->state_.lock()->canSendStderr);
-        };
-
-    } catch (std::exception & e) {
-        tunnelLogger->stopWork(false, e.what(), 1);
-        to.flush();
-        return;
-    }
-}
-
 
 static void sigChldHandler(int sigNo)
 {
@@ -1054,9 +266,9 @@ static void daemonLoop(char * * argv)
                 }
 
                 /* Handle the connection. */
-                from.fd = remote.get();
-                to.fd = remote.get();
-                processConnection(trusted, user, peer.uid);
+                FdSource from(remote.get());
+                FdSink to(remote.get());
+                processConnection(from, to, trusted, user, peer.uid);
 
                 exit(0);
             }, options);
@@ -1136,7 +348,9 @@ static int _main(int argc, char * * argv)
                     }
                 }
             } else {
-                processConnection(true, "root", 0);
+                FdSource from(STDIN_FILENO);
+                FdSink to(STDOUT_FILENO);
+                processConnection(from, to, true, "root", 0);
             }
         } else {
             daemonLoop(argv);
-- 
GitLab