mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2024-10-30 18:03:32 +00:00
fdstream: Implement sparse stream
Basically, what is needed here is to introduce new message type for the messages passed between the event loop callbacks and the worker thread that does all the I/O. The idea is that instead of a queue of read buffers we will have a queue where "hole of size X" messages appear. That way the event loop callbacks can just check the head of the queue and see if the worker thread is in data or a hole section and how long the section is. Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
This commit is contained in:
parent
89a0e69cec
commit
895479647b
@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn ATTRIBUTE_UNUSED,
|
|||||||
/* Not using O_CREAT because the file is required to already exist at
|
/* Not using O_CREAT because the file is required to already exist at
|
||||||
* this point */
|
* this point */
|
||||||
ret = virFDStreamOpenBlockDevice(stream, target_path,
|
ret = virFDStreamOpenBlockDevice(stream, target_path,
|
||||||
offset, len, O_WRONLY);
|
offset, len, false, O_WRONLY);
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
VIR_FREE(path);
|
VIR_FREE(path);
|
||||||
@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn ATTRIBUTE_UNUSED,
|
|||||||
}
|
}
|
||||||
|
|
||||||
ret = virFDStreamOpenBlockDevice(stream, target_path,
|
ret = virFDStreamOpenBlockDevice(stream, target_path,
|
||||||
offset, len, O_RDONLY);
|
offset, len, false, O_RDONLY);
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
VIR_FREE(path);
|
VIR_FREE(path);
|
||||||
|
@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream");
|
|||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
VIR_FDSTREAM_MSG_TYPE_DATA,
|
VIR_FDSTREAM_MSG_TYPE_DATA,
|
||||||
|
VIR_FDSTREAM_MSG_TYPE_HOLE,
|
||||||
} virFDStreamMsgType;
|
} virFDStreamMsgType;
|
||||||
|
|
||||||
typedef struct _virFDStreamMsg virFDStreamMsg;
|
typedef struct _virFDStreamMsg virFDStreamMsg;
|
||||||
@ -66,6 +67,9 @@ struct _virFDStreamMsg {
|
|||||||
size_t len;
|
size_t len;
|
||||||
size_t offset;
|
size_t offset;
|
||||||
} data;
|
} data;
|
||||||
|
struct {
|
||||||
|
long long len;
|
||||||
|
} hole;
|
||||||
} stream;
|
} stream;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -198,6 +202,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg)
|
|||||||
case VIR_FDSTREAM_MSG_TYPE_DATA:
|
case VIR_FDSTREAM_MSG_TYPE_DATA:
|
||||||
VIR_FREE(msg->stream.data.buf);
|
VIR_FREE(msg->stream.data.buf);
|
||||||
break;
|
break;
|
||||||
|
case VIR_FDSTREAM_MSG_TYPE_HOLE:
|
||||||
|
/* nada */
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
VIR_FREE(msg);
|
VIR_FREE(msg);
|
||||||
@ -385,6 +392,7 @@ struct _virFDStreamThreadData {
|
|||||||
virStreamPtr st;
|
virStreamPtr st;
|
||||||
size_t length;
|
size_t length;
|
||||||
bool doRead;
|
bool doRead;
|
||||||
|
bool sparse;
|
||||||
int fdin;
|
int fdin;
|
||||||
char *fdinname;
|
char *fdinname;
|
||||||
int fdout;
|
int fdout;
|
||||||
@ -407,19 +415,50 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
|
|||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
|
virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
|
||||||
|
bool sparse,
|
||||||
const int fdin,
|
const int fdin,
|
||||||
const int fdout,
|
const int fdout,
|
||||||
const char *fdinname,
|
const char *fdinname,
|
||||||
const char *fdoutname,
|
const char *fdoutname,
|
||||||
|
size_t *dataLen,
|
||||||
size_t buflen)
|
size_t buflen)
|
||||||
{
|
{
|
||||||
virFDStreamMsgPtr msg = NULL;
|
virFDStreamMsgPtr msg = NULL;
|
||||||
|
int inData = 0;
|
||||||
|
long long sectionLen = 0;
|
||||||
char *buf = NULL;
|
char *buf = NULL;
|
||||||
ssize_t got;
|
ssize_t got;
|
||||||
|
|
||||||
|
if (sparse && *dataLen == 0) {
|
||||||
|
if (virFileInData(fdin, &inData, §ionLen) < 0)
|
||||||
|
goto error;
|
||||||
|
|
||||||
|
if (inData)
|
||||||
|
*dataLen = sectionLen;
|
||||||
|
}
|
||||||
|
|
||||||
if (VIR_ALLOC(msg) < 0)
|
if (VIR_ALLOC(msg) < 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
|
if (sparse && *dataLen == 0) {
|
||||||
|
msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
|
||||||
|
msg->stream.hole.len = sectionLen;
|
||||||
|
got = sectionLen;
|
||||||
|
|
||||||
|
/* HACK: The message queue is one directional. So caller
|
||||||
|
* cannot make us skip the hole. Do that for them instead. */
|
||||||
|
if (sectionLen &&
|
||||||
|
lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
|
||||||
|
virReportSystemError(errno,
|
||||||
|
_("unable to seek in %s"),
|
||||||
|
fdinname);
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (sparse &&
|
||||||
|
buflen > *dataLen)
|
||||||
|
buflen = *dataLen;
|
||||||
|
|
||||||
if (VIR_ALLOC_N(buf, buflen) < 0)
|
if (VIR_ALLOC_N(buf, buflen) < 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
@ -434,6 +473,9 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
|
|||||||
msg->stream.data.buf = buf;
|
msg->stream.data.buf = buf;
|
||||||
msg->stream.data.len = got;
|
msg->stream.data.len = got;
|
||||||
buf = NULL;
|
buf = NULL;
|
||||||
|
if (sparse)
|
||||||
|
*dataLen -= got;
|
||||||
|
}
|
||||||
|
|
||||||
virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
|
virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
|
||||||
msg = NULL;
|
msg = NULL;
|
||||||
@ -449,6 +491,7 @@ virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
|
|||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
|
virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
|
||||||
|
bool sparse,
|
||||||
const int fdin,
|
const int fdin,
|
||||||
const int fdout,
|
const int fdout,
|
||||||
const char *fdinname,
|
const char *fdinname,
|
||||||
@ -456,6 +499,7 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
|
|||||||
{
|
{
|
||||||
ssize_t got = 0;
|
ssize_t got = 0;
|
||||||
virFDStreamMsgPtr msg = fdst->msg;
|
virFDStreamMsgPtr msg = fdst->msg;
|
||||||
|
off_t off;
|
||||||
bool pop = false;
|
bool pop = false;
|
||||||
|
|
||||||
switch (msg->type) {
|
switch (msg->type) {
|
||||||
@ -474,6 +518,32 @@ virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
|
|||||||
|
|
||||||
pop = msg->stream.data.offset == msg->stream.data.len;
|
pop = msg->stream.data.offset == msg->stream.data.len;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case VIR_FDSTREAM_MSG_TYPE_HOLE:
|
||||||
|
if (!sparse) {
|
||||||
|
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
|
||||||
|
_("unexpected stream hole"));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
got = msg->stream.hole.len;
|
||||||
|
off = lseek(fdout, got, SEEK_CUR);
|
||||||
|
if (off == (off_t) -1) {
|
||||||
|
virReportSystemError(errno,
|
||||||
|
_("unable to seek in %s"),
|
||||||
|
fdoutname);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ftruncate(fdout, off) < 0) {
|
||||||
|
virReportSystemError(errno,
|
||||||
|
_("unable to truncate %s"),
|
||||||
|
fdoutname);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pop = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pop) {
|
if (pop) {
|
||||||
@ -491,6 +561,7 @@ virFDStreamThread(void *opaque)
|
|||||||
virFDStreamThreadDataPtr data = opaque;
|
virFDStreamThreadDataPtr data = opaque;
|
||||||
virStreamPtr st = data->st;
|
virStreamPtr st = data->st;
|
||||||
size_t length = data->length;
|
size_t length = data->length;
|
||||||
|
bool sparse = data->sparse;
|
||||||
int fdin = data->fdin;
|
int fdin = data->fdin;
|
||||||
char *fdinname = data->fdinname;
|
char *fdinname = data->fdinname;
|
||||||
int fdout = data->fdout;
|
int fdout = data->fdout;
|
||||||
@ -499,6 +570,7 @@ virFDStreamThread(void *opaque)
|
|||||||
bool doRead = fdst->threadDoRead;
|
bool doRead = fdst->threadDoRead;
|
||||||
size_t buflen = 256 * 1024;
|
size_t buflen = 256 * 1024;
|
||||||
size_t total = 0;
|
size_t total = 0;
|
||||||
|
size_t dataLen = 0;
|
||||||
|
|
||||||
virObjectRef(fdst);
|
virObjectRef(fdst);
|
||||||
virObjectLock(fdst);
|
virObjectLock(fdst);
|
||||||
@ -533,12 +605,12 @@ virFDStreamThread(void *opaque)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (doRead)
|
if (doRead)
|
||||||
got = virFDStreamThreadDoRead(fdst,
|
got = virFDStreamThreadDoRead(fdst, sparse,
|
||||||
fdin, fdout,
|
fdin, fdout,
|
||||||
fdinname, fdoutname,
|
fdinname, fdoutname,
|
||||||
buflen);
|
&dataLen, buflen);
|
||||||
else
|
else
|
||||||
got = virFDStreamThreadDoWrite(fdst,
|
got = virFDStreamThreadDoWrite(fdst, sparse,
|
||||||
fdin, fdout,
|
fdin, fdout,
|
||||||
fdinname, fdoutname);
|
fdinname, fdoutname);
|
||||||
|
|
||||||
@ -809,6 +881,14 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Shortcut, if the stream is in the trailing hole,
|
||||||
|
* return 0 immediately. */
|
||||||
|
if (msg->type == VIR_FDSTREAM_MSG_TYPE_HOLE &&
|
||||||
|
msg->stream.hole.len == 0) {
|
||||||
|
ret = 0;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
|
if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
|
||||||
/* Nope, nope, I'm outta here */
|
/* Nope, nope, I'm outta here */
|
||||||
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
|
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
|
||||||
@ -859,11 +939,124 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
virFDStreamSendHole(virStreamPtr st,
|
||||||
|
long long length,
|
||||||
|
unsigned int flags)
|
||||||
|
{
|
||||||
|
virFDStreamDataPtr fdst = st->privateData;
|
||||||
|
virFDStreamMsgPtr msg = NULL;
|
||||||
|
off_t off;
|
||||||
|
int ret = -1;
|
||||||
|
|
||||||
|
virCheckFlags(0, -1);
|
||||||
|
|
||||||
|
virObjectLock(fdst);
|
||||||
|
if (fdst->length) {
|
||||||
|
if (length > fdst->length - fdst->offset)
|
||||||
|
length = fdst->length - fdst->offset;
|
||||||
|
fdst->offset += length;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fdst->thread) {
|
||||||
|
/* Things are a bit complicated here. If FDStream is in a
|
||||||
|
* read mode, then if the message at the queue head is
|
||||||
|
* HOLE, just pop it. The thread has lseek()-ed anyway.
|
||||||
|
* However, if the FDStream is in write mode, then tell
|
||||||
|
* the thread to do the lseek() for us. Under no
|
||||||
|
* circumstances we can do the lseek() ourselves here. We
|
||||||
|
* might mess up file position for the thread. */
|
||||||
|
if (fdst->threadDoRead) {
|
||||||
|
msg = fdst->msg;
|
||||||
|
if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) {
|
||||||
|
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
|
||||||
|
_("Invalid stream hole"));
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
|
||||||
|
} else {
|
||||||
|
if (VIR_ALLOC(msg) < 0)
|
||||||
|
goto cleanup;
|
||||||
|
|
||||||
|
msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
|
||||||
|
msg->stream.hole.len = length;
|
||||||
|
virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
|
||||||
|
msg = NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
off = lseek(fdst->fd, length, SEEK_CUR);
|
||||||
|
if (off == (off_t) -1) {
|
||||||
|
virReportSystemError(errno, "%s",
|
||||||
|
_("unable to seek"));
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ftruncate(fdst->fd, off) < 0) {
|
||||||
|
virReportSystemError(errno, "%s",
|
||||||
|
_("unable to truncate"));
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = 0;
|
||||||
|
cleanup:
|
||||||
|
virObjectUnlock(fdst);
|
||||||
|
virFDStreamMsgFree(msg);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
virFDStreamInData(virStreamPtr st,
|
||||||
|
int *inData,
|
||||||
|
long long *length)
|
||||||
|
{
|
||||||
|
virFDStreamDataPtr fdst = st->privateData;
|
||||||
|
int ret = -1;
|
||||||
|
|
||||||
|
virObjectLock(fdst);
|
||||||
|
|
||||||
|
if (fdst->thread) {
|
||||||
|
virFDStreamMsgPtr msg;
|
||||||
|
|
||||||
|
while (!(msg = fdst->msg)) {
|
||||||
|
if (fdst->threadQuit) {
|
||||||
|
*inData = *length = 0;
|
||||||
|
ret = 0;
|
||||||
|
goto cleanup;
|
||||||
|
} else {
|
||||||
|
virObjectUnlock(fdst);
|
||||||
|
virCondSignal(&fdst->threadCond);
|
||||||
|
virObjectLock(fdst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) {
|
||||||
|
*inData = 1;
|
||||||
|
*length = msg->stream.data.len - msg->stream.data.offset;
|
||||||
|
} else {
|
||||||
|
*inData = 0;
|
||||||
|
*length = msg->stream.hole.len;
|
||||||
|
}
|
||||||
|
ret = 0;
|
||||||
|
} else {
|
||||||
|
ret = virFileInData(fdst->fd, inData, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
virObjectUnlock(fdst);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static virStreamDriver virFDStreamDrv = {
|
static virStreamDriver virFDStreamDrv = {
|
||||||
.streamSend = virFDStreamWrite,
|
.streamSend = virFDStreamWrite,
|
||||||
.streamRecv = virFDStreamRead,
|
.streamRecv = virFDStreamRead,
|
||||||
.streamFinish = virFDStreamClose,
|
.streamFinish = virFDStreamClose,
|
||||||
.streamAbort = virFDStreamAbort,
|
.streamAbort = virFDStreamAbort,
|
||||||
|
.streamSendHole = virFDStreamSendHole,
|
||||||
|
.streamInData = virFDStreamInData,
|
||||||
.streamEventAddCallback = virFDStreamAddCallback,
|
.streamEventAddCallback = virFDStreamAddCallback,
|
||||||
.streamEventUpdateCallback = virFDStreamUpdateCallback,
|
.streamEventUpdateCallback = virFDStreamUpdateCallback,
|
||||||
.streamEventRemoveCallback = virFDStreamRemoveCallback
|
.streamEventRemoveCallback = virFDStreamRemoveCallback
|
||||||
@ -1004,7 +1197,8 @@ virFDStreamOpenFileInternal(virStreamPtr st,
|
|||||||
unsigned long long length,
|
unsigned long long length,
|
||||||
int oflags,
|
int oflags,
|
||||||
int mode,
|
int mode,
|
||||||
bool forceIOHelper)
|
bool forceIOHelper,
|
||||||
|
bool sparse)
|
||||||
{
|
{
|
||||||
int fd = -1;
|
int fd = -1;
|
||||||
int pipefds[2] = { -1, -1 };
|
int pipefds[2] = { -1, -1 };
|
||||||
@ -1071,6 +1265,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
|
|||||||
|
|
||||||
threadData->st = virObjectRef(st);
|
threadData->st = virObjectRef(st);
|
||||||
threadData->length = length;
|
threadData->length = length;
|
||||||
|
threadData->sparse = sparse;
|
||||||
|
|
||||||
if ((oflags & O_ACCMODE) == O_RDONLY) {
|
if ((oflags & O_ACCMODE) == O_RDONLY) {
|
||||||
threadData->fdin = fd;
|
threadData->fdin = fd;
|
||||||
@ -1120,7 +1315,7 @@ int virFDStreamOpenFile(virStreamPtr st,
|
|||||||
}
|
}
|
||||||
return virFDStreamOpenFileInternal(st, path,
|
return virFDStreamOpenFileInternal(st, path,
|
||||||
offset, length,
|
offset, length,
|
||||||
oflags, 0, false);
|
oflags, 0, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
int virFDStreamCreateFile(virStreamPtr st,
|
int virFDStreamCreateFile(virStreamPtr st,
|
||||||
@ -1133,7 +1328,7 @@ int virFDStreamCreateFile(virStreamPtr st,
|
|||||||
return virFDStreamOpenFileInternal(st, path,
|
return virFDStreamOpenFileInternal(st, path,
|
||||||
offset, length,
|
offset, length,
|
||||||
oflags | O_CREAT, mode,
|
oflags | O_CREAT, mode,
|
||||||
false);
|
false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAVE_CFMAKERAW
|
#ifdef HAVE_CFMAKERAW
|
||||||
@ -1149,7 +1344,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
|
|||||||
if (virFDStreamOpenFileInternal(st, path,
|
if (virFDStreamOpenFileInternal(st, path,
|
||||||
offset, length,
|
offset, length,
|
||||||
oflags | O_CREAT, 0,
|
oflags | O_CREAT, 0,
|
||||||
false) < 0)
|
false, false) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
fdst = st->privateData;
|
fdst = st->privateData;
|
||||||
@ -1186,7 +1381,7 @@ int virFDStreamOpenPTY(virStreamPtr st,
|
|||||||
return virFDStreamOpenFileInternal(st, path,
|
return virFDStreamOpenFileInternal(st, path,
|
||||||
offset, length,
|
offset, length,
|
||||||
oflags | O_CREAT, 0,
|
oflags | O_CREAT, 0,
|
||||||
false);
|
false, false);
|
||||||
}
|
}
|
||||||
#endif /* !HAVE_CFMAKERAW */
|
#endif /* !HAVE_CFMAKERAW */
|
||||||
|
|
||||||
@ -1194,11 +1389,12 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
|
|||||||
const char *path,
|
const char *path,
|
||||||
unsigned long long offset,
|
unsigned long long offset,
|
||||||
unsigned long long length,
|
unsigned long long length,
|
||||||
|
bool sparse,
|
||||||
int oflags)
|
int oflags)
|
||||||
{
|
{
|
||||||
return virFDStreamOpenFileInternal(st, path,
|
return virFDStreamOpenFileInternal(st, path,
|
||||||
offset, length,
|
offset, length,
|
||||||
oflags, 0, true);
|
oflags, 0, true, sparse);
|
||||||
}
|
}
|
||||||
|
|
||||||
int virFDStreamSetInternalCloseCb(virStreamPtr st,
|
int virFDStreamSetInternalCloseCb(virStreamPtr st,
|
||||||
|
@ -59,6 +59,7 @@ int virFDStreamOpenBlockDevice(virStreamPtr st,
|
|||||||
const char *path,
|
const char *path,
|
||||||
unsigned long long offset,
|
unsigned long long offset,
|
||||||
unsigned long long length,
|
unsigned long long length,
|
||||||
|
bool sparse,
|
||||||
int oflags);
|
int oflags);
|
||||||
|
|
||||||
int virFDStreamSetInternalCloseCb(virStreamPtr st,
|
int virFDStreamSetInternalCloseCb(virStreamPtr st,
|
||||||
|
Loading…
Reference in New Issue
Block a user