diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c index 43f3561f8e..908cad8746 100644 --- a/src/storage/storage_util.c +++ b/src/storage/storage_util.c @@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, /* Not using O_CREAT because the file is required to already exist at * this point */ ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_WRONLY); + offset, len, false, O_WRONLY); cleanup: VIR_FREE(path); @@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED, } ret = virFDStreamOpenBlockDevice(stream, target_path, - offset, len, O_RDONLY); + offset, len, false, O_RDONLY); cleanup: VIR_FREE(path); diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c index 2be45dd63e..6870d8846c 100644 --- a/src/util/virfdstream.c +++ b/src/util/virfdstream.c @@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream"); typedef enum { VIR_FDSTREAM_MSG_TYPE_DATA, + VIR_FDSTREAM_MSG_TYPE_HOLE, } virFDStreamMsgType; typedef struct _virFDStreamMsg virFDStreamMsg; @@ -66,6 +67,9 @@ struct _virFDStreamMsg { size_t len; size_t offset; } data; + struct { + long long len; + } hole; } stream; }; @@ -198,6 +202,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg) case VIR_FDSTREAM_MSG_TYPE_DATA: VIR_FREE(msg->stream.data.buf); break; + case VIR_FDSTREAM_MSG_TYPE_HOLE: + /* nada */ + break; } VIR_FREE(msg); @@ -385,6 +392,7 @@ struct _virFDStreamThreadData { virStreamPtr st; size_t length; bool doRead; + bool sparse; int fdin; char *fdinname; int fdout; @@ -407,34 +415,68 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data) static ssize_t virFDStreamThreadDoRead(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const int fdout, const char *fdinname, const char *fdoutname, + size_t *dataLen, size_t buflen) { virFDStreamMsgPtr msg = NULL; + int inData = 0; + long long sectionLen = 0; char *buf = NULL; ssize_t got; + if (sparse && *dataLen == 0) { + if (virFileInData(fdin, &inData, §ionLen) < 0) + goto error; + + if (inData) + *dataLen = sectionLen; + } + if (VIR_ALLOC(msg) < 0) goto error; - if (VIR_ALLOC_N(buf, buflen) < 0) - goto error; + if (sparse && *dataLen == 0) { + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = sectionLen; + got = sectionLen; - if ((got = saferead(fdin, buf, buflen)) < 0) { - virReportSystemError(errno, - _("Unable to read %s"), - fdinname); - goto error; + /* HACK: The message queue is one directional. So caller + * cannot make us skip the hole. Do that for them instead. */ + if (sectionLen && + lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdinname); + goto error; + } + } else { + if (sparse && + buflen > *dataLen) + buflen = *dataLen; + + if (VIR_ALLOC_N(buf, buflen) < 0) + goto error; + + if ((got = saferead(fdin, buf, buflen)) < 0) { + virReportSystemError(errno, + _("Unable to read %s"), + fdinname); + goto error; + } + + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; + msg->stream.data.buf = buf; + msg->stream.data.len = got; + buf = NULL; + if (sparse) + *dataLen -= got; } - msg->type = VIR_FDSTREAM_MSG_TYPE_DATA; - msg->stream.data.buf = buf; - msg->stream.data.len = got; - buf = NULL; - virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname); msg = NULL; @@ -449,6 +491,7 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst, static ssize_t virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, + bool sparse, const int fdin, const int fdout, const char *fdinname, @@ -456,6 +499,7 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, { ssize_t got = 0; virFDStreamMsgPtr msg = fdst->msg; + off_t off; bool pop = false; switch (msg->type) { @@ -474,6 +518,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst, pop = msg->stream.data.offset == msg->stream.data.len; break; + + case VIR_FDSTREAM_MSG_TYPE_HOLE: + if (!sparse) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("unexpected stream hole")); + return -1; + } + + got = msg->stream.hole.len; + off = lseek(fdout, got, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, + _("unable to seek in %s"), + fdoutname); + return -1; + } + + if (ftruncate(fdout, off) < 0) { + virReportSystemError(errno, + _("unable to truncate %s"), + fdoutname); + return -1; + } + + pop = true; + break; } if (pop) { @@ -491,6 +561,7 @@ virFDStreamThread(void *opaque) virFDStreamThreadDataPtr data = opaque; virStreamPtr st = data->st; size_t length = data->length; + bool sparse = data->sparse; int fdin = data->fdin; char *fdinname = data->fdinname; int fdout = data->fdout; @@ -499,6 +570,7 @@ virFDStreamThread(void *opaque) bool doRead = fdst->threadDoRead; size_t buflen = 256 * 1024; size_t total = 0; + size_t dataLen = 0; virObjectRef(fdst); virObjectLock(fdst); @@ -533,12 +605,12 @@ virFDStreamThread(void *opaque) } if (doRead) - got = virFDStreamThreadDoRead(fdst, + got = virFDStreamThreadDoRead(fdst, sparse, fdin, fdout, fdinname, fdoutname, - buflen); + &dataLen, buflen); else - got = virFDStreamThreadDoWrite(fdst, + got = virFDStreamThreadDoWrite(fdst, sparse, fdin, fdout, fdinname, fdoutname); @@ -809,6 +881,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } } + /* Shortcut, if the stream is in the trailing hole, + * return 0 immediately. */ + if (msg->type == VIR_FDSTREAM_MSG_TYPE_HOLE && + msg->stream.hole.len == 0) { + ret = 0; + goto cleanup; + } + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) { /* Nope, nope, I'm outta here */ virReportError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -859,11 +939,124 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes) } +static int +virFDStreamSendHole(virStreamPtr st, + long long length, + unsigned int flags) +{ + virFDStreamDataPtr fdst = st->privateData; + virFDStreamMsgPtr msg = NULL; + off_t off; + int ret = -1; + + virCheckFlags(0, -1); + + virObjectLock(fdst); + if (fdst->length) { + if (length > fdst->length - fdst->offset) + length = fdst->length - fdst->offset; + fdst->offset += length; + } + + if (fdst->thread) { + /* Things are a bit complicated here. If FDStream is in a + * read mode, then if the message at the queue head is + * HOLE, just pop it. The thread has lseek()-ed anyway. + * However, if the FDStream is in write mode, then tell + * the thread to do the lseek() for us. Under no + * circumstances we can do the lseek() ourselves here. We + * might mess up file position for the thread. */ + if (fdst->threadDoRead) { + msg = fdst->msg; + if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) { + virReportError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Invalid stream hole")); + goto cleanup; + } + + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe"); + } else { + if (VIR_ALLOC(msg) < 0) + goto cleanup; + + msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE; + msg->stream.hole.len = length; + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe"); + msg = NULL; + } + } else { + off = lseek(fdst->fd, length, SEEK_CUR); + if (off == (off_t) -1) { + virReportSystemError(errno, "%s", + _("unable to seek")); + goto cleanup; + } + + if (ftruncate(fdst->fd, off) < 0) { + virReportSystemError(errno, "%s", + _("unable to truncate")); + goto cleanup; + } + } + + ret = 0; + cleanup: + virObjectUnlock(fdst); + virFDStreamMsgFree(msg); + return ret; +} + + +static int +virFDStreamInData(virStreamPtr st, + int *inData, + long long *length) +{ + virFDStreamDataPtr fdst = st->privateData; + int ret = -1; + + virObjectLock(fdst); + + if (fdst->thread) { + virFDStreamMsgPtr msg; + + while (!(msg = fdst->msg)) { + if (fdst->threadQuit) { + *inData = *length = 0; + ret = 0; + goto cleanup; + } else { + virObjectUnlock(fdst); + virCondSignal(&fdst->threadCond); + virObjectLock(fdst); + } + } + + if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) { + *inData = 1; + *length = msg->stream.data.len - msg->stream.data.offset; + } else { + *inData = 0; + *length = msg->stream.hole.len; + } + ret = 0; + } else { + ret = virFileInData(fdst->fd, inData, length); + } + + cleanup: + virObjectUnlock(fdst); + return ret; +} + + static virStreamDriver virFDStreamDrv = { .streamSend = virFDStreamWrite, .streamRecv = virFDStreamRead, .streamFinish = virFDStreamClose, .streamAbort = virFDStreamAbort, + .streamSendHole = virFDStreamSendHole, + .streamInData = virFDStreamInData, .streamEventAddCallback = virFDStreamAddCallback, .streamEventUpdateCallback = virFDStreamUpdateCallback, .streamEventRemoveCallback = virFDStreamRemoveCallback @@ -1004,7 +1197,8 @@ virFDStreamOpenFileInternal(virStreamPtr st, unsigned long long length, int oflags, int mode, - bool forceIOHelper) + bool forceIOHelper, + bool sparse) { int fd = -1; int pipefds[2] = { -1, -1 }; @@ -1071,6 +1265,7 @@ virFDStreamOpenFileInternal(virStreamPtr st, threadData->st = virObjectRef(st); threadData->length = length; + threadData->sparse = sparse; if ((oflags & O_ACCMODE) == O_RDONLY) { threadData->fdin = fd; @@ -1120,7 +1315,7 @@ int virFDStreamOpenFile(virStreamPtr st, } return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, false); + oflags, 0, false, false); } int virFDStreamCreateFile(virStreamPtr st, @@ -1133,7 +1328,7 @@ int virFDStreamCreateFile(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, mode, - false); + false, false); } #ifdef HAVE_CFMAKERAW @@ -1149,7 +1344,7 @@ int virFDStreamOpenPTY(virStreamPtr st, if (virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false) < 0) + false, false) < 0) return -1; fdst = st->privateData; @@ -1186,7 +1381,7 @@ int virFDStreamOpenPTY(virStreamPtr st, return virFDStreamOpenFileInternal(st, path, offset, length, oflags | O_CREAT, 0, - false); + false, false); } #endif /* !HAVE_CFMAKERAW */ @@ -1194,11 +1389,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags) { return virFDStreamOpenFileInternal(st, path, offset, length, - oflags, 0, true); + oflags, 0, true, sparse); } int virFDStreamSetInternalCloseCb(virStreamPtr st, diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h index 34c4c3fc68..887c991d69 100644 --- a/src/util/virfdstream.h +++ b/src/util/virfdstream.h @@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st, const char *path, unsigned long long offset, unsigned long long length, + bool sparse, int oflags); int virFDStreamSetInternalCloseCb(virStreamPtr st,