Monday, March 18, 2013

CIFS: Async Reads


A read call on the cifs fs goes through the following code path

sys_read -> vfs_read -> file->f_op->read() -> do_sync_read() -> filp->f_op->aio_read -> do_generic_file_read()


Within do_generic_file_read,

static void do_generic_file_read(struct file *filp, loff_t *ppos,
                read_descriptor_t *desc, read_actor_t actor)
{
..
        //Index of the page which contains the ppos.
        index = *ppos >> PAGE_CACHE_SHIFT;
..
        for (;;) {
..
find_page:
                //Find the page and get a reference.
                page = find_get_page(mapping, index);
                if (!page) {
                        page_cache_sync_readahead(mapping,
                                        ra, filp,
                                        index, last_index - index);
..
}

We use the read ahead code to fetch the read pages using the async read code.
This is implemented using the address_operations->readpages().

We go through the following code path before we get to the address_operations
page_cache_sync_readahead() -> ondemand_readahead() -> __do_page_cache_readahead()

We allocate all the pages required in the __do_page_cache_readahead following which we
call read_pages()

static int
__do_page_cache_readahead(struct address_space *mapping, struct file *filp,
                        pgoff_t offset, unsigned long nr_to_read,
                        unsigned long lookahead_size)
{
..
        /*
         * Preallocate as many pages as we will need.
         */
        for (page_idx = 0; page_idx < nr_to_read; page_idx++) {
..
                page = page_cache_alloc_readahead(mapping);
                if (!page)
                        break;
                page->index = page_offset;
                list_add(&page->lru, &page_pool);
                if (page_idx == nr_to_read - lookahead_size)
                        SetPageReadahead(page);
                ret++;
        }
..
        //then call read_pages().
        if (ret)
                read_pages(mapping, filp, &page_pool, ret);

..
}

static int read_pages(struct address_space *mapping, struct file *filp,
                struct list_head *pages, unsigned nr_pages)
{
..
        if (mapping->a_ops->readpages) {
                ret = mapping->a_ops->readpages(filp, mapping, pages, nr_pages);
..
}

const struct address_space_operations cifs_addr_ops = {
..
        .readpages = cifs_readpages,
..
}

We first start by locking the page. This lock on the page is not unlocked until the
read response from the server is received or the request times out. This is the mechanism
used to block the thread which started the read while it waits for the response to return.

static int cifs_readpages(struct file *file, struct address_space *mapping,
        struct list_head *page_list, unsigned num_pages)
{
..
        while (!list_empty(page_list)) {
..
                //We first lock the page, then we add it to the page cache.
                __set_page_locked(page);
                rc = add_to_page_cache_locked(page, mapping,
                                              page->index, GFP_KERNEL);

                /* give up if we can't stick it in the cache */
                if (rc) {
                        __clear_page_locked(page);
                        break;
                }
..
                rdata = cifs_readdata_alloc(nr_pages, cifs_readv_complete);
..
}

Within cifs_readdata_alloc, we initialise the struct work and set it to call
cifs_readv_complete()

static struct cifs_readdata *
cifs_readdata_alloc(unsigned int nr_pages, work_func_t complete)
{
..
                INIT_WORK(&rdata->work, complete);
..
}

Back to cifs_readpages()

static int cifs_readpages(struct file *file, struct address_space *mapping,
        struct list_head *page_list, unsigned num_pages)
{
..
        while (!list_empty(page_list)) {
..
                rdata = cifs_readdata_alloc(nr_pages, cifs_readv_complete);
                //If there was an error, free up all the pages we allocated
                // and return an error
                if (!rdata) {
                        /* best to give up if we're out of mem */
                        list_for_each_entry_safe(page, tpage, &tmplist, lru) {
                                list_del(&page->lru);
                                lru_cache_add_file(page);
                                unlock_page(page);
                                page_cache_release(page);
                        }
                        rc = -ENOMEM;
                        break;
                }
                //We get a reference to the open_file.
                rdata->cfile = cifsFileInfo_get(open_file);
                rdata->mapping = mapping;
                rdata->offset = offset;
                rdata->bytes = bytes;
                rdata->pid = pid;
                rdata->pagesz = PAGE_CACHE_SIZE;
                rdata->read_into_pages = cifs_readpages_read_into_pages;

                list_for_each_entry_safe(page, tpage, &tmplist, lru) {
                        list_del(&page->lru);
                        rdata->pages[rdata->nr_pages++] = page;
                }

                rc = cifs_retry_async_readv(rdata);
..
}

static int
cifs_retry_async_readv(struct cifs_readdata *rdata)
{
..
        do {
                //We re-open the file if the handle is invalid.
                if (rdata->cfile->invalidHandle) {
                        rc = cifs_reopen_file(rdata->cfile, true);
                        if (rc != 0)
                                continue;
                }
                //The async readv call is protocol specific.
                rc = server->ops->async_readv(rdata);
        } while (rc == -EAGAIN);
..
}

struct smb_version_operations smb1_operations = {
..
        .async_readv = cifs_async_readv,
..
}

int
cifs_async_readv(struct cifs_readdata *rdata)
{
..
        rc = small_smb_init(SMB_COM_READ_ANDX, wct, tcon, (void **)&smb);
..
        rc = cifs_call_async(tcon->ses->server, &rqst, cifs_readv_receive,
                             cifs_readv_callback, rdata, 0);
..
        return rc;
}

We call cifs_call_async() with the arguments for
mid_receive_t *receive is set to cifs_readv_receive
and for
mid_callback_t *callback is set to cifs_readv_callback

int
cifs_call_async(struct TCP_Server_Info *server, struct smb_rqst *rqst,
                mid_receive_t *receive, mid_callback_t *callback,
                void *cbdata, const int flags)
{
..
        //We first allocate a mid for the async_request.
        mid = server->ops->setup_async_request(server, rqst);
..
        //We set the receive function pointer to cifs_readv_receive()
        mid->receive = receive;
        //We set the callback function pointer to cifs_readv_callback()
        mid->callback = callback;
        mid->callback_data = cbdata;
        mid->mid_state = MID_REQUEST_SUBMITTED;

        /* put it on the pending_mid_q */
        spin_lock(&GlobalMid_Lock);
        list_add_tail(&mid->qhead, &server->pending_mid_q);
        spin_unlock(&GlobalMid_Lock);
..
        rc = smb_send_rqst(server, rqst);
..
        if (rc == 0)
                return 0;

        //If there was an error, delete the mid and return an error.
        cifs_delete_mid(mid);
..
        return rc;
}

The data is sent to the server using the smb_send_rqst() call. cifs_call_async() then returns immediately.
If we had encoountered an error at this point, the error is propagated back to cifs_readpages() where the
allocated pages are freed.

static void do_generic_file_read(struct file *filp, loff_t *ppos,
                read_descriptor_t *desc, read_actor_t actor)
{
..
                if (!page) {
                        page_cache_sync_readahead(mapping,
                                        ra, filp,
                                        index, last_index - index);
                        //Fetch the page again. This should get you the page since we have allocated and
                        //attached the page to the page cache above.
                        page = find_get_page(mapping, index);
                        if (unlikely(page == NULL))
                                goto no_cached_page;
                }
..
                if (!PageUptodate(page)) {
                        if (inode->i_blkbits == PAGE_CACHE_SHIFT ||
                                        !mapping->a_ops->is_partially_uptodate)
                                goto page_not_up_to_date;
                        //We should hit this page condition since the page is locked in cifs_readpages()
                        if (!trylock_page(page))
                                goto page_not_up_to_date;
..
page_not_up_to_date:
                //We first attempt to lock the page. It blocks here since we have locked the page in
                //cifs_readpages() and do not unlock it until we have received the response from the server.
                /* Get exclusive access to the page ... */
                error = lock_page_killable(page);
..
}

The thread which triggered the read is blocked at this point waiting for the response to the read call sent earlier.

The response to the read call is received by the demultiplex thread for that share.

static int
cifs_demultiplex_thread(void *p)
{
..
        struct mid_q_entry *mid_entry;
..
        while (server->tcpStatus != CifsExiting) {
..
                mid_entry = server->ops->find_mid(server, buf);
..
                if (!mid_entry || !mid_entry->receive)
                        length = standard_receive3(server, mid_entry);
                else
                        length = mid_entry->receive(server, mid_entry);
..
                if (mid_entry != NULL) {
                        if (!mid_entry->multiRsp || mid_entry->multiEnd)
                                mid_entry->callback(mid_entry);
..
}

mid->receive is set to cifs_readv_receive()
mid->callback is set to cifs_readv_callback()
with the call made to cifs_call_async() in cifs_async_readv()

We first call the receive function.

int
cifs_readv_receive(struct TCP_Server_Info *server, struct mid_q_entry *mid)
{
..
        //We receive the data
        length = cifs_readv_from_socket(server, &rdata->iov, 1, len);
        if (length < 0)
                return length;
        server->total_read += length;

        //If we did encounter an error, discard the data returned.
        /* Was the SMB read successful? */
        rdata->result = server->ops->map_error(buf, false);
        if (rdata->result != 0) {
                cFYI(1, "%s: server returned error %d", __func__,
                        rdata->result);
                return cifs_readv_discard(server, mid);
        }
..
        //read the read data
        data_offset = server->ops->read_data_offset(buf) + 4;
..
       length = rdata->read_into_pages(server, rdata, data_len);
..
}

We then call the callback function from the demultiplex thread.

static void
cifs_readv_callback(struct mid_q_entry *mid)
{
        struct cifs_readdata *rdata = mid->callback_data;
        struct cifs_tcon *tcon = tlink_tcon(rdata->cfile->tlink);
        struct TCP_Server_Info *server = tcon->ses->server;
        struct smb_rqst rqst = { .rq_iov = &rdata->iov,
                                 .rq_nvec = 1,
                                 .rq_pages = rdata->pages,
                                 .rq_npages = rdata->nr_pages,
                                 .rq_pagesz = rdata->pagesz,
                                 .rq_tailsz = rdata->tailsz };

..
//The work is queued in the cifsiod workqueue.
        queue_work(cifsiod_wq, &rdata->work);
        DeleteMidQEntry(mid);
        add_credits(server, 1, 0);
}

The work function is set to
cifs_readv_complete()
in
cifs_readpages() -> cifs_readdata_alloc()

static void
cifs_readv_complete(struct work_struct *work)
{
        unsigned int i;
        struct cifs_readdata *rdata = container_of(work,
                                                struct cifs_readdata, work);

        //For each page
        for (i = 0; i < rdata->nr_pages; i++) {
                struct page *page = rdata->pages[i];

                //add the page to the lru_cache
                lru_cache_add_file(page);

                //Set the page as Uptodate.
                if (rdata->result == 0) {
                        flush_dcache_page(page);
                        SetPageUptodate(page);
                }

                //And unlock the page.
                unlock_page(page);
..
}

As we unlock the page, we unblock the read thread which was blocked
and waiting for the page lock at do_generic_file_read().

We thus complete a successful read.

What happens if we encounter an error when performing an Async read?

We look at the code path when we encounter an error in the response from the server

int
cifs_readv_receive(struct TCP_Server_Info *server, struct mid_q_entry *mid)
{
..
        //We receive the data
        length = cifs_readv_from_socket(server, &rdata->iov, 1, len);
        if (length < 0)
                return length;
        server->total_read += length;
        //If we did encounter an error, discard the data returned.
        /* Was the SMB read successful? */
        rdata->result = server->ops->map_error(buf, false);
        if (rdata->result != 0) {
                cFYI(1, "%s: server returned error %d", __func__,
                        rdata->result);
                return cifs_readv_discard(server, mid);
        }
..
}


The result is stored in rdata->result. A non zero value indicates an error. On encountering
an error, the mid is dequeued.

static int
cifs_readv_discard(struct TCP_Server_Info *server, struct mid_q_entry *mid)
{
..
        dequeue_mid(mid, rdata->result);
        return 0;
}


It returns 0 which is propagated to cifs_readv_receive() -> cifs_demultiplex_thread()

static int
cifs_demultiplex_thread(void *p)
{
..
        struct mid_q_entry *mid_entry;
..
        while (server->tcpStatus != CifsExiting) {
..
                mid_entry = server->ops->find_mid(server, buf);
..
                if (!mid_entry || !mid_entry->receive)
                        length = standard_receive3(server, mid_entry);
                else
                        length = mid_entry->receive(server, mid_entry);

                if (length < 0)
                        continue;
..
                if (mid_entry != NULL) {
                        if (!mid_entry->multiRsp || mid_entry->multiEnd)
                                mid_entry->callback(mid_entry); <-- callback="" i="">
..
}

It then proceeds to call the callback function cifs_readv_callback().
We queue the work and deletes the mid.

static void
cifs_readv_callback(struct mid_q_entry *mid)
{
..
        queue_work(cifsiod_wq, &rdata->work);
        DeleteMidQEntry(mid);
..
}

The thread cifsiod runs the workqueue. This calls the function cifs_readv_complete()

static void
cifs_readv_complete(struct work_struct *work)
{
        unsigned int i;
        struct cifs_readdata *rdata = container_of(work,
                                                struct cifs_readdata, work);

        //For each page
        for (i = 0; i < rdata->nr_pages; i++) {
                struct page *page = rdata->pages[i];

                //Since we hit an error, we do not set
                //page uptodate here
                if (rdata->result == 0) {
                        flush_dcache_page(page);
                        SetPageUptodate(page);
                }

                //We unlock the page.
                unlock_page(page);
..
}

As we unlock the page, we unblock the read thread which was blocked
and waiting for the page lock at do_generic_file_read().

static void do_generic_file_read(struct file *filp, loff_t *ppos,
                read_descriptor_t *desc, read_actor_t actor)
{
..
page_not_up_to_date:
                /* Get exclusive access to the page ... */
                error = lock_page_killable(page);
                if (unlikely(error))
                        goto readpage_error;
                //Once we receive the lock, we proceed.

                //the page is not set as uptodate. We therefore skip this.
                /* Did somebody else fill it already? */
                if (PageUptodate(page)) {
                        unlock_page(page);
                        goto page_ok;
                }
..
readpage:
..
                //Call a_ops->readpage() which calls the sync version of cifs_read
                /* Start the actual read. The read will unlock the page. */
                error = mapping->a_ops->readpage(filp, page);
..
}

On receiving an error when attempting an async read, we ignore the error and drop back to using sync reads.