node_device_udev: Use a worker pool for processing events and emitting nodedev event

Use a worker pool for processing the events (e.g. udev, mdevctl config changes)
and the initialization instead of a separate initThread and a mdevctl-thread.
This has the large advantage that we can leverage the job API and now this
thread pool is responsible to do all the "costly-work" and emitting the libvirt
nodedev events.

Reviewed-by: Jonathon Jongsma <jjongsma@redhat.com>
Reviewed-by: Boris Fiuczynski <fiuczy@linux.ibm.com>
Signed-off-by: Marc Hartmayer <mhartmay@linux.ibm.com>
This commit is contained in:
Marc Hartmayer 2024-04-23 20:09:02 +02:00 committed by Jonathon Jongsma
parent 01ab7047e9
commit b56458d443
3 changed files with 185 additions and 73 deletions

View File

@ -1419,10 +1419,11 @@ nodeDeviceDestroy(virNodeDevicePtr device)
goto cleanup; goto cleanup;
/* Because we're about to release the lock and thus run into a race /* Because we're about to release the lock and thus run into a race
* possibility (however improbable) with a udevAddOneDevice change * possibility (however improbable) with a
* event which would essentially free the existing @def (obj->def) and * processNodeDeviceAddAndChangeEvent change event which would
* replace it with something new, we need to grab the parent field * essentially free the existing @def (obj->def) and replace it with
* and then find the parent obj in order to manage the vport */ * something new, we need to grab the parent field and then find the
* parent obj in order to manage the vport */
parent = g_strdup(def->parent); parent = g_strdup(def->parent);
virNodeDeviceObjEndAPI(&obj); virNodeDeviceObjEndAPI(&obj);

View File

@ -43,6 +43,7 @@
#include "virnetdev.h" #include "virnetdev.h"
#include "virmdev.h" #include "virmdev.h"
#include "virutil.h" #include "virutil.h"
#include "virthreadpool.h"
#include "configmake.h" #include "configmake.h"
@ -69,13 +70,13 @@ struct _udevEventData {
bool udevThreadQuit; bool udevThreadQuit;
bool udevDataReady; bool udevDataReady;
/* init thread */
virThread *initThread;
/* Protects @mdevctlMonitors */ /* Protects @mdevctlMonitors */
virMutex mdevctlLock; virMutex mdevctlLock;
GList *mdevctlMonitors; GList *mdevctlMonitors;
int mdevctlTimeout; int mdevctlTimeout;
/* Immutable pointer, self-locking APIs */
virThreadPool *workerPool;
}; };
static virClass *udevEventDataClass; static virClass *udevEventDataClass;
@ -86,8 +87,6 @@ udevEventDataDispose(void *obj)
struct udev *udev = NULL; struct udev *udev = NULL;
udevEventData *priv = obj; udevEventData *priv = obj;
g_clear_pointer(&priv->initThread, g_free);
VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) { VIR_WITH_MUTEX_LOCK_GUARD(&priv->mdevctlLock) {
g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref); g_list_free_full(g_steal_pointer(&priv->mdevctlMonitors), g_object_unref);
} }
@ -100,6 +99,8 @@ udevEventDataDispose(void *obj)
udev_unref(udev); udev_unref(udev);
} }
g_clear_pointer(&priv->workerPool, virThreadPoolFree);
virMutexDestroy(&priv->mdevctlLock); virMutexDestroy(&priv->mdevctlLock);
virCondDestroy(&priv->udevThreadCond); virCondDestroy(&priv->udevThreadCond);
@ -143,6 +144,66 @@ udevEventDataNew(void)
return ret; return ret;
} }
typedef enum {
NODE_DEVICE_EVENT_INIT = 0,
NODE_DEVICE_EVENT_UDEV_ADD,
NODE_DEVICE_EVENT_UDEV_REMOVE,
NODE_DEVICE_EVENT_UDEV_CHANGE,
NODE_DEVICE_EVENT_UDEV_MOVE,
NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED,
NODE_DEVICE_EVENT_LAST
} nodeDeviceEventType;
struct _nodeDeviceEvent {
nodeDeviceEventType eventType;
void *data;
virFreeCallback dataFreeFunc;
};
typedef struct _nodeDeviceEvent nodeDeviceEvent;
static void
nodeDeviceEventFree(nodeDeviceEvent *event)
{
if (!event)
return;
if (event->dataFreeFunc)
event->dataFreeFunc(event->data);
g_free(event);
}
G_DEFINE_AUTOPTR_CLEANUP_FUNC(nodeDeviceEvent, nodeDeviceEventFree);
/**
* nodeDeviceEventSubmit:
* @eventType: the event to be processed
* @data: additional data for the event processor (the pointer is stolen and it
* will be properly freed using @dataFreeFunc)
* @dataFreeFunc: callback to free @data
*
* Submits @eventType to be processed by the asynchronous event handling
* thread.
*/
static int nodeDeviceEventSubmit(nodeDeviceEventType eventType, void *data, virFreeCallback dataFreeFunc)
{
nodeDeviceEvent *event = g_new0(nodeDeviceEvent, 1);
udevEventData *priv = NULL;
if (!driver)
return -1;
priv = driver->privateData;
event->eventType = eventType;
event->data = data;
event->dataFreeFunc = dataFreeFunc;
if (virThreadPoolSendJob(priv->workerPool, 0, event) < 0) {
nodeDeviceEventFree(event);
return -1;
}
return 0;
}
static bool static bool
udevHasDeviceProperty(struct udev_device *dev, udevHasDeviceProperty(struct udev_device *dev,
@ -1446,8 +1507,8 @@ udevGetDeviceDetails(virNodeDeviceDriverState *driver_state,
static int static int
udevRemoveOneDeviceSysPath(virNodeDeviceDriverState *driver_state, processNodeDeviceRemoveEvent(virNodeDeviceDriverState *driver_state,
const char *path) const char *path)
{ {
virNodeDeviceObj *obj = NULL; virNodeDeviceObj *obj = NULL;
virNodeDeviceDef *def; virNodeDeviceDef *def;
@ -1529,8 +1590,8 @@ udevSetParent(virNodeDeviceDriverState *driver_state,
} }
static int static int
udevAddOneDevice(virNodeDeviceDriverState *driver_state, processNodeDeviceAddAndChangeEvent(virNodeDeviceDriverState *driver_state,
struct udev_device *device) struct udev_device *device)
{ {
g_autofree char *sysfs_path = NULL; g_autofree char *sysfs_path = NULL;
virNodeDeviceDef *def = NULL; virNodeDeviceDef *def = NULL;
@ -1643,7 +1704,7 @@ udevProcessDeviceListEntry(virNodeDeviceDriverState *driver_state,
device = udev_device_new_from_syspath(udev, name); device = udev_device_new_from_syspath(udev, name);
if (device != NULL) { if (device != NULL) {
if (udevAddOneDevice(driver_state, device) != 0) { if (processNodeDeviceAddAndChangeEvent(driver_state, device) != 0) {
VIR_DEBUG("Failed to create node device for udev device '%s'", VIR_DEBUG("Failed to create node device for udev device '%s'",
name); name);
} }
@ -1752,26 +1813,23 @@ udevHandleOneDevice(struct udev_device *device)
VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device)); VIR_DEBUG("udev action: '%s': %s", action, udev_device_get_syspath(device));
if (STREQ(action, "add") || STREQ(action, "change")) /* Reference is either released via workerpool logic or at the end of this
return udevAddOneDevice(driver, device); * function. */
device = udev_device_ref(device);
if (STREQ(action, "remove")) { if (STREQ(action, "add")) {
const char *path = udev_device_get_syspath(device); return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_ADD, device,
(virFreeCallback)udev_device_unref);
return udevRemoveOneDeviceSysPath(driver, path); } else if (STREQ(action, "change")) {
} return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_CHANGE, device,
(virFreeCallback)udev_device_unref);
if (STREQ(action, "move")) { } else if (STREQ(action, "remove")) {
const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD"); return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_REMOVE, device,
(virFreeCallback)udev_device_unref);
if (devpath_old) { } else if (STREQ(action, "move")) {
g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old); return nodeDeviceEventSubmit(NODE_DEVICE_EVENT_UDEV_MOVE, device,
(virFreeCallback)udev_device_unref);
udevRemoveOneDeviceSysPath(driver, devpath_old_fixed);
}
return udevAddOneDevice(driver, device);
} }
udev_device_unref(device);
return 0; return 0;
} }
@ -1990,23 +2048,24 @@ udevSetupSystemDev(void)
static void static void
nodeStateInitializeEnumerate(void *opaque) processNodeStateInitializeEnumerate(virNodeDeviceDriverState *driver_state,
void *opaque)
{ {
struct udev *udev = opaque; struct udev *udev = opaque;
udevEventData *priv = driver->privateData; udevEventData *priv = driver_state->privateData;
/* Populate with known devices */ /* Populate with known devices */
if (udevEnumerateDevices(driver, udev) != 0) if (udevEnumerateDevices(driver_state, udev) != 0)
goto error; goto error;
/* Load persistent mdevs (which might not be activated yet) and additional /* Load persistent mdevs (which might not be activated yet) and additional
* information about active mediated devices from mdevctl */ * information about active mediated devices from mdevctl */
if (nodeDeviceUpdateMediatedDevices(driver) != 0) if (nodeDeviceUpdateMediatedDevices(driver_state) != 0)
goto error; goto error;
cleanup: cleanup:
VIR_WITH_MUTEX_LOCK_GUARD(&driver->lock) { VIR_WITH_MUTEX_LOCK_GUARD(&driver_state->lock) {
driver->initialized = true; driver_state->initialized = true;
virCondBroadcast(&driver->initCond); virCondBroadcast(&driver_state->initCond);
} }
return; return;
@ -2048,31 +2107,16 @@ udevPCITranslateInit(bool privileged G_GNUC_UNUSED)
static void static void
mdevctlUpdateThreadFunc(void *opaque) submitMdevctlUpdate(int timer G_GNUC_UNUSED, void *opaque)
{
virNodeDeviceDriverState *driver_state = opaque;
if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
VIR_WARN("mdevctl failed to update mediated devices");
}
static void
launchMdevctlUpdateThread(int timer G_GNUC_UNUSED, void *opaque)
{ {
udevEventData *priv = opaque; udevEventData *priv = opaque;
virThread thread;
if (priv->mdevctlTimeout != -1) { if (priv->mdevctlTimeout != -1) {
virEventRemoveTimeout(priv->mdevctlTimeout); virEventRemoveTimeout(priv->mdevctlTimeout);
priv->mdevctlTimeout = -1; priv->mdevctlTimeout = -1;
} }
if (virThreadCreateFull(&thread, false, mdevctlUpdateThreadFunc, nodeDeviceEventSubmit(NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED, NULL, NULL);
"mdevctl-thread", false, driver) < 0) {
virReportSystemError(errno, "%s",
_("failed to create mdevctl thread"));
}
} }
@ -2167,7 +2211,7 @@ mdevctlEnableMonitor(udevEventData *priv)
/* Schedules an mdevctl update for 100ms in the future, canceling any existing /* Schedules an mdevctl update for 100ms in the future, canceling any existing
* timeout that may have been set. In this way, multiple update requests in * timeout that may have been set. In this way, multiple update requests in
* quick succession can be collapsed into a single update. if @force is true, * quick succession can be collapsed into a single update. if @force is true,
* an update thread will be spawned immediately. */ * the worker job is submitted immediately. */
static void static void
scheduleMdevctlUpdate(udevEventData *data, scheduleMdevctlUpdate(udevEventData *data,
bool force) bool force)
@ -2175,12 +2219,12 @@ scheduleMdevctlUpdate(udevEventData *data,
if (!force) { if (!force) {
if (data->mdevctlTimeout != -1) if (data->mdevctlTimeout != -1)
virEventRemoveTimeout(data->mdevctlTimeout); virEventRemoveTimeout(data->mdevctlTimeout);
data->mdevctlTimeout = virEventAddTimeout(100, launchMdevctlUpdateThread, data->mdevctlTimeout = virEventAddTimeout(100, submitMdevctlUpdate,
data, NULL); data, NULL);
return; return;
} }
launchMdevctlUpdateThread(-1, data); submitMdevctlUpdate(-1, data);
} }
@ -2220,6 +2264,62 @@ mdevctlEventHandleCallback(GFileMonitor *monitor G_GNUC_UNUSED,
} }
static void nodeDeviceEventHandler(void *data, void *opaque)
{
virNodeDeviceDriverState *driver_state = opaque;
g_autoptr(nodeDeviceEvent) processEvent = data;
switch (processEvent->eventType) {
case NODE_DEVICE_EVENT_INIT:
{
struct udev *udev = processEvent->data;
processNodeStateInitializeEnumerate(driver_state, udev);
}
break;
case NODE_DEVICE_EVENT_UDEV_ADD:
case NODE_DEVICE_EVENT_UDEV_CHANGE:
{
struct udev_device *device = processEvent->data;
processNodeDeviceAddAndChangeEvent(driver_state, device);
}
break;
case NODE_DEVICE_EVENT_UDEV_REMOVE:
{
struct udev_device *device = processEvent->data;
const char *path = udev_device_get_syspath(device);
processNodeDeviceRemoveEvent(driver_state, path);
}
break;
case NODE_DEVICE_EVENT_UDEV_MOVE:
{
struct udev_device *device = processEvent->data;
const char *devpath_old = udevGetDeviceProperty(device, "DEVPATH_OLD");
if (devpath_old) {
g_autofree char *devpath_old_fixed = g_strdup_printf("/sys%s", devpath_old);
processNodeDeviceRemoveEvent(driver_state, devpath_old_fixed);
}
processNodeDeviceAddAndChangeEvent(driver_state, device);
}
break;
case NODE_DEVICE_EVENT_MDEVCTL_CONFIG_CHANGED:
{
if (nodeDeviceUpdateMediatedDevices(driver_state) < 0)
VIR_WARN("mdevctl failed to update mediated devices");
}
break;
case NODE_DEVICE_EVENT_LAST:
g_assert_not_reached();
break;
}
}
/* Note: It must be safe to call this function even if the driver was not /* Note: It must be safe to call this function even if the driver was not
* successfully initialized. This must be considered when changing this * successfully initialized. This must be considered when changing this
* function. */ * function. */
@ -2255,6 +2355,9 @@ nodeStateShutdownPrepare(void)
priv->udevThreadQuit = true; priv->udevThreadQuit = true;
virCondSignal(&priv->udevThreadCond); virCondSignal(&priv->udevThreadCond);
} }
if (priv->workerPool)
virThreadPoolStop(priv->workerPool);
return 0; return 0;
} }
@ -2275,11 +2378,12 @@ nodeStateShutdownWait(void)
return 0; return 0;
VIR_WITH_OBJECT_LOCK_GUARD(priv) { VIR_WITH_OBJECT_LOCK_GUARD(priv) {
if (priv->initThread)
virThreadJoin(priv->initThread);
if (priv->udevThread) if (priv->udevThread)
virThreadJoin(priv->udevThread); virThreadJoin(priv->udevThread);
} }
if (priv->workerPool)
virThreadPoolDrain(priv->workerPool);
return 0; return 0;
} }
@ -2350,6 +2454,19 @@ nodeStateInitialize(bool privileged,
driver->parserCallbacks.postParse = nodeDeviceDefPostParse; driver->parserCallbacks.postParse = nodeDeviceDefPostParse;
driver->parserCallbacks.validate = nodeDeviceDefValidate; driver->parserCallbacks.validate = nodeDeviceDefValidate;
/* With the current design, we can only have exactly *one* worker thread as
* otherwise we cannot guarantee that the 'order(udev_events) ==
* order(nodedev_events)' is preserved. The worker pool must be initialized
* before trying to reconnect to all the running mdevs since there might
* occur some mdevctl monitor events that will be dispatched to the worker
* pool. */
priv->workerPool = virThreadPoolNewFull(1, 1, 0, nodeDeviceEventHandler,
"nodev-device-event",
NULL,
driver);
if (!priv->workerPool)
goto unlock;
if (udevPCITranslateInit(privileged) < 0) if (udevPCITranslateInit(privileged) < 0)
goto unlock; goto unlock;
@ -2407,13 +2524,7 @@ nodeStateInitialize(bool privileged,
if (udevSetupSystemDev() != 0) if (udevSetupSystemDev() != 0)
goto cleanup; goto cleanup;
priv->initThread = g_new0(virThread, 1); nodeDeviceEventSubmit(NODE_DEVICE_EVENT_INIT, udev_ref(udev), (virFreeCallback)udev_unref);
if (virThreadCreateFull(priv->initThread, true, nodeStateInitializeEnumerate,
"nodedev-init", false, udev) < 0) {
virReportSystemError(errno, "%s",
_("failed to create udev enumerate thread"));
goto cleanup;
}
return VIR_DRV_STATE_INIT_COMPLETE; return VIR_DRV_STATE_INIT_COMPLETE;

View File

@ -7768,10 +7768,10 @@ testNodeDeviceDestroy(virNodeDevicePtr dev)
if (virNodeDeviceGetWWNs(def, &wwnn, &wwpn) == -1) if (virNodeDeviceGetWWNs(def, &wwnn, &wwpn) == -1)
goto cleanup; goto cleanup;
/* Unlike the real code we cannot run into the udevAddOneDevice race /* Unlike the real code we cannot run into the
* which would replace obj->def, so no need to save off the parent, * processNodeDeviceAddAndChangeEvent race which would replace obj->def, so
* but do need to drop the @obj lock so that the FindByName code doesn't * no need to save off the parent, but do need to drop the @obj lock so that
* deadlock on ourselves */ * the FindByName code doesn't deadlock on ourselves */
virObjectUnlock(obj); virObjectUnlock(obj);
/* We do this just for basic validation and throw away the parentobj /* We do this just for basic validation and throw away the parentobj