[PATCH RFC 2/3] netdev-linux: add new netdev type afxdp.


William Tu
 

The patch creates a new netdev type "afxdp" and copies some of the
afxdp api implementation from xdpsock_user.c at linux sample code.
The afxdp ebpf programs/maps are loaded when dpif-netdev is created,
and when users add a netdev with type="afxdp", ovs attaches the
ebpf program/map to the netdev, and initializes the af_xdp socket.

Signed-off-by: William Tu <u9012063@...>
---
lib/automake.mk | 3 +-
lib/dpif-netdev.c | 74 ++++-
lib/if_xdp.h | 79 ++++++
lib/netdev-dummy.c | 1 +
lib/netdev-linux.c | 741 +++++++++++++++++++++++++++++++++++++++++++++++++-
lib/netdev-provider.h | 2 +
lib/netdev-vport.c | 4 +
lib/netdev.c | 11 +
lib/netdev.h | 1 +
9 files changed, 907 insertions(+), 9 deletions(-)
create mode 100644 lib/if_xdp.h

diff --git a/lib/automake.mk b/lib/automake.mk
index 61fef23152d3..e0528f74989f 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -302,7 +302,8 @@ lib_libopenvswitch_la_SOURCES = \
lib/lldp/lldpd.c \
lib/lldp/lldpd.h \
lib/lldp/lldpd-structs.c \
- lib/lldp/lldpd-structs.h
+ lib/lldp/lldpd-structs.h \
+ lib/if_xdp.h

if WIN32
lib_libopenvswitch_la_SOURCES += \
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index baff020fe3d0..9f0300ac4e91 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -76,6 +76,11 @@
#include "unixctl.h"
#include "util.h"

+#include "bpf.h"
+#include "netdev.h"
+#include "openvswitch/thread.h"
+#include <bpf/bpf.h>
+
VLOG_DEFINE_THIS_MODULE(dpif_netdev);

#define FLOW_DUMP_MAX_BATCH 50
@@ -507,6 +512,12 @@ struct tx_port {
struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
};

+static struct dp_bpf {
+ struct bpf_state bpf;
+ struct netdev *outport; /* Used for downcall. */
+} bpf_datapath;
+
+
/* A set of properties for the current processing loop that is not directly
* associated with the pmd thread itself, but with the packets being
* processed or the short-term system configuration (for example, time).
@@ -1121,6 +1132,8 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
static int
dpif_netdev_init(void)
{
+ int error;
+ static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
static enum pmd_info_type show_aux = PMD_INFO_SHOW_STATS,
clear_aux = PMD_INFO_CLEAR_STATS,
poll_aux = PMD_INFO_SHOW_RXQ;
@@ -1137,6 +1150,17 @@ dpif_netdev_init(void)
unixctl_command_register("dpif-netdev/pmd-rxq-rebalance", "[dp]",
0, 1, dpif_netdev_pmd_rebalance,
NULL);
+
+ // load the bpf program
+ if (ovsthread_once_start(&once)) {
+ // we don't need downcall device here
+ error = bpf_get(&bpf_datapath.bpf, true);
+ if (error) {
+ VLOG_ERR("%s: Load BPF datapath failed", __func__);
+ }
+ }
+ ovsthread_once_done(&once);
+
return 0;
}

@@ -1504,7 +1528,26 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex);
ovs_mutex_unlock(&pmd->cond_mutex);
}
-
+/*
+static bool output_to_local_stack(struct netdev *netdev)
+{
+ return !strcmp(netdev_get_type(netdev), "tap");
+}
+*/
+static bool netdev_support_xdp(const char *devname)
+{
+ /*
+ struct netdev_linux *netdev_linux = netdev_linux_cast(netdev_linux);
+ if (netdev_linux->ifindex == 0)
+ return false;
+*/
+ if (!strstr(devname, "afxdp")) {
+ return false;
+ } else {
+ return true;
+ }
+}
+static int afxdp_idx;
static int
port_create(const char *devname, const char *type,
odp_port_t port_no, struct dp_netdev_port **portp)
@@ -1519,7 +1562,7 @@ port_create(const char *devname, const char *type,

/* Open and validate network device. */
error = netdev_open(devname, type, &netdev);
- VLOG_INFO("%s %s error %d", __func__, devname, error);
+ VLOG_INFO("%s %s type = %s error %d", __func__, devname, type, error);
if (error) {
return error;
}
@@ -1538,6 +1581,23 @@ port_create(const char *devname, const char *type,
goto out;
}

+ if (!strcmp(type, "afxdp")) {
+ // or a separate set_af_xdp?
+ // FIXME:
+ VLOG_INFO("using afxdp port idx %d", afxdp_idx);
+ error = netdev_set_xdp(netdev, &bpf_datapath.bpf.afxdp[afxdp_idx]);
+ if (error) {
+ VLOG_WARN("%s XDP set failed", __func__);
+ goto out;
+ }
+ error = netdev_set_xskmap(netdev, bpf_datapath.bpf.xsks_map[afxdp_idx].fd);
+ if (error) {
+ VLOG_ERR("%s XSK map set error\n", __func__);
+ goto out;
+ }
+ afxdp_idx++;
+ }
+
port = xzalloc(sizeof *port);
port->port_no = port_no;
port->netdev = netdev;
@@ -5008,8 +5068,11 @@ emc_processing(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_flow *flow;

if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
dp_packet_delete(packet);
n_dropped++;
+ VLOG_ERR_RL(&rl, "%s dropped packet size %d\n", __func__, dp_packet_size(packet));
continue;
}

@@ -5254,6 +5317,13 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
n_batches = 0;
emc_processing(pmd, packets, keys, batches, &n_batches,
md_is_valid, port_no);
+/*
+ if (dp_packet_batch_is_empty(packets)) {
+ VLOG_WARN("%s: batch is empty ", __func__);
+ } else {
+ VLOG_WARN("%s: batch is %lu ", __func__, packets->count);
+ }
+*/
if (!dp_packet_batch_is_empty(packets)) {
/* Get ingress port from first packet's metadata. */
in_port = packets->packets[0]->md.in_port.odp_port;
diff --git a/lib/if_xdp.h b/lib/if_xdp.h
new file mode 100644
index 000000000000..2a8c5780166f
--- /dev/null
+++ b/lib/if_xdp.h
@@ -0,0 +1,79 @@
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+/*
+ * if_xdp: XDP socket user-space interface
+ * Copyright(c) 2018 Intel Corporation.
+ *
+ * Author(s): Björn Töpel <bjorn.topel@...>
+ * Magnus Karlsson <magnus.karlsson@...>
+ */
+
+#ifndef _LINUX_IF_XDP_H
+#define _LINUX_IF_XDP_H
+
+#include <linux/types.h>
+#include <stdbool.h>
+
+/* Options for the sxdp_flags field */
+#define XDP_SHARED_UMEM (1 << 0)
+#define XDP_COPY (1 << 1) /* Force copy-mode */
+#define XDP_ZEROCOPY (1 << 2) /* Force zero-copy mode */
+
+struct sockaddr_xdp {
+ __u16 sxdp_family;
+ __u16 sxdp_flags;
+ __u32 sxdp_ifindex;
+ __u32 sxdp_queue_id;
+ __u32 sxdp_shared_umem_fd;
+};
+
+struct xdp_ring_offset {
+ __u64 producer;
+ __u64 consumer;
+ __u64 desc;
+};
+
+struct xdp_mmap_offsets {
+ struct xdp_ring_offset rx;
+ struct xdp_ring_offset tx;
+ struct xdp_ring_offset fr; /* Fill */
+ struct xdp_ring_offset cr; /* Completion */
+};
+
+/* XDP socket options */
+#define XDP_MMAP_OFFSETS 1
+#define XDP_RX_RING 2
+#define XDP_TX_RING 3
+#define XDP_UMEM_REG 4
+#define XDP_UMEM_FILL_RING 5
+#define XDP_UMEM_COMPLETION_RING 6
+#define XDP_STATISTICS 7
+
+struct xdp_umem_reg {
+ __u64 addr; /* Start of packet data area */
+ __u64 len; /* Length of packet data area */
+ __u32 chunk_size;
+ __u32 headroom;
+};
+
+struct xdp_statistics {
+ __u64 rx_dropped; /* Dropped for reasons other than invalid desc */
+ __u64 rx_invalid_descs; /* Dropped due to invalid descriptor */
+ __u64 tx_invalid_descs; /* Dropped due to invalid descriptor */
+};
+
+/* Pgoff for mmaping the rings */
+#define XDP_PGOFF_RX_RING 0
+#define XDP_PGOFF_TX_RING 0x80000000
+#define XDP_UMEM_PGOFF_FILL_RING 0x100000000ULL
+#define XDP_UMEM_PGOFF_COMPLETION_RING 0x180000000ULL
+
+/* Rx/Tx descriptor */
+struct xdp_desc {
+ __u64 addr;
+ __u32 len;
+ __u32 options;
+};
+
+/* UMEM descriptor is __u64 */
+
+#endif /* _LINUX_IF_XDP_H */
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 44c9458a9a22..c7a065ed7ba8 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1429,6 +1429,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
NULL, /* set_policing */ \
NULL, /* set_filter */ \
NULL, /* set_xdp */ \
+ NULL, /* set_xskmap */ \
NULL, /* get_qos_types */ \
NULL, /* get_qos_capabilities */ \
NULL, /* get_qos */ \
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 121dd3bc738e..6546ff88aee6 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -88,6 +88,519 @@ COVERAGE_DEFINE(netdev_set_hwaddr);
COVERAGE_DEFINE(netdev_get_ethtool);
COVERAGE_DEFINE(netdev_set_ethtool);

+#ifdef AFXDP_NETDEV
+// =========================================================
+#ifndef SOL_XDP
+#define SOL_XDP 283
+#endif
+
+#ifndef AF_XDP
+#define AF_XDP 44
+#endif
+
+#ifndef PF_XDP
+#define PF_XDP AF_XDP
+#endif
+
+#define NUM_FRAMES 128
+#define FRAME_HEADROOM 0
+#define FRAME_SIZE 2048
+#define NUM_DESCS 32
+
+#define FQ_NUM_DESCS 32
+#define CQ_NUM_DESCS 32
+
+#define DEBUG_HEXDUMP 0
+
+typedef __u32 u32;
+typedef uint64_t u64;
+
+#include "lib/xdpsock.h"
+static u32 opt_xdp_flags; // now alwyas set to SKB_MODE at bpf_set_link_xdp_fd
+static u32 opt_xdp_bind_flags;
+
+struct xdp_uqueue {
+ u32 cached_prod;
+ u32 cached_cons;
+ u32 mask;
+ u32 size;
+ u32 *producer;
+ u32 *consumer;
+ struct xdp_desc *ring;
+ void *map;
+};
+
+struct xdpsock {
+ struct xdp_uqueue rx;
+ struct xdp_uqueue tx;
+ int sfd;
+ struct xdp_umem *umem;
+ u32 outstanding_tx;
+ unsigned long rx_npkts;
+ unsigned long tx_npkts;
+ unsigned long prev_rx_npkts;
+ unsigned long prev_tx_npkts;
+};
+
+#define MAX_SOCKS 4
+
+#define barrier() __asm__ __volatile__("": : :"memory")
+#define u_smp_rmb() barrier()
+#define u_smp_wmb() barrier()
+#define likely(x) __builtin_expect(!!(x), 1)
+#define unlikely(x) __builtin_expect(!!(x), 0)
+
+static const char pkt_data[] =
+ "\x3c\xfd\xfe\x9e\x7f\x71\xec\xb1\xd7\x98\x3a\xc0\x08\x00\x45\x00"
+ "\x00\x2e\x00\x00\x00\x00\x40\x11\x88\x97\x05\x08\x07\x08\xc8\x14"
+ "\x1e\x04\x10\x92\x10\x92\x00\x1a\x6d\xa3\x34\x33\x1f\x69\x40\x6b"
+ "\x54\x59\xb6\x14\x2d\x11\x44\xbf\xaf\xd9\xbe\xaa";
+
+static inline u32 xq_nb_avail(struct xdp_uqueue *q, u32 ndescs)
+{
+ u32 entries = q->cached_prod - q->cached_cons;
+
+ if (entries == 0) {
+ q->cached_prod = *q->producer;
+ entries = q->cached_prod - q->cached_cons;
+ }
+
+ return (entries > ndescs) ? ndescs : entries;
+}
+
+static inline u32 umem_nb_free(struct xdp_umem_uqueue *q, u32 nb)
+{
+ u32 free_entries = q->cached_cons - q->cached_prod;
+ VLOG_INFO("0: %s cons %d prod %d\n", __func__, q->cached_cons, q->cached_prod);
+
+ if (free_entries >= nb)
+ return free_entries;
+
+ /* Refresh the local tail pointer */
+ q->cached_cons = (*q->consumer + q->size) & q->mask;
+
+ VLOG_INFO("%s cons %d prod %d\n", __func__, q->cached_cons, q->cached_prod);
+ VLOG_INFO("consumer %d, size %d\n", *q->consumer, q->size);
+
+ return q->cached_cons - q->cached_prod;
+}
+
+static inline int umem_fill_to_kernel_ex(struct xdp_umem_uqueue *fq,
+ struct xdp_desc *d,
+ size_t nb)
+{
+ u32 i;
+
+ VLOG_INFO("%s nb = %d", __func__, nb);
+ if (umem_nb_free(fq, nb) < nb) {
+ VLOG_ERR("%s error\n", __func__);
+ return -ENOSPC;
+ }
+
+ for (i = 0; i < nb; i++) {
+ u32 idx = fq->cached_prod++ & fq->mask;
+
+ fq->ring[idx] = d[i].addr;
+ }
+
+ u_smp_wmb();
+
+ *fq->producer = fq->cached_prod;
+
+ VLOG_INFO("%s producer at %d\n", __func__, *fq->producer);
+ return 0;
+}
+
+static inline int umem_fill_to_kernel(struct xdp_umem_uqueue *fq, uint64_t *d,
+ size_t nb)
+{
+ u32 i;
+
+ if (umem_nb_free(fq, nb) < nb) {
+ VLOG_ERR("%s error\n", __func__);
+ return -ENOSPC;
+ }
+
+ for (i = 0; i < nb; i++) {
+ u32 idx = fq->cached_prod++ & fq->mask;
+
+ fq->ring[idx] = d[i];
+ }
+
+ u_smp_wmb();
+
+ *fq->producer = fq->cached_prod;
+
+ VLOG_INFO("%s producer at %d\n", __func__, *fq->producer);
+ return 0;
+}
+
+static inline u32 umem_nb_avail(struct xdp_umem_uqueue *q, u32 nb)
+{
+ u32 entries = q->cached_prod - q->cached_cons;
+
+ if (entries == 0) {
+ q->cached_prod = *q->producer;
+ entries = q->cached_prod - q->cached_cons;
+ }
+
+ return (entries > nb) ? nb : entries;
+}
+
+static inline size_t umem_complete_from_kernel(struct xdp_umem_uqueue *cq,
+ uint64_t *d, size_t nb)
+{
+ u32 idx, i, entries = umem_nb_avail(cq, nb);
+
+ u_smp_rmb();
+
+ for (i = 0; i < entries; i++) {
+ idx = cq->cached_cons++ & cq->mask;
+ d[i] = cq->ring[idx];
+ }
+
+ if (entries > 0) {
+ u_smp_wmb();
+
+ *cq->consumer = cq->cached_cons;
+ }
+
+ return entries;
+}
+
+static struct xdp_umem *xdp_umem_configure(int sfd)
+{
+ int fq_size = FQ_NUM_DESCS, cq_size = CQ_NUM_DESCS;
+ struct xdp_mmap_offsets off;
+ struct xdp_umem_reg mr;
+ struct xdp_umem *umem;
+ socklen_t optlen;
+ void *bufs;
+
+ umem = calloc(1, sizeof(*umem));
+ ovs_assert(umem);
+
+ VLOG_DBG("enter: %s \n", __func__);
+ ovs_assert(posix_memalign(&bufs, getpagesize(), /* PAGE_SIZE aligned */
+ NUM_FRAMES * FRAME_SIZE) == 0);
+
+ VLOG_INFO("%s shared umem from %p to %p", __func__,
+ bufs, (char*)bufs + NUM_FRAMES * FRAME_SIZE);
+
+ mr.addr = (__u64)bufs;
+ mr.len = NUM_FRAMES * FRAME_SIZE;
+ mr.chunk_size = FRAME_SIZE;
+ mr.headroom = FRAME_HEADROOM;
+
+ ovs_assert(setsockopt(sfd, SOL_XDP, XDP_UMEM_REG, &mr, sizeof(mr)) == 0);
+ ovs_assert(setsockopt(sfd, SOL_XDP, XDP_UMEM_FILL_RING, &fq_size,
+ sizeof(int)) == 0);
+ ovs_assert(setsockopt(sfd, SOL_XDP, XDP_UMEM_COMPLETION_RING, &cq_size,
+ sizeof(int)) == 0);
+
+ optlen = sizeof(off);
+ ovs_assert(getsockopt(sfd, SOL_XDP, XDP_MMAP_OFFSETS, &off,
+ &optlen) == 0);
+
+ umem->fq.map = mmap(0, off.fr.desc +
+ FQ_NUM_DESCS * sizeof(u64),
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_POPULATE, sfd,
+ XDP_UMEM_PGOFF_FILL_RING);
+ ovs_assert(umem->fq.map != MAP_FAILED);
+
+ umem->fq.mask = FQ_NUM_DESCS - 1;
+ umem->fq.size = FQ_NUM_DESCS;
+ umem->fq.producer = (void *)((char *)umem->fq.map + off.fr.producer);
+ umem->fq.consumer = (void *)((char *)umem->fq.map + off.fr.consumer);
+ umem->fq.ring = (void *)((char *)umem->fq.map + off.fr.desc);
+ umem->fq.cached_cons = FQ_NUM_DESCS;
+
+ umem->cq.map = mmap(0, off.cr.desc +
+ CQ_NUM_DESCS * sizeof(u64),
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_POPULATE, sfd,
+ XDP_UMEM_PGOFF_COMPLETION_RING);
+ ovs_assert(umem->cq.map != MAP_FAILED);
+
+ umem->cq.mask = CQ_NUM_DESCS - 1;
+ umem->cq.size = CQ_NUM_DESCS;
+ umem->cq.producer = umem->cq.map + off.cr.producer;
+ umem->cq.consumer = umem->cq.map + off.cr.consumer;
+ umem->cq.ring = umem->cq.map + off.cr.desc;
+
+ umem->frames = bufs;
+ umem->fd = sfd;
+
+#if 0
+ if (opt_bench == BENCH_TXONLY) {
+ int i;
+
+ for (i = 0; i < NUM_FRAMES; i++)
+ (void)gen_eth_frame(&umem->frames[i][0]);
+ }
+#endif
+ return umem;
+}
+
+static struct xdpsock *xsk_configure(struct xdp_umem *umem,
+ int ifindex, int queue)
+{
+ struct sockaddr_xdp sxdp = {};
+ struct xdp_mmap_offsets off;
+ int sfd, ndescs = NUM_DESCS;
+ struct xdpsock *xsk;
+ bool shared = false;
+ socklen_t optlen;
+ u64 i;
+
+ opt_xdp_flags |= XDP_FLAGS_SKB_MODE;
+ opt_xdp_bind_flags |= XDP_COPY;
+
+ sfd = socket(PF_XDP, SOCK_RAW, 0);
+ ovs_assert(sfd >= 0);
+
+ xsk = calloc(1, sizeof(*xsk));
+ ovs_assert(xsk);
+
+ xsk->sfd = sfd;
+ xsk->outstanding_tx = 0;
+
+ VLOG_DBG("enter: %s xsk fd %d", __func__, sfd);
+ if (!umem) {
+ shared = false;
+ xsk->umem = xdp_umem_configure(sfd);
+ } else {
+ xsk->umem = umem;
+ ovs_assert(0);
+ }
+
+ ovs_assert(setsockopt(sfd, SOL_XDP, XDP_RX_RING,
+ &ndescs, sizeof(int)) == 0);
+ ovs_assert(setsockopt(sfd, SOL_XDP, XDP_TX_RING,
+ &ndescs, sizeof(int)) == 0);
+ optlen = sizeof(off);
+ ovs_assert(getsockopt(sfd, SOL_XDP, XDP_MMAP_OFFSETS, &off,
+ &optlen) == 0);
+
+ /* Rx */
+ xsk->rx.map = mmap(NULL,
+ off.rx.desc +
+ NUM_DESCS * sizeof(struct xdp_desc),
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_POPULATE, sfd,
+ XDP_PGOFF_RX_RING);
+ ovs_assert(xsk->rx.map != MAP_FAILED);
+
+ if (!shared) {
+ for (i = 0; i < NUM_DESCS * FRAME_SIZE; i += FRAME_SIZE)
+ ovs_assert(umem_fill_to_kernel(&xsk->umem->fq, &i, 1)
+ == 0);
+ }
+
+ // FIXME: we also configure tx here
+ /* Tx */
+ xsk->tx.map = mmap(NULL,
+ off.tx.desc +
+ NUM_DESCS * sizeof(struct xdp_desc),
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_POPULATE, sfd,
+ XDP_PGOFF_TX_RING);
+ ovs_assert(xsk->tx.map != MAP_FAILED);
+
+ xsk->rx.mask = NUM_DESCS - 1;
+ xsk->rx.size = NUM_DESCS;
+ xsk->rx.producer = xsk->rx.map + off.rx.producer;
+ xsk->rx.consumer = xsk->rx.map + off.rx.consumer;
+ xsk->rx.ring = xsk->rx.map + off.rx.desc;
+
+ xsk->tx.mask = NUM_DESCS - 1;
+ xsk->tx.size = NUM_DESCS;
+ xsk->tx.producer = xsk->tx.map + off.tx.producer;
+ xsk->tx.consumer = xsk->tx.map + off.tx.consumer;
+ xsk->tx.ring = xsk->tx.map + off.tx.desc;
+ xsk->tx.cached_cons = NUM_DESCS;
+
+ /* XSK socket */
+ sxdp.sxdp_family = PF_XDP;
+ sxdp.sxdp_ifindex = ifindex;
+ sxdp.sxdp_queue_id = queue;
+
+ if (shared) {
+ sxdp.sxdp_flags = XDP_SHARED_UMEM;
+ sxdp.sxdp_shared_umem_fd = umem->fd;
+ } else {
+ sxdp.sxdp_flags = opt_xdp_bind_flags;
+ }
+
+ ovs_assert(bind(sfd, (struct sockaddr *)&sxdp, sizeof(sxdp)) == 0);
+
+ return xsk;
+}
+
+static inline int xq_deq(struct xdp_uqueue *uq,
+ struct xdp_desc *descs,
+ int ndescs)
+{
+ struct xdp_desc *r = uq->ring;
+ unsigned int idx;
+ int i, entries;
+
+ entries = xq_nb_avail(uq, ndescs);
+
+ u_smp_rmb();
+
+ for (i = 0; i < entries; i++) {
+ idx = uq->cached_cons++ & uq->mask;
+ descs[i] = r[idx];
+ }
+
+ if (entries > 0) {
+ u_smp_wmb();
+
+ *uq->consumer = uq->cached_cons;
+ VLOG_INFO("%s entries %d consumer %d\n", __func__, entries, *uq->consumer);
+ }
+ return entries;
+}
+
+static inline void *xq_get_data(struct xdpsock *xsk, u64 addr)
+{
+ return &xsk->umem->frames[addr];
+}
+
+static void vlog_hex_dump(const void *buf, size_t count)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ ds_put_hex_dump(&ds, buf, count, 0, false);
+ VLOG_INFO("\n%s", ds_cstr(&ds));
+ ds_destroy(&ds);
+}
+
+static void kick_tx(int fd)
+{
+ int ret;
+
+ VLOG_DBG("%s: send to fd %d", __func__, fd);
+ ret = sendto(fd, NULL, 0, MSG_DONTWAIT, NULL, 0);
+ if (ret >= 0 || errno == ENOBUFS || errno == EAGAIN)
+ return;
+ ovs_assert(0);
+}
+
+static inline void complete_tx_l2fwd(struct xdpsock *xsk)
+{
+ u64 descs[BATCH_SIZE];
+ unsigned int rcvd;
+ size_t ndescs;
+
+ if (!xsk->outstanding_tx)
+ return;
+
+ kick_tx(xsk->sfd);
+ ndescs = (xsk->outstanding_tx > BATCH_SIZE) ? BATCH_SIZE :
+ xsk->outstanding_tx;
+
+ /* re-add completed Tx buffers */
+ rcvd = umem_complete_from_kernel(&xsk->umem->cq, descs, ndescs);
+
+ if (rcvd > 0) {
+ umem_fill_to_kernel(&xsk->umem->fq, descs, rcvd);
+ xsk->outstanding_tx -= rcvd;
+ xsk->tx_npkts += rcvd;
+ }
+}
+
+static inline void complete_tx_only(struct xdpsock *xsk)
+{
+ u64 descs[BATCH_SIZE];
+ unsigned int rcvd;
+
+ if (!xsk->outstanding_tx) {
+ VLOG_DBG("no outstanding_tx");
+ return;
+ }
+
+ kick_tx(xsk->sfd);
+
+ rcvd = umem_complete_from_kernel(&xsk->umem->cq, descs, BATCH_SIZE);
+ if (rcvd > 0) {
+ xsk->outstanding_tx -= rcvd;
+ xsk->tx_npkts += rcvd;
+ }
+}
+
+static inline u32 xq_nb_free(struct xdp_uqueue *q, u32 ndescs)
+{
+ u32 free_entries = q->cached_cons - q->cached_prod;
+
+ if (free_entries >= ndescs)
+ return free_entries;
+
+ /* Refresh the local tail pointer */
+ q->cached_cons = *q->consumer + q->size;
+ return q->cached_cons - q->cached_prod;
+}
+
+static inline int xq_enq(struct xdp_uqueue *uq,
+ const struct xdp_desc *descs,
+ unsigned int ndescs)
+{
+ struct xdp_desc *r = uq->ring;
+ unsigned int i;
+
+ if (xq_nb_free(uq, ndescs) < ndescs)
+ return -ENOSPC;
+
+ for (i = 0; i < ndescs; i++) {
+ u32 idx = uq->cached_prod++ & uq->mask;
+
+ r[idx].addr = descs[i].addr;
+ r[idx].len = descs[i].len;
+ }
+
+ u_smp_wmb();
+
+ *uq->producer = uq->cached_prod;
+ return 0;
+}
+
+static inline int xq_enq_tx_only(struct xdp_uqueue *uq,
+ unsigned int id, unsigned int ndescs)
+{
+ struct xdp_desc *r = uq->ring;
+ unsigned int i;
+
+ if (xq_nb_free(uq, ndescs) < ndescs)
+ return -ENOSPC;
+
+ for (i = 0; i < ndescs; i++) {
+ u32 idx = uq->cached_prod++ & uq->mask;
+
+ r[idx].addr = (id + i) << FRAME_SHIFT;
+ r[idx].len = sizeof(pkt_data) - 1;
+ }
+
+ u_smp_wmb();
+
+ *uq->producer = uq->cached_prod;
+ return 0;
+}
+
+static inline void print_xsk_stat(struct xdpsock *xsk) {
+ struct xdp_statistics stat;
+ socklen_t optlen;
+
+ optlen = sizeof(stat);
+ ovs_assert(getsockopt(xsk->sfd, SOL_XDP, XDP_STATISTICS,
+ &stat, &optlen) == 0);
+
+ VLOG_INFO("rx dropped %llu, rx_invalid %llu, tx_invalid %llu",
+ stat.rx_dropped, stat.rx_invalid_descs, stat.tx_invalid_descs);
+
+}
+// =========================================================
+#endif

/* These were introduced in Linux 2.6.14, so they might be missing if we have
* old headers. */
@@ -522,6 +1035,8 @@ struct netdev_linux {
int tap_fd;
bool present; /* If the device is present in the namespace */
uint64_t tx_dropped; /* tap device can drop if the iface is down */
+ struct xdpsock *xsk[16]; /* af_xdp socket: each queue has one xdp sock */
+ int xskmap_fd; /* map netdev's queue id to xsk fd */
};

struct netdev_rxq_linux {
@@ -571,6 +1086,12 @@ is_netdev_linux_class(const struct netdev_class *netdev_class)
}

static bool
+is_afxdp_netdev(const struct netdev *netdev)
+{
+ return netdev_get_class(netdev) == &netdev_afxdp_class;
+}
+
+static bool
is_tap_netdev(const struct netdev *netdev)
{
return netdev_get_class(netdev) == &netdev_tap_class;
@@ -921,6 +1442,13 @@ netdev_linux_destruct(struct netdev *netdev_)
atomic_count_dec(&miimon_cnt);
}

+ if (is_afxdp_netdev(netdev_)) {
+ int ifindex;
+
+ get_ifindex(netdev_, &ifindex);
+ bpf_set_link_xdp_fd(ifindex, -1, XDP_FLAGS_SKB_MODE);
+ }
+
ovs_mutex_destroy(&netdev->mutex);
}

@@ -950,6 +1478,44 @@ netdev_linux_rxq_construct(struct netdev_rxq *rxq_)
rx->is_tap = is_tap_netdev(netdev_);
if (rx->is_tap) {
rx->fd = netdev->tap_fd;
+ } else if (is_afxdp_netdev(netdev_)) {
+ // setup AF_XDP socket here, see xsk_configure
+ struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
+ int ifindex, num_socks = 0;
+ struct xdpsock *xsk;
+ int queue_id = 0; // FIXME
+ int key = 0;
+ int xsk_fd;
+
+ if (setrlimit(RLIMIT_MEMLOCK, &r)) {
+ VLOG_ERR("ERROR: setrlimit(RLIMIT_MEMLOCK) \"%s\"\n",
+ ovs_strerror(errno));
+ ovs_assert(0);
+ }
+
+ VLOG_INFO("%s: %s: queue=%d configuring xdp sock",
+ __func__, netdev_->name, queue_id);
+
+ /* Get ethernet device index. */
+ error = get_ifindex(&netdev->up, &ifindex);
+ if (error) {
+ goto error;
+ }
+
+ xsk = xsk_configure(NULL, ifindex, queue_id);
+
+ netdev->xsk[num_socks++] = xsk;
+ rx->fd = xsk->sfd; //for upper layer to poll
+ xsk_fd = xsk->sfd;
+
+ if (xsk_fd) {
+ error = bpf_map_update_elem(netdev->xskmap_fd, &key, &xsk_fd, 0);
+ if (error) {
+ VLOG_ERR("failed to set xsks_map: %s", ovs_strerror(error));
+ return error;
+ }
+ }
+
} else {
struct sockaddr_ll sll;
int ifindex, val;
@@ -1149,6 +1715,58 @@ netdev_linux_rxq_recv_tap(int fd, struct dp_packet *buffer)
return 0;
}

+/* Receive packet from AF_XDP socket */
+static int
+netdev_linux_rxq_xsk(struct xdpsock *xsk,
+ struct dp_packet_batch *batch)
+{
+ struct xdp_desc descs[NETDEV_MAX_BURST];
+ unsigned int rcvd, i = 0;
+ int ret = 0;
+
+ rcvd = xq_deq(&xsk->rx, descs, NETDEV_MAX_BURST);
+ if (rcvd == 0) {
+ return 0;
+ }
+
+ VLOG_INFO("%s receive %d packets xsk fd %d",
+ __func__, rcvd, xsk->sfd);
+
+ for (i = 0; i < rcvd; i++) {
+ struct dp_packet *packet;
+ void *base, *new_packet;
+
+ packet = xmalloc(sizeof *packet);
+
+ VLOG_INFO("%s packet len %d", __func__, descs[i].len);
+ base = xq_get_data(xsk, descs[i].addr);
+
+ //vlog_hex_dump(base, 14);
+ new_packet = malloc(2048);
+ memcpy(new_packet, base, descs[i].len);
+
+ //dp_packet_use(packet, base, descs[i].len);
+ dp_packet_use(packet, new_packet, descs[i].len);
+
+ packet->source = DPBUF_MALLOC;
+ //dp_packet_set_data(packet, base); // no offset now?
+ dp_packet_set_data(packet, new_packet); // no offset now?
+ dp_packet_set_size(packet, descs[i].len);
+
+ /* add packet into batch, batch->count inc */
+ dp_packet_batch_add(batch, packet);
+ }
+
+ xsk->rx_npkts += rcvd;
+ umem_fill_to_kernel_ex(&xsk->umem->fq, descs, rcvd);
+
+ //batch->count = rcvd; // batch_add inc the counter
+ //don't put it back to FILL queue yet.
+
+ print_xsk_stat(xsk);
+ return ret;
+}
+
static int
netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch *batch)
{
@@ -1157,6 +1775,8 @@ netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch *batch)
struct dp_packet *buffer;
ssize_t retval;
int mtu;
+ struct netdev_linux *netdev_ = netdev_linux_cast(netdev);
+

if (netdev_linux_get_mtu__(netdev_linux_cast(netdev), &mtu)) {
mtu = ETH_PAYLOAD_MAX;
@@ -1166,15 +1786,20 @@ netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch *batch)
buffer = dp_packet_new_with_headroom(VLAN_ETH_HEADER_LEN + mtu,
DP_NETDEV_HEADROOM);
retval = (rx->is_tap
- ? netdev_linux_rxq_recv_tap(rx->fd, buffer)
- : netdev_linux_rxq_recv_sock(rx->fd, buffer));
-
+ ? netdev_linux_rxq_recv_tap(rx->fd, buffer) :
+ (is_afxdp_netdev(netdev) ? netdev_linux_rxq_xsk(netdev_->xsk[0], batch) :
+ netdev_linux_rxq_recv_sock(rx->fd, buffer)));
if (retval) {
if (retval != EAGAIN && retval != EMSGSIZE) {
VLOG_WARN_RL(&rl, "error receiving Ethernet packet on %s: %s",
netdev_rxq_get_name(rxq_), ovs_strerror(errno));
}
dp_packet_delete(buffer);
+ } else if (is_afxdp_netdev(netdev)) {
+ dp_packet_batch_init_packet_fields(batch);
+
+ if (batch->count != 0)
+ VLOG_INFO("%s AFXDP recv %lu packets", __func__, batch->count);
} else {
dp_packet_batch_init_packet(batch, buffer);
}
@@ -1208,6 +1833,66 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_)
}

static int
+netdev_linux_afxdp_batch_send(struct xdpsock *xsk, /* send to xdp socket! */
+ struct dp_packet_batch *batch)
+{
+ struct dp_packet *packet;
+ struct xdp_uqueue *uq;
+ struct xdp_desc *r;
+ int ndescs = batch->count;
+ u32 id = NUM_FRAMES / 2;
+
+ VLOG_INFO("%s send %lu packet to fd %d", __func__, batch->count, xsk->sfd);
+ VLOG_INFO("%s outstanding tx %d", __func__, xsk->outstanding_tx);
+
+ /* cleanup and refill */
+ uq = &xsk->tx;
+ r = uq->ring;
+
+ // see tx_only and xq_enq_tx_only
+ if (xq_nb_free(uq, ndescs) < ndescs) {
+ VLOG_ERR("no free desc");
+ return -ENOSPC;
+ }
+
+ DP_PACKET_BATCH_FOR_EACH (packet, batch) {
+ void *umem_buf;
+
+ u32 idx = uq->cached_prod++ & uq->mask;
+ // FIXME: find available id
+ umem_buf = xsk->umem->frames + (id << FRAME_SHIFT);
+
+ memcpy(umem_buf, dp_packet_data(packet), dp_packet_size(packet));
+ //vlog_hex_dump(dp_packet_data(packet), 14);
+ r[idx].addr = (id << FRAME_SHIFT);
+ r[idx].len = dp_packet_size(packet);
+ id++;
+#if 0 /* avoid copy */
+ } else {
+ u32 idx = uq->cached_prod++ & uq->mask;
+
+ VLOG_WARN("packet from umem %p", dp_packet_base(packet));
+ vlog_hex_dump(dp_packet_base(packet), 14);
+
+ r[idx].addr = (u64)(u64 *)dp_packet_base(packet);
+ r[idx].len = dp_packet_size(packet);
+ }
+#endif
+ }
+ u_smp_wmb();
+
+ *uq->producer = uq->cached_prod;
+
+ xsk->outstanding_tx += batch->count;
+
+ complete_tx_only(xsk);
+ print_xsk_stat(xsk);
+
+ return 0;
+}
+
+
+static int
netdev_linux_sock_batch_send(int sock, int ifindex,
struct dp_packet_batch *batch)
{
@@ -1312,21 +1997,32 @@ netdev_linux_send(struct netdev *netdev_, int qid OVS_UNUSED,
int error = 0;
int sock = 0;

- if (!is_tap_netdev(netdev_)) {
+ if (!is_tap_netdev(netdev_) &&
+ !is_afxdp_netdev(netdev_)) {
sock = af_packet_sock();
if (sock < 0) {
error = -sock;
+ VLOG_WARN("%s af sock < 0", __func__);
goto free_batch;
}

int ifindex = netdev_get_ifindex(netdev_);
if (ifindex < 0) {
+ VLOG_WARN("%s ifindex < 0", __func__);
error = -ifindex;
goto free_batch;
}

error = netdev_linux_sock_batch_send(sock, ifindex, batch);
+ } else if (is_afxdp_netdev(netdev_)) {
+ struct xdpsock *xsk;
+ struct netdev_linux *netdev = netdev_linux_cast(netdev_);
+
+ xsk = netdev->xsk[0]; // FIXME: always use queue 0
+ VLOG_INFO_RL(&rl, "XXX %s sent to AFXDP dev xsk %d", __func__, xsk->sfd);
+ error = netdev_linux_afxdp_batch_send(xsk, batch);
} else {
+ VLOG_INFO_RL(&rl, "%s sent to tap dev", __func__);
error = netdev_linux_tap_batch_send(netdev_, batch);
}
if (error) {
@@ -2426,12 +3122,22 @@ netdev_linux_set_xdp__(struct netdev *netdev_, const struct bpf_prog *prog,
{
struct netdev_linux *netdev = netdev_linux_cast(netdev_);
const char *netdev_name = netdev_get_name(netdev_);
- int ifindex = netdev->ifindex;
+ int ifindex;
int error;

- VLOG_DBG("Setting %s XDP filter %d on %s (ifindex %d)", prog->name,
+ error = get_ifindex(netdev_, &ifindex);
+ if (error) {
+ return ENODEV;
+ }
+
+
+ VLOG_INFO("Setting %s XDP filter %d on %s (ifindex %d)", prog->name,
prog->fd, netdev_name, ifindex);

+ if (ifindex == 0) {
+ VLOG_WARN("skip device %s", netdev_name);
+ return 0;
+ }
if (netdev->cache_valid & valid_bit) {
error = *filter_error;
if (error || (prog && prog->fd == *netdev_filter)) {
@@ -2456,6 +3162,19 @@ out:
}

static int
+netdev_linux_set_xskmap(struct netdev *netdev_, int xskmap_fd)
+{
+ struct netdev_linux *netdev = netdev_linux_cast(netdev_);
+
+ ovs_assert(xskmap_fd != 0);
+
+ VLOG_INFO("%s xsks_map fd %d", __func__, xskmap_fd);
+ netdev->xskmap_fd = xskmap_fd;
+
+ return 0;
+}
+
+static int
netdev_linux_set_xdp(struct netdev *netdev_, const struct bpf_prog *prog)
{
struct netdev_linux *netdev = netdev_linux_cast(netdev_);
@@ -3167,6 +3886,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
netdev_linux_set_policing, \
netdev_linux_set_filter, \
netdev_linux_set_xdp, \
+ netdev_linux_set_xskmap, \
netdev_linux_get_qos_types, \
netdev_linux_get_qos_capabilities, \
netdev_linux_get_qos, \
@@ -3201,6 +3921,15 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
FLOW_OFFLOAD_API \
}

+const struct netdev_class netdev_afxdp_class =
+ NETDEV_LINUX_CLASS(
+ "afxdp",
+ netdev_linux_construct,
+ netdev_linux_get_stats,
+ netdev_linux_get_features,
+ netdev_linux_get_status,
+ LINUX_FLOW_OFFLOAD_API);
+
const struct netdev_class netdev_linux_class =
NETDEV_LINUX_CLASS(
"system",
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 3e53a5b76272..df92275d5aff 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -515,6 +515,7 @@ struct netdev_class {
*
* This function may be set to null if filters are not supported. */
int (*set_xdp)(struct netdev *netdev, const struct bpf_prog *);
+ int (*set_xskmap)(struct netdev *netdev, int xsks_map_fd);

/* Adds to 'types' all of the forms of QoS supported by 'netdev', or leaves
* it empty if 'netdev' does not support QoS. Any names added to 'types'
@@ -884,6 +885,7 @@ extern const struct netdev_class netdev_bsd_class;
extern const struct netdev_class netdev_windows_class;
#else
extern const struct netdev_class netdev_linux_class;
+extern const struct netdev_class netdev_afxdp_class;
#endif
extern const struct netdev_class netdev_internal_class;
extern const struct netdev_class netdev_tap_class;
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index 4341c89894a3..a61ff4b6808c 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -1000,6 +1000,9 @@ netdev_vport_set_xdp(struct netdev *netdev_, const struct bpf_prog *prog)
ifindex = netdev_vport_get_ifindex(netdev_);
error = bpf_set_link_xdp_fd(ifindex, prog->fd,
XDP_FLAGS_SKB_MODE);
+ // FIXME / TODO
+ // update xsks_map_fd
+
ovs_mutex_unlock(&netdev->mutex);

VLOG_INFO("%s %d", __func__, error);
@@ -1057,6 +1060,7 @@ netdev_vport_set_xdp(struct netdev *netdev_, const struct bpf_prog *prog)
NULL, /* set_policing */ \
netdev_vport_set_filter, /* set_filter */ \
netdev_vport_set_xdp, /* set_xdp */ \
+ NULL, /* set_xskmap */ \
NULL, /* get_qos_types */ \
NULL, /* get_qos_capabilities */ \
NULL, /* get_qos */ \
diff --git a/lib/netdev.c b/lib/netdev.c
index c44a1a683b92..826555dd92f6 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -142,6 +142,7 @@ netdev_initialize(void)

#ifdef __linux__
netdev_register_provider(&netdev_linux_class);
+ netdev_register_provider(&netdev_afxdp_class);
netdev_register_provider(&netdev_internal_class);
netdev_register_provider(&netdev_tap_class);
netdev_vport_tunnel_register();
@@ -1474,6 +1475,16 @@ netdev_set_xdp(struct netdev *netdev, struct bpf_prog *prog)
: EOPNOTSUPP);
}

+/* set xsk map */
+int
+netdev_set_xskmap(struct netdev *netdev, int xskmap)
+{
+ return (netdev->netdev_class->set_xskmap
+ ? netdev->netdev_class->set_xskmap(netdev, xskmap)
+ : EOPNOTSUPP);
+}
+
+
/* Adds to 'types' all of the forms of QoS supported by 'netdev', or leaves it
* empty if 'netdev' does not support QoS. Any names added to 'types' should
* be documented as valid for the "type" column in the "QoS" table in
diff --git a/lib/netdev.h b/lib/netdev.h
index 3388504d85c9..3a8d7118378e 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -320,6 +320,7 @@ int netdev_set_policing(struct netdev *, uint32_t kbits_rate,
uint32_t kbits_burst);
int netdev_set_filter(struct netdev *netdev, struct bpf_prog *prog);
int netdev_set_xdp(struct netdev *netdev, struct bpf_prog *prog);
+int netdev_set_xskmap(struct netdev *netdev, int xsks_map_fd);

int netdev_get_qos_types(const struct netdev *, struct sset *types);
int netdev_get_qos_capabilities(const struct netdev *,
--
2.7.4