libvirt/src/esx/esx_stream.c
Matthias Bolte 125007d373 esx: Add libcurl based stream driver
This allows to implement libvirt functions that use streams, such as
virDoaminScreenshot, without the need to store the downloaded data in
a temporary file first. The stream driver directly interacts with
libcurl to send and receive data.

The driver uses the libcurl multi interface that allows to do a transfer
in multiple curl_multi_perform() calls. The easy interface would do the
whole transfer in a single curl_easy_perform() call. This doesn't work
with the libvirt stream API that is driven by multiple calls to the
virStreamSend() and virStreamRecv() functions.

The curl_multi_wait() function is used to do blocking operations. But it
was added in libcurl 7.28.0. For older versions it is emulated using the
socket callback of the multi interface.

The current driver only supports blocking operations. There is already
some code in place for non-blocking mode but it is not complete.
2014-10-07 22:05:49 +02:00

479 lines
14 KiB
C

/*
* esx_stream.c: libcurl based stream driver
*
* Copyright (C) 2012-2014 Matthias Bolte <matthias.bolte@googlemail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library. If not, see
* <http://www.gnu.org/licenses/>.
*
*/
#include <config.h>
#include "internal.h"
#include "datatypes.h"
#include "viralloc.h"
#include "virstring.h"
#include "esx_stream.h"
#define VIR_FROM_THIS VIR_FROM_ESX
/*
* This libcurl based stream driver cannot use a libcurl easy handle alone
* because curl_easy_perform would do the whole transfer before it returns.
* But there is no place in the stream handling concept that would allow for
* such a call to be made. The stream is driven by esxStream(Send|Recv) which
* is probably called multiple times to send/receive the stream in chunks.
* Therefore, a libcurl multi handle is used that allows to perform the data
* transfer in chunks and also allows to support non-blocking operations.
*
* In the upload direction esxStreamSend is called to push data into the
* stream and libcurl will call esxVI_CURL_ReadStream to pull data out of
* the stream to upload it via HTTP(S). To realize this esxStreamSend calls
* esxStreamTransfer that uses esxVI_MultiCURL_(Wait|Perform) to drive the
* transfer and makes libcurl read up the data passed to esxStreamSend.
*
* In the download direction esxStreamRecv is called to pull data out of the
* stream and libcurl will call esxVI_CURL_WriteStream to push data into the
* stream that it has downloaded via HTTP(S). To realize this esxStreamRecv
* calls esxStreamTransfer that uses esxVI_MultiCURL_(Wait|Perform) to drive
* the transfer and makes libcurl write to the buffer passed to esxStreamRecv.
*
* The download direction requires some extra logic because libcurl might
* call esxVI_CURL_WriteStream with more data than there is space left in the
* buffer passed to esxStreamRecv. But esxVI_CURL_WriteStream is not allowed
* to handle only a part of the incoming data, it needs to handle it all at
* once. Therefore the stream driver manages a backlog buffer that holds the
* extra data that didn't fit into the esxStreamRecv buffer anymore. The next
* time esxStreamRecv is called it'll read the data from the backlog buffer
* first before asking libcurl for more data.
*
* Typically libcurl will call esxVI_CURL_WriteStream with up to 16kb data
* this means that the typically maximum backlog size should be 16kb as well.
*/
enum _esxStreamMode {
ESX_STREAM_MODE_UPLOAD = 1,
ESX_STREAM_MODE_DOWNLOAD = 2
};
typedef struct _esxStreamPrivate esxStreamPrivate;
typedef enum _esxStreamMode esxStreamMode;
struct _esxStreamPrivate {
esxVI_CURL *curl;
int mode;
/* Backlog of downloaded data that has not been esxStreamRecv'ed yet */
char *backlog;
size_t backlog_size;
size_t backlog_used;
/* Buffer given to esxStream(Send|Recv) to (read|write) data (from|to) */
char *buffer;
size_t buffer_size;
size_t buffer_used;
};
static size_t
esxVI_CURL_ReadStream(char *output, size_t size, size_t nmemb, void *userdata)
{
esxStreamPrivate *priv = userdata;
size_t output_size = size * nmemb;
size_t output_used = 0;
if (output_size > priv->buffer_used)
output_used = priv->buffer_used;
else
output_used = output_size;
memcpy(output, priv->buffer + priv->buffer_size - priv->buffer_used,
output_used);
priv->buffer_used -= output_used;
return output_used;
}
static size_t
esxVI_CURL_WriteStream(char *input, size_t size, size_t nmemb, void *userdata)
{
esxStreamPrivate *priv = userdata;
size_t input_size = size * nmemb;
size_t input_used = priv->buffer_size - priv->buffer_used;
if (input_size == 0)
return input_size;
if (input_used > input_size)
input_used = input_size;
/* Fill buffer */
memcpy(priv->buffer + priv->buffer_used, input, input_used);
priv->buffer_used += input_used;
/* Move rest to backlog */
if (input_size > input_used) {
size_t input_remaining = input_size - input_used;
size_t backlog_remaining = priv->backlog_size - priv->backlog_used;
if (!priv->backlog) {
priv->backlog_size = input_remaining;
priv->backlog_used = 0;
if (VIR_ALLOC_N(priv->backlog, priv->backlog_size) < 0)
return 0;
} else if (input_remaining > backlog_remaining) {
priv->backlog_size += input_remaining - backlog_remaining;
if (VIR_REALLOC_N(priv->backlog, priv->backlog_size) < 0)
return 0;
}
memcpy(priv->backlog + priv->backlog_used, input + input_used,
input_remaining);
priv->backlog_used += input_remaining;
}
return input_size;
}
/* Returns -1 on error, 0 if it needs to be called again, and 1 if it's done for now */
static int
esxStreamTransfer(esxStreamPrivate *priv, bool blocking)
{
int runningHandles = 0;
long responseCode = 0;
int status;
CURLcode errorCode;
if (blocking) {
if (esxVI_MultiCURL_Wait(priv->curl->multi, &runningHandles) < 0)
return -1;
} else {
if (esxVI_MultiCURL_Perform(priv->curl->multi, &runningHandles) < 0)
return -1;
}
if (runningHandles == 0) {
/* Transfer is done check for result */
status = esxVI_MultiCURL_CheckFirstMessage(priv->curl->multi,
&responseCode, &errorCode);
if (status == 0) {
/* No message, transfer finished successfully */
return 1;
}
if (status < 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Could not complete transfer: %s (%d)"),
curl_easy_strerror(errorCode), errorCode);
return -1;
}
if (responseCode != 200 && responseCode != 206) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Unexpected HTTP response code %lu"),
responseCode);
return -1;
}
return 1;
}
return blocking ? 0 : 1;
}
static int
esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes)
{
int result = -1;
esxStreamPrivate *priv = stream->privateData;
int status;
if (nbytes == 0)
return 0;
if (!priv) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
return -1;
}
if (priv->mode != ESX_STREAM_MODE_UPLOAD) {
virReportError(VIR_ERR_INVALID_ARG, "%s", _("Not an upload stream"));
return -1;
}
virMutexLock(&priv->curl->lock);
priv->buffer = (char *)data;
priv->buffer_size = nbytes;
priv->buffer_used = nbytes;
if (stream->flags & VIR_STREAM_NONBLOCK) {
if (esxStreamTransfer(priv, false) < 0)
goto cleanup;
if (priv->buffer_used < priv->buffer_size)
result = priv->buffer_size - priv->buffer_used;
else
result = -2;
} else /* blocking */ {
do {
status = esxStreamTransfer(priv, true);
if (status < 0)
goto cleanup;
if (status > 0)
break;
} while (priv->buffer_used > 0);
result = priv->buffer_size - priv->buffer_used;
}
cleanup:
virMutexUnlock(&priv->curl->lock);
return result;
}
static int
esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes)
{
int result = -1;
esxStreamPrivate *priv = stream->privateData;
int status;
if (nbytes == 0)
return 0;
if (!priv) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
return -1;
}
if (priv->mode != ESX_STREAM_MODE_DOWNLOAD) {
virReportError(VIR_ERR_INVALID_ARG, "%s", _("Not a download stream"));
return -1;
}
virMutexLock(&priv->curl->lock);
priv->buffer = data;
priv->buffer_size = nbytes;
priv->buffer_used = 0;
if (priv->backlog_used > 0) {
if (priv->buffer_size > priv->backlog_used)
priv->buffer_used = priv->backlog_used;
else
priv->buffer_used = priv->buffer_size;
memcpy(priv->buffer, priv->backlog, priv->buffer_used);
memmove(priv->backlog, priv->backlog + priv->buffer_used,
priv->backlog_used - priv->buffer_used);
priv->backlog_used -= priv->buffer_used;
result = priv->buffer_used;
} else if (stream->flags & VIR_STREAM_NONBLOCK) {
if (esxStreamTransfer(priv, false) < 0)
goto cleanup;
if (priv->buffer_used > 0)
result = priv->buffer_used;
else
result = -2;
} else /* blocking */ {
do {
status = esxStreamTransfer(priv, true);
if (status < 0)
goto cleanup;
if (status > 0)
break;
} while (priv->buffer_used < priv->buffer_size);
result = priv->buffer_used;
}
cleanup:
virMutexUnlock(&priv->curl->lock);
return result;
}
static void
esxFreeStreamPrivate(esxStreamPrivate **priv)
{
if (!priv || !*priv)
return;
esxVI_CURL_Free(&(*priv)->curl);
VIR_FREE((*priv)->backlog);
VIR_FREE(*priv);
}
static int
esxStreamClose(virStreamPtr stream, bool finish)
{
int result = 0;
esxStreamPrivate *priv = stream->privateData;
if (!priv)
return 0;
virMutexLock(&priv->curl->lock);
if (finish && priv->backlog_used > 0) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Stream has untransferred data left"));
result = -1;
}
stream->privateData = NULL;
virMutexUnlock(&priv->curl->lock);
esxFreeStreamPrivate(&priv);
return result;
}
static int
esxStreamFinish(virStreamPtr stream)
{
return esxStreamClose(stream, true);
}
static int
esxStreamAbort(virStreamPtr stream)
{
return esxStreamClose(stream, false);
}
virStreamDriver esxStreamDriver = {
.streamSend = esxStreamSend,
.streamRecv = esxStreamRecv,
/* FIXME: streamAddCallback missing */
/* FIXME: streamUpdateCallback missing */
/* FIXME: streamRemoveCallback missing */
.streamFinish = esxStreamFinish,
.streamAbort = esxStreamAbort,
};
static int
esxStreamOpen(virStreamPtr stream, esxPrivate *priv, const char *url,
unsigned long long offset, unsigned long long length, int mode)
{
int result = -1;
esxStreamPrivate *streamPriv;
char *range = NULL;
char *userpwd = NULL;
esxVI_MultiCURL *multi = NULL;
/* FIXME: Although there is already some code in place to deal with
* non-blocking streams it is currently incomplete, so usage
* of the non-blocking mode is denied here for now. */
if (stream->flags & VIR_STREAM_NONBLOCK) {
virReportError(VIR_ERR_OPERATION_INVALID, "%s",
_("Non-blocking streams are not supported yet"));
return -1;
}
if (VIR_ALLOC(streamPriv) < 0)
return -1;
streamPriv->mode = mode;
if (length > 0) {
if (virAsprintf(&range, "%llu-%llu", offset, offset + length - 1) < 0)
goto cleanup;
} else if (offset > 0) {
if (virAsprintf(&range, "%llu-", offset) < 0)
goto cleanup;
}
if (esxVI_CURL_Alloc(&streamPriv->curl) < 0 ||
esxVI_CURL_Connect(streamPriv->curl, priv->parsedUri) < 0)
goto cleanup;
if (mode == ESX_STREAM_MODE_UPLOAD) {
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 1);
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READFUNCTION,
esxVI_CURL_ReadStream);
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READDATA, streamPriv);
} else {
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 0);
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_HTTPGET, 1);
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEFUNCTION,
esxVI_CURL_WriteStream);
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEDATA, streamPriv);
}
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_URL, url);
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_RANGE, range);
#if LIBCURL_VERSION_NUM >= 0x071301 /* 7.19.1 */
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERNAME,
priv->primary->username);
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_PASSWORD,
priv->primary->password);
#else
if (virAsprintf(&userpwd, "%s:%s", priv->primary->username,
priv->primary->password) < 0)
goto cleanup;
curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERPWD, userpwd);
#endif
if (esxVI_MultiCURL_Alloc(&multi) < 0 ||
esxVI_MultiCURL_Add(multi, streamPriv->curl) < 0)
goto cleanup;
stream->driver = &esxStreamDriver;
stream->privateData = streamPriv;
result = 0;
cleanup:
if (result < 0) {
if (streamPriv->curl && multi != streamPriv->curl->multi)
esxVI_MultiCURL_Free(&multi);
esxFreeStreamPrivate(&streamPriv);
}
VIR_FREE(range);
VIR_FREE(userpwd);
return result;
}
int
esxStreamOpenUpload(virStreamPtr stream, esxPrivate *priv, const char *url)
{
return esxStreamOpen(stream, priv, url, 0, 0, ESX_STREAM_MODE_UPLOAD);
}
int
esxStreamOpenDownload(virStreamPtr stream, esxPrivate *priv, const char *url,
unsigned long long offset, unsigned long long length)
{
return esxStreamOpen(stream, priv, url, offset, length, ESX_STREAM_MODE_DOWNLOAD);
}