From f7a7a5c228d45efc45d6e26a199a3ea13d2f8754 Mon Sep 17 00:00:00 2001 From: Jack Wang Date: Mon, 11 May 2020 15:51:23 +0200 Subject: [PATCH] block/rnbd: client: main functionality This is main functionality of rnbd-client module, which provides interface to map remote device as local block device /dev/rnbd and feeds RTRS with IO requests. Link: https://lore.kernel.org/r/20200511135131.27580-18-danil.kipnis@cloud.ionos.com Signed-off-by: Danil Kipnis Signed-off-by: Jack Wang Reviewed-by: Bart Van Assche Acked-by: Jens Axboe Signed-off-by: Jason Gunthorpe --- drivers/block/rnbd/rnbd-clt.c | 1729 +++++++++++++++++++++++++++++++++ 1 file changed, 1729 insertions(+) create mode 100644 drivers/block/rnbd/rnbd-clt.c diff --git a/drivers/block/rnbd/rnbd-clt.c b/drivers/block/rnbd/rnbd-clt.c new file mode 100644 index 0000000000000..55bff3b1be715 --- /dev/null +++ b/drivers/block/rnbd/rnbd-clt.c @@ -0,0 +1,1729 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * RDMA Network Block Driver + * + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. + */ + +#undef pr_fmt +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt + +#include +#include +#include +#include +#include + +#include "rnbd-clt.h" + +MODULE_DESCRIPTION("RDMA Network Block Device Client"); +MODULE_LICENSE("GPL"); + +static int rnbd_client_major; +static DEFINE_IDA(index_ida); +static DEFINE_MUTEX(ida_lock); +static DEFINE_MUTEX(sess_lock); +static LIST_HEAD(sess_list); + +/* + * Maximum number of partitions an instance can have. + * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) + */ +#define RNBD_PART_BITS 6 + +static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) +{ + return refcount_inc_not_zero(&sess->refcount); +} + +static void free_sess(struct rnbd_clt_session *sess); + +static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) +{ + might_sleep(); + + if (refcount_dec_and_test(&sess->refcount)) + free_sess(sess); +} + +static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) +{ + might_sleep(); + + if (!refcount_dec_and_test(&dev->refcount)) + return; + + mutex_lock(&ida_lock); + ida_simple_remove(&index_ida, dev->clt_device_id); + mutex_unlock(&ida_lock); + kfree(dev->hw_queues); + rnbd_clt_put_sess(dev->sess); + mutex_destroy(&dev->lock); + kfree(dev); +} + +static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) +{ + return refcount_inc_not_zero(&dev->refcount); +} + +static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev, + const struct rnbd_msg_open_rsp *rsp) +{ + struct rnbd_clt_session *sess = dev->sess; + + if (!rsp->logical_block_size) + return -EINVAL; + + dev->device_id = le32_to_cpu(rsp->device_id); + dev->nsectors = le64_to_cpu(rsp->nsectors); + dev->logical_block_size = le16_to_cpu(rsp->logical_block_size); + dev->physical_block_size = le16_to_cpu(rsp->physical_block_size); + dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors); + dev->max_discard_sectors = le32_to_cpu(rsp->max_discard_sectors); + dev->discard_granularity = le32_to_cpu(rsp->discard_granularity); + dev->discard_alignment = le32_to_cpu(rsp->discard_alignment); + dev->secure_discard = le16_to_cpu(rsp->secure_discard); + dev->rotational = rsp->rotational; + + dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE; + dev->max_segments = BMAX_SEGMENTS; + + dev->max_hw_sectors = min_t(u32, dev->max_hw_sectors, + le32_to_cpu(rsp->max_hw_sectors)); + dev->max_segments = min_t(u16, dev->max_segments, + le16_to_cpu(rsp->max_segments)); + + return 0; +} + +static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, + size_t new_nsectors) +{ + int err = 0; + + rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n", + dev->nsectors, new_nsectors); + dev->nsectors = new_nsectors; + set_capacity(dev->gd, dev->nsectors); + err = revalidate_disk(dev->gd); + if (err) + rnbd_clt_err(dev, + "Failed to change device size from %zu to %zu, err: %d\n", + dev->nsectors, new_nsectors, err); + return err; +} + +static int process_msg_open_rsp(struct rnbd_clt_dev *dev, + struct rnbd_msg_open_rsp *rsp) +{ + int err = 0; + + mutex_lock(&dev->lock); + if (dev->dev_state == DEV_STATE_UNMAPPED) { + rnbd_clt_info(dev, + "Ignoring Open-Response message from server for unmapped device\n"); + err = -ENOENT; + goto out; + } + if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { + u64 nsectors = le64_to_cpu(rsp->nsectors); + + /* + * If the device was remapped and the size changed in the + * meantime we need to revalidate it + */ + if (dev->nsectors != nsectors) + rnbd_clt_change_capacity(dev, nsectors); + rnbd_clt_info(dev, "Device online, device remapped successfully\n"); + } + err = rnbd_clt_set_dev_attr(dev, rsp); + if (err) + goto out; + dev->dev_state = DEV_STATE_MAPPED; + +out: + mutex_unlock(&dev->lock); + + return err; +} + +int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize) +{ + int ret = 0; + + mutex_lock(&dev->lock); + if (dev->dev_state != DEV_STATE_MAPPED) { + pr_err("Failed to set new size of the device, device is not opened\n"); + ret = -ENOENT; + goto out; + } + ret = rnbd_clt_change_capacity(dev, newsize); + +out: + mutex_unlock(&dev->lock); + + return ret; +} + +static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) +{ + if (WARN_ON(!q->hctx)) + return; + + /* We can come here from interrupt, thus async=true */ + blk_mq_run_hw_queue(q->hctx, true); +} + +enum { + RNBD_DELAY_IFBUSY = -1, +}; + +/** + * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun + * @sess: Session to find a queue for + * @cpu: Cpu to start the search from + * + * Description: + * Each CPU has a list of HW queues, which needs to be rerun. If a list + * is not empty - it is marked with a bit. This function finds first + * set bit in a bitmap and returns corresponding CPU list. + */ +static struct rnbd_cpu_qlist * +rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) +{ + int bit; + + /* Search from cpu to nr_cpu_ids */ + bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); + if (bit < nr_cpu_ids) { + return per_cpu_ptr(sess->cpu_queues, bit); + } else if (cpu != 0) { + /* Search from 0 to cpu */ + bit = find_next_bit(sess->cpu_queues_bm, cpu, 0); + if (bit < cpu) + return per_cpu_ptr(sess->cpu_queues, bit); + } + + return NULL; +} + +static inline int nxt_cpu(int cpu) +{ + return (cpu + 1) % nr_cpu_ids; +} + +/** + * rnbd_rerun_if_needed() - rerun next queue marked as stopped + * @sess: Session to rerun a queue on + * + * Description: + * Each CPU has it's own list of HW queues, which should be rerun. + * Function finds such list with HW queues, takes a list lock, picks up + * the first HW queue out of the list and requeues it. + * + * Return: + * True if the queue was requeued, false otherwise. + * + * Context: + * Does not matter. + */ +static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) +{ + struct rnbd_queue *q = NULL; + struct rnbd_cpu_qlist *cpu_q; + unsigned long flags; + int *cpup; + + /* + * To keep fairness and not to let other queues starve we always + * try to wake up someone else in round-robin manner. That of course + * increases latency but queues always have a chance to be executed. + */ + cpup = get_cpu_ptr(sess->cpu_rr); + for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; + cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { + if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) + continue; + if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm))) + goto unlock; + q = list_first_entry_or_null(&cpu_q->requeue_list, + typeof(*q), requeue_list); + if (WARN_ON(!q)) + goto clear_bit; + list_del_init(&q->requeue_list); + clear_bit_unlock(0, &q->in_list); + + if (list_empty(&cpu_q->requeue_list)) { + /* Clear bit if nothing is left */ +clear_bit: + clear_bit(cpu_q->cpu, sess->cpu_queues_bm); + } +unlock: + spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); + + if (q) + break; + } + + /** + * Saves the CPU that is going to be requeued on the per-cpu var. Just + * incrementing it doesn't work because rnbd_get_cpu_qlist() will + * always return the first CPU with something on the queue list when the + * value stored on the var is greater than the last CPU with something + * on the list. + */ + if (cpu_q) + *cpup = cpu_q->cpu; + put_cpu_var(sess->cpu_rr); + + if (q) + rnbd_clt_dev_requeue(q); + + return q; +} + +/** + * rnbd_rerun_all_if_idle() - rerun all queues left in the list if + * session is idling (there are no requests + * in-flight). + * @sess: Session to rerun the queues on + * + * Description: + * This function tries to rerun all stopped queues if there are no + * requests in-flight anymore. This function tries to solve an obvious + * problem, when number of tags < than number of queues (hctx), which + * are stopped and put to sleep. If last permit, which has been just put, + * does not wake up all left queues (hctxs), IO requests hang forever. + * + * That can happen when all number of permits, say N, have been exhausted + * from one CPU, and we have many block devices per session, say M. + * Each block device has it's own queue (hctx) for each CPU, so eventually + * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. + * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. + * + * To avoid this hang last caller of rnbd_put_permit() (last caller is the + * one who observes sess->busy == 0) must wake up all remaining queues. + * + * Context: + * Does not matter. + */ +static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) +{ + bool requeued; + + do { + requeued = rnbd_rerun_if_needed(sess); + } while (atomic_read(&sess->busy) == 0 && requeued); +} + +static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, + enum rtrs_clt_con_type con_type, + int wait) +{ + struct rtrs_permit *permit; + + permit = rtrs_clt_get_permit(sess->rtrs, con_type, + wait ? RTRS_PERMIT_WAIT : + RTRS_PERMIT_NOWAIT); + if (likely(permit)) + /* We have a subtle rare case here, when all permits can be + * consumed before busy counter increased. This is safe, + * because loser will get NULL as a permit, observe 0 busy + * counter and immediately restart the queue himself. + */ + atomic_inc(&sess->busy); + + return permit; +} + +static void rnbd_put_permit(struct rnbd_clt_session *sess, + struct rtrs_permit *permit) +{ + rtrs_clt_put_permit(sess->rtrs, permit); + atomic_dec(&sess->busy); + /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first + * and then check queue bits. + */ + smp_mb__after_atomic(); + rnbd_rerun_all_if_idle(sess); +} + +static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, + enum rtrs_clt_con_type con_type, + int wait) +{ + struct rnbd_iu *iu; + struct rtrs_permit *permit; + + permit = rnbd_get_permit(sess, con_type, + wait ? RTRS_PERMIT_WAIT : + RTRS_PERMIT_NOWAIT); + if (unlikely(!permit)) + return NULL; + iu = rtrs_permit_to_pdu(permit); + iu->permit = permit; + /* + * 1st reference is dropped after finishing sending a "user" message, + * 2nd reference is dropped after confirmation with the response is + * returned. + * 1st and 2nd can happen in any order, so the rnbd_iu should be + * released (rtrs_permit returned to ibbtrs) only leased after both + * are finished. + */ + atomic_set(&iu->refcount, 2); + init_waitqueue_head(&iu->comp.wait); + iu->comp.errno = INT_MAX; + + return iu; +} + +static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) +{ + if (atomic_dec_and_test(&iu->refcount)) + rnbd_put_permit(sess, iu->permit); +} + +static void rnbd_softirq_done_fn(struct request *rq) +{ + struct rnbd_clt_dev *dev = rq->rq_disk->private_data; + struct rnbd_clt_session *sess = dev->sess; + struct rnbd_iu *iu; + + iu = blk_mq_rq_to_pdu(rq); + rnbd_put_permit(sess, iu->permit); + blk_mq_end_request(rq, errno_to_blk_status(iu->errno)); +} + +static void msg_io_conf(void *priv, int errno) +{ + struct rnbd_iu *iu = priv; + struct rnbd_clt_dev *dev = iu->dev; + struct request *rq = iu->rq; + int rw = rq_data_dir(rq); + + iu->errno = errno; + + blk_mq_complete_request(rq); + + if (errno) + rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n", + rw == READ ? "read" : "write", errno); +} + +static void wake_up_iu_comp(struct rnbd_iu *iu, int errno) +{ + iu->comp.errno = errno; + wake_up(&iu->comp.wait); +} + +static void msg_conf(void *priv, int errno) +{ + struct rnbd_iu *iu = priv; + + iu->errno = errno; + schedule_work(&iu->work); +} + +enum wait_type { + NO_WAIT = 0, + WAIT = 1 +}; + +static int send_usr_msg(struct rtrs_clt *rtrs, int dir, + struct rnbd_iu *iu, struct kvec *vec, size_t nr, + size_t len, struct scatterlist *sg, unsigned int sg_len, + void (*conf)(struct work_struct *work), + int *errno, enum wait_type wait) +{ + int err; + struct rtrs_clt_req_ops req_ops; + + INIT_WORK(&iu->work, conf); + req_ops = (struct rtrs_clt_req_ops) { + .priv = iu, + .conf_fn = msg_conf, + }; + err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit, + vec, nr, len, sg, sg_len); + if (!err && wait) { + wait_event(iu->comp.wait, iu->comp.errno != INT_MAX); + *errno = iu->comp.errno; + } else { + *errno = 0; + } + + return err; +} + +static void msg_close_conf(struct work_struct *work) +{ + struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); + struct rnbd_clt_dev *dev = iu->dev; + + wake_up_iu_comp(iu, iu->errno); + rnbd_put_iu(dev->sess, iu); + rnbd_clt_put_dev(dev); +} + +static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait) +{ + struct rnbd_clt_session *sess = dev->sess; + struct rnbd_msg_close msg; + struct rnbd_iu *iu; + struct kvec vec = { + .iov_base = &msg, + .iov_len = sizeof(msg) + }; + int err, errno; + + iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); + if (!iu) + return -ENOMEM; + + iu->buf = NULL; + iu->dev = dev; + + sg_mark_end(&iu->sglist[0]); + + msg.hdr.type = cpu_to_le16(RNBD_MSG_CLOSE); + msg.device_id = cpu_to_le32(device_id); + + WARN_ON(!rnbd_clt_get_dev(dev)); + err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 1, 0, NULL, 0, + msg_close_conf, &errno, wait); + if (err) { + rnbd_clt_put_dev(dev); + rnbd_put_iu(sess, iu); + } else { + err = errno; + } + + rnbd_put_iu(sess, iu); + return err; +} + +static void msg_open_conf(struct work_struct *work) +{ + struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); + struct rnbd_msg_open_rsp *rsp = iu->buf; + struct rnbd_clt_dev *dev = iu->dev; + int errno = iu->errno; + + if (errno) { + rnbd_clt_err(dev, + "Opening failed, server responded: %d\n", + errno); + } else { + errno = process_msg_open_rsp(dev, rsp); + if (errno) { + u32 device_id = le32_to_cpu(rsp->device_id); + /* + * If server thinks its fine, but we fail to process + * then be nice and send a close to server. + */ + (void)send_msg_close(dev, device_id, NO_WAIT); + } + } + kfree(rsp); + wake_up_iu_comp(iu, errno); + rnbd_put_iu(dev->sess, iu); + rnbd_clt_put_dev(dev); +} + +static void msg_sess_info_conf(struct work_struct *work) +{ + struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); + struct rnbd_msg_sess_info_rsp *rsp = iu->buf; + struct rnbd_clt_session *sess = iu->sess; + + if (!iu->errno) + sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR); + + kfree(rsp); + wake_up_iu_comp(iu, iu->errno); + rnbd_put_iu(sess, iu); + rnbd_clt_put_sess(sess); +} + +static int send_msg_open(struct rnbd_clt_dev *dev, bool wait) +{ + struct rnbd_clt_session *sess = dev->sess; + struct rnbd_msg_open_rsp *rsp; + struct rnbd_msg_open msg; + struct rnbd_iu *iu; + struct kvec vec = { + .iov_base = &msg, + .iov_len = sizeof(msg) + }; + int err, errno; + + rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); + if (!rsp) + return -ENOMEM; + + iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); + if (!iu) { + kfree(rsp); + return -ENOMEM; + } + + iu->buf = rsp; + iu->dev = dev; + + sg_init_one(iu->sglist, rsp, sizeof(*rsp)); + + msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); + msg.access_mode = dev->access_mode; + strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); + + WARN_ON(!rnbd_clt_get_dev(dev)); + err = send_usr_msg(sess->rtrs, READ, iu, + &vec, 1, sizeof(*rsp), iu->sglist, 1, + msg_open_conf, &errno, wait); + if (err) { + rnbd_clt_put_dev(dev); + rnbd_put_iu(sess, iu); + kfree(rsp); + } else { + err = errno; + } + + rnbd_put_iu(sess, iu); + return err; +} + +static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait) +{ + struct rnbd_msg_sess_info_rsp *rsp; + struct rnbd_msg_sess_info msg; + struct rnbd_iu *iu; + struct kvec vec = { + .iov_base = &msg, + .iov_len = sizeof(msg) + }; + int err, errno; + + rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); + if (!rsp) + return -ENOMEM; + + iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); + if (!iu) { + kfree(rsp); + return -ENOMEM; + } + + iu->buf = rsp; + iu->sess = sess; + + sg_init_one(iu->sglist, rsp, sizeof(*rsp)); + + msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO); + msg.ver = RNBD_PROTO_VER_MAJOR; + + if (!rnbd_clt_get_sess(sess)) { + /* + * That can happen only in one case, when RTRS has restablished + * the connection and link_ev() is called, but session is almost + * dead, last reference on session is put and caller is waiting + * for RTRS to close everything. + */ + err = -ENODEV; + goto put_iu; + } + err = send_usr_msg(sess->rtrs, READ, iu, + &vec, 1, sizeof(*rsp), iu->sglist, 1, + msg_sess_info_conf, &errno, wait); + if (err) { + rnbd_clt_put_sess(sess); +put_iu: + rnbd_put_iu(sess, iu); + kfree(rsp); + } else { + err = errno; + } + + rnbd_put_iu(sess, iu); + return err; +} + +static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess) +{ + struct rnbd_clt_dev *dev; + + mutex_lock(&sess->lock); + list_for_each_entry(dev, &sess->devs_list, list) { + rnbd_clt_err(dev, "Device disconnected.\n"); + + mutex_lock(&dev->lock); + if (dev->dev_state == DEV_STATE_MAPPED) + dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED; + mutex_unlock(&dev->lock); + } + mutex_unlock(&sess->lock); +} + +static void remap_devs(struct rnbd_clt_session *sess) +{ + struct rnbd_clt_dev *dev; + struct rtrs_attrs attrs; + int err; + + /* + * Careful here: we are called from RTRS link event directly, + * thus we can't send any RTRS request and wait for response + * or RTRS will not be able to complete request with failure + * if something goes wrong (failing of outstanding requests + * happens exactly from the context where we are blocking now). + * + * So to avoid deadlocks each usr message sent from here must + * be asynchronous. + */ + + err = send_msg_sess_info(sess, NO_WAIT); + if (err) { + pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err); + return; + } + + rtrs_clt_query(sess->rtrs, &attrs); + mutex_lock(&sess->lock); + sess->max_io_size = attrs.max_io_size; + + list_for_each_entry(dev, &sess->devs_list, list) { + bool skip; + + mutex_lock(&dev->lock); + skip = (dev->dev_state == DEV_STATE_INIT); + mutex_unlock(&dev->lock); + if (skip) + /* + * When device is establishing connection for the first + * time - do not remap, it will be closed soon. + */ + continue; + + rnbd_clt_info(dev, "session reconnected, remapping device\n"); + err = send_msg_open(dev, NO_WAIT); + if (err) { + rnbd_clt_err(dev, "send_msg_open(): %d\n", err); + break; + } + } + mutex_unlock(&sess->lock); +} + +static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev) +{ + struct rnbd_clt_session *sess = priv; + + switch (ev) { + case RTRS_CLT_LINK_EV_DISCONNECTED: + set_dev_states_to_disconnected(sess); + break; + case RTRS_CLT_LINK_EV_RECONNECTED: + remap_devs(sess); + break; + default: + pr_err("Unknown session event received (%d), session: %s\n", + ev, sess->sessname); + } +} + +static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues) +{ + unsigned int cpu; + struct rnbd_cpu_qlist *cpu_q; + + for_each_possible_cpu(cpu) { + cpu_q = per_cpu_ptr(cpu_queues, cpu); + + cpu_q->cpu = cpu; + INIT_LIST_HEAD(&cpu_q->requeue_list); + spin_lock_init(&cpu_q->requeue_lock); + } +} + +static void destroy_mq_tags(struct rnbd_clt_session *sess) +{ + if (sess->tag_set.tags) + blk_mq_free_tag_set(&sess->tag_set); +} + +static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess) +{ + sess->rtrs_ready = true; + wake_up_all(&sess->rtrs_waitq); +} + +static void close_rtrs(struct rnbd_clt_session *sess) +{ + might_sleep(); + + if (!IS_ERR_OR_NULL(sess->rtrs)) { + rtrs_clt_close(sess->rtrs); + sess->rtrs = NULL; + wake_up_rtrs_waiters(sess); + } +} + +static void free_sess(struct rnbd_clt_session *sess) +{ + WARN_ON(!list_empty(&sess->devs_list)); + + might_sleep(); + + close_rtrs(sess); + destroy_mq_tags(sess); + if (!list_empty(&sess->list)) { + mutex_lock(&sess_lock); + list_del(&sess->list); + mutex_unlock(&sess_lock); + } + free_percpu(sess->cpu_queues); + free_percpu(sess->cpu_rr); + mutex_destroy(&sess->lock); + kfree(sess); +} + +static struct rnbd_clt_session *alloc_sess(const char *sessname) +{ + struct rnbd_clt_session *sess; + int err, cpu; + + sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE); + if (!sess) + return ERR_PTR(-ENOMEM); + strlcpy(sess->sessname, sessname, sizeof(sess->sessname)); + atomic_set(&sess->busy, 0); + mutex_init(&sess->lock); + INIT_LIST_HEAD(&sess->devs_list); + INIT_LIST_HEAD(&sess->list); + bitmap_zero(sess->cpu_queues_bm, NR_CPUS); + init_waitqueue_head(&sess->rtrs_waitq); + refcount_set(&sess->refcount, 1); + + sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist); + if (!sess->cpu_queues) { + err = -ENOMEM; + goto err; + } + rnbd_init_cpu_qlists(sess->cpu_queues); + + /* + * That is simple percpu variable which stores cpu indeces, which are + * incremented on each access. We need that for the sake of fairness + * to wake up queues in a round-robin manner. + */ + sess->cpu_rr = alloc_percpu(int); + if (!sess->cpu_rr) { + err = -ENOMEM; + goto err; + } + for_each_possible_cpu(cpu) + * per_cpu_ptr(sess->cpu_rr, cpu) = cpu; + + return sess; + +err: + free_sess(sess); + + return ERR_PTR(err); +} + +static int wait_for_rtrs_connection(struct rnbd_clt_session *sess) +{ + wait_event(sess->rtrs_waitq, sess->rtrs_ready); + if (IS_ERR_OR_NULL(sess->rtrs)) + return -ECONNRESET; + + return 0; +} + +static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess) + __releases(&sess_lock) + __acquires(&sess_lock) +{ + DEFINE_WAIT(wait); + + prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); + if (IS_ERR_OR_NULL(sess->rtrs)) { + finish_wait(&sess->rtrs_waitq, &wait); + return; + } + mutex_unlock(&sess_lock); + /* loop in caller, see __find_and_get_sess(). + * You can't leave mutex locked and call schedule(), you will catch a + * deadlock with a caller of free_sess(), which has just put the last + * reference and is about to take the sess_lock in order to delete + * the session from the list. + */ + schedule(); + mutex_lock(&sess_lock); +} + +static struct rnbd_clt_session *__find_and_get_sess(const char *sessname) + __releases(&sess_lock) + __acquires(&sess_lock) +{ + struct rnbd_clt_session *sess, *sn; + int err; + +again: + list_for_each_entry_safe(sess, sn, &sess_list, list) { + if (strcmp(sessname, sess->sessname)) + continue; + + if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) + /* + * No RTRS connection, session is dying. + */ + continue; + + if (rnbd_clt_get_sess(sess)) { + /* + * Alive session is found, wait for RTRS connection. + */ + mutex_unlock(&sess_lock); + err = wait_for_rtrs_connection(sess); + if (err) + rnbd_clt_put_sess(sess); + mutex_lock(&sess_lock); + + if (err) + /* Session is dying, repeat the loop */ + goto again; + + return sess; + } + /* + * Ref is 0, session is dying, wait for RTRS disconnect + * in order to avoid session names clashes. + */ + wait_for_rtrs_disconnection(sess); + /* + * RTRS is disconnected and soon session will be freed, + * so repeat a loop. + */ + goto again; + } + + return NULL; +} + +static struct +rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first) +{ + struct rnbd_clt_session *sess = NULL; + + mutex_lock(&sess_lock); + sess = __find_and_get_sess(sessname); + if (!sess) { + sess = alloc_sess(sessname); + if (sess) { + list_add(&sess->list, &sess_list); + *first = true; + } else { + mutex_unlock(&sess_lock); + return ERR_PTR(-ENOMEM); + } + } else + *first = false; + mutex_unlock(&sess_lock); + + return sess; +} + +static int rnbd_client_open(struct block_device *block_device, fmode_t mode) +{ + struct rnbd_clt_dev *dev = block_device->bd_disk->private_data; + + if (dev->read_only && (mode & FMODE_WRITE)) + return -EPERM; + + if (dev->dev_state == DEV_STATE_UNMAPPED || + !rnbd_clt_get_dev(dev)) + return -EIO; + + return 0; +} + +static void rnbd_client_release(struct gendisk *gen, fmode_t mode) +{ + struct rnbd_clt_dev *dev = gen->private_data; + + rnbd_clt_put_dev(dev); +} + +static int rnbd_client_getgeo(struct block_device *block_device, + struct hd_geometry *geo) +{ + u64 size; + struct rnbd_clt_dev *dev; + + dev = block_device->bd_disk->private_data; + size = dev->size * (dev->logical_block_size / SECTOR_SIZE); + geo->cylinders = size >> 6; /* size/64 */ + geo->heads = 4; + geo->sectors = 16; + geo->start = 0; + + return 0; +} + +static const struct block_device_operations rnbd_client_ops = { + .owner = THIS_MODULE, + .open = rnbd_client_open, + .release = rnbd_client_release, + .getgeo = rnbd_client_getgeo +}; + +/* The amount of data that belongs to an I/O and the amount of data that + * should be read or written to the disk (bi_size) can differ. + * + * E.g. When WRITE_SAME is used, only a small amount of data is + * transferred that is then written repeatedly over a lot of sectors. + * + * Get the size of data to be transferred via RTRS by summing up the size + * of the scather-gather list entries. + */ +static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len) +{ + struct scatterlist *sg; + size_t tsize = 0; + int i; + + for_each_sg(sglist, sg, len, i) + tsize += sg->length; + return tsize; +} + +static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev, + struct request *rq, + struct rnbd_iu *iu) +{ + struct rtrs_clt *rtrs = dev->sess->rtrs; + struct rtrs_permit *permit = iu->permit; + struct rnbd_msg_io msg; + struct rtrs_clt_req_ops req_ops; + unsigned int sg_cnt = 0; + struct kvec vec; + size_t size; + int err; + + iu->rq = rq; + iu->dev = dev; + msg.sector = cpu_to_le64(blk_rq_pos(rq)); + msg.bi_size = cpu_to_le32(blk_rq_bytes(rq)); + msg.rw = cpu_to_le32(rq_to_rnbd_flags(rq)); + msg.prio = cpu_to_le16(req_get_ioprio(rq)); + + /* + * We only support discards with single segment for now. + * See queue limits. + */ + if (req_op(rq) != REQ_OP_DISCARD) + sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sglist); + + if (sg_cnt == 0) + /* Do not forget to mark the end */ + sg_mark_end(&iu->sglist[0]); + + msg.hdr.type = cpu_to_le16(RNBD_MSG_IO); + msg.device_id = cpu_to_le32(dev->device_id); + + vec = (struct kvec) { + .iov_base = &msg, + .iov_len = sizeof(msg) + }; + size = rnbd_clt_get_sg_size(iu->sglist, sg_cnt); + req_ops = (struct rtrs_clt_req_ops) { + .priv = iu, + .conf_fn = msg_io_conf, + }; + err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit, + &vec, 1, size, iu->sglist, sg_cnt); + if (unlikely(err)) { + rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n", + err); + return err; + } + + return 0; +} + +/** + * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy + * @dev: Device to be checked + * @q: Queue to be added to the requeue list if required + * + * Description: + * If session is busy, that means someone will requeue us when resources + * are freed. If session is not doing anything - device is not added to + * the list and @false is returned. + */ +static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, + struct rnbd_queue *q) +{ + struct rnbd_clt_session *sess = dev->sess; + struct rnbd_cpu_qlist *cpu_q; + unsigned long flags; + bool added = true; + bool need_set; + + cpu_q = get_cpu_ptr(sess->cpu_queues); + spin_lock_irqsave(&cpu_q->requeue_lock, flags); + + if (likely(!test_and_set_bit_lock(0, &q->in_list))) { + if (WARN_ON(!list_empty(&q->requeue_list))) + goto unlock; + + need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); + if (need_set) { + set_bit(cpu_q->cpu, sess->cpu_queues_bm); + /* Paired with rnbd_put_permit(). Set a bit first + * and then observe the busy counter. + */ + smp_mb__before_atomic(); + } + if (likely(atomic_read(&sess->busy))) { + list_add_tail(&q->requeue_list, &cpu_q->requeue_list); + } else { + /* Very unlikely, but possible: busy counter was + * observed as zero. Drop all bits and return + * false to restart the queue by ourselves. + */ + if (need_set) + clear_bit(cpu_q->cpu, sess->cpu_queues_bm); + clear_bit_unlock(0, &q->in_list); + added = false; + } + } +unlock: + spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); + put_cpu_ptr(sess->cpu_queues); + + return added; +} + +static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev, + struct blk_mq_hw_ctx *hctx, + int delay) +{ + struct rnbd_queue *q = hctx->driver_data; + + if (delay != RNBD_DELAY_IFBUSY) + blk_mq_delay_run_hw_queue(hctx, delay); + else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q))) + /* + * If session is not busy we have to restart + * the queue ourselves. + */ + blk_mq_delay_run_hw_queue(hctx, 10/*ms*/); +} + +static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx, + const struct blk_mq_queue_data *bd) +{ + struct request *rq = bd->rq; + struct rnbd_clt_dev *dev = rq->rq_disk->private_data; + struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); + int err; + + if (unlikely(dev->dev_state != DEV_STATE_MAPPED)) + return BLK_STS_IOERR; + + iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON, + RTRS_PERMIT_NOWAIT); + if (unlikely(!iu->permit)) { + rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY); + return BLK_STS_RESOURCE; + } + + blk_mq_start_request(rq); + err = rnbd_client_xfer_request(dev, rq, iu); + if (likely(err == 0)) + return BLK_STS_OK; + if (unlikely(err == -EAGAIN || err == -ENOMEM)) { + rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); + rnbd_put_permit(dev->sess, iu->permit); + return BLK_STS_RESOURCE; + } + + rnbd_put_permit(dev->sess, iu->permit); + return BLK_STS_IOERR; +} + +static int rnbd_init_request(struct blk_mq_tag_set *set, struct request *rq, + unsigned int hctx_idx, unsigned int numa_node) +{ + struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); + + sg_init_table(iu->sglist, BMAX_SEGMENTS); + return 0; +} + +static struct blk_mq_ops rnbd_mq_ops = { + .queue_rq = rnbd_queue_rq, + .init_request = rnbd_init_request, + .complete = rnbd_softirq_done_fn, +}; + +static int setup_mq_tags(struct rnbd_clt_session *sess) +{ + struct blk_mq_tag_set *tag_set = &sess->tag_set; + + memset(tag_set, 0, sizeof(*tag_set)); + tag_set->ops = &rnbd_mq_ops; + tag_set->queue_depth = sess->queue_depth; + tag_set->numa_node = NUMA_NO_NODE; + tag_set->flags = BLK_MQ_F_SHOULD_MERGE | + BLK_MQ_F_TAG_SHARED; + tag_set->cmd_size = sizeof(struct rnbd_iu); + tag_set->nr_hw_queues = num_online_cpus(); + + return blk_mq_alloc_tag_set(tag_set); +} + +static struct rnbd_clt_session * +find_and_get_or_create_sess(const char *sessname, + const struct rtrs_addr *paths, + size_t path_cnt, u16 port_nr) +{ + struct rnbd_clt_session *sess; + struct rtrs_attrs attrs; + int err; + bool first; + struct rtrs_clt_ops rtrs_ops; + + sess = find_or_create_sess(sessname, &first); + if (sess == ERR_PTR(-ENOMEM)) + return ERR_PTR(-ENOMEM); + else if (!first) + return sess; + + rtrs_ops = (struct rtrs_clt_ops) { + .priv = sess, + .link_ev = rnbd_clt_link_ev, + }; + /* + * Nothing was found, establish rtrs connection and proceed further. + */ + sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname, + paths, path_cnt, port_nr, + sizeof(struct rnbd_iu), + RECONNECT_DELAY, BMAX_SEGMENTS, + MAX_RECONNECTS); + if (IS_ERR(sess->rtrs)) { + err = PTR_ERR(sess->rtrs); + goto wake_up_and_put; + } + rtrs_clt_query(sess->rtrs, &attrs); + sess->max_io_size = attrs.max_io_size; + sess->queue_depth = attrs.queue_depth; + + err = setup_mq_tags(sess); + if (err) + goto close_rtrs; + + err = send_msg_sess_info(sess, WAIT); + if (err) + goto close_rtrs; + + wake_up_rtrs_waiters(sess); + + return sess; + +close_rtrs: + close_rtrs(sess); +put_sess: + rnbd_clt_put_sess(sess); + + return ERR_PTR(err); + +wake_up_and_put: + wake_up_rtrs_waiters(sess); + goto put_sess; +} + +static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev, + struct rnbd_queue *q, + struct blk_mq_hw_ctx *hctx) +{ + INIT_LIST_HEAD(&q->requeue_list); + q->dev = dev; + q->hctx = hctx; +} + +static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev) +{ + int i; + struct blk_mq_hw_ctx *hctx; + struct rnbd_queue *q; + + queue_for_each_hw_ctx(dev->queue, hctx, i) { + q = &dev->hw_queues[i]; + rnbd_init_hw_queue(dev, q, hctx); + hctx->driver_data = q; + } +} + +static int setup_mq_dev(struct rnbd_clt_dev *dev) +{ + dev->queue = blk_mq_init_queue(&dev->sess->tag_set); + if (IS_ERR(dev->queue)) { + rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n", + PTR_ERR(dev->queue)); + return PTR_ERR(dev->queue); + } + rnbd_init_mq_hw_queues(dev); + return 0; +} + +static void setup_request_queue(struct rnbd_clt_dev *dev) +{ + blk_queue_logical_block_size(dev->queue, dev->logical_block_size); + blk_queue_physical_block_size(dev->queue, dev->physical_block_size); + blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors); + blk_queue_max_write_same_sectors(dev->queue, + dev->max_write_same_sectors); + + /* + * we don't support discards to "discontiguous" segments + * in on request + */ + blk_queue_max_discard_segments(dev->queue, 1); + + blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors); + dev->queue->limits.discard_granularity = dev->discard_granularity; + dev->queue->limits.discard_alignment = dev->discard_alignment; + if (dev->max_discard_sectors) + blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue); + if (dev->secure_discard) + blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue); + + blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue); + blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue); + blk_queue_max_segments(dev->queue, dev->max_segments); + blk_queue_io_opt(dev->queue, dev->sess->max_io_size); + blk_queue_virt_boundary(dev->queue, SZ_4K - 1); + blk_queue_write_cache(dev->queue, true, true); + dev->queue->queuedata = dev; +} + +static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx) +{ + dev->gd->major = rnbd_client_major; + dev->gd->first_minor = idx << RNBD_PART_BITS; + dev->gd->fops = &rnbd_client_ops; + dev->gd->queue = dev->queue; + dev->gd->private_data = dev; + snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d", + idx); + pr_debug("disk_name=%s, capacity=%zu\n", + dev->gd->disk_name, + dev->nsectors * (dev->logical_block_size / SECTOR_SIZE) + ); + + set_capacity(dev->gd, dev->nsectors); + + if (dev->access_mode == RNBD_ACCESS_RO) { + dev->read_only = true; + set_disk_ro(dev->gd, true); + } else { + dev->read_only = false; + } + + if (!dev->rotational) + blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue); +} + +static int rnbd_client_setup_device(struct rnbd_clt_session *sess, + struct rnbd_clt_dev *dev, int idx) +{ + int err; + + dev->size = dev->nsectors * dev->logical_block_size; + + err = setup_mq_dev(dev); + if (err) + return err; + + setup_request_queue(dev); + + dev->gd = alloc_disk_node(1 << RNBD_PART_BITS, NUMA_NO_NODE); + if (!dev->gd) { + blk_cleanup_queue(dev->queue); + return -ENOMEM; + } + + rnbd_clt_setup_gen_disk(dev, idx); + + return 0; +} + +static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess, + enum rnbd_access_mode access_mode, + const char *pathname) +{ + struct rnbd_clt_dev *dev; + int ret; + + dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); + if (!dev) + return ERR_PTR(-ENOMEM); + + dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues), + GFP_KERNEL); + if (!dev->hw_queues) { + ret = -ENOMEM; + goto out_alloc; + } + + mutex_lock(&ida_lock); + ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS), + GFP_KERNEL); + mutex_unlock(&ida_lock); + if (ret < 0) { + pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n", + pathname, sess->sessname, ret); + goto out_queues; + } + dev->clt_device_id = ret; + dev->sess = sess; + dev->access_mode = access_mode; + strlcpy(dev->pathname, pathname, sizeof(dev->pathname)); + mutex_init(&dev->lock); + refcount_set(&dev->refcount, 1); + dev->dev_state = DEV_STATE_INIT; + + /* + * Here we called from sysfs entry, thus clt-sysfs is + * responsible that session will not disappear. + */ + WARN_ON(!rnbd_clt_get_sess(sess)); + + return dev; + +out_queues: + kfree(dev->hw_queues); +out_alloc: + kfree(dev); + return ERR_PTR(ret); +} + +static bool __exists_dev(const char *pathname) +{ + struct rnbd_clt_session *sess; + struct rnbd_clt_dev *dev; + bool found = false; + + list_for_each_entry(sess, &sess_list, list) { + mutex_lock(&sess->lock); + list_for_each_entry(dev, &sess->devs_list, list) { + if (!strncmp(dev->pathname, pathname, + sizeof(dev->pathname))) { + found = true; + break; + } + } + mutex_unlock(&sess->lock); + if (found) + break; + } + + return found; +} + +static bool exists_devpath(const char *pathname) +{ + bool found; + + mutex_lock(&sess_lock); + found = __exists_dev(pathname); + mutex_unlock(&sess_lock); + + return found; +} + +static bool insert_dev_if_not_exists_devpath(const char *pathname, + struct rnbd_clt_session *sess, + struct rnbd_clt_dev *dev) +{ + bool found; + + mutex_lock(&sess_lock); + found = __exists_dev(pathname); + if (!found) { + mutex_lock(&sess->lock); + list_add_tail(&dev->list, &sess->devs_list); + mutex_unlock(&sess->lock); + } + mutex_unlock(&sess_lock); + + return found; +} + +static void delete_dev(struct rnbd_clt_dev *dev) +{ + struct rnbd_clt_session *sess = dev->sess; + + mutex_lock(&sess->lock); + list_del(&dev->list); + mutex_unlock(&sess->lock); +} + +struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname, + struct rtrs_addr *paths, + size_t path_cnt, u16 port_nr, + const char *pathname, + enum rnbd_access_mode access_mode) +{ + struct rnbd_clt_session *sess; + struct rnbd_clt_dev *dev; + int ret; + + if (exists_devpath(pathname)) + return ERR_PTR(-EEXIST); + + sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr); + if (IS_ERR(sess)) + return ERR_CAST(sess); + + dev = init_dev(sess, access_mode, pathname); + if (IS_ERR(dev)) { + pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n", + pathname, sess->sessname, PTR_ERR(dev)); + ret = PTR_ERR(dev); + goto put_sess; + } + if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) { + ret = -EEXIST; + goto put_dev; + } + ret = send_msg_open(dev, WAIT); + if (ret) { + rnbd_clt_err(dev, + "map_device: failed, can't open remote device, err: %d\n", + ret); + goto del_dev; + } + mutex_lock(&dev->lock); + pr_debug("Opened remote device: session=%s, path='%s'\n", + sess->sessname, pathname); + ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id); + if (ret) { + rnbd_clt_err(dev, + "map_device: Failed to configure device, err: %d\n", + ret); + mutex_unlock(&dev->lock); + goto del_dev; + } + + rnbd_clt_info(dev, + "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d)\n", + dev->gd->disk_name, dev->nsectors, + dev->logical_block_size, dev->physical_block_size, + dev->max_write_same_sectors, dev->max_discard_sectors, + dev->discard_granularity, dev->discard_alignment, + dev->secure_discard, dev->max_segments, + dev->max_hw_sectors, dev->rotational); + + mutex_unlock(&dev->lock); + + add_disk(dev->gd); + rnbd_clt_put_sess(sess); + + return dev; + +del_dev: + delete_dev(dev); +put_dev: + rnbd_clt_put_dev(dev); +put_sess: + rnbd_clt_put_sess(sess); + + return ERR_PTR(ret); +} + +static void destroy_gen_disk(struct rnbd_clt_dev *dev) +{ + del_gendisk(dev->gd); + blk_cleanup_queue(dev->queue); + put_disk(dev->gd); +} + +static void destroy_sysfs(struct rnbd_clt_dev *dev, + const struct attribute *sysfs_self) +{ + rnbd_clt_remove_dev_symlink(dev); + if (dev->kobj.state_initialized) { + if (sysfs_self) + /* To avoid deadlock firstly remove itself */ + sysfs_remove_file_self(&dev->kobj, sysfs_self); + kobject_del(&dev->kobj); + kobject_put(&dev->kobj); + } +} + +int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force, + const struct attribute *sysfs_self) +{ + struct rnbd_clt_session *sess = dev->sess; + int refcount, ret = 0; + bool was_mapped; + + mutex_lock(&dev->lock); + if (dev->dev_state == DEV_STATE_UNMAPPED) { + rnbd_clt_info(dev, "Device is already being unmapped\n"); + ret = -EALREADY; + goto err; + } + refcount = refcount_read(&dev->refcount); + if (!force && refcount > 1) { + rnbd_clt_err(dev, + "Closing device failed, device is in use, (%d device users)\n", + refcount - 1); + ret = -EBUSY; + goto err; + } + was_mapped = (dev->dev_state == DEV_STATE_MAPPED); + dev->dev_state = DEV_STATE_UNMAPPED; + mutex_unlock(&dev->lock); + + delete_dev(dev); + destroy_sysfs(dev, sysfs_self); + destroy_gen_disk(dev); + if (was_mapped && sess->rtrs) + send_msg_close(dev, dev->device_id, WAIT); + + rnbd_clt_info(dev, "Device is unmapped\n"); + + /* Likely last reference put */ + rnbd_clt_put_dev(dev); + + /* + * Here device and session can be vanished! + */ + + return 0; +err: + mutex_unlock(&dev->lock); + + return ret; +} + +int rnbd_clt_remap_device(struct rnbd_clt_dev *dev) +{ + int err; + + mutex_lock(&dev->lock); + if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) + err = 0; + else if (dev->dev_state == DEV_STATE_UNMAPPED) + err = -ENODEV; + else if (dev->dev_state == DEV_STATE_MAPPED) + err = -EALREADY; + else + err = -EBUSY; + mutex_unlock(&dev->lock); + if (!err) { + rnbd_clt_info(dev, "Remapping device.\n"); + err = send_msg_open(dev, WAIT); + if (err) + rnbd_clt_err(dev, "remap_device: %d\n", err); + } + + return err; +} + +static void unmap_device_work(struct work_struct *work) +{ + struct rnbd_clt_dev *dev; + + dev = container_of(work, typeof(*dev), unmap_on_rmmod_work); + rnbd_clt_unmap_device(dev, true, NULL); +} + +static void rnbd_destroy_sessions(void) +{ + struct rnbd_clt_session *sess, *sn; + struct rnbd_clt_dev *dev, *tn; + + /* Firstly forbid access through sysfs interface */ + rnbd_clt_destroy_default_group(); + rnbd_clt_destroy_sysfs_files(); + + /* + * Here at this point there is no any concurrent access to sessions + * list and devices list: + * 1. New session or device can'be be created - session sysfs files + * are removed. + * 2. Device or session can't be removed - module reference is taken + * into account in unmap device sysfs callback. + * 3. No IO requests inflight - each file open of block_dev increases + * module reference in get_disk(). + * + * But still there can be user requests inflights, which are sent by + * asynchronous send_msg_*() functions, thus before unmapping devices + * RTRS session must be explicitly closed. + */ + + list_for_each_entry_safe(sess, sn, &sess_list, list) { + WARN_ON(!rnbd_clt_get_sess(sess)); + close_rtrs(sess); + list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { + /* + * Here unmap happens in parallel for only one reason: + * blk_cleanup_queue() takes around half a second, so + * on huge amount of devices the whole module unload + * procedure takes minutes. + */ + INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work); + queue_work(system_long_wq, &dev->unmap_on_rmmod_work); + } + rnbd_clt_put_sess(sess); + } + /* Wait for all scheduled unmap works */ + flush_workqueue(system_long_wq); + WARN_ON(!list_empty(&sess_list)); +} + +static int __init rnbd_client_init(void) +{ + int err = 0; + + BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4); + BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36); + BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36); + BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264); + BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8); + BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56); + rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd"); + if (rnbd_client_major <= 0) { + pr_err("Failed to load module, block device registration failed\n"); + return -EBUSY; + } + + err = rnbd_clt_create_sysfs_files(); + if (err) { + pr_err("Failed to load module, creating sysfs device files failed, err: %d\n", + err); + unregister_blkdev(rnbd_client_major, "rnbd"); + } + + return err; +} + +static void __exit rnbd_client_exit(void) +{ + rnbd_destroy_sessions(); + unregister_blkdev(rnbd_client_major, "rnbd"); + ida_destroy(&index_ida); +} + +module_init(rnbd_client_init); +module_exit(rnbd_client_exit);