From 3a1e2e920e0467be8511e83fe5f7ce44af997cd3 Mon Sep 17 00:00:00 2001 From: Michal Privoznik Date: Sat, 21 May 2016 15:17:51 +0200 Subject: [PATCH] Introduce virStreamSparseRecvAll This is just a wrapper over new functions that have been just introduced: virStreamRecvFlags(), virStreamRecvHole(). It's very similar to virStreamRecvAll() except it handles sparse streams well. Signed-off-by: Michal Privoznik --- include/libvirt/libvirt-stream.h | 33 ++++++++- src/libvirt-stream.c | 123 +++++++++++++++++++++++++++++++ src/libvirt_public.syms | 1 + 3 files changed, 154 insertions(+), 3 deletions(-) diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h index c4baaf7a3d..a5e69a1c1a 100644 --- a/include/libvirt/libvirt-stream.h +++ b/include/libvirt/libvirt-stream.h @@ -104,9 +104,9 @@ int virStreamSendAll(virStreamPtr st, * @nbytes: size of the data array * @opaque: optional application provided data * - * The virStreamSinkFunc callback is used together - * with the virStreamRecvAll function for libvirt to - * provide the data that has been received. + * The virStreamSinkFunc callback is used together with the + * virStreamRecvAll or virStreamSparseRecvAll functions for + * libvirt to provide the data that has been received. * * The callback will be invoked multiple times, * providing data in small chunks. The application @@ -129,6 +129,33 @@ int virStreamRecvAll(virStreamPtr st, virStreamSinkFunc handler, void *opaque); +/** + * virStreamSinkHoleFunc: + * @st: the stream object + * @length: stream hole size + * @opaque: optional application provided data + * + * This callback is used together with the virStreamSparseRecvAll + * function for libvirt to provide the size of a hole that + * occurred in the stream. + * + * The callback may be invoked multiple times as holes are found + * during processing a stream. The application should create the + * hole in the stream target and then return. A return value of + * -1 at any time will abort the receive operation. + * + * Returns 0 on success, + * -1 upon error + */ +typedef int (*virStreamSinkHoleFunc)(virStreamPtr st, + long long length, + void *opaque); + +int virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque); + typedef enum { VIR_STREAM_EVENT_READABLE = (1 << 0), VIR_STREAM_EVENT_WRITABLE = (1 << 1), diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c index bedb6159a8..6bf4c4f29f 100644 --- a/src/libvirt-stream.c +++ b/src/libvirt-stream.c @@ -668,6 +668,129 @@ virStreamRecvAll(virStreamPtr stream, } +/** + * virStreamSparseRecvAll: + * @stream: pointer to the stream object + * @handler: sink callback for writing data to application + * @holeHandler: stream hole callback for skipping holes + * @opaque: application defined data + * + * Receive the entire data stream, sending the data to the + * requested data sink @handler and calling the skip @holeHandler + * to generate holes for sparse stream targets. This is simply a + * convenient alternative to virStreamRecvFlags, for apps that do + * blocking-I/O. + * + * An example using this with a hypothetical file download + * API looks like: + * + * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return write(*fd, buf, nbytes); + * } + * + * int myskip(virStreamPtr st, long long offset, void *opaque) { + * int *fd = opaque; + * + * return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0; + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_WRONLY); + * + * virConnectDownloadSparseFile(conn, st); + * if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) { + * ...report an error ... + * goto done; + * } + * if (virStreamFinish(st) < 0) + * ...report an error... + * virStreamFree(st); + * close(fd); + * + * Note that @opaque data is shared between both @handler and + * @holeHandler callbacks. + * + * Returns 0 if all the data was successfully received. The caller + * should invoke virStreamFinish(st) to flush the stream upon + * success and then virStreamFree(st). + * + * Returns -1 upon any error, with virStreamAbort() already + * having been called, so the caller need only call virStreamFree(). + */ +int +virStreamSparseRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + virStreamSinkHoleFunc holeHandler, + void *opaque) +{ + char *bytes = NULL; + size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX; + const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE; + int ret = -1; + + VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p", + stream, handler, holeHandler, opaque); + + virResetLastError(); + + virCheckStreamReturn(stream, -1); + virCheckNonNullArgGoto(handler, cleanup); + virCheckNonNullArgGoto(holeHandler, cleanup); + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virReportError(VIR_ERR_OPERATION_INVALID, "%s", + _("data sinks cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) + goto cleanup; + + for (;;) { + int got, offset = 0; + long long holeLen; + const unsigned int holeFlags = 0; + + got = virStreamRecvFlags(stream, bytes, want, flags); + if (got == -3) { + if (virStreamRecvHole(stream, &holeLen, holeFlags) < 0) { + virStreamAbort(stream); + goto cleanup; + } + + if (holeHandler(stream, holeLen, opaque) < 0) { + virStreamAbort(stream); + goto cleanup; + } + continue; + } else if (got < 0) { + goto cleanup; + } else if (got == 0) { + break; + } + while (offset < got) { + int done; + done = (handler)(stream, bytes + offset, got - offset, opaque); + if (done < 0) { + virStreamAbort(stream); + goto cleanup; + } + offset += done; + } + } + ret = 0; + + cleanup: + VIR_FREE(bytes); + + if (ret != 0) + virDispatchError(stream->conn); + + return ret; +} + /** * virStreamEventAddCallback: * @stream: pointer to the stream object diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index b73cc8af1e..37fc4e2243 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -764,6 +764,7 @@ LIBVIRT_3.4.0 { virStreamRecvFlags; virStreamRecvHole; virStreamSendHole; + virStreamSparseRecvAll; } LIBVIRT_3.1.0; # .... define new API here using predicted next version number ....