Convert libvirtd over to the new RPC handling APIs

This guts the libvirtd daemon, removing all its networking and
RPC handling code. Instead it calls out to the new virServerPtr
APIs for all its RPC & networking work

As a fallout all libvirtd daemon error reporting now takes place
via the normal internal error reporting APIs. There is no need
to call separate error reporting APIs in RPC code, nor should
code use VIR_WARN/VIR_ERROR for reporting fatal problems anymore.

* daemon/qemu_dispatch_*.h, daemon/remote_dispatch_*.h: Remove
  old generated dispatcher code
* daemon/qemu_dispatch.h, daemon/remote_dispatch.h: New dispatch
  code
* daemon/dispatch.c, daemon/dispatch.h: Remove obsoleted code
* daemon/remote.c, daemon/remote.h: Rewrite for new dispatch
  APIs
* daemon/libvirtd.c, daemon/libvirtd.h: Remove all networking
  code
* daemon/stream.c, daemon/stream.h: Update for new APIs
* daemon/Makefile.am: Link to libvirt-net-rpc-server.la
This commit is contained in:
Daniel P. Berrange 2011-05-16 18:13:11 +01:00
parent c1b2264477
commit df0b57a95a
15 changed files with 1909 additions and 4965 deletions

2
.gitignore vendored
View File

@ -34,7 +34,7 @@
/config.sub /config.sub
/configure /configure
/configure.lineno /configure.lineno
/daemon/*_dispatch_*.h /daemon/*_dispatch.h
/docs/hvsupport.html.in /docs/hvsupport.html.in
/gnulib/ /gnulib/
/libtool /libtool

View File

@ -299,17 +299,6 @@ if test x"$enable_debug" = x"yes"; then
fi fi
AC_MSG_CHECKING([where to write libvirtd PID file])
AC_ARG_WITH([remote-pid-file], [AC_HELP_STRING([--with-remote-pid-file=@<:@pidfile|none@:>@], [PID file for libvirtd])])
if test "x$with_remote_pid_file" = "x" ; then
REMOTE_PID_FILE="$localstatedir/run/libvirtd.pid"
elif test "x$with_remote_pid_file" = "xnone" ; then
REMOTE_PID_FILE=""
else
REMOTE_PID_FILE="$with_remote_pid_file"
fi
AC_SUBST([REMOTE_PID_FILE])
AC_MSG_RESULT($REMOTE_PID_FILE)
dnl dnl
dnl init script flavor dnl init script flavor

View File

@ -3,33 +3,21 @@
CLEANFILES = CLEANFILES =
DAEMON_GENERATED = \ DAEMON_GENERATED = \
$(srcdir)/remote_dispatch_prototypes.h \ $(srcdir)/remote_dispatch.h \
$(srcdir)/remote_dispatch_table.h \ $(srcdir)/qemu_dispatch.h
$(srcdir)/remote_dispatch_args.h \
$(srcdir)/remote_dispatch_ret.h \
$(srcdir)/remote_dispatch_bodies.h \
$(srcdir)/qemu_dispatch_prototypes.h \
$(srcdir)/qemu_dispatch_table.h \
$(srcdir)/qemu_dispatch_args.h \
$(srcdir)/qemu_dispatch_ret.h \
$(srcdir)/qemu_dispatch_bodies.h
DAEMON_SOURCES = \ DAEMON_SOURCES = \
libvirtd.c libvirtd.h \ libvirtd.c libvirtd.h \
remote.c remote.h \ remote.c remote.h \
dispatch.c dispatch.h \
stream.c stream.h \ stream.c stream.h \
../src/remote/remote_protocol.c \ ../src/remote/remote_protocol.c \
../src/remote/qemu_protocol.c \ ../src/remote/qemu_protocol.c \
$(DAEMON_GENERATED) $(DAEMON_GENERATED)
AVAHI_SOURCES = \
mdns.c mdns.h
DISTCLEANFILES = DISTCLEANFILES =
EXTRA_DIST = \ EXTRA_DIST = \
remote_dispatch_bodies.h \ remote_dispatch.h \
qemu_dispatch_bodies.h \ qemu_dispatch.h \
libvirtd.conf \ libvirtd.conf \
libvirtd.init.in \ libvirtd.init.in \
libvirtd.upstart \ libvirtd.upstart \
@ -47,7 +35,6 @@ EXTRA_DIST = \
libvirtd.pod.in \ libvirtd.pod.in \
libvirtd.8.in \ libvirtd.8.in \
libvirtd.stp \ libvirtd.stp \
$(AVAHI_SOURCES) \
$(DAEMON_SOURCES) $(DAEMON_SOURCES)
BUILT_SOURCES = BUILT_SOURCES =
@ -55,52 +42,12 @@ BUILT_SOURCES =
REMOTE_PROTOCOL = $(top_srcdir)/src/remote/remote_protocol.x REMOTE_PROTOCOL = $(top_srcdir)/src/remote/remote_protocol.x
QEMU_PROTOCOL = $(top_srcdir)/src/remote/qemu_protocol.x QEMU_PROTOCOL = $(top_srcdir)/src/remote/qemu_protocol.x
$(srcdir)/remote_dispatch_prototypes.h: $(srcdir)/../src/rpc/gendispatch.pl \ $(srcdir)/remote_dispatch.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(REMOTE_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -c -p remote \
$(REMOTE_PROTOCOL) > $@
$(srcdir)/remote_dispatch_table.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(REMOTE_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -c -t remote \
$(REMOTE_PROTOCOL) > $@
$(srcdir)/remote_dispatch_args.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(REMOTE_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -c -a remote \
$(REMOTE_PROTOCOL) > $@
$(srcdir)/remote_dispatch_ret.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(REMOTE_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -c -r remote \
$(REMOTE_PROTOCOL) > $@
$(srcdir)/remote_dispatch_bodies.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(REMOTE_PROTOCOL) $(REMOTE_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -c -b remote \ $(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -c -b remote \
$(REMOTE_PROTOCOL) > $@ $(REMOTE_PROTOCOL) > $@
$(srcdir)/qemu_dispatch_prototypes.h: $(srcdir)/../src/rpc/gendispatch.pl \ $(srcdir)/qemu_dispatch.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(QEMU_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -p qemu \
$(QEMU_PROTOCOL) > $@
$(srcdir)/qemu_dispatch_table.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(QEMU_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -t qemu \
$(QEMU_PROTOCOL) > $@
$(srcdir)/qemu_dispatch_args.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(QEMU_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -a qemu \
$(QEMU_PROTOCOL) > $@
$(srcdir)/qemu_dispatch_ret.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(QEMU_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -r qemu \
$(QEMU_PROTOCOL) > $@
$(srcdir)/qemu_dispatch_bodies.h: $(srcdir)/../src/rpc/gendispatch.pl \
$(QEMU_PROTOCOL) $(QEMU_PROTOCOL)
$(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -b qemu \ $(AM_V_GEN)perl -w $(srcdir)/../src/rpc/gendispatch.pl -b qemu \
$(QEMU_PROTOCOL) > $@ $(QEMU_PROTOCOL) > $@
@ -137,6 +84,7 @@ libvirtd_CFLAGS = \
-I$(top_srcdir)/src \ -I$(top_srcdir)/src \
-I$(top_srcdir)/src/util \ -I$(top_srcdir)/src/util \
-I$(top_srcdir)/src/conf \ -I$(top_srcdir)/src/conf \
-I$(top_srcdir)/src/rpc \
-I$(top_srcdir)/src/remote \ -I$(top_srcdir)/src/remote \
$(LIBXML_CFLAGS) $(GNUTLS_CFLAGS) $(SASL_CFLAGS) \ $(LIBXML_CFLAGS) $(GNUTLS_CFLAGS) $(SASL_CFLAGS) \
$(XDR_CFLAGS) $(POLKIT_CFLAGS) \ $(XDR_CFLAGS) $(POLKIT_CFLAGS) \
@ -155,7 +103,10 @@ libvirtd_LDADD = \
$(SASL_LIBS) \ $(SASL_LIBS) \
$(POLKIT_LIBS) $(POLKIT_LIBS)
libvirtd_LDADD += ../src/libvirt-qemu.la libvirtd_LDADD += \
../src/libvirt-net-rpc-server.la \
../src/libvirt-net-rpc.la \
../src/libvirt-qemu.la
if ! WITH_DRIVER_MODULES if ! WITH_DRIVER_MODULES
if WITH_QEMU if WITH_QEMU
@ -211,12 +162,6 @@ policyfile = libvirtd.policy-1
endif endif
endif endif
if HAVE_AVAHI
libvirtd_SOURCES += $(AVAHI_SOURCES)
libvirtd_CFLAGS += $(AVAHI_CFLAGS)
libvirtd_LDADD += $(AVAHI_LIBS)
endif
if WITH_DTRACE if WITH_DTRACE
libvirtd_LDADD += probes.o libvirtd_LDADD += probes.o
nodist_libvirtd_SOURCES = probes.h nodist_libvirtd_SOURCES = probes.h

View File

@ -1,693 +0,0 @@
/*
* dispatch.h: RPC message dispatching infrastructure
*
* Copyright (C) 2007, 2008, 2009 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Richard W.M. Jones <rjones@redhat.com>
* Author: Daniel P. Berrange <berrange@redhat.com>
*/
#include <config.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include "dispatch.h"
#include "remote.h"
#include "memory.h"
/* Convert a libvirt virError object into wire format */
static void
remoteDispatchCopyError (remote_error *rerr,
virErrorPtr verr)
{
rerr->code = verr->code;
rerr->domain = verr->domain;
rerr->message = verr->message ? malloc(sizeof(char*)) : NULL;
if (rerr->message) *rerr->message = strdup(verr->message);
rerr->level = verr->level;
rerr->str1 = verr->str1 ? malloc(sizeof(char*)) : NULL;
if (rerr->str1) *rerr->str1 = strdup(verr->str1);
rerr->str2 = verr->str2 ? malloc(sizeof(char*)) : NULL;
if (rerr->str2) *rerr->str2 = strdup(verr->str2);
rerr->str3 = verr->str3 ? malloc(sizeof(char*)) : NULL;
if (rerr->str3) *rerr->str3 = strdup(verr->str3);
rerr->int1 = verr->int1;
rerr->int2 = verr->int2;
}
/* A set of helpers for sending back errors to client
in various ways .... */
static void
remoteDispatchStringError (remote_error *rerr,
int code, const char *msg)
{
virError verr;
memset(&verr, 0, sizeof verr);
/* Construct the dummy libvirt virError. */
verr.code = code;
verr.domain = VIR_FROM_REMOTE;
verr.message = (char *)msg;
verr.level = VIR_ERR_ERROR;
verr.str1 = (char *)msg;
remoteDispatchCopyError(rerr, &verr);
}
void remoteDispatchAuthError (remote_error *rerr)
{
remoteDispatchStringError (rerr, VIR_ERR_AUTH_FAILED, "authentication failed");
}
void remoteDispatchFormatError (remote_error *rerr,
const char *fmt, ...)
{
va_list args;
char msgbuf[1024];
char *msg = msgbuf;
va_start (args, fmt);
vsnprintf (msgbuf, sizeof msgbuf, fmt, args);
va_end (args);
remoteDispatchStringError (rerr, VIR_ERR_RPC, msg);
}
void remoteDispatchGenericError (remote_error *rerr)
{
remoteDispatchStringError(rerr,
VIR_ERR_INTERNAL_ERROR,
"library function returned error but did not set virterror");
}
void remoteDispatchError(remote_error *rerr)
{
virErrorPtr verr = virGetLastError();
if (verr)
remoteDispatchCopyError(rerr, verr);
else
remoteDispatchGenericError(rerr);
}
static int
remoteSerializeError(struct qemud_client *client,
remote_error *rerr,
int program,
int version,
int procedure,
int type,
int serial)
{
XDR xdr;
unsigned int len;
struct qemud_client_message *msg = NULL;
VIR_DEBUG("prog=%d ver=%d proc=%d type=%d serial=%d, msg=%s",
program, version, procedure, type, serial,
rerr->message ? *rerr->message : "(none)");
if (VIR_ALLOC(msg) < 0)
goto fatal_error;
/* Return header. */
msg->hdr.prog = program;
msg->hdr.vers = version;
msg->hdr.proc = procedure;
msg->hdr.type = type;
msg->hdr.serial = serial;
msg->hdr.status = REMOTE_ERROR;
msg->bufferLength = sizeof(msg->buffer);
/* Serialise the return header. */
xdrmem_create (&xdr,
msg->buffer,
msg->bufferLength,
XDR_ENCODE);
len = 0; /* We'll come back and write this later. */
if (!xdr_u_int (&xdr, &len))
goto xdr_error;
if (!xdr_remote_message_header (&xdr, &msg->hdr))
goto xdr_error;
/* Error was not set, so synthesize a generic error message. */
if (rerr->code == 0)
remoteDispatchGenericError(rerr);
if (!xdr_remote_error (&xdr, rerr))
goto xdr_error;
/* Write the length word. */
len = xdr_getpos (&xdr);
if (xdr_setpos (&xdr, 0) == 0)
goto xdr_error;
if (!xdr_u_int (&xdr, &len))
goto xdr_error;
xdr_destroy (&xdr);
msg->bufferLength = len;
msg->bufferOffset = 0;
/* Put reply on end of tx queue to send out */
qemudClientMessageQueuePush(&client->tx, msg);
qemudUpdateClientEvent(client);
xdr_free((xdrproc_t)xdr_remote_error, (char *)rerr);
return 0;
xdr_error:
VIR_WARN("Failed to serialize remote error '%s' as XDR",
rerr->message ? *rerr->message : "<unknown>");
xdr_destroy(&xdr);
VIR_FREE(msg);
fatal_error:
xdr_free((xdrproc_t)xdr_remote_error, (char *)rerr);
return -1;
}
/*
* @client: the client to send the error to
* @rerr: the error object to send
* @req: the message this error is in reply to
*
* Send an error message to the client
*
* Returns 0 if the error was sent, -1 upon fatal error
*/
int
remoteSerializeReplyError(struct qemud_client *client,
remote_error *rerr,
remote_message_header *req) {
/*
* For data streams, errors are sent back as data streams
* For method calls, errors are sent back as method replies
*/
return remoteSerializeError(client,
rerr,
req->prog,
req->vers,
req->proc,
req->type == REMOTE_STREAM ? REMOTE_STREAM : REMOTE_REPLY,
req->serial);
}
int
remoteSerializeStreamError(struct qemud_client *client,
remote_error *rerr,
int proc,
int serial)
{
return remoteSerializeError(client,
rerr,
REMOTE_PROGRAM,
REMOTE_PROTOCOL_VERSION,
proc,
REMOTE_STREAM,
serial);
}
/*
* @msg: the complete incoming message, whose header to decode
*
* Decodes the header part of the client message, but does not
* validate the decoded fields in the header. It expects
* bufferLength to refer to length of the data packet. Upon
* return bufferOffset will refer to the amount of the packet
* consumed by decoding of the header.
*
* returns 0 if successfully decoded, -1 upon fatal error
*/
int
remoteDecodeClientMessageHeader (struct qemud_client_message *msg)
{
XDR xdr;
int ret = -1;
msg->bufferOffset = REMOTE_MESSAGE_HEADER_XDR_LEN;
/* Parse the header. */
xdrmem_create (&xdr,
msg->buffer + msg->bufferOffset,
msg->bufferLength - msg->bufferOffset,
XDR_DECODE);
if (!xdr_remote_message_header (&xdr, &msg->hdr))
goto cleanup;
msg->bufferOffset += xdr_getpos(&xdr);
ret = 0;
cleanup:
xdr_destroy(&xdr);
return ret;
}
/*
* @msg: the outgoing message, whose header to encode
*
* Encodes the header part of the client message, setting the
* message offset ready to encode the payload. Leaves space
* for the length field later. Upon return bufferLength will
* refer to the total available space for message, while
* bufferOffset will refer to current space used by header
*
* returns 0 if successfully encoded, -1 upon fatal error
*/
int
remoteEncodeClientMessageHeader (struct qemud_client_message *msg)
{
XDR xdr;
int ret = -1;
unsigned int len = 0;
msg->bufferLength = sizeof(msg->buffer);
msg->bufferOffset = 0;
/* Format the header. */
xdrmem_create (&xdr,
msg->buffer,
msg->bufferLength,
XDR_ENCODE);
/* The real value is filled in shortly */
if (!xdr_u_int (&xdr, &len)) {
goto cleanup;
}
if (!xdr_remote_message_header (&xdr, &msg->hdr))
goto cleanup;
len = xdr_getpos(&xdr);
xdr_setpos(&xdr, 0);
/* Fill in current length - may be re-written later
* if a payload is added
*/
if (!xdr_u_int (&xdr, &len)) {
goto cleanup;
}
msg->bufferOffset += len;
ret = 0;
cleanup:
xdr_destroy(&xdr);
return ret;
}
static int
remoteDispatchClientCall (struct qemud_server *server,
struct qemud_client *client,
struct qemud_client_message *msg,
bool qemu_protocol);
/*
* @server: the unlocked server object
* @client: the locked client object
* @msg: the complete incoming message packet, with header already decoded
*
* This function gets called from qemud when it pulls a incoming
* remote protocol message off the dispatch queue for processing.
*
* The @msg parameter must have had its header decoded already by
* calling remoteDecodeClientMessageHeader
*
* Returns 0 if the message was dispatched, -1 upon fatal error
*/
int
remoteDispatchClientRequest(struct qemud_server *server,
struct qemud_client *client,
struct qemud_client_message *msg)
{
int ret;
remote_error rerr;
bool qemu_call;
VIR_DEBUG("prog=%d ver=%d type=%d status=%d serial=%d proc=%d",
msg->hdr.prog, msg->hdr.vers, msg->hdr.type,
msg->hdr.status, msg->hdr.serial, msg->hdr.proc);
memset(&rerr, 0, sizeof rerr);
/* Check version, etc. */
if (msg->hdr.prog == REMOTE_PROGRAM)
qemu_call = false;
else if (msg->hdr.prog == QEMU_PROGRAM)
qemu_call = true;
else {
remoteDispatchFormatError (&rerr,
_("program mismatch (actual %x, expected %x or %x)"),
msg->hdr.prog, REMOTE_PROGRAM, QEMU_PROGRAM);
goto error;
}
if (!qemu_call && msg->hdr.vers != REMOTE_PROTOCOL_VERSION) {
remoteDispatchFormatError (&rerr,
_("version mismatch (actual %x, expected %x)"),
msg->hdr.vers, REMOTE_PROTOCOL_VERSION);
goto error;
}
else if (qemu_call && msg->hdr.vers != QEMU_PROTOCOL_VERSION) {
remoteDispatchFormatError (&rerr,
_("version mismatch (actual %x, expected %x)"),
msg->hdr.vers, QEMU_PROTOCOL_VERSION);
goto error;
}
switch (msg->hdr.type) {
case REMOTE_CALL:
return remoteDispatchClientCall(server, client, msg, qemu_call);
case REMOTE_STREAM:
/* Since stream data is non-acked, async, we may continue to received
* stream packets after we closed down a stream. Just drop & ignore
* these.
*/
VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d",
msg->hdr.serial, msg->hdr.proc, msg->hdr.status);
qemudClientMessageRelease(client, msg);
break;
default:
remoteDispatchFormatError (&rerr, _("type (%d) != REMOTE_CALL"),
(int) msg->hdr.type);
goto error;
}
return 0;
error:
ret = remoteSerializeReplyError(client, &rerr, &msg->hdr);
if (ret >= 0)
VIR_FREE(msg);
return ret;
}
/*
* @server: the unlocked server object
* @client: the locked client object
* @msg: the complete incoming method call, with header already decoded
*
* This method is used to dispatch an message representing an
* incoming method call from a client. It decodes the payload
* to obtain method call arguments, invokves the method and
* then sends a reply packet with the return values
*
* Returns 0 if the reply was sent, or -1 upon fatal error
*/
static int
remoteDispatchClientCall (struct qemud_server *server,
struct qemud_client *client,
struct qemud_client_message *msg,
bool qemu_protocol)
{
XDR xdr;
remote_error rerr;
dispatch_args args;
dispatch_ret ret;
const dispatch_data *data = NULL;
int rv = -1;
unsigned int len;
virConnectPtr conn = NULL;
memset(&args, 0, sizeof args);
memset(&ret, 0, sizeof ret);
memset(&rerr, 0, sizeof rerr);
if (msg->hdr.status != REMOTE_OK) {
remoteDispatchFormatError (&rerr, _("status (%d) != REMOTE_OK"),
(int) msg->hdr.status);
goto rpc_error;
}
/* If client is marked as needing auth, don't allow any RPC ops,
* except for authentication ones
*/
if (client->auth) {
if (msg->hdr.proc != REMOTE_PROC_AUTH_LIST &&
msg->hdr.proc != REMOTE_PROC_AUTH_SASL_INIT &&
msg->hdr.proc != REMOTE_PROC_AUTH_SASL_START &&
msg->hdr.proc != REMOTE_PROC_AUTH_SASL_STEP &&
msg->hdr.proc != REMOTE_PROC_AUTH_POLKIT
) {
/* Explicitly *NOT* calling remoteDispatchAuthError() because
we want back-compatability with libvirt clients which don't
support the VIR_ERR_AUTH_FAILED error code */
remoteDispatchFormatError (&rerr, "%s", _("authentication required"));
goto rpc_error;
}
}
if (qemu_protocol)
data = qemuGetDispatchData(msg->hdr.proc);
else
data = remoteGetDispatchData(msg->hdr.proc);
if (!data) {
remoteDispatchFormatError (&rerr, _("unknown procedure: %d"),
msg->hdr.proc);
goto rpc_error;
}
/* De-serialize payload with args from the wire message */
xdrmem_create (&xdr,
msg->buffer + msg->bufferOffset,
msg->bufferLength - msg->bufferOffset,
XDR_DECODE);
if (!((data->args_filter)(&xdr, &args))) {
xdr_destroy (&xdr);
remoteDispatchFormatError (&rerr, "%s", _("parse args failed"));
goto rpc_error;
}
xdr_destroy (&xdr);
/* Call function. */
conn = client->conn;
virMutexUnlock(&client->lock);
/*
* When the RPC handler is called:
*
* - Server object is unlocked
* - Client object is unlocked
*
* Without locking, it is safe to use:
*
* 'conn', 'rerr', 'args and 'ret'
*/
rv = (data->fn)(server, client, conn, &msg->hdr, &rerr, &args, &ret);
virMutexLock(&server->lock);
virMutexLock(&client->lock);
virMutexUnlock(&server->lock);
xdr_free (data->args_filter, (char*)&args);
if (rv < 0)
goto rpc_error;
/* Return header. We're re-using same message object, so
* only need to tweak type/status fields */
/*msg->hdr.prog = msg->hdr.prog;*/
/*msg->hdr.vers = msg->hdr.vers;*/
/*msg->hdr.proc = msg->hdr.proc;*/
msg->hdr.type = REMOTE_REPLY;
/*msg->hdr.serial = msg->hdr.serial;*/
msg->hdr.status = REMOTE_OK;
if (remoteEncodeClientMessageHeader(msg) < 0) {
xdr_free (data->ret_filter, (char*)&ret);
remoteDispatchFormatError(&rerr, "%s", _("failed to serialize reply header"));
goto xdr_hdr_error;
}
/* Now for the payload */
xdrmem_create (&xdr,
msg->buffer,
msg->bufferLength,
XDR_ENCODE);
if (xdr_setpos(&xdr, msg->bufferOffset) == 0) {
remoteDispatchFormatError(&rerr, "%s", _("failed to change XDR reply offset"));
goto xdr_error;
}
/* If OK, serialise return structure, if error serialise error. */
/* Serialise reply data */
if (!((data->ret_filter) (&xdr, &ret))) {
remoteDispatchFormatError(&rerr, "%s", _("failed to serialize reply payload (probable message size limit)"));
goto xdr_error;
}
/* Update the length word. */
msg->bufferOffset += xdr_getpos (&xdr);
len = msg->bufferOffset;
if (xdr_setpos (&xdr, 0) == 0) {
remoteDispatchFormatError(&rerr, "%s", _("failed to change XDR reply offset"));
goto xdr_error;
}
if (!xdr_u_int (&xdr, &len)) {
remoteDispatchFormatError(&rerr, "%s", _("failed to update reply length header"));
goto xdr_error;
}
xdr_destroy (&xdr);
xdr_free (data->ret_filter, (char*)&ret);
/* Reset ready for I/O */
msg->bufferLength = len;
msg->bufferOffset = 0;
/* Put reply on end of tx queue to send out */
qemudClientMessageQueuePush(&client->tx, msg);
qemudUpdateClientEvent(client);
return 0;
xdr_error:
/* Bad stuff serializing reply. Try to send a little info
* back to client to assist in bug reporting/diagnosis */
xdr_free (data->ret_filter, (char*)&ret);
xdr_destroy (&xdr);
/* fallthrough */
xdr_hdr_error:
VIR_WARN("Failed to serialize reply for program '%d' proc '%d' as XDR",
msg->hdr.prog, msg->hdr.proc);
/* fallthrough */
rpc_error:
/* Bad stuff (de-)serializing message, but we have an
* RPC error message we can send back to the client */
rv = remoteSerializeReplyError(client, &rerr, &msg->hdr);
if (rv >= 0)
VIR_FREE(msg);
return rv;
}
int
remoteSendStreamData(struct qemud_client *client,
struct qemud_client_stream *stream,
const char *data,
unsigned int len)
{
struct qemud_client_message *msg;
XDR xdr;
VIR_DEBUG("client=%p stream=%p data=%p len=%d", client, stream, data, len);
if (VIR_ALLOC(msg) < 0) {
return -1;
}
/* Return header. We're re-using same message object, so
* only need to tweak type/status fields */
msg->hdr.prog = REMOTE_PROGRAM;
msg->hdr.vers = REMOTE_PROTOCOL_VERSION;
msg->hdr.proc = stream->procedure;
msg->hdr.type = REMOTE_STREAM;
msg->hdr.serial = stream->serial;
/*
* NB
* data != NULL + len > 0 => REMOTE_CONTINUE (Sending back data)
* data != NULL + len == 0 => REMOTE_CONTINUE (Sending read EOF)
* data == NULL => REMOTE_OK (Sending finish handshake confirmation)
*/
msg->hdr.status = data ? REMOTE_CONTINUE : REMOTE_OK;
if (remoteEncodeClientMessageHeader(msg) < 0)
goto fatal_error;
if (data && len) {
if ((msg->bufferLength - msg->bufferOffset) < len)
goto fatal_error;
/* Now for the payload */
xdrmem_create (&xdr,
msg->buffer,
msg->bufferLength,
XDR_ENCODE);
/* Skip over existing header already written */
if (xdr_setpos(&xdr, msg->bufferOffset) == 0)
goto xdr_error;
memcpy(msg->buffer + msg->bufferOffset, data, len);
msg->bufferOffset += len;
/* Update the length word. */
len = msg->bufferOffset;
if (xdr_setpos (&xdr, 0) == 0)
goto xdr_error;
if (!xdr_u_int (&xdr, &len))
goto xdr_error;
xdr_destroy (&xdr);
VIR_DEBUG("Total %d", msg->bufferOffset);
}
if (data)
msg->streamTX = 1;
/* Reset ready for I/O */
msg->bufferLength = msg->bufferOffset;
msg->bufferOffset = 0;
/* Put reply on end of tx queue to send out */
qemudClientMessageQueuePush(&client->tx, msg);
qemudUpdateClientEvent(client);
return 0;
xdr_error:
xdr_destroy (&xdr);
fatal_error:
VIR_FREE(msg);
VIR_WARN("Failed to serialize stream data for proc %d as XDR",
stream->procedure);
return -1;
}

View File

@ -1,68 +0,0 @@
/*
* dispatch.h: RPC message dispatching infrastructure
*
* Copyright (C) 2007, 2008, 2009 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Richard W.M. Jones <rjones@redhat.com>
* Author: Daniel P. Berrange <berrange@redhat.com>
*/
#ifndef __LIBVIRTD_DISPATCH_H__
# define __LIBVIRTD_DISPATCH_H__
# include "libvirtd.h"
int
remoteDecodeClientMessageHeader (struct qemud_client_message *req);
int
remoteEncodeClientMessageHeader (struct qemud_client_message *req);
int
remoteDispatchClientRequest (struct qemud_server *server,
struct qemud_client *client,
struct qemud_client_message *req);
void remoteDispatchFormatError (remote_error *rerr,
const char *fmt, ...)
ATTRIBUTE_FMT_PRINTF(2, 3);
void remoteDispatchAuthError (remote_error *rerr);
void remoteDispatchGenericError (remote_error *rerr);
void remoteDispatchError(remote_error *rerr);
int
remoteSerializeReplyError(struct qemud_client *client,
remote_error *rerr,
remote_message_header *req);
int
remoteSerializeStreamError(struct qemud_client *client,
remote_error *rerr,
int proc,
int serial);
int
remoteSendStreamData(struct qemud_client *client,
struct qemud_client_stream *stream,
const char *data,
unsigned int len);
#endif /* __LIBVIRTD_DISPATCH_H__ */

File diff suppressed because it is too large Load Diff

View File

@ -27,13 +27,6 @@
# include <config.h> # include <config.h>
# include <gnutls/gnutls.h>
# include <gnutls/x509.h>
# include "gnutls_1_0_compat.h"
# if HAVE_SASL
# include <sasl/sasl.h>
# endif
# if HAVE_POLKIT0 # if HAVE_POLKIT0
# include <dbus/dbus.h> # include <dbus/dbus.h>
# endif # endif
@ -45,6 +38,8 @@
# include "logging.h" # include "logging.h"
# include "threads.h" # include "threads.h"
# include "network.h" # include "network.h"
# include "virnetsaslcontext.h"
# include "virnetserverprogram.h"
# if WITH_DTRACE # if WITH_DTRACE
# ifndef LIBVIRTD_PROBES_H # ifndef LIBVIRTD_PROBES_H
@ -63,230 +58,37 @@
#NAME ": " FMT, __VA_ARGS__); #NAME ": " FMT, __VA_ARGS__);
# endif # endif
# ifdef __GNUC__ typedef struct daemonClientStream daemonClientStream;
# ifdef HAVE_ANSIDECL_H typedef daemonClientStream *daemonClientStreamPtr;
# include <ansidecl.h> typedef struct daemonClientPrivate daemonClientPrivate;
# endif typedef daemonClientPrivate *daemonClientPrivatePtr;
# ifndef __GNUC_PREREQ
# if defined __GNUC__ && defined __GNUC_MINOR__
# define __GNUC_PREREQ(maj, min) \
((__GNUC__ << 16) + __GNUC_MINOR__ >= ((maj) << 16) + (min))
# else
# define __GNUC_PREREQ(maj,min) 0
# endif
# endif
/**
* ATTRIBUTE_UNUSED:
*
* Macro to flag conciously unused parameters to functions
*/
# ifndef ATTRIBUTE_UNUSED
# define ATTRIBUTE_UNUSED __attribute__((__unused__))
# endif
/**
* ATTRIBUTE_FMT_PRINTF
*
* Macro used to check printf like functions, if compiling
* with gcc.
*
* We use gnulib which guarentees we always have GNU style
* printf format specifiers even on broken Win32 platforms
* hence we have to force 'gnu_printf' for new GCC
*/
# ifndef ATTRIBUTE_FMT_PRINTF
# if __GNUC_PREREQ (4, 4)
# define ATTRIBUTE_FMT_PRINTF(fmtpos,argpos) __attribute__((__format__ (gnu_printf, fmtpos,argpos)))
# else
# define ATTRIBUTE_FMT_PRINTF(fmtpos,argpos) __attribute__((__format__ (printf, fmtpos,argpos)))
# endif
# endif
# ifndef ATTRIBUTE_RETURN_CHECK
# if __GNUC_PREREQ (3, 4)
# define ATTRIBUTE_RETURN_CHECK __attribute__((__warn_unused_result__))
# else
# define ATTRIBUTE_RETURN_CHECK
# endif
# endif
# else
# ifndef ATTRIBUTE_UNUSED
# define ATTRIBUTE_UNUSED
# endif
# ifndef ATTRIBUTE_FMT_PRINTF
# define ATTRIBUTE_FMT_PRINTF(...)
# endif
# ifndef ATTRIBUTE_RETURN_CHECK
# define ATTRIBUTE_RETURN_CHECK
# endif
# endif
/* Whether we're passing reads & writes through a sasl SSF */
enum qemud_sasl_ssf {
QEMUD_SASL_SSF_NONE = 0,
QEMUD_SASL_SSF_READ = 1,
QEMUD_SASL_SSF_WRITE = 2,
};
enum qemud_sock_type {
QEMUD_SOCK_TYPE_UNIX = 0,
QEMUD_SOCK_TYPE_TCP = 1,
QEMUD_SOCK_TYPE_TLS = 2,
};
struct qemud_client_message {
char buffer [REMOTE_MESSAGE_MAX + REMOTE_MESSAGE_HEADER_XDR_LEN];
unsigned int bufferLength;
unsigned int bufferOffset;
unsigned int async : 1;
unsigned int streamTX : 1;
remote_message_header hdr;
struct qemud_client_message *next;
};
struct qemud_client;
/* Allow for filtering of incoming messages to a custom
* dispatch processing queue, instead of client->dx.
*/
typedef int (*qemud_client_filter_func)(struct qemud_client *client,
struct qemud_client_message *msg, void *opaque);
struct qemud_client_filter {
qemud_client_filter_func query;
void *opaque;
struct qemud_client_filter *next;
};
struct qemud_client_stream {
virStreamPtr st;
int procedure;
int serial;
unsigned int recvEOF : 1;
unsigned int closed : 1;
struct qemud_client_filter filter;
struct qemud_client_message *rx;
int tx;
struct qemud_client_stream *next;
};
/* Stores the per-client connection state */ /* Stores the per-client connection state */
struct qemud_client { struct daemonClientPrivate {
/* Hold while accessing any data except conn */
virMutex lock; virMutex lock;
int magic;
int fd;
int watch;
unsigned int readonly :1;
unsigned int closing :1;
int domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LAST]; int domainEventCallbackID[VIR_DOMAIN_EVENT_ID_LAST];
virSocketAddr addr; virNetSASLSessionPtr sasl;
const char *addrstr;
int type; /* qemud_sock_type */
gnutls_session_t tlssession;
int auth;
unsigned int handshake :1; /* If we're in progress for TLS handshake */
# if HAVE_SASL
sasl_conn_t *saslconn;
int saslSSF;
const char *saslDecoded;
unsigned int saslDecodedLength;
unsigned int saslDecodedOffset;
const char *saslEncoded;
unsigned int saslEncodedLength;
unsigned int saslEncodedOffset;
char *saslUsername;
char saslTemporary[8192]; /* temorary holds data to be decoded */
# endif
/* Count of meages in 'dx' or 'tx' queue
* ie RPC calls in progress. Does not count
* async events which are not used for
* throttling calculations */
int nrequests;
/* Zero or one messages being received. Zero if
* nrequests >= max_clients and throttling */
struct qemud_client_message *rx;
/* Zero or many messages waiting for a worker
* to process them */
struct qemud_client_message *dx;
/* Zero or many messages waiting for transmit
* back to client, including async events */
struct qemud_client_message *tx;
/* Filters to capture messages that would otherwise
* end up on the 'dx' queue */
struct qemud_client_filter *filters;
/* Data streams */
struct qemud_client_stream *streams;
/* This is only valid if a remote open call has been made on this /* This is only valid if a remote open call has been made on this
* connection, otherwise it will be NULL. Also if remote close is * connection, otherwise it will be NULL. Also if remote close is
* called, it will be set back to NULL if that succeeds. * called, it will be set back to NULL if that succeeds.
*/ */
virConnectPtr conn; virConnectPtr conn;
int refs;
daemonClientStreamPtr streams;
}; };
# define QEMUD_CLIENT_MAGIC 0x7788aaee extern virNetSASLContextPtr saslCtxt;
extern virNetServerProgramPtr remoteProgram;
extern virNetServerProgramPtr qemuProgram;
struct qemud_socket {
char *path;
virSocketAddr addr;
const char *addrstr;
int fd;
int watch;
int readonly;
int type; /* qemud_sock_type */
int auth;
struct qemud_socket *next;
};
struct qemud_worker {
pthread_t thread;
unsigned int hasThread :1;
unsigned int processingCall :1;
unsigned int quitRequest :1;
/* back-pointer to our server */
struct qemud_server *server;
};
/* Main server state */ /* Main server state */
struct qemud_server { struct qemud_server {
virMutex lock;
virCond job;
int privileged; int privileged;
size_t nworkers;
size_t nactiveworkers;
struct qemud_worker *workers;
size_t nsockets;
struct qemud_socket *sockets;
size_t nclients;
size_t nclients_max;
struct qemud_client **clients;
int sigread; int sigread;
int sigwrite; int sigwrite;
char *logDir; char *logDir;
@ -304,27 +106,6 @@ struct qemud_server {
# endif # endif
}; };
void qemudLog(int priority, const char *fmt, ...)
ATTRIBUTE_FMT_PRINTF(2,3);
int qemudRegisterClientEvent(struct qemud_server *server,
struct qemud_client *client);
void qemudUpdateClientEvent(struct qemud_client *client);
void qemudDispatchClientFailure(struct qemud_client *client);
void
qemudClientMessageQueuePush(struct qemud_client_message **queue,
struct qemud_client_message *msg);
struct qemud_client_message *
qemudClientMessageQueueServe(struct qemud_client_message **queue);
void
qemudClientMessageRelease(struct qemud_client *client,
struct qemud_client_message *msg);
# if HAVE_POLKIT # if HAVE_POLKIT
int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid); int qemudGetSocketIdentity(int fd, uid_t *uid, pid_t *pid);

File diff suppressed because it is too large Load Diff

View File

@ -24,62 +24,20 @@
#ifndef __LIBVIRTD_REMOTE_H__ #ifndef __LIBVIRTD_REMOTE_H__
# define __LIBVIRTD_REMOTE_H__ # define __LIBVIRTD_REMOTE_H__
# include "remote_protocol.h"
# include "libvirtd.h" # include "rpc/virnetserverprogram.h"
# include "rpc/virnetserverclient.h"
typedef union {
# include "remote_dispatch_args.h"
} dispatch_args;
verify(sizeof(dispatch_args) > 0);
typedef union {
# include "remote_dispatch_ret.h"
} dispatch_ret;
verify(sizeof(dispatch_ret) > 0);
typedef union {
# include "qemu_dispatch_args.h"
} qemu_dispatch_args;
verify(sizeof(qemu_dispatch_args) > 0);
typedef union {
# include "qemu_dispatch_ret.h"
} qemu_dispatch_ret;
verify(sizeof(qemu_dispatch_ret) > 0);
extern virNetServerProgramProc remoteProcs[];
extern size_t remoteNProcs;
extern virNetServerProgramErrorHander remoteErr;
/** extern virNetServerProgramProc qemuProcs[];
* When the RPC handler is called: extern size_t qemuNProcs;
* extern virNetServerProgramErrorHander qemuErr;
* - Server object is unlocked
* - Client object is unlocked
*
* Both must be locked before use. Server lock must
* be held before attempting to lock client.
*
* Without any locking, it is safe to use:
*
* 'conn', 'rerr', 'args and 'ret'
*/
typedef int (*dispatch_fn) (struct qemud_server *server,
struct qemud_client *client,
virConnectPtr conn,
remote_message_header *hdr,
remote_error *err,
dispatch_args *args,
dispatch_ret *ret);
typedef struct {
dispatch_fn fn;
xdrproc_t args_filter;
xdrproc_t ret_filter;
} dispatch_data;
const dispatch_data const *remoteGetDispatchData(int proc);
const dispatch_data const *qemuGetDispatchData(int proc);
int remoteClientInitHook(virNetServerPtr srv,
virNetServerClientPtr client);
#endif /* __LIBVIRTD_REMOTE_H__ */ #endif /* __LIBVIRTD_REMOTE_H__ */

View File

@ -24,32 +24,57 @@
#include <config.h> #include <config.h>
#include "stream.h" #include "stream.h"
#include "remote.h"
#include "memory.h" #include "memory.h"
#include "dispatch.h"
#include "logging.h" #include "logging.h"
#include "virnetserverclient.h"
#include "virterror_internal.h" #include "virterror_internal.h"
#define VIR_FROM_THIS VIR_FROM_STREAMS #define VIR_FROM_THIS VIR_FROM_STREAMS
#define virNetError(code, ...) \
virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \
__FUNCTION__, __LINE__, __VA_ARGS__)
struct daemonClientStream {
daemonClientPrivatePtr priv;
virNetServerProgramPtr prog;
virStreamPtr st;
int procedure;
int serial;
unsigned int recvEOF : 1;
unsigned int closed : 1;
int filterID;
virNetMessagePtr rx;
int tx;
daemonClientStreamPtr next;
};
static int static int
remoteStreamHandleWrite(struct qemud_client *client, daemonStreamHandleWrite(virNetServerClientPtr client,
struct qemud_client_stream *stream); daemonClientStream *stream);
static int static int
remoteStreamHandleRead(struct qemud_client *client, daemonStreamHandleRead(virNetServerClientPtr client,
struct qemud_client_stream *stream); daemonClientStream *stream);
static int static int
remoteStreamHandleFinish(struct qemud_client *client, daemonStreamHandleFinish(virNetServerClientPtr client,
struct qemud_client_stream *stream, daemonClientStream *stream,
struct qemud_client_message *msg); virNetMessagePtr msg);
static int static int
remoteStreamHandleAbort(struct qemud_client *client, daemonStreamHandleAbort(virNetServerClientPtr client,
struct qemud_client_stream *stream, daemonClientStream *stream,
struct qemud_client_message *msg); virNetMessagePtr msg);
static void static void
remoteStreamUpdateEvents(struct qemud_client_stream *stream) daemonStreamUpdateEvents(daemonClientStream *stream)
{ {
int newEvents = 0; int newEvents = 0;
if (stream->rx) if (stream->rx)
@ -60,24 +85,43 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream)
virStreamEventUpdateCallback(stream->st, newEvents); virStreamEventUpdateCallback(stream->st, newEvents);
} }
/*
* Invoked when an outgoing data packet message has been fully sent.
* This simply re-enables TX of further data.
*
* The idea is to stop the daemon growing without bound due to
* fast stream, but slow client
*/
static void
daemonStreamMessageFinished(virNetMessagePtr msg,
void *opaque)
{
daemonClientStream *stream = opaque;
VIR_DEBUG("stream=%p proc=%d serial=%d",
stream, msg->header.proc, msg->header.serial);
stream->tx = 1;
daemonStreamUpdateEvents(stream);
}
/* /*
* Callback that gets invoked when a stream becomes writable/readable * Callback that gets invoked when a stream becomes writable/readable
*/ */
static void static void
remoteStreamEvent(virStreamPtr st, int events, void *opaque) daemonStreamEvent(virStreamPtr st, int events, void *opaque)
{ {
struct qemud_client *client = opaque; virNetServerClientPtr client = opaque;
struct qemud_client_stream *stream; daemonClientStream *stream;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
/* XXX sub-optimal - we really should be taking the server lock virMutexLock(&priv->lock);
* first, but we have no handle to the server object
* We're lucky to get away with it for now, due to this callback
* executing in the main thread, but this should really be fixed
*/
virMutexLock(&client->lock);
stream = remoteFindClientStream(client, st); stream = priv->streams;
while (stream) {
if (stream->st == st)
break;
stream = stream->next;
}
if (!stream) { if (!stream) {
VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st); VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st);
@ -85,12 +129,12 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque)
goto cleanup; goto cleanup;
} }
VIR_DEBUG("st=%p events=%d", st, events); VIR_DEBUG("st=%p events=%d EOF=%d closed=%d", st, events, stream->recvEOF, stream->closed);
if (events & VIR_STREAM_EVENT_WRITABLE) { if (events & VIR_STREAM_EVENT_WRITABLE) {
if (remoteStreamHandleWrite(client, stream) < 0) { if (daemonStreamHandleWrite(client, stream) < 0) {
remoteRemoveClientStream(client, stream); daemonRemoveClientStream(client, stream);
qemudDispatchClientFailure(client); virNetServerClientClose(client);
goto cleanup; goto cleanup;
} }
} }
@ -98,40 +142,84 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque)
if (!stream->recvEOF && if (!stream->recvEOF &&
(events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) { (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) {
events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP); events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP);
if (remoteStreamHandleRead(client, stream) < 0) { if (daemonStreamHandleRead(client, stream) < 0) {
remoteRemoveClientStream(client, stream); daemonRemoveClientStream(client, stream);
qemudDispatchClientFailure(client); virNetServerClientClose(client);
goto cleanup; goto cleanup;
} }
} }
/* If we have a completion/abort message, always process it */
if (stream->rx) {
virNetMessagePtr msg = stream->rx;
switch (msg->header.status) {
case VIR_NET_CONTINUE:
/* nada */
break;
case VIR_NET_OK:
virNetMessageQueueServe(&stream->rx);
if (daemonStreamHandleFinish(client, stream, msg) < 0) {
virNetMessageFree(msg);
daemonRemoveClientStream(client, stream);
virNetServerClientClose(client);
goto cleanup;
}
break;
case VIR_NET_ERROR:
default:
virNetMessageQueueServe(&stream->rx);
if (daemonStreamHandleAbort(client, stream, msg) < 0) {
virNetMessageFree(msg);
daemonRemoveClientStream(client, stream);
virNetServerClientClose(client);
goto cleanup;
}
break;
}
}
if (!stream->closed && if (!stream->closed &&
(events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) { (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
int ret; int ret;
remote_error rerr; virNetMessagePtr msg;
memset(&rerr, 0, sizeof rerr); virNetMessageError rerr;
memset(&rerr, 0, sizeof(rerr));
stream->closed = 1; stream->closed = 1;
virStreamEventRemoveCallback(stream->st); virStreamEventRemoveCallback(stream->st);
virStreamAbort(stream->st); virStreamAbort(stream->st);
if (events & VIR_STREAM_EVENT_HANGUP) if (events & VIR_STREAM_EVENT_HANGUP)
remoteDispatchFormatError(&rerr, "%s", _("stream had unexpected termination")); virNetError(VIR_ERR_RPC,
"%s", _("stream had unexpected termination"));
else else
remoteDispatchFormatError(&rerr, "%s", _("stream had I/O failure")); virNetError(VIR_ERR_RPC,
ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial); "%s", _("stream had I/O failure"));
remoteRemoveClientStream(client, stream);
msg = virNetMessageNew();
if (!msg) {
ret = -1;
} else {
ret = virNetServerProgramSendStreamError(remoteProgram,
client,
msg,
&rerr,
stream->procedure,
stream->serial);
}
daemonRemoveClientStream(client, stream);
if (ret < 0) if (ret < 0)
qemudDispatchClientFailure(client); virNetServerClientClose(client);
goto cleanup; goto cleanup;
} }
if (stream->closed) { if (stream->closed) {
remoteRemoveClientStream(client, stream); daemonRemoveClientStream(client, stream);
} else { } else {
remoteStreamUpdateEvents(stream); daemonStreamUpdateEvents(stream);
} }
cleanup: cleanup:
virMutexUnlock(&client->lock); virMutexUnlock(&priv->lock);
} }
@ -144,90 +232,72 @@ cleanup:
* -1 on fatal client error * -1 on fatal client error
*/ */
static int static int
remoteStreamFilter(struct qemud_client *client, daemonStreamFilter(virNetServerClientPtr client,
struct qemud_client_message *msg, void *opaque) virNetMessagePtr msg,
void *opaque)
{ {
struct qemud_client_stream *stream = opaque; daemonClientStream *stream = opaque;
if (msg->hdr.serial == stream->serial &&
msg->hdr.proc == stream->procedure &&
msg->hdr.type == REMOTE_STREAM) {
VIR_DEBUG("Incoming rx=%p serial=%d proc=%d status=%d",
stream->rx, msg->hdr.proc, msg->hdr.serial, msg->hdr.status);
/* If there are queued packets, we need to queue all further
* messages, since they must be processed strictly in order.
* If there are no queued packets, then OK/ERROR messages
* should be processed immediately. Data packets are still
* queued to only be processed when the stream is marked as
* writable.
*/
if (stream->rx) {
qemudClientMessageQueuePush(&stream->rx, msg);
remoteStreamUpdateEvents(stream);
} else {
int ret = 0; int ret = 0;
switch (msg->hdr.status) {
case REMOTE_OK:
ret = remoteStreamHandleFinish(client, stream, msg);
if (ret == 0)
qemudClientMessageRelease(client, msg);
break;
case REMOTE_CONTINUE: virMutexLock(&stream->priv->lock);
qemudClientMessageQueuePush(&stream->rx, msg);
remoteStreamUpdateEvents(stream);
break;
case REMOTE_ERROR: if (msg->header.type != VIR_NET_STREAM)
default: goto cleanup;
ret = remoteStreamHandleAbort(client, stream, msg);
if (ret == 0)
qemudClientMessageRelease(client, msg);
break;
}
if (ret < 0) if (!virNetServerProgramMatches(stream->prog, msg))
return -1; goto cleanup;
}
return 1; if (msg->header.proc != stream->procedure ||
} msg->header.serial != stream->serial)
return 0; goto cleanup;
VIR_DEBUG("Incoming client=%p, rx=%p, serial=%d, proc=%d, status=%d",
client, stream->rx, msg->header.proc,
msg->header.serial, msg->header.status);
virNetMessageQueuePush(&stream->rx, msg);
daemonStreamUpdateEvents(stream);
ret = 1;
cleanup:
virMutexUnlock(&stream->priv->lock);
return ret;
} }
/* /*
* @conn: a connection object to associate the stream with * @conn: a connection object to associate the stream with
* @hdr: the method call to associate with the stram * @header: the method call to associate with the stream
* *
* Creates a new stream for this conn * Creates a new stream for this conn
* *
* Returns a new stream object, or NULL upon OOM * Returns a new stream object, or NULL upon OOM
*/ */
struct qemud_client_stream * daemonClientStream *
remoteCreateClientStream(virConnectPtr conn, daemonCreateClientStream(virNetServerClientPtr client,
remote_message_header *hdr) virStreamPtr st,
virNetServerProgramPtr prog,
virNetMessageHeaderPtr header)
{ {
struct qemud_client_stream *stream; daemonClientStream *stream;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
VIR_DEBUG("proc=%d serial=%d", hdr->proc, hdr->serial); VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p",
client, header->proc, header->serial, st);
if (VIR_ALLOC(stream) < 0) { if (VIR_ALLOC(stream) < 0) {
virReportOOMError(); virReportOOMError();
return NULL; return NULL;
} }
stream->procedure = hdr->proc; stream->priv = priv;
stream->serial = hdr->serial; stream->prog = prog;
stream->procedure = header->proc;
stream->serial = header->serial;
stream->filterID = -1;
stream->st = st;
stream->st = virStreamNew(conn, VIR_STREAM_NONBLOCK); virNetServerProgramRef(prog);
if (!stream->st) {
VIR_FREE(stream);
return NULL;
}
stream->filter.query = remoteStreamFilter;
stream->filter.opaque = stream;
return stream; return stream;
} }
@ -238,25 +308,37 @@ remoteCreateClientStream(virConnectPtr conn,
* Frees the memory associated with this inactive client * Frees the memory associated with this inactive client
* stream * stream
*/ */
void remoteFreeClientStream(struct qemud_client *client, int daemonFreeClientStream(virNetServerClientPtr client,
struct qemud_client_stream *stream) daemonClientStream *stream)
{ {
struct qemud_client_message *msg; virNetMessagePtr msg;
int ret = 0;
if (!stream) if (!stream)
return; return 0;
VIR_DEBUG("proc=%d serial=%d", stream->procedure, stream->serial); VIR_DEBUG("client=%p, proc=%d, serial=%d",
client, stream->procedure, stream->serial);
virNetServerProgramFree(stream->prog);
msg = stream->rx; msg = stream->rx;
while (msg) { while (msg) {
struct qemud_client_message *tmp = msg->next; virNetMessagePtr tmp = msg->next;
qemudClientMessageRelease(client, msg); /* Send a dummy reply to free up 'msg' & unblock client rx */
memset(msg, 0, sizeof(*msg));
if (virNetServerClientSendMessage(client, msg) < 0) {
virNetServerClientMarkClose(client);
virNetMessageFree(msg);
ret = -1;
}
msg = tmp; msg = tmp;
} }
virStreamFree(stream->st); virStreamFree(stream->st);
VIR_FREE(stream); VIR_FREE(stream);
return ret;
} }
@ -264,63 +346,45 @@ void remoteFreeClientStream(struct qemud_client *client,
* @client: a locked client to add the stream to * @client: a locked client to add the stream to
* @stream: a stream to add * @stream: a stream to add
*/ */
int remoteAddClientStream(struct qemud_client *client, int daemonAddClientStream(virNetServerClientPtr client,
struct qemud_client_stream *stream, daemonClientStream *stream,
int transmit) bool transmit)
{ {
struct qemud_client_stream *tmp = client->streams; VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p, transmit=%d",
client, stream->procedure, stream->serial, stream->st, transmit);
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
VIR_DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial); if (stream->filterID != -1) {
VIR_WARN("Filter already added to client %p", client);
if (virStreamEventAddCallback(stream->st, 0,
remoteStreamEvent, client, NULL) < 0)
return -1; return -1;
if (tmp) {
while (tmp->next)
tmp = tmp->next;
tmp->next = stream;
} else {
client->streams = stream;
} }
stream->filter.next = client->filters; if (virStreamEventAddCallback(stream->st, 0,
client->filters = &stream->filter; daemonStreamEvent, client, NULL) < 0)
return -1;
if ((stream->filterID = virNetServerClientAddFilter(client,
daemonStreamFilter,
stream)) < 0) {
virStreamEventRemoveCallback(stream->st);
return -1;
}
if (transmit) if (transmit)
stream->tx = 1; stream->tx = 1;
remoteStreamUpdateEvents(stream); virMutexLock(&priv->lock);
stream->next = priv->streams;
priv->streams = stream;
daemonStreamUpdateEvents(stream);
virMutexUnlock(&priv->lock);
return 0; return 0;
} }
/*
* @client: a locked client object
* @procedure: procedure associated with the stream
* @serial: serial number associated with the stream
*
* Finds a existing active stream
*
* Returns a stream object matching the procedure+serial number, or NULL
*/
struct qemud_client_stream *
remoteFindClientStream(struct qemud_client *client,
virStreamPtr st)
{
struct qemud_client_stream *stream = client->streams;
while (stream) {
if (stream->st == st)
return stream;
stream = stream->next;
}
return NULL;
}
/* /*
* @client: a locked client object * @client: a locked client object
* @stream: an inactive, closed stream object * @stream: an inactive, closed stream object
@ -330,26 +394,19 @@ remoteFindClientStream(struct qemud_client *client,
* Returns 0 if the stream was removd, -1 if it doesn't exist * Returns 0 if the stream was removd, -1 if it doesn't exist
*/ */
int int
remoteRemoveClientStream(struct qemud_client *client, daemonRemoveClientStream(virNetServerClientPtr client,
struct qemud_client_stream *stream) daemonClientStream *stream)
{ {
VIR_DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial); VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p",
client, stream->procedure, stream->serial, stream->st);
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
daemonClientStream *curr = priv->streams;
daemonClientStream *prev = NULL;
struct qemud_client_stream *curr = client->streams; if (stream->filterID != -1) {
struct qemud_client_stream *prev = NULL; virNetServerClientRemoveFilter(client,
struct qemud_client_filter *filter = NULL; stream->filterID);
stream->filterID = -1;
if (client->filters == &stream->filter) {
client->filters = client->filters->next;
} else {
filter = client->filters;
while (filter) {
if (filter->next == &stream->filter) {
filter->next = filter->next->next;
break;
}
filter = filter->next;
}
} }
if (!stream->closed) { if (!stream->closed) {
@ -362,9 +419,8 @@ remoteRemoveClientStream(struct qemud_client *client,
if (prev) if (prev)
prev->next = curr->next; prev->next = curr->next;
else else
client->streams = curr->next; priv->streams = curr->next;
remoteFreeClientStream(client, stream); return daemonFreeClientStream(client, stream);
return 0;
} }
prev = curr; prev = curr;
curr = curr->next; curr = curr->next;
@ -380,17 +436,15 @@ remoteRemoveClientStream(struct qemud_client *client,
* 1 if message is still being processed * 1 if message is still being processed
*/ */
static int static int
remoteStreamHandleWriteData(struct qemud_client *client, daemonStreamHandleWriteData(virNetServerClientPtr client,
struct qemud_client_stream *stream, daemonClientStream *stream,
struct qemud_client_message *msg) virNetMessagePtr msg)
{ {
remote_error rerr;
int ret; int ret;
VIR_DEBUG("stream=%p proc=%d serial=%d len=%d offset=%d", VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d, len=%zu, offset=%zu",
stream, msg->hdr.proc, msg->hdr.serial, msg->bufferLength, msg->bufferOffset); client, stream, msg->header.proc, msg->header.serial,
msg->bufferLength, msg->bufferOffset);
memset(&rerr, 0, sizeof rerr);
ret = virStreamSend(stream->st, ret = virStreamSend(stream->st,
msg->buffer + msg->bufferOffset, msg->buffer + msg->bufferOffset,
@ -402,14 +456,25 @@ remoteStreamHandleWriteData(struct qemud_client *client,
/* Partial write, so indicate we have more todo later */ /* Partial write, so indicate we have more todo later */
if (msg->bufferOffset < msg->bufferLength) if (msg->bufferOffset < msg->bufferLength)
return 1; return 1;
/* A dummy 'send' just to free up 'msg' object */
memset(msg, 0, sizeof(*msg));
return virNetServerClientSendMessage(client, msg);
} else if (ret == -2) { } else if (ret == -2) {
/* Blocking, so indicate we have more todo later */ /* Blocking, so indicate we have more todo later */
return 1; return 1;
} else { } else {
virNetMessageError rerr;
memset(&rerr, 0, sizeof(rerr));
VIR_INFO("Stream send failed"); VIR_INFO("Stream send failed");
stream->closed = 1; stream->closed = 1;
remoteDispatchError(&rerr); return virNetServerProgramSendReplyError(stream->prog,
return remoteSerializeReplyError(client, &rerr, &msg->hdr); client,
msg,
&rerr,
&msg->header);
} }
return 0; return 0;
@ -419,38 +484,42 @@ remoteStreamHandleWriteData(struct qemud_client *client,
/* /*
* Process an finish handshake from the client. * Process an finish handshake from the client.
* *
* Returns a REMOTE_OK confirmation if successful, or a REMOTE_ERROR * Returns a VIR_NET_OK confirmation if successful, or a VIR_NET_ERROR
* if there was a stream error * if there was a stream error
* *
* Returns 0 if successfully sent RPC reply, -1 upon fatal error * Returns 0 if successfully sent RPC reply, -1 upon fatal error
*/ */
static int static int
remoteStreamHandleFinish(struct qemud_client *client, daemonStreamHandleFinish(virNetServerClientPtr client,
struct qemud_client_stream *stream, daemonClientStream *stream,
struct qemud_client_message *msg) virNetMessagePtr msg)
{ {
remote_error rerr;
int ret; int ret;
VIR_DEBUG("stream=%p proc=%d serial=%d", VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d",
stream, msg->hdr.proc, msg->hdr.serial); client, stream, msg->header.proc, msg->header.serial);
memset(&rerr, 0, sizeof rerr);
stream->closed = 1; stream->closed = 1;
virStreamEventRemoveCallback(stream->st); virStreamEventRemoveCallback(stream->st);
ret = virStreamFinish(stream->st); ret = virStreamFinish(stream->st);
if (ret < 0) { if (ret < 0) {
remoteDispatchError(&rerr); virNetMessageError rerr;
return remoteSerializeReplyError(client, &rerr, &msg->hdr); memset(&rerr, 0, sizeof(rerr));
return virNetServerProgramSendReplyError(stream->prog,
client,
msg,
&rerr,
&msg->header);
} else { } else {
/* Send zero-length confirm */ /* Send zero-length confirm */
if (remoteSendStreamData(client, stream, NULL, 0) < 0) return virNetServerProgramSendStreamData(stream->prog,
return -1; client,
msg,
stream->procedure,
stream->serial,
NULL, 0);
} }
return 0;
} }
@ -460,30 +529,35 @@ remoteStreamHandleFinish(struct qemud_client *client,
* Returns 0 if successfully aborted, -1 upon error * Returns 0 if successfully aborted, -1 upon error
*/ */
static int static int
remoteStreamHandleAbort(struct qemud_client *client, daemonStreamHandleAbort(virNetServerClientPtr client,
struct qemud_client_stream *stream, daemonClientStream *stream,
struct qemud_client_message *msg) virNetMessagePtr msg)
{ {
remote_error rerr; VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d",
client, stream, msg->header.proc, msg->header.serial);
virNetMessageError rerr;
VIR_DEBUG("stream=%p proc=%d serial=%d", memset(&rerr, 0, sizeof(rerr));
stream, msg->hdr.proc, msg->hdr.serial);
memset(&rerr, 0, sizeof rerr);
stream->closed = 1; stream->closed = 1;
virStreamEventRemoveCallback(stream->st); virStreamEventRemoveCallback(stream->st);
virStreamAbort(stream->st); virStreamAbort(stream->st);
if (msg->hdr.status == REMOTE_ERROR) if (msg->header.status == VIR_NET_ERROR)
remoteDispatchFormatError(&rerr, "%s", _("stream aborted at client request")); virNetError(VIR_ERR_RPC,
"%s", _("stream aborted at client request"));
else { else {
VIR_WARN("unexpected stream status %d", msg->hdr.status); VIR_WARN("unexpected stream status %d", msg->header.status);
remoteDispatchFormatError(&rerr, _("stream aborted with unexpected status %d"), virNetError(VIR_ERR_RPC,
msg->hdr.status); _("stream aborted with unexpected status %d"),
msg->header.status);
} }
return remoteSerializeReplyError(client, &rerr, &msg->hdr); return virNetServerProgramSendReplyError(remoteProgram,
client,
msg,
&rerr,
&msg->header);
} }
@ -496,41 +570,39 @@ remoteStreamHandleAbort(struct qemud_client *client,
* Returns 0 on success, or -1 upon fatal error * Returns 0 on success, or -1 upon fatal error
*/ */
static int static int
remoteStreamHandleWrite(struct qemud_client *client, daemonStreamHandleWrite(virNetServerClientPtr client,
struct qemud_client_stream *stream) daemonClientStream *stream)
{ {
struct qemud_client_message *msg, *tmp; VIR_DEBUG("client=%p, stream=%p", client, stream);
VIR_DEBUG("stream=%p", stream); while (stream->rx && !stream->closed) {
virNetMessagePtr msg = stream->rx;
msg = stream->rx;
while (msg && !stream->closed) {
int ret; int ret;
switch (msg->hdr.status) {
case REMOTE_OK: switch (msg->header.status) {
ret = remoteStreamHandleFinish(client, stream, msg); case VIR_NET_OK:
ret = daemonStreamHandleFinish(client, stream, msg);
break; break;
case REMOTE_CONTINUE: case VIR_NET_CONTINUE:
ret = remoteStreamHandleWriteData(client, stream, msg); ret = daemonStreamHandleWriteData(client, stream, msg);
break; break;
case REMOTE_ERROR: case VIR_NET_ERROR:
default: default:
ret = remoteStreamHandleAbort(client, stream, msg); ret = daemonStreamHandleAbort(client, stream, msg);
break; break;
} }
if (ret == 0) if (ret > 0)
qemudClientMessageQueueServe(&stream->rx); break; /* still processing data from msg */
else if (ret < 0)
return -1;
else
break; /* still processing data */
tmp = msg->next; virNetMessageQueueServe(&stream->rx);
qemudClientMessageRelease(client, msg); if (ret < 0) {
msg = tmp; virNetMessageFree(msg);
virNetServerClientMarkClose(client);
return -1;
}
} }
return 0; return 0;
@ -549,14 +621,14 @@ remoteStreamHandleWrite(struct qemud_client *client,
* be killed * be killed
*/ */
static int static int
remoteStreamHandleRead(struct qemud_client *client, daemonStreamHandleRead(virNetServerClientPtr client,
struct qemud_client_stream *stream) daemonClientStream *stream)
{ {
char *buffer; char *buffer;
size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX; size_t bufferLen = VIR_NET_MESSAGE_PAYLOAD_MAX;
int ret; int ret;
VIR_DEBUG("stream=%p", stream); VIR_DEBUG("client=%p, stream=%p", client, stream);
/* Shouldn't ever be called unless we're marked able to /* Shouldn't ever be called unless we're marked able to
* transmit, but doesn't hurt to check */ * transmit, but doesn't hurt to check */
@ -572,47 +644,41 @@ remoteStreamHandleRead(struct qemud_client *client,
* we're readable, but hey things change... */ * we're readable, but hey things change... */
ret = 0; ret = 0;
} else if (ret < 0) { } else if (ret < 0) {
remote_error rerr; virNetMessagePtr msg;
memset(&rerr, 0, sizeof rerr); virNetMessageError rerr;
remoteDispatchError(&rerr);
ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial); memset(&rerr, 0, sizeof(rerr));
if (!(msg = virNetMessageNew()))
ret = -1;
else
ret = virNetServerProgramSendStreamError(remoteProgram,
client,
msg,
&rerr,
stream->procedure,
stream->serial);
} else { } else {
virNetMessagePtr msg;
stream->tx = 0; stream->tx = 0;
if (ret == 0) if (ret == 0)
stream->recvEOF = 1; stream->recvEOF = 1;
ret = remoteSendStreamData(client, stream, buffer, ret); if (!(msg = virNetMessageNew()))
ret = -1;
if (msg) {
msg->cb = daemonStreamMessageFinished;
msg->opaque = stream;
virNetServerClientRef(client);
ret = virNetServerProgramSendStreamData(remoteProgram,
client,
msg,
stream->procedure,
stream->serial,
buffer, ret);
}
} }
VIR_FREE(buffer); VIR_FREE(buffer);
return ret; return ret;
} }
/*
* Invoked when an outgoing data packet message has been fully sent.
* This simply re-enables TX of further data.
*
* The idea is to stop the daemon growing without bound due to
* fast stream, but slow client
*/
void
remoteStreamMessageFinished(struct qemud_client *client,
struct qemud_client_message *msg)
{
struct qemud_client_stream *stream = client->streams;
while (stream) {
if (msg->hdr.proc == stream->procedure &&
msg->hdr.serial == stream->serial)
break;
stream = stream->next;
}
VIR_DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial);
if (stream) {
stream->tx = 1;
remoteStreamUpdateEvents(stream);
}
}

View File

@ -28,27 +28,21 @@
struct qemud_client_stream * daemonClientStream *
remoteCreateClientStream(virConnectPtr conn, daemonCreateClientStream(virNetServerClientPtr client,
remote_message_header *hdr); virStreamPtr st,
virNetServerProgramPtr prog,
virNetMessageHeaderPtr hdr);
void remoteFreeClientStream(struct qemud_client *client, int daemonFreeClientStream(virNetServerClientPtr client,
struct qemud_client_stream *stream); daemonClientStream *stream);
int remoteAddClientStream(struct qemud_client *client, int daemonAddClientStream(virNetServerClientPtr client,
struct qemud_client_stream *stream, daemonClientStream *stream,
int transmit); bool transmit);
struct qemud_client_stream *
remoteFindClientStream(struct qemud_client *client,
virStreamPtr stream);
int int
remoteRemoveClientStream(struct qemud_client *client, daemonRemoveClientStream(virNetServerClientPtr client,
struct qemud_client_stream *stream); daemonClientStream *stream);
void
remoteStreamMessageFinished(struct qemud_client *client,
struct qemud_client_message *msg);
#endif /* __LIBVIRTD_STREAM_H__ */ #endif /* __LIBVIRTD_STREAM_H__ */

View File

@ -1,4 +1,3 @@
daemon/dispatch.c
daemon/libvirtd.c daemon/libvirtd.c
daemon/remote.c daemon/remote.c
daemon/remote_dispatch_bodies.h daemon/remote_dispatch_bodies.h

View File

@ -258,80 +258,6 @@ if ($opt_d) {
} }
} }
# Prototypes for dispatch functions ("remote_dispatch_prototypes.h").
elsif ($opt_p) {
my @keys = sort (keys %calls);
foreach (@keys) {
# Skip things which are REMOTE_MESSAGE
next if $calls{$_}->{msg};
print "static int ${structprefix}Dispatch$calls{$_}->{ProcName}(\n";
print " struct qemud_server *server,\n";
print " struct qemud_client *client,\n";
print " virConnectPtr conn,\n";
print " remote_message_header *hdr,\n";
print " remote_error *rerr,\n";
print " $calls{$_}->{args} *args,\n";
print " $calls{$_}->{ret} *ret);\n";
}
}
# Union of all arg types
# ("remote_dispatch_args.h").
elsif ($opt_a) {
for ($id = 0 ; $id <= $#calls ; $id++) {
if (defined $calls[$id] &&
!$calls[$id]->{msg} &&
$calls[$id]->{args} ne "void") {
print " $calls[$id]->{args} val_$calls[$id]->{args};\n";
}
}
}
# Union of all arg types
# ("remote_dispatch_ret.h").
elsif ($opt_r) {
for ($id = 0 ; $id <= $#calls ; $id++) {
if (defined $calls[$id] &&
!$calls[$id]->{msg} &&
$calls[$id]->{ret} ne "void") {
print " $calls[$id]->{ret} val_$calls[$id]->{ret};\n";
}
}
}
# Inside the switch statement, prepare the 'fn', 'args_filter', etc
# ("remote_dispatch_table.h").
elsif ($opt_t) {
for ($id = 0 ; $id <= $#calls ; $id++) {
if (defined $calls[$id] && !$calls[$id]->{msg}) {
print "{ /* $calls[$id]->{ProcName} => $id */\n";
print " .fn = (dispatch_fn) ${structprefix}Dispatch$calls[$id]->{ProcName},\n";
if ($calls[$id]->{args} ne "void") {
print " .args_filter = (xdrproc_t) xdr_$calls[$id]->{args},\n";
} else {
print " .args_filter = (xdrproc_t) xdr_void,\n";
}
if ($calls[$id]->{ret} ne "void") {
print " .ret_filter = (xdrproc_t) xdr_$calls[$id]->{ret},\n";
} else {
print " .ret_filter = (xdrproc_t) xdr_void,\n";
}
print "},\n";
} else {
if ($calls[$id]->{msg}) {
print "{ /* Async event $calls[$id]->{ProcName} => $id */\n";
} else {
print "{ /* (unused) => $id */\n";
}
print " .fn = NULL,\n";
print " .args_filter = (xdrproc_t) xdr_void,\n";
print " .ret_filter = (xdrproc_t) xdr_void,\n";
print "},\n";
}
}
}
# Bodies for dispatch functions ("remote_dispatch_bodies.h"). # Bodies for dispatch functions ("remote_dispatch_bodies.h").
elsif ($opt_b) { elsif ($opt_b) {
my %generate = map { $_ => 1 } @autogen; my %generate = map { $_ => 1 } @autogen;
@ -343,8 +269,58 @@ elsif ($opt_b) {
# skip things which are REMOTE_MESSAGE # skip things which are REMOTE_MESSAGE
next if $call->{msg}; next if $call->{msg};
# skip procedures not on generate list my $name = $structprefix . "Dispatch" . $call->{ProcName};
next if ! exists($generate{$call->{ProcName}}); my $argtype = $call->{args};
my $rettype = $call->{ret};
my $argann = $argtype ne "void" ? "" : " ATTRIBUTE_UNUSED";
my $retann = $rettype ne "void" ? "" : " ATTRIBUTE_UNUSED";
# First we print out a function declaration for the
# real dispatcher body
print "static int ${name}(\n";
print " virNetServerPtr server,\n";
print " virNetServerClientPtr client,\n";
print " virNetMessageHeaderPtr hdr,\n";
print " virNetMessageErrorPtr rerr";
if ($argtype ne "void") {
print ",\n $argtype *args";
}
if ($rettype ne "void") {
print ",\n $rettype *ret";
}
print ");\n";
# Next we print out a generic wrapper method which has
# fixed function signature, for use in the dispatcher
# table. This simply callers the real dispatcher method
print "static int ${name}Helper(\n";
print " virNetServerPtr server,\n";
print " virNetServerClientPtr client,\n";
print " virNetMessageHeaderPtr hdr,\n";
print " virNetMessageErrorPtr rerr,\n";
print " void *args$argann,\n";
print " void *ret$retann)\n";
print "{\n";
print " VIR_DEBUG(\"server=%p client=%p hdr=%p rerr=%p args=%p ret=%p\", server, client, hdr, rerr, args, ret);\n";
print " return $name(server, client, hdr, rerr";
if ($argtype ne "void") {
print ", args";
}
if ($rettype ne "void") {
print ", ret";
}
print ");\n";
print "}\n";
# Finally we print out the dispatcher method body impl
# (if possible)
if (!exists($generate{$call->{ProcName}})) {
print "/* ${structprefix}Dispatch$call->{ProcName} body has " .
"to be implemented manually */\n\n\n\n";
next;
}
my $has_node_device = 0; my $has_node_device = 0;
my @vars_list = (); my @vars_list = ();
@ -354,18 +330,18 @@ elsif ($opt_b) {
my @prepare_ret_list = (); my @prepare_ret_list = ();
my @ret_list = (); my @ret_list = ();
my @free_list = (); my @free_list = ();
my @free_list_on_error = ("remoteDispatchError(rerr);"); my @free_list_on_error = ("virNetMessageSaveError(rerr);");
# handle arguments to the function # handle arguments to the function
if ($call->{args} ne "void") { if ($argtype ne "void") {
# node device is special, as it's identified by name # node device is special, as it's identified by name
if ($call->{args} =~ m/^remote_node_device_/ and if ($argtype =~ m/^remote_node_device_/ and
!($call->{args} =~ m/^remote_node_device_lookup_by_name_/) and !($argtype =~ m/^remote_node_device_lookup_by_name_/) and
!($call->{args} =~ m/^remote_node_device_create_xml_/)) { !($argtype =~ m/^remote_node_device_create_xml_/)) {
$has_node_device = 1; $has_node_device = 1;
push(@vars_list, "virNodeDevicePtr dev = NULL"); push(@vars_list, "virNodeDevicePtr dev = NULL");
push(@getters_list, push(@getters_list,
" if (!(dev = virNodeDeviceLookupByName(conn, args->name)))\n" . " if (!(dev = virNodeDeviceLookupByName(priv->conn, args->name)))\n" .
" goto cleanup;\n"); " goto cleanup;\n");
push(@args_list, "dev"); push(@args_list, "dev");
push(@free_list, push(@free_list,
@ -382,7 +358,7 @@ elsif ($opt_b) {
push(@vars_list, "vir${type_name}Ptr $2 = NULL"); push(@vars_list, "vir${type_name}Ptr $2 = NULL");
push(@getters_list, push(@getters_list,
" if (!($2 = get_nonnull_$1(conn, args->$2)))\n" . " if (!($2 = get_nonnull_$1(priv->conn, args->$2)))\n" .
" goto cleanup;\n"); " goto cleanup;\n");
push(@args_list, "$2"); push(@args_list, "$2");
push(@free_list, push(@free_list,
@ -392,7 +368,7 @@ elsif ($opt_b) {
push(@vars_list, "virDomainPtr dom = NULL"); push(@vars_list, "virDomainPtr dom = NULL");
push(@vars_list, "virDomainSnapshotPtr snapshot = NULL"); push(@vars_list, "virDomainSnapshotPtr snapshot = NULL");
push(@getters_list, push(@getters_list,
" if (!(dom = get_nonnull_domain(conn, args->snap.dom)))\n" . " if (!(dom = get_nonnull_domain(priv->conn, args->snap.dom)))\n" .
" goto cleanup;\n" . " goto cleanup;\n" .
"\n" . "\n" .
" if (!(snapshot = get_nonnull_domain_snapshot(dom, args->snap)))\n" . " if (!(snapshot = get_nonnull_domain_snapshot(dom, args->snap)))\n" .
@ -405,14 +381,14 @@ elsif ($opt_b) {
" virDomainFree(dom);"); " virDomainFree(dom);");
} elsif ($args_member =~ m/^(?:remote_string|remote_uuid) (\S+)<\S+>;/) { } elsif ($args_member =~ m/^(?:remote_string|remote_uuid) (\S+)<\S+>;/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
push(@args_list, "args->$1.$1_val"); push(@args_list, "args->$1.$1_val");
push(@args_list, "args->$1.$1_len"); push(@args_list, "args->$1.$1_len");
} elsif ($args_member =~ m/^(?:opaque|remote_nonnull_string) (\S+)<\S+>;(.*)$/) { } elsif ($args_member =~ m/^(?:opaque|remote_nonnull_string) (\S+)<\S+>;(.*)$/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
my $cast = ""; my $cast = "";
@ -431,7 +407,7 @@ elsif ($opt_b) {
push(@args_list, "args->$arg_name.${arg_name}_len"); push(@args_list, "args->$arg_name.${arg_name}_len");
} elsif ($args_member =~ m/^(?:unsigned )?int (\S+)<\S+>;/) { } elsif ($args_member =~ m/^(?:unsigned )?int (\S+)<\S+>;/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
push(@args_list, "args->$1.$1_val"); push(@args_list, "args->$1.$1_val");
@ -452,13 +428,13 @@ elsif ($opt_b) {
die "unhandled type for argument value: $args_member"; die "unhandled type for argument value: $args_member";
} elsif ($args_member =~ m/^remote_uuid (\S+);/) { } elsif ($args_member =~ m/^remote_uuid (\S+);/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
push(@args_list, "(unsigned char *) args->$1"); push(@args_list, "(unsigned char *) args->$1");
} elsif ($args_member =~ m/^remote_string (\S+);/) { } elsif ($args_member =~ m/^remote_string (\S+);/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
push(@vars_list, "char *$1"); push(@vars_list, "char *$1");
@ -466,19 +442,19 @@ elsif ($opt_b) {
push(@args_list, "$1"); push(@args_list, "$1");
} elsif ($args_member =~ m/^remote_nonnull_string (\S+);/) { } elsif ($args_member =~ m/^remote_nonnull_string (\S+);/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
push(@args_list, "args->$1"); push(@args_list, "args->$1");
} elsif ($args_member =~ m/^(unsigned )?int (\S+);/) { } elsif ($args_member =~ m/^(unsigned )?int (\S+);/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
push(@args_list, "args->$2"); push(@args_list, "args->$2");
} elsif ($args_member =~ m/^(unsigned )?hyper (\S+);/) { } elsif ($args_member =~ m/^(unsigned )?hyper (\S+);/) {
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
my $arg_name = $2; my $arg_name = $2;
@ -511,12 +487,12 @@ elsif ($opt_b) {
my $single_ret_list_max_define = "undefined"; my $single_ret_list_max_define = "undefined";
my $multi_ret = 0; my $multi_ret = 0;
if ($call->{ret} ne "void" and if ($rettype ne "void" and
scalar(@{$call->{ret_members}}) > 1) { scalar(@{$call->{ret_members}}) > 1) {
$multi_ret = 1; $multi_ret = 1;
} }
if ($call->{ret} ne "void") { if ($rettype ne "void") {
foreach my $ret_member (@{$call->{ret_members}}) { foreach my $ret_member (@{$call->{ret_members}}) {
if ($multi_ret) { if ($multi_ret) {
if ($ret_member =~ m/^(unsigned )?(char|short|int|hyper) (\S+)\[\S+\];/) { if ($ret_member =~ m/^(unsigned )?(char|short|int|hyper) (\S+)\[\S+\];/) {
@ -715,8 +691,8 @@ elsif ($opt_b) {
die "multi-return-value without insert@<offset> annotation: $call->{ret}"; die "multi-return-value without insert@<offset> annotation: $call->{ret}";
} }
if (!@args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
} }
my $struct_name = $call->{ProcName}; my $struct_name = $call->{ProcName};
@ -737,35 +713,27 @@ elsif ($opt_b) {
} }
if ($call->{streamflag} ne "none") { if ($call->{streamflag} ne "none") {
splice(@args_list, $call->{streamoffset}, 0, ("stream->st")); splice(@args_list, $call->{streamoffset}, 0, ("st"));
push(@free_list_on_error, "if (stream) {"); push(@free_list_on_error, "if (stream) {");
push(@free_list_on_error, " virStreamAbort(stream->st);"); push(@free_list_on_error, " virStreamAbort(st);");
push(@free_list_on_error, " remoteFreeClientStream(client, stream);"); push(@free_list_on_error, " daemonFreeClientStream(client, stream);");
push(@free_list_on_error, "} else {");
push(@free_list_on_error, " virStreamFree(st);");
push(@free_list_on_error, "}"); push(@free_list_on_error, "}");
} }
# print functions signature # print functions signature
print "\n"; print "static int $name(\n";
print "static int\n"; print " virNetServerPtr server ATTRIBUTE_UNUSED,\n";
print "${structprefix}Dispatch$call->{ProcName}(\n"; print " virNetServerClientPtr client,\n";
print " struct qemud_server *server ATTRIBUTE_UNUSED,\n"; print " virNetMessageHeaderPtr hdr ATTRIBUTE_UNUSED,\n";
print " struct qemud_client *client ATTRIBUTE_UNUSED,\n"; print " virNetMessageErrorPtr rerr";
print " virConnectPtr conn,\n"; if ($argtype ne "void") {
print " remote_message_header *hdr ATTRIBUTE_UNUSED,\n"; print ",\n $argtype *args";
print " remote_error *rerr,\n";
print " $call->{args} *args";
if ($call->{args} eq "void") {
print " ATTRIBUTE_UNUSED"
} }
if ($rettype ne "void") {
print ",\n"; print ",\n $rettype *ret";
print " $call->{ret} *ret";
if ($call->{ret} eq "void") {
print " ATTRIBUTE_UNUSED"
} }
print ")\n"; print ")\n";
# print function body # print function body
@ -775,13 +743,16 @@ elsif ($opt_b) {
foreach my $var (@vars_list) { foreach my $var (@vars_list) {
print " $var;\n"; print " $var;\n";
} }
print " struct daemonClientPrivate *priv =\n";
print " virNetServerClientGetPrivateData(client);\n";
if ($call->{streamflag} ne "none") { if ($call->{streamflag} ne "none") {
print " struct qemud_client_stream *stream = NULL;\n"; print " virStreamPtr st = NULL;\n";
print " daemonClientStreamPtr stream = NULL;\n";
} }
print "\n"; print "\n";
print " if (!conn) {\n"; print " if (!priv->conn) {\n";
print " virNetError(VIR_ERR_INTERNAL_ERROR, \"%s\", _(\"connection not open\"));\n"; print " virNetError(VIR_ERR_INTERNAL_ERROR, \"%s\", _(\"connection not open\"));\n";
print " goto cleanup;\n"; print " goto cleanup;\n";
print " }\n"; print " }\n";
@ -811,12 +782,15 @@ elsif ($opt_b) {
} }
if ($call->{streamflag} ne "none") { if ($call->{streamflag} ne "none") {
print " if (!(stream = remoteCreateClientStream(conn, hdr)))\n"; print " if (!(st = virStreamNew(priv->conn, VIR_STREAM_NONBLOCK)))\n";
print " goto cleanup;\n";
print "\n";
print " if (!(stream = daemonCreateClientStream(client, st, remoteProgram, hdr)))\n";
print " goto cleanup;\n"; print " goto cleanup;\n";
print "\n"; print "\n";
} }
if ($call->{ret} eq "void") { if ($rettype eq "void") {
print " if (vir$call->{ProcName}("; print " if (vir$call->{ProcName}(";
print join(', ', @args_list); print join(', ', @args_list);
print ") < 0)\n"; print ") < 0)\n";
@ -827,7 +801,7 @@ elsif ($opt_b) {
my $proc_name = $call->{ProcName}; my $proc_name = $call->{ProcName};
if (! @args_list) { if (! @args_list) {
push(@args_list, "conn"); push(@args_list, "priv->conn");
if ($call->{ProcName} ne "NodeGetFreeMemory") { if ($call->{ProcName} ne "NodeGetFreeMemory") {
$prefix = "Connect" $prefix = "Connect"
@ -885,12 +859,12 @@ elsif ($opt_b) {
} }
if ($call->{streamflag} ne "none") { if ($call->{streamflag} ne "none") {
print " if (remoteAddClientStream(client, stream, "; print " if (daemonAddClientStream(client, stream, ";
if ($call->{streamflag} eq "write") { if ($call->{streamflag} eq "write") {
print "0"; print "false";
} else { } else {
print "1"; print "true";
} }
print ") < 0)\n"; print ") < 0)\n";
@ -934,8 +908,46 @@ elsif ($opt_b) {
} }
print " return rv;\n"; print " return rv;\n";
print "}\n"; print "}\n\n\n\n";
} }
# Finally we write out the huge dispatch table which lists
# the dispatch helper method. the XDR proc for processing
# args and return values, and the size of the args and
# return value structs. All methods are marked as requiring
# authentication. Methods are selectively relaxed in the
# daemon code which registers the program.
print "virNetServerProgramProc ${structprefix}Procs[] = {\n";
for ($id = 0 ; $id <= $#calls ; $id++) {
my ($comment, $name, $argtype, $arglen, $argfilter, $retlen, $retfilter);
if (defined $calls[$id] && !$calls[$id]->{msg}) {
$comment = "/* Method $calls[$id]->{ProcName} => $id */";
$name = $structprefix . "Dispatch" . $calls[$id]->{ProcName} . "Helper";
my $argtype = $calls[$id]->{args};
my $rettype = $calls[$id]->{ret};
$arglen = $argtype ne "void" ? "sizeof($argtype)" : "0";
$retlen = $rettype ne "void" ? "sizeof($rettype)" : "0";
$argfilter = $argtype ne "void" ? "xdr_$argtype" : "xdr_void";
$retfilter = $rettype ne "void" ? "xdr_$rettype" : "xdr_void";
} else {
if ($calls[$id]->{msg}) {
$comment = "/* Async event $calls[$id]->{ProcName} => $id */";
} else {
$comment = "/* Unused $id */";
}
$name = "NULL";
$arglen = $retlen = 0;
$argfilter = "xdr_void";
$retfilter = "xdr_void";
}
print "{ $comment\n ${name},\n $arglen,\n (xdrproc_t)$argfilter,\n $retlen,\n (xdrproc_t)$retfilter,\n true \n},\n";
}
print "};\n";
print "size_t ${structprefix}NProcs = ARRAY_CARDINALITY(${structprefix}Procs);\n";
} }
# Bodies for client functions ("remote_client_bodies.h"). # Bodies for client functions ("remote_client_bodies.h").
@ -952,6 +964,9 @@ elsif ($opt_k) {
# skip procedures not on generate list # skip procedures not on generate list
next if ! exists($generate{$call->{ProcName}}); next if ! exists($generate{$call->{ProcName}});
my $argtype = $call->{args};
my $rettype = $call->{ret};
# handle arguments to the function # handle arguments to the function
my @args_list = (); my @args_list = ();
my @vars_list = (); my @vars_list = ();
@ -962,18 +977,18 @@ elsif ($opt_k) {
my $priv_name = "privateData"; my $priv_name = "privateData";
my $call_args = "&args"; my $call_args = "&args";
if ($call->{args} eq "void") { if ($argtype eq "void") {
$call_args = "NULL"; $call_args = "NULL";
} else { } else {
push(@vars_list, "$call->{args} args"); push(@vars_list, "$argtype args");
my $is_first_arg = 1; my $is_first_arg = 1;
my $has_node_device = 0; my $has_node_device = 0;
# node device is special # node device is special
if ($call->{args} =~ m/^remote_node_/ and if ($argtype =~ m/^remote_node_/ and
!($call->{args} =~ m/^remote_node_device_lookup_by_name_/) and !($argtype =~ m/^remote_node_device_lookup_by_name_/) and
!($call->{args} =~ m/^remote_node_device_create_xml_/)) { !($argtype =~ m/^remote_node_device_create_xml_/)) {
$has_node_device = 1; $has_node_device = 1;
$priv_name = "devMonPrivateData"; $priv_name = "devMonPrivateData";
} }
@ -1150,15 +1165,15 @@ elsif ($opt_k) {
my $single_ret_cleanup = 0; my $single_ret_cleanup = 0;
my $multi_ret = 0; my $multi_ret = 0;
if ($call->{ret} ne "void" and if ($rettype ne "void" and
scalar(@{$call->{ret_members}}) > 1) { scalar(@{$call->{ret_members}}) > 1) {
$multi_ret = 1; $multi_ret = 1;
} }
if ($call->{ret} eq "void") { if ($rettype eq "void") {
$call_ret = "NULL"; $call_ret = "NULL";
} else { } else {
push(@vars_list, "$call->{ret} ret"); push(@vars_list, "$rettype ret");
foreach my $ret_member (@{$call->{ret_members}}) { foreach my $ret_member (@{$call->{ret_members}}) {
if ($multi_ret) { if ($multi_ret) {
@ -1233,7 +1248,7 @@ elsif ($opt_k) {
push(@ret_list, "rv = get_nonnull_$name($priv_src, ret.$arg_name);"); push(@ret_list, "rv = get_nonnull_$name($priv_src, ret.$arg_name);");
} }
push(@ret_list, "xdr_free((xdrproc_t)xdr_$call->{ret}, (char *)&ret);"); push(@ret_list, "xdr_free((xdrproc_t)xdr_$rettype, (char *)&ret);");
$single_ret_var = "vir${type_name}Ptr rv = NULL"; $single_ret_var = "vir${type_name}Ptr rv = NULL";
$single_ret_type = "vir${type_name}Ptr"; $single_ret_type = "vir${type_name}Ptr";
} }
@ -1397,15 +1412,15 @@ elsif ($opt_k) {
print "\n"; print "\n";
} }
if ($call->{ret} ne "void") { if ($rettype ne "void") {
print "\n"; print "\n";
print " memset(&ret, 0, sizeof ret);\n"; print " memset(&ret, 0, sizeof ret);\n";
} }
print "\n"; print "\n";
print " if (call($priv_src, priv, 0, ${procprefix}_PROC_$call->{UC_NAME},\n"; print " if (call($priv_src, priv, 0, ${procprefix}_PROC_$call->{UC_NAME},\n";
print " (xdrproc_t)xdr_$call->{args}, (char *)$call_args,\n"; print " (xdrproc_t)xdr_$argtype, (char *)$call_args,\n";
print " (xdrproc_t)xdr_$call->{ret}, (char *)$call_ret) == -1) {\n"; print " (xdrproc_t)xdr_$rettype, (char *)$call_ret) == -1) {\n";
if ($call->{streamflag} ne "none") { if ($call->{streamflag} ne "none") {
print " virNetClientRemoveStream(priv->client, netst);\n"; print " virNetClientRemoveStream(priv->client, netst);\n";

View File

@ -51,6 +51,9 @@ void virNetMessageFree(virNetMessagePtr msg)
if (!msg) if (!msg)
return; return;
if (msg->cb)
msg->cb(msg, msg->opaque);
VIR_DEBUG("msg=%p", msg); VIR_DEBUG("msg=%p", msg);
VIR_FREE(msg); VIR_FREE(msg);

View File

@ -29,6 +29,8 @@ typedef struct virNetMessageError *virNetMessageErrorPtr;
typedef struct _virNetMessage virNetMessage; typedef struct _virNetMessage virNetMessage;
typedef virNetMessage *virNetMessagePtr; typedef virNetMessage *virNetMessagePtr;
typedef void (*virNetMessageFreeCallback)(virNetMessagePtr msg, void *opaque);
/* Never allocate this (huge) buffer on the stack. Always /* Never allocate this (huge) buffer on the stack. Always
* use virNetMessageNew() to allocate on the heap * use virNetMessageNew() to allocate on the heap
*/ */
@ -39,6 +41,9 @@ struct _virNetMessage {
virNetMessageHeader header; virNetMessageHeader header;
virNetMessageFreeCallback cb;
void *opaque;
virNetMessagePtr next; virNetMessagePtr next;
}; };