blockjob.c 34.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
 * QEMU System Emulator block driver
 *
 * Copyright (c) 2011 IBM Corp.
 * Copyright (c) 2012 Red Hat, Inc.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 */

Peter Maydell's avatar
Peter Maydell committed
26
#include "qemu/osdep.h"
27
#include "qemu-common.h"
28
#include "block/block.h"
29
#include "block/blockjob_int.h"
30
#include "block/block_int.h"
31
#include "block/trace.h"
32
#include "sysemu/block-backend.h"
33
#include "qapi/error.h"
34
#include "qapi/qapi-events-block-core.h"
35
#include "qapi/qmp/qerror.h"
36
#include "qemu/coroutine.h"
37
#include "qemu/id.h"
38
#include "qemu/timer.h"
39

40 41 42 43 44
/* Right now, this mutex is only needed to synchronize accesses to job->busy
 * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
 * block_job_enter. */
static QemuMutex block_job_mutex;

45 46
/* BlockJob State Transition Table */
bool BlockJobSTT[BLOCK_JOB_STATUS__MAX][BLOCK_JOB_STATUS__MAX] = {
47 48 49 50 51 52 53 54 55 56 57 58
                                          /* U, C, R, P, Y, S, W, D, X, E, N */
    /* U: */ [BLOCK_JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
    /* C: */ [BLOCK_JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
    /* R: */ [BLOCK_JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
    /* P: */ [BLOCK_JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
    /* Y: */ [BLOCK_JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
    /* S: */ [BLOCK_JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
    /* W: */ [BLOCK_JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
    /* D: */ [BLOCK_JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
    /* X: */ [BLOCK_JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
    /* E: */ [BLOCK_JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
    /* N: */ [BLOCK_JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
59 60
};

61
bool BlockJobVerbTable[BLOCK_JOB_VERB__MAX][BLOCK_JOB_STATUS__MAX] = {
62 63 64 65 66 67
                                          /* U, C, R, P, Y, S, W, D, X, E, N */
    [BLOCK_JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
    [BLOCK_JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
    [BLOCK_JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
    [BLOCK_JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
    [BLOCK_JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
68
    [BLOCK_JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
69
    [BLOCK_JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
70 71
};

72 73 74 75 76 77
static void block_job_state_transition(BlockJob *job, BlockJobStatus s1)
{
    BlockJobStatus s0 = job->status;
    assert(s1 >= 0 && s1 <= BLOCK_JOB_STATUS__MAX);
    trace_block_job_state_transition(job, job->ret, BlockJobSTT[s0][s1] ?
                                     "allowed" : "disallowed",
78 79
                                     BlockJobStatus_str(s0),
                                     BlockJobStatus_str(s1));
80 81 82 83
    assert(BlockJobSTT[s0][s1]);
    job->status = s1;
}

84 85 86
static int block_job_apply_verb(BlockJob *job, BlockJobVerb bv, Error **errp)
{
    assert(bv >= 0 && bv <= BLOCK_JOB_VERB__MAX);
87 88
    trace_block_job_apply_verb(job, BlockJobStatus_str(job->status),
                               BlockJobVerb_str(bv),
89 90 91 92 93 94
                               BlockJobVerbTable[bv][job->status] ?
                               "allowed" : "prohibited");
    if (BlockJobVerbTable[bv][job->status]) {
        return 0;
    }
    error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
95
               job->id, BlockJobStatus_str(job->status), BlockJobVerb_str(bv));
96 97 98
    return -EPERM;
}

99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
static void block_job_lock(void)
{
    qemu_mutex_lock(&block_job_mutex);
}

static void block_job_unlock(void)
{
    qemu_mutex_unlock(&block_job_mutex);
}

static void __attribute__((__constructor__)) block_job_init(void)
{
    qemu_mutex_init(&block_job_mutex);
}

114 115
static void block_job_event_cancelled(BlockJob *job);
static void block_job_event_completed(BlockJob *job, const char *msg);
116
static int block_job_event_pending(BlockJob *job);
117
static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
118

119 120 121 122 123 124 125 126 127 128 129 130 131
/* Transactional group of block jobs */
struct BlockJobTxn {

    /* Is this txn being cancelled? */
    bool aborting;

    /* List of jobs */
    QLIST_HEAD(, BlockJob) jobs;

    /* Reference count */
    int refcnt;
};

132 133
static QLIST_HEAD(, BlockJob) block_jobs = QLIST_HEAD_INITIALIZER(block_jobs);

134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
/*
 * The block job API is composed of two categories of functions.
 *
 * The first includes functions used by the monitor.  The monitor is
 * peculiar in that it accesses the block job list with block_job_get, and
 * therefore needs consistency across block_job_get and the actual operation
 * (e.g. block_job_set_speed).  The consistency is achieved with
 * aio_context_acquire/release.  These functions are declared in blockjob.h.
 *
 * The second includes functions used by the block job drivers and sometimes
 * by the core block layer.  These do not care about locking, because the
 * whole coroutine runs under the AioContext lock, and are declared in
 * blockjob_int.h.
 */

149 150 151 152 153 154 155 156
BlockJob *block_job_next(BlockJob *job)
{
    if (!job) {
        return QLIST_FIRST(&block_jobs);
    }
    return QLIST_NEXT(job, job_list);
}

157 158 159 160 161
BlockJob *block_job_get(const char *id)
{
    BlockJob *job;

    QLIST_FOREACH(job, &block_jobs, job_list) {
162
        if (job->id && !strcmp(id, job->id)) {
163 164 165 166 167 168 169
            return job;
        }
    }

    return NULL;
}

170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
BlockJobTxn *block_job_txn_new(void)
{
    BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
    QLIST_INIT(&txn->jobs);
    txn->refcnt = 1;
    return txn;
}

static void block_job_txn_ref(BlockJobTxn *txn)
{
    txn->refcnt++;
}

void block_job_txn_unref(BlockJobTxn *txn)
{
    if (txn && --txn->refcnt == 0) {
        g_free(txn);
    }
}

void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
{
    if (!txn) {
        return;
    }

    assert(!job->txn);
    job->txn = txn;

    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
    block_job_txn_ref(txn);
}

203 204 205 206 207 208 209 210 211
static void block_job_txn_del_job(BlockJob *job)
{
    if (job->txn) {
        QLIST_REMOVE(job, txn_list);
        block_job_txn_unref(job->txn);
        job->txn = NULL;
    }
}

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
static void block_job_pause(BlockJob *job)
{
    job->pause_count++;
}

static void block_job_resume(BlockJob *job)
{
    assert(job->pause_count > 0);
    job->pause_count--;
    if (job->pause_count) {
        return;
    }
    block_job_enter(job);
}

227
void block_job_ref(BlockJob *job)
228 229 230 231 232 233 234 235
{
    ++job->refcnt;
}

static void block_job_attached_aio_context(AioContext *new_context,
                                           void *opaque);
static void block_job_detach_aio_context(void *opaque);

236
void block_job_unref(BlockJob *job)
237 238
{
    if (--job->refcnt == 0) {
John Snow's avatar
John Snow committed
239
        assert(job->status == BLOCK_JOB_STATUS_NULL);
240
        assert(!job->txn);
241
        BlockDriverState *bs = blk_bs(job->blk);
242
        QLIST_REMOVE(job, job_list);
243 244 245 246 247 248 249 250
        bs->job = NULL;
        block_job_remove_all_bdrv(job);
        blk_remove_aio_context_notifier(job->blk,
                                        block_job_attached_aio_context,
                                        block_job_detach_aio_context, job);
        blk_unref(job->blk);
        error_free(job->blocker);
        g_free(job->id);
251
        assert(!timer_pending(&job->sleep_timer));
252 253 254 255
        g_free(job);
    }
}

256 257 258 259 260 261 262 263 264 265 266 267
static void block_job_attached_aio_context(AioContext *new_context,
                                           void *opaque)
{
    BlockJob *job = opaque;

    if (job->driver->attached_aio_context) {
        job->driver->attached_aio_context(job, new_context);
    }

    block_job_resume(job);
}

268 269 270 271 272 273 274 275 276 277 278
static void block_job_drain(BlockJob *job)
{
    /* If job is !job->busy this kicks it into the next pause point. */
    block_job_enter(job);

    blk_drain(job->blk);
    if (job->driver->drain) {
        job->driver->drain(job);
    }
}

279 280 281 282 283 284 285 286 287 288
static void block_job_detach_aio_context(void *opaque)
{
    BlockJob *job = opaque;

    /* In case the job terminates during aio_poll()... */
    block_job_ref(job);

    block_job_pause(job);

    while (!job->paused && !job->completed) {
289
        block_job_drain(job);
290 291 292 293 294
    }

    block_job_unref(job);
}

295 296 297 298
static char *child_job_get_parent_desc(BdrvChild *c)
{
    BlockJob *job = c->opaque;
    return g_strdup_printf("%s job '%s'",
299
                           BlockJobType_str(job->driver->job_type),
300 301 302
                           job->id);
}

303
static void child_job_drained_begin(BdrvChild *c)
304
{
305
    BlockJob *job = c->opaque;
306 307 308
    block_job_pause(job);
}

309
static void child_job_drained_end(BdrvChild *c)
310
{
311
    BlockJob *job = c->opaque;
312 313 314
    block_job_resume(job);
}

315 316 317 318 319
static const BdrvChildRole child_job = {
    .get_parent_desc    = child_job_get_parent_desc,
    .drained_begin      = child_job_drained_begin,
    .drained_end        = child_job_drained_end,
    .stay_at_node       = true,
320 321
};

322 323 324 325 326 327 328 329 330 331 332 333
void block_job_remove_all_bdrv(BlockJob *job)
{
    GSList *l;
    for (l = job->nodes; l; l = l->next) {
        BdrvChild *c = l->data;
        bdrv_op_unblock_all(c->bs, job->blocker);
        bdrv_root_unref_child(c);
    }
    g_slist_free(job->nodes);
    job->nodes = NULL;
}

334 335
int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
                       uint64_t perm, uint64_t shared_perm, Error **errp)
336
{
337 338 339 340 341 342 343 344 345
    BdrvChild *c;

    c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
                               job, errp);
    if (c == NULL) {
        return -EPERM;
    }

    job->nodes = g_slist_prepend(job->nodes, c);
346 347
    bdrv_ref(bs);
    bdrv_op_block_all(bs, job->blocker);
348 349

    return 0;
350 351
}

352 353 354 355 356
bool block_job_is_internal(BlockJob *job)
{
    return (job->id == NULL);
}

357 358 359 360 361
static bool block_job_started(BlockJob *job)
{
    return job->co;
}

362 363 364 365 366 367 368 369 370 371 372 373 374
/**
 * All jobs must allow a pause point before entering their job proper. This
 * ensures that jobs can be paused prior to being started, then resumed later.
 */
static void coroutine_fn block_job_co_entry(void *opaque)
{
    BlockJob *job = opaque;

    assert(job && job->driver && job->driver->start);
    block_job_pause_point(job);
    job->driver->start(job);
}

375 376 377 378 379 380 381
static void block_job_sleep_timer_cb(void *opaque)
{
    BlockJob *job = opaque;

    block_job_enter(job);
}

382 383 384
void block_job_start(BlockJob *job)
{
    assert(job && !block_job_started(job) && job->paused &&
385 386 387 388 389
           job->driver && job->driver->start);
    job->co = qemu_coroutine_create(block_job_co_entry, job);
    job->pause_count--;
    job->busy = true;
    job->paused = false;
390
    block_job_state_transition(job, BLOCK_JOB_STATUS_RUNNING);
391
    bdrv_coroutine_enter(blk_bs(job->blk), job->co);
392 393
}

John Snow's avatar
John Snow committed
394 395 396 397 398 399 400
static void block_job_decommission(BlockJob *job)
{
    assert(job);
    job->completed = true;
    job->busy = false;
    job->paused = false;
    job->deferred_to_main_loop = true;
401
    block_job_txn_del_job(job);
John Snow's avatar
John Snow committed
402 403 404 405
    block_job_state_transition(job, BLOCK_JOB_STATUS_NULL);
    block_job_unref(job);
}

406 407 408 409 410
static void block_job_do_dismiss(BlockJob *job)
{
    block_job_decommission(job);
}

411 412 413
static void block_job_conclude(BlockJob *job)
{
    block_job_state_transition(job, BLOCK_JOB_STATUS_CONCLUDED);
414 415 416
    if (job->auto_dismiss || !block_job_started(job)) {
        block_job_do_dismiss(job);
    }
417 418
}

419 420 421 422 423 424 425 426 427 428
static void block_job_update_rc(BlockJob *job)
{
    if (!job->ret && block_job_is_cancelled(job)) {
        job->ret = -ECANCELED;
    }
    if (job->ret) {
        block_job_state_transition(job, BLOCK_JOB_STATUS_ABORTING);
    }
}

429 430 431 432 433 434 435 436
static int block_job_prepare(BlockJob *job)
{
    if (job->ret == 0 && job->driver->prepare) {
        job->ret = job->driver->prepare(job);
    }
    return job->ret;
}

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
static void block_job_commit(BlockJob *job)
{
    assert(!job->ret);
    if (job->driver->commit) {
        job->driver->commit(job);
    }
}

static void block_job_abort(BlockJob *job)
{
    assert(job->ret);
    if (job->driver->abort) {
        job->driver->abort(job);
    }
}

static void block_job_clean(BlockJob *job)
{
    if (job->driver->clean) {
        job->driver->clean(job);
    }
}

460
static int block_job_finalize_single(BlockJob *job)
461
{
462 463
    assert(job->completed);

464 465
    /* Ensure abort is called for late-transactional failures */
    block_job_update_rc(job);
466

467
    if (!job->ret) {
468
        block_job_commit(job);
469
    } else {
470
        block_job_abort(job);
471
    }
472
    block_job_clean(job);
473 474 475 476

    if (job->cb) {
        job->cb(job->opaque, job->ret);
    }
477 478 479 480 481 482 483 484 485 486 487

    /* Emit events only if we actually started */
    if (block_job_started(job)) {
        if (block_job_is_cancelled(job)) {
            block_job_event_cancelled(job);
        } else {
            const char *msg = NULL;
            if (job->ret < 0) {
                msg = strerror(-job->ret);
            }
            block_job_event_completed(job, msg);
488 489 490
        }
    }

491
    block_job_txn_del_job(job);
492
    block_job_conclude(job);
493
    return 0;
494 495
}

496
static void block_job_cancel_async(BlockJob *job, bool force)
497 498 499 500 501 502 503 504 505 506
{
    if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
        block_job_iostatus_reset(job);
    }
    if (job->user_paused) {
        /* Do not call block_job_enter here, the caller will handle it.  */
        job->user_paused = false;
        job->pause_count--;
    }
    job->cancelled = true;
507 508
    /* To prevent 'force == false' overriding a previous 'force == true' */
    job->force |= force;
509 510
}

511
static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
512 513 514
{
    AioContext *ctx;
    BlockJob *job, *next;
515
    int rc = 0;
516 517

    QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
518 519 520 521
        if (lock) {
            ctx = blk_get_aio_context(job->blk);
            aio_context_acquire(ctx);
        }
522
        rc = fn(job);
523 524 525
        if (lock) {
            aio_context_release(ctx);
        }
526 527 528
        if (rc) {
            break;
        }
529
    }
530
    return rc;
531 532
}

533 534 535 536 537 538 539 540 541 542 543
static int block_job_finish_sync(BlockJob *job,
                                 void (*finish)(BlockJob *, Error **errp),
                                 Error **errp)
{
    Error *local_err = NULL;
    int ret;

    assert(blk_bs(job->blk)->job == job);

    block_job_ref(job);

544 545 546
    if (finish) {
        finish(job, &local_err);
    }
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
    if (local_err) {
        error_propagate(errp, local_err);
        block_job_unref(job);
        return -EBUSY;
    }
    /* block_job_drain calls block_job_enter, and it should be enough to
     * induce progress until the job completes or moves to the main thread.
    */
    while (!job->deferred_to_main_loop && !job->completed) {
        block_job_drain(job);
    }
    while (!job->completed) {
        aio_poll(qemu_get_aio_context(), true);
    }
    ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
    block_job_unref(job);
    return ret;
}

566 567 568 569
static void block_job_completed_txn_abort(BlockJob *job)
{
    AioContext *ctx;
    BlockJobTxn *txn = job->txn;
570
    BlockJob *other_job;
571 572 573 574 575 576 577 578

    if (txn->aborting) {
        /*
         * We are cancelled by another job, which will handle everything.
         */
        return;
    }
    txn->aborting = true;
579 580
    block_job_txn_ref(txn);

581 582
    /* We are the first failed job. Cancel other jobs. */
    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
583
        ctx = blk_get_aio_context(other_job->blk);
584 585
        aio_context_acquire(ctx);
    }
586 587 588 589

    /* Other jobs are effectively cancelled by us, set the status for
     * them; this job, however, may or may not be cancelled, depending
     * on the caller, so leave it. */
590
    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
591
        if (other_job != job) {
592
            block_job_cancel_async(other_job, false);
593 594
        }
    }
595 596
    while (!QLIST_EMPTY(&txn->jobs)) {
        other_job = QLIST_FIRST(&txn->jobs);
597
        ctx = blk_get_aio_context(other_job->blk);
598 599 600 601
        if (!other_job->completed) {
            assert(other_job->cancelled);
            block_job_finish_sync(other_job, NULL, NULL);
        }
602
        block_job_finalize_single(other_job);
603 604
        aio_context_release(ctx);
    }
605 606

    block_job_txn_unref(txn);
607 608
}

609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
static int block_job_needs_finalize(BlockJob *job)
{
    return !job->auto_finalize;
}

static void block_job_do_finalize(BlockJob *job)
{
    int rc;
    assert(job && job->txn);

    /* prepare the transaction to complete */
    rc = block_job_txn_apply(job->txn, block_job_prepare, true);
    if (rc) {
        block_job_completed_txn_abort(job);
    } else {
        block_job_txn_apply(job->txn, block_job_finalize_single, true);
    }
}

628 629 630
static void block_job_completed_txn_success(BlockJob *job)
{
    BlockJobTxn *txn = job->txn;
631
    BlockJob *other_job;
632

633 634
    block_job_state_transition(job, BLOCK_JOB_STATUS_WAITING);

635 636 637 638 639 640 641 642 643 644
    /*
     * Successful completion, see if there are other running jobs in this
     * txn.
     */
    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
        if (!other_job->completed) {
            return;
        }
        assert(other_job->ret == 0);
    }
645

646
    block_job_txn_apply(txn, block_job_event_pending, false);
647 648 649 650 651

    /* If no jobs need manual finalization, automatically do so */
    if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
        block_job_do_finalize(job);
    }
652 653
}

654 655 656 657 658 659
/* Assumes the block_job_mutex is held */
static bool block_job_timer_pending(BlockJob *job)
{
    return timer_pending(&job->sleep_timer);
}

660 661 662
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
{
    Error *local_err = NULL;
663
    int64_t old_speed = job->speed;
664

665
    if (!job->driver->set_speed) {
666
        error_setg(errp, QERR_UNSUPPORTED);
667 668
        return;
    }
669 670 671
    if (block_job_apply_verb(job, BLOCK_JOB_VERB_SET_SPEED, errp)) {
        return;
    }
672
    job->driver->set_speed(job, speed, &local_err);
673
    if (local_err) {
674 675 676 677 678
        error_propagate(errp, local_err);
        return;
    }

    job->speed = speed;
679
    if (speed && speed <= old_speed) {
680 681 682 683 684
        return;
    }

    /* kick only if a timer is pending */
    block_job_enter_cond(job, block_job_timer_pending);
685 686
}

687 688
void block_job_complete(BlockJob *job, Error **errp)
{
689 690
    /* Should not be reachable via external interface for internal jobs */
    assert(job->id);
691 692 693 694
    if (block_job_apply_verb(job, BLOCK_JOB_VERB_COMPLETE, errp)) {
        return;
    }
    if (job->pause_count || job->cancelled || !job->driver->complete) {
695 696
        error_setg(errp, "The active block job '%s' cannot be completed",
                   job->id);
697 698 699
        return;
    }

700
    job->driver->complete(job, errp);
701 702
}

703 704 705 706 707 708 709 710 711
void block_job_finalize(BlockJob *job, Error **errp)
{
    assert(job && job->id && job->txn);
    if (block_job_apply_verb(job, BLOCK_JOB_VERB_FINALIZE, errp)) {
        return;
    }
    block_job_do_finalize(job);
}

712 713 714 715 716 717 718 719 720 721 722 723 724
void block_job_dismiss(BlockJob **jobptr, Error **errp)
{
    BlockJob *job = *jobptr;
    /* similarly to _complete, this is QMP-interface only. */
    assert(job->id);
    if (block_job_apply_verb(job, BLOCK_JOB_VERB_DISMISS, errp)) {
        return;
    }

    block_job_do_dismiss(job);
    *jobptr = NULL;
}

725
void block_job_user_pause(BlockJob *job, Error **errp)
726
{
727 728 729 730 731 732 733
    if (block_job_apply_verb(job, BLOCK_JOB_VERB_PAUSE, errp)) {
        return;
    }
    if (job->user_paused) {
        error_setg(errp, "Job is already paused");
        return;
    }
734 735 736 737 738 739
    job->user_paused = true;
    block_job_pause(job);
}

bool block_job_user_paused(BlockJob *job)
{
740
    return job->user_paused;
741 742
}

743
void block_job_user_resume(BlockJob *job, Error **errp)
744
{
745 746 747 748
    assert(job);
    if (!job->user_paused || job->pause_count <= 0) {
        error_setg(errp, "Can't resume a job that was not paused");
        return;
749
    }
750 751 752 753 754 755
    if (block_job_apply_verb(job, BLOCK_JOB_VERB_RESUME, errp)) {
        return;
    }
    block_job_iostatus_reset(job);
    job->user_paused = false;
    block_job_resume(job);
756 757
}

758
void block_job_cancel(BlockJob *job, bool force)
759
{
760
    if (job->status == BLOCK_JOB_STATUS_CONCLUDED) {
761
        block_job_do_dismiss(job);
762 763
        return;
    }
764
    block_job_cancel_async(job, force);
765
    if (!block_job_started(job)) {
766
        block_job_completed(job, -ECANCELED);
767 768 769 770
    } else if (job->deferred_to_main_loop) {
        block_job_completed_txn_abort(job);
    } else {
        block_job_enter(job);
771
    }
772 773
}

774
void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
775 776 777 778
{
    if (block_job_apply_verb(job, BLOCK_JOB_VERB_CANCEL, errp)) {
        return;
    }
779
    block_job_cancel(job, force);
780 781
}

782 783 784 785 786
/* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
 * used with block_job_finish_sync() without the need for (rather nasty)
 * function pointer casts there. */
static void block_job_cancel_err(BlockJob *job, Error **errp)
{
787
    block_job_cancel(job, false);
788 789 790 791 792 793 794
}

int block_job_cancel_sync(BlockJob *job)
{
    return block_job_finish_sync(job, &block_job_cancel_err, NULL);
}

795 796 797 798 799 800
void block_job_cancel_sync_all(void)
{
    BlockJob *job;
    AioContext *aio_context;

    while ((job = QLIST_FIRST(&block_jobs))) {
801
        aio_context = blk_get_aio_context(job->blk);
802 803 804 805 806 807
        aio_context_acquire(aio_context);
        block_job_cancel_sync(job);
        aio_context_release(aio_context);
    }
}

808 809 810 811 812
int block_job_complete_sync(BlockJob *job, Error **errp)
{
    return block_job_finish_sync(job, &block_job_complete, errp);
}

813
BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
814
{
815 816 817 818 819 820 821
    BlockJobInfo *info;

    if (block_job_is_internal(job)) {
        error_setg(errp, "Cannot query QEMU internal jobs");
        return NULL;
    }
    info = g_new0(BlockJobInfo, 1);
822
    info->type      = g_strdup(BlockJobType_str(job->driver->job_type));
823
    info->device    = g_strdup(job->id);
824
    info->len       = job->len;
825
    info->busy      = atomic_read(&job->busy);
Fam Zheng's avatar
Fam Zheng committed
826
    info->paused    = job->pause_count > 0;
827 828 829
    info->offset    = job->offset;
    info->speed     = job->speed;
    info->io_status = job->iostatus;
Max Reitz's avatar
Max Reitz committed
830
    info->ready     = job->ready;
John Snow's avatar
John Snow committed
831
    info->status    = job->status;
832 833
    info->auto_finalize = job->auto_finalize;
    info->auto_dismiss  = job->auto_dismiss;
834 835
    return info;
}
836 837 838 839 840 841 842 843 844

static void block_job_iostatus_set_err(BlockJob *job, int error)
{
    if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
        job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
                                          BLOCK_DEVICE_IO_STATUS_FAILED;
    }
}

845
static void block_job_event_cancelled(BlockJob *job)
846
{
847 848 849 850
    if (block_job_is_internal(job)) {
        return;
    }

851
    qapi_event_send_block_job_cancelled(job->driver->job_type,
852
                                        job->id,
853 854 855 856 857
                                        job->len,
                                        job->offset,
                                        job->speed,
                                        &error_abort);
}
858

859
static void block_job_event_completed(BlockJob *job, const char *msg)
860
{
861 862 863 864
    if (block_job_is_internal(job)) {
        return;
    }

865
    qapi_event_send_block_job_completed(job->driver->job_type,
866
                                        job->id,
867 868 869 870 871 872
                                        job->len,
                                        job->offset,
                                        job->speed,
                                        !!msg,
                                        msg,
                                        &error_abort);
873 874
}

875 876 877 878 879 880 881 882 883 884 885
static int block_job_event_pending(BlockJob *job)
{
    block_job_state_transition(job, BLOCK_JOB_STATUS_PENDING);
    if (!job->auto_finalize && !block_job_is_internal(job)) {
        qapi_event_send_block_job_pending(job->driver->job_type,
                                          job->id,
                                          &error_abort);
    }
    return 0;
}

886 887 888 889 890 891
/*
 * API for block job drivers and the block layer.  These functions are
 * declared in blockjob_int.h.
 */

void *block_job_create(const char *job_id, const BlockJobDriver *driver,
892
                       BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
                       uint64_t shared_perm, int64_t speed, int flags,
                       BlockCompletionFunc *cb, void *opaque, Error **errp)
{
    BlockBackend *blk;
    BlockJob *job;
    int ret;

    if (bs->job) {
        error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
        return NULL;
    }

    if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
        job_id = bdrv_get_device_name(bs);
        if (!*job_id) {
            error_setg(errp, "An explicit job ID is required for this node");
            return NULL;
        }
    }

    if (job_id) {
        if (flags & BLOCK_JOB_INTERNAL) {
            error_setg(errp, "Cannot specify job ID for internal block job");
            return NULL;
        }

        if (!id_wellformed(job_id)) {
            error_setg(errp, "Invalid job ID '%s'", job_id);
            return NULL;
        }

        if (block_job_get(job_id)) {
            error_setg(errp, "Job ID '%s' already in use", job_id);
            return NULL;
        }
    }

    blk = blk_new(perm, shared_perm);
    ret = blk_insert_bs(blk, bs, errp);
    if (ret < 0) {
        blk_unref(blk);
        return NULL;
    }

    job = g_malloc0(driver->instance_size);
    job->driver        = driver;
    job->id            = g_strdup(job_id);
    job->blk           = blk;
    job->cb            = cb;
    job->opaque        = opaque;
    job->busy          = false;
    job->paused        = true;
    job->pause_count   = 1;
    job->refcnt        = 1;
947
    job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
948
    job->auto_dismiss  = !(flags & BLOCK_JOB_MANUAL_DISMISS);
949
    block_job_state_transition(job, BLOCK_JOB_STATUS_CREATED);
950 951 952
    aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
                   QEMU_CLOCK_REALTIME, SCALE_NS,
                   block_job_sleep_timer_cb, job);
953 954

    error_setg(&job->blocker, "block device is in use by block job: %s",
955
               BlockJobType_str(driver->job_type));
956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
    block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
    bs->job = job;

    bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);

    QLIST_INSERT_HEAD(&block_jobs, job, job_list);

    blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
                                 block_job_detach_aio_context, job);

    /* Only set speed when necessary to avoid NotSupported error */
    if (speed != 0) {
        Error *local_err = NULL;

        block_job_set_speed(job, speed, &local_err);
        if (local_err) {
John Snow's avatar
John Snow committed
972
            block_job_early_fail(job);
973 974 975 976
            error_propagate(errp, local_err);
            return NULL;
        }
    }
977 978 979 980 981 982 983 984 985 986 987

    /* Single jobs are modeled as single-job transactions for sake of
     * consolidating the job management logic */
    if (!txn) {
        txn = block_job_txn_new();
        block_job_txn_add_job(txn, job);
        block_job_txn_unref(txn);
    } else {
        block_job_txn_add_job(txn, job);
    }

988 989 990
    return job;
}

991 992 993 994 995 996 997
void block_job_pause_all(void)
{
    BlockJob *job = NULL;
    while ((job = block_job_next(job))) {
        AioContext *aio_context = blk_get_aio_context(job->blk);

        aio_context_acquire(aio_context);
998
        block_job_ref(job);
999 1000 1001 1002 1003
        block_job_pause(job);
        aio_context_release(aio_context);
    }
}

1004 1005
void block_job_early_fail(BlockJob *job)
{
John Snow's avatar
John Snow committed
1006 1007
    assert(job->status == BLOCK_JOB_STATUS_CREATED);
    block_job_decommission(job);
1008 1009 1010 1011
}

void block_job_completed(BlockJob *job, int ret)
{
1012
    assert(job && job->txn && !job->completed);
1013 1014 1015
    assert(blk_bs(job->blk)->job == job);
    job->completed = true;
    job->ret = ret;
1016 1017 1018
    block_job_update_rc(job);
    trace_block_job_completed(job, ret, job->ret);
    if (job->ret) {
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
        block_job_completed_txn_abort(job);
    } else {
        block_job_completed_txn_success(job);
    }
}

static bool block_job_should_pause(BlockJob *job)
{
    return job->pause_count > 0;
}

1030 1031 1032 1033 1034 1035 1036
/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
 * Reentering the job coroutine with block_job_enter() before the timer has
 * expired is allowed and cancels the timer.
 *
 * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
 * called explicitly. */
static void block_job_do_yield(BlockJob *job, uint64_t ns)
1037
{
1038 1039 1040 1041
    block_job_lock();
    if (ns != -1) {
        timer_mod(&job->sleep_timer, ns);
    }
1042
    job->busy = false;
1043
    block_job_unlock();
1044 1045 1046 1047 1048 1049
    qemu_coroutine_yield();

    /* Set by block_job_enter before re-entering the coroutine.  */
    assert(job->busy);
}

1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
void coroutine_fn block_job_pause_point(BlockJob *job)
{
    assert(job && block_job_started(job));

    if (!block_job_should_pause(job)) {
        return;
    }
    if (block_job_is_cancelled(job)) {
        return;
    }

    if (job->driver->pause) {
        job->driver->pause(job);
    }

    if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
John Snow's avatar
John Snow committed
1066
        BlockJobStatus status = job->status;
1067 1068 1069
        block_job_state_transition(job, status == BLOCK_JOB_STATUS_READY ? \
                                   BLOCK_JOB_STATUS_STANDBY :           \
                                   BLOCK_JOB_STATUS_PAUSED);
1070
        job->paused = true;
1071
        block_job_do_yield(job, -1);
1072
        job->paused = false;
1073
        block_job_state_transition(job, status);
1074 1075 1076 1077 1078 1079 1080
    }

    if (job->driver->resume) {
        job->driver->resume(job);
    }
}

1081 1082
void block_job_resume_all(void)
{
1083 1084 1085
    BlockJob *job, *next;

    QLIST_FOREACH_SAFE(job, &block_jobs, job_list, next) {
1086 1087 1088 1089
        AioContext *aio_context = blk_get_aio_context(job->blk);

        aio_context_acquire(aio_context);
        block_job_resume(job);
1090
        block_job_unref(job);
1091 1092 1093 1094
        aio_context_release(aio_context);
    }
}

1095 1096 1097 1098 1099
/*
 * Conditionally enter a block_job pending a call to fn() while
 * under the block_job_lock critical section.
 */
static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1100
{
1101 1102 1103 1104 1105 1106 1107
    if (!block_job_started(job)) {
        return;
    }
    if (job->deferred_to_main_loop) {
        return;
    }

1108
    block_job_lock();
1109
    if (job->busy) {
1110
        block_job_unlock();
1111
        return;
1112
    }
1113

1114 1115 1116 1117 1118
    if (fn && !fn(job)) {
        block_job_unlock();
        return;
    }

1119 1120
    assert(!job->deferred_to_main_loop);
    timer_del(&job->sleep_timer);
1121
    job->busy = true;
1122
    block_job_unlock();
1123
    aio_co_wake(job->co);
1124 1125
}

1126 1127 1128 1129 1130
void block_job_enter(BlockJob *job)
{
    block_job_enter_cond(job, NULL);
}

1131 1132 1133 1134 1135
bool block_job_is_cancelled(BlockJob *job)
{
    return job->cancelled;
}

1136
void block_job_sleep_ns(BlockJob *job, int64_t ns)
1137 1138 1139 1140 1141 1142 1143 1144 1145
{
    assert(job->busy);

    /* Check cancellation *before* setting busy = false, too!  */
    if (block_job_is_cancelled(job)) {
        return;
    }

    if (!block_job_should_pause(job)) {
1146
        block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161
    }

    block_job_pause_point(job);
}

void block_job_yield(BlockJob *job)
{
    assert(job->busy);

    /* Check cancellation *before* setting busy = false, too!  */
    if (block_job_is_cancelled(job)) {
        return;
    }

    if (!block_job_should_pause(job)) {
1162
        block_job_do_yield(job, -1);
1163 1164 1165 1166 1167
    }

    block_job_pause_point(job);
}

1168 1169
void block_job_iostatus_reset(BlockJob *job)
{
1170 1171 1172 1173
    if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
        return;
    }
    assert(job->user_paused && job->pause_count > 0);
1174 1175 1176
    job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
}

1177
void block_job_event_ready(BlockJob *job)
1178
{
1179
    block_job_state_transition(job, BLOCK_JOB_STATUS_READY);
Max Reitz's avatar
Max Reitz committed
1180 1181
    job->ready = true;

1182 1183 1184 1185
    if (block_job_is_internal(job)) {
        return;
    }

1186
    qapi_event_send_block_job_ready(job->driver->job_type,
1187
                                    job->id,
1188 1189 1190
                                    job->len,
                                    job->offset,
                                    job->speed, &error_abort);
1191 1192
}

1193
BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1194 1195 1196 1197 1198 1199
                                        int is_read, int error)
{
    BlockErrorAction action;

    switch (on_err) {
    case BLOCKDEV_ON_ERROR_ENOSPC:
1200
    case BLOCKDEV_ON_ERROR_AUTO:
1201 1202
        action = (error == ENOSPC) ?
                 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1203 1204
        break;
    case BLOCKDEV_ON_ERROR_STOP:
1205
        action = BLOCK_ERROR_ACTION_STOP;
1206 1207
        break;
    case BLOCKDEV_ON_ERROR_REPORT:
1208
        action = BLOCK_ERROR_ACTION_REPORT;
1209 1210
        break;
    case BLOCKDEV_ON_ERROR_IGNORE:
1211
        action = BLOCK_ERROR_ACTION_IGNORE;
1212 1213 1214 1215
        break;
    default:
        abort();
    }
1216 1217 1218 1219 1220 1221
    if (!block_job_is_internal(job)) {
        qapi_event_send_block_job_error(job->id,
                                        is_read ? IO_OPERATION_TYPE_READ :
                                        IO_OPERATION_TYPE_WRITE,
                                        action, &error_abort);
    }
1222
    if (action == BLOCK_ERROR_ACTION_STOP) {
1223
        block_job_pause(job);
Fam Zheng's avatar
Fam Zheng committed
1224
        /* make the pause user visible, which will be resumed from QMP. */
1225
        job->user_paused = true;
1226 1227 1228 1229
        block_job_iostatus_set_err(job, error);
    }
    return action;
}
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246

typedef struct {
    BlockJob *job;
    AioContext *aio_context;
    BlockJobDeferToMainLoopFn *fn;
    void *opaque;
} BlockJobDeferToMainLoopData;

static void block_job_defer_to_main_loop_bh(void *opaque)
{
    BlockJobDeferToMainLoopData *data = opaque;
    AioContext *aio_context;

    /* Prevent race with block_job_defer_to_main_loop() */
    aio_context_acquire(data->aio_context);

    /* Fetch BDS AioContext again, in case it has changed */
1247
    aio_context = blk_get_aio_context(data->job->blk);
1248 1249 1250
    if (aio_context != data->aio_context) {
        aio_context_acquire(aio_context);
    }
1251 1252 1253

    data->fn(data->job, data->opaque);

1254 1255 1256
    if (aio_context != data->aio_context) {
        aio_context_release(aio_context);
    }
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268

    aio_context_release(data->aio_context);

    g_free(data);
}

void block_job_defer_to_main_loop(BlockJob *job,
                                  BlockJobDeferToMainLoopFn *fn,
                                  void *opaque)
{
    BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
    data->job = job;
1269
    data->aio_context = blk_get_aio_context(job->blk);
1270 1271
    data->fn = fn;
    data->opaque = opaque;
1272
    job->deferred_to_main_loop = true;
1273

1274 1275
    aio_bh_schedule_oneshot(qemu_get_aio_context(),
                            block_job_defer_to_main_loop_bh, data);
1276
}