| Line | Exclusive | Inclusive | Code |
|---|---|---|---|
| 1 | # This file is a part of Julia. License is MIT: https://julialang.org/license | ||
| 2 | |||
| 3 | ## basic task functions and TLS | ||
| 4 | |||
| 5 | Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer()) | ||
| 6 | |||
| 7 | # Container for a captured exception and its backtrace. Can be serialized. | ||
| 8 | struct CapturedException <: Exception | ||
| 9 | ex::Any | ||
| 10 | processed_bt::Vector{Any} | ||
| 11 | |||
| 12 | function CapturedException(ex, bt_raw::Vector) | ||
| 13 | # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace | ||
| 14 | # Typically the result of a catch_backtrace() | ||
| 15 | |||
| 16 | # Process bt_raw so that it can be safely serialized | ||
| 17 | bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines. | ||
| 18 | CapturedException(ex, bt_lines) | ||
| 19 | end | ||
| 20 | |||
| 21 | CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt) | ||
| 22 | end | ||
| 23 | |||
| 24 | function showerror(io::IO, ce::CapturedException) | ||
| 25 | showerror(io, ce.ex, ce.processed_bt, backtrace=true) | ||
| 26 | end | ||
| 27 | |||
| 28 | """ | ||
| 29 | CompositeException | ||
| 30 | |||
| 31 | Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel | ||
| 32 | or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions. | ||
| 33 | For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will | ||
| 34 | contain a "bundle" of information from each worker indicating where and why the exception(s) occurred. | ||
| 35 | """ | ||
| 36 | struct CompositeException <: Exception | ||
| 37 | exceptions::Vector{Any} | ||
| 38 | CompositeException() = new(Any[]) | ||
| 39 | CompositeException(exceptions) = new(exceptions) | ||
| 40 | end | ||
| 41 | length(c::CompositeException) = length(c.exceptions) | ||
| 42 | push!(c::CompositeException, ex) = push!(c.exceptions, ex) | ||
| 43 | isempty(c::CompositeException) = isempty(c.exceptions) | ||
| 44 | iterate(c::CompositeException, state...) = iterate(c.exceptions, state...) | ||
| 45 | eltype(::Type{CompositeException}) = Any | ||
| 46 | |||
| 47 | function showerror(io::IO, ex::CompositeException) | ||
| 48 | if !isempty(ex) | ||
| 49 | showerror(io, ex.exceptions[1]) | ||
| 50 | remaining = length(ex) - 1 | ||
| 51 | if remaining > 0 | ||
| 52 | print(io, string("\n\n...and ", remaining, " more exception(s).\n")) | ||
| 53 | end | ||
| 54 | else | ||
| 55 | print(io, "CompositeException()\n") | ||
| 56 | end | ||
| 57 | end | ||
| 58 | |||
| 59 | """ | ||
| 60 | TaskFailedException | ||
| 61 | |||
| 62 | This exception is thrown by a `wait(t)` call when task `t` fails. | ||
| 63 | `TaskFailedException` wraps the failed task `t`. | ||
| 64 | """ | ||
| 65 | struct TaskFailedException <: Exception | ||
| 66 | task::Task | ||
| 67 | end | ||
| 68 | |||
| 69 | function showerror(io::IO, ex::TaskFailedException) | ||
| 70 | stacks = [] | ||
| 71 | while isa(ex.task.exception, TaskFailedException) | ||
| 72 | pushfirst!(stacks, ex.task.backtrace) | ||
| 73 | ex = ex.task.exception | ||
| 74 | end | ||
| 75 | println(io, "TaskFailedException:") | ||
| 76 | showerror(io, ex.task.exception, ex.task.backtrace) | ||
| 77 | if !isempty(stacks) | ||
| 78 | for bt in stacks | ||
| 79 | show_backtrace(io, bt) | ||
| 80 | end | ||
| 81 | end | ||
| 82 | end | ||
| 83 | |||
| 84 | function show(io::IO, t::Task) | ||
| 85 | print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))") | ||
| 86 | end | ||
| 87 | |||
| 88 | """ | ||
| 89 | @task | ||
| 90 | |||
| 91 | Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only | ||
| 92 | creates a task, and does not run it. | ||
| 93 | |||
| 94 | # Examples | ||
| 95 | ```jldoctest | ||
| 96 | julia> a1() = sum(i for i in 1:1000); | ||
| 97 | |||
| 98 | julia> b = @task a1(); | ||
| 99 | |||
| 100 | julia> istaskstarted(b) | ||
| 101 | false | ||
| 102 | |||
| 103 | julia> schedule(b); | ||
| 104 | |||
| 105 | julia> yield(); | ||
| 106 | |||
| 107 | julia> istaskdone(b) | ||
| 108 | true | ||
| 109 | ``` | ||
| 110 | """ | ||
| 111 | macro task(ex) | ||
| 112 | :(Task(()->$(esc(ex)))) | ||
| 113 | end | ||
| 114 | |||
| 115 | """ | ||
| 116 | current_task() | ||
| 117 | |||
| 118 | Get the currently running [`Task`](@ref). | ||
| 119 | """ | ||
| 120 | current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) | ||
| 121 | |||
| 122 | """ | ||
| 123 | istaskdone(t::Task) -> Bool | ||
| 124 | |||
| 125 | Determine whether a task has exited. | ||
| 126 | |||
| 127 | # Examples | ||
| 128 | ```jldoctest | ||
| 129 | julia> a2() = sum(i for i in 1:1000); | ||
| 130 | |||
| 131 | julia> b = Task(a2); | ||
| 132 | |||
| 133 | julia> istaskdone(b) | ||
| 134 | false | ||
| 135 | |||
| 136 | julia> schedule(b); | ||
| 137 | |||
| 138 | julia> yield(); | ||
| 139 | |||
| 140 | julia> istaskdone(b) | ||
| 141 | true | ||
| 142 | ``` | ||
| 143 | """ | ||
| 144 | istaskdone(t::Task) = ((t.state === :done) | istaskfailed(t)) | ||
| 145 | |||
| 146 | """ | ||
| 147 | istaskstarted(t::Task) -> Bool | ||
| 148 | |||
| 149 | Determine whether a task has started executing. | ||
| 150 | |||
| 151 | # Examples | ||
| 152 | ```jldoctest | ||
| 153 | julia> a3() = sum(i for i in 1:1000); | ||
| 154 | |||
| 155 | julia> b = Task(a3); | ||
| 156 | |||
| 157 | julia> istaskstarted(b) | ||
| 158 | false | ||
| 159 | ``` | ||
| 160 | """ | ||
| 161 | istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0 | ||
| 162 | |||
| 163 | """ | ||
| 164 | istaskfailed(t::Task) -> Bool | ||
| 165 | |||
| 166 | Determine whether a task has exited because an exception was thrown. | ||
| 167 | |||
| 168 | # Examples | ||
| 169 | ```jldoctest | ||
| 170 | julia> a4() = error("task failed"); | ||
| 171 | |||
| 172 | julia> b = Task(a4); | ||
| 173 | |||
| 174 | julia> istaskfailed(b) | ||
| 175 | false | ||
| 176 | |||
| 177 | julia> schedule(b); | ||
| 178 | |||
| 179 | julia> yield(); | ||
| 180 | |||
| 181 | julia> istaskfailed(b) | ||
| 182 | true | ||
| 183 | ``` | ||
| 184 | """ | ||
| 185 | istaskfailed(t::Task) = (t.state === :failed) | ||
| 186 | |||
| 187 | Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1) | ||
| 188 | |||
| 189 | task_result(t::Task) = t.result | ||
| 190 | |||
| 191 | task_local_storage() = get_task_tls(current_task()) | ||
| 192 | function get_task_tls(t::Task) | ||
| 193 | if t.storage === nothing | ||
| 194 | t.storage = IdDict() | ||
| 195 | end | ||
| 196 | return (t.storage)::IdDict{Any,Any} | ||
| 197 | end | ||
| 198 | |||
| 199 | """ | ||
| 200 | task_local_storage(key) | ||
| 201 | |||
| 202 | Look up the value of a key in the current task's task-local storage. | ||
| 203 | """ | ||
| 204 | task_local_storage(key) = task_local_storage()[key] | ||
| 205 | |||
| 206 | """ | ||
| 207 | task_local_storage(key, value) | ||
| 208 | |||
| 209 | Assign a value to a key in the current task's task-local storage. | ||
| 210 | """ | ||
| 211 | task_local_storage(key, val) = (task_local_storage()[key] = val) | ||
| 212 | |||
| 213 | """ | ||
| 214 | task_local_storage(body, key, value) | ||
| 215 | |||
| 216 | Call the function `body` with a modified task-local storage, in which `value` is assigned to | ||
| 217 | `key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful | ||
| 218 | for emulating dynamic scoping. | ||
| 219 | """ | ||
| 220 | function task_local_storage(body::Function, key, val) | ||
| 221 | tls = task_local_storage() | ||
| 222 | hadkey = haskey(tls, key) | ||
| 223 | old = get(tls, key, nothing) | ||
| 224 | tls[key] = val | ||
| 225 | try | ||
| 226 | return body() | ||
| 227 | finally | ||
| 228 | hadkey ? (tls[key] = old) : delete!(tls, key) | ||
| 229 | end | ||
| 230 | end | ||
| 231 | |||
| 232 | # just wait for a task to be done, no error propagation | ||
| 233 | function _wait(t::Task) | ||
| 234 | if !istaskdone(t) | ||
| 235 | lock(t.donenotify) | ||
| 236 | try | ||
| 237 | while !istaskdone(t) | ||
| 238 | wait(t.donenotify) | ||
| 239 | end | ||
| 240 | finally | ||
| 241 | unlock(t.donenotify) | ||
| 242 | end | ||
| 243 | end | ||
| 244 | nothing | ||
| 245 | end | ||
| 246 | |||
| 247 | # have `waiter` wait for `t` | ||
| 248 | function _wait2(t::Task, waiter::Task) | ||
| 249 | if !istaskdone(t) | ||
| 250 | lock(t.donenotify) | ||
| 251 | if !istaskdone(t) | ||
| 252 | push!(t.donenotify.waitq, waiter) | ||
| 253 | unlock(t.donenotify) | ||
| 254 | return nothing | ||
| 255 | else | ||
| 256 | unlock(t.donenotify) | ||
| 257 | end | ||
| 258 | end | ||
| 259 | schedule(waiter) | ||
| 260 | nothing | ||
| 261 | end | ||
| 262 | |||
| 263 | function wait(t::Task) | ||
| 264 | t === current_task() && error("deadlock detected: cannot wait on current task") | ||
| 265 | _wait(t) | ||
| 266 | if istaskfailed(t) | ||
| 267 | throw(TaskFailedException(t)) | ||
| 268 | end | ||
| 269 | nothing | ||
| 270 | end | ||
| 271 | |||
| 272 | fetch(@nospecialize x) = x | ||
| 273 | |||
| 274 | """ | ||
| 275 | fetch(t::Task) | ||
| 276 | |||
| 277 | Wait for a Task to finish, then return its result value. | ||
| 278 | If the task fails with an exception, a `TaskFailedException` (which wraps the failed task) | ||
| 279 | is thrown. | ||
| 280 | """ | ||
| 281 | function fetch(t::Task) | ||
| 282 | wait(t) | ||
| 283 | return task_result(t) | ||
| 284 | end | ||
| 285 | |||
| 286 | |||
| 287 | ## lexically-scoped waiting for multiple items | ||
| 288 | |||
| 289 | function sync_end(refs) | ||
| 290 | local c_ex | ||
| 291 | defined = false | ||
| 292 | for r in refs | ||
| 293 | if isa(r, Task) | ||
| 294 | _wait(r) | ||
| 295 | if istaskfailed(r) | ||
| 296 | if !defined | ||
| 297 | defined = true | ||
| 298 | c_ex = CompositeException() | ||
| 299 | end | ||
| 300 | push!(c_ex, TaskFailedException(r)) | ||
| 301 | end | ||
| 302 | else | ||
| 303 | try | ||
| 304 | wait(r) | ||
| 305 | catch e | ||
| 306 | if !defined | ||
| 307 | defined = true | ||
| 308 | c_ex = CompositeException() | ||
| 309 | end | ||
| 310 | push!(c_ex, e) | ||
| 311 | end | ||
| 312 | end | ||
| 313 | end | ||
| 314 | |||
| 315 | if defined | ||
| 316 | throw(c_ex) | ||
| 317 | end | ||
| 318 | nothing | ||
| 319 | end | ||
| 320 | |||
| 321 | const sync_varname = gensym(:sync) | ||
| 322 | |||
| 323 | """ | ||
| 324 | @sync | ||
| 325 | |||
| 326 | Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed` | ||
| 327 | are complete. All exceptions thrown by enclosed async operations are collected and thrown as | ||
| 328 | a `CompositeException`. | ||
| 329 | """ | ||
| 330 | macro sync(block) | ||
| 331 | var = esc(sync_varname) | ||
| 332 | quote | ||
| 333 | let $var = Any[] | ||
| 334 | v = $(esc(block)) | ||
| 335 | sync_end($var) | ||
| 336 | v | ||
| 337 | end | ||
| 338 | end | ||
| 339 | end | ||
| 340 | |||
| 341 | # schedule an expression to run asynchronously | ||
| 342 | |||
| 343 | """ | ||
| 344 | @async | ||
| 345 | |||
| 346 | Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. | ||
| 347 | |||
| 348 | Values can be interpolated into `@async` via `\$`, which copies the value directly into the | ||
| 349 | constructed underlying closure. This allows you to insert the _value_ of a variable, | ||
| 350 | isolating the aysnchronous code from changes to the variable's value in the current task. | ||
| 351 | |||
| 352 | !!! compat "Julia 1.4" | ||
| 353 | Interpolating values via `\$` is available as of Julia 1.4. | ||
| 354 | """ | ||
| 355 | macro async(expr) | ||
| 356 | letargs = Base._lift_one_interp!(expr) | ||
| 357 | |||
| 358 | 833 (100 %) |
833 (100 %)
samples spent calling
run_backend
thunk = esc(:(()->($expr)))
|
|
| 359 | var = esc(sync_varname) | ||
| 360 | quote | ||
| 361 | let $(letargs...) | ||
| 362 | local task = Task($thunk) | ||
| 363 | if $(Expr(:islocal, var)) | ||
| 364 | push!($var, task) | ||
| 365 | end | ||
| 366 | schedule(task) | ||
| 367 | task | ||
| 368 | end | ||
| 369 | end | ||
| 370 | end | ||
| 371 | |||
| 372 | # Capture interpolated variables in $() and move them to let-block | ||
| 373 | function _lift_one_interp!(e) | ||
| 374 | letargs = Any[] # store the new gensymed arguments | ||
| 375 | _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false) | ||
| 376 | letargs | ||
| 377 | end | ||
| 378 | _lift_one_interp_helper(v, _, _) = v | ||
| 379 | function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs) | ||
| 380 | if expr.head === :$ | ||
| 381 | if in_quote_context # This $ is simply interpolating out of the quote | ||
| 382 | # Now, we're out of the quote, so any _further_ $ is ours. | ||
| 383 | in_quote_context = false | ||
| 384 | else | ||
| 385 | newarg = gensym() | ||
| 386 | push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1])))) | ||
| 387 | return newarg # Don't recurse into the lifted $() exprs | ||
| 388 | end | ||
| 389 | elseif expr.head === :quote | ||
| 390 | in_quote_context = true # Don't try to lift $ directly out of quotes | ||
| 391 | elseif expr.head === :macrocall | ||
| 392 | return expr # Don't recur into macro calls, since some other macros use $ | ||
| 393 | end | ||
| 394 | for (i,e) in enumerate(expr.args) | ||
| 395 | expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs) | ||
| 396 | end | ||
| 397 | expr | ||
| 398 | end | ||
| 399 | |||
| 400 | |||
| 401 | # add a wait-able object to the sync pool | ||
| 402 | macro sync_add(expr) | ||
| 403 | var = esc(sync_varname) | ||
| 404 | quote | ||
| 405 | local ref = $(esc(expr)) | ||
| 406 | push!($var, ref) | ||
| 407 | ref | ||
| 408 | end | ||
| 409 | end | ||
| 410 | |||
| 411 | # runtime system hook called when a task finishes | ||
| 412 | function task_done_hook(t::Task) | ||
| 413 | # `finish_task` sets `sigatomic` before entering this function | ||
| 414 | err = istaskfailed(t) | ||
| 415 | result = task_result(t) | ||
| 416 | handled = false | ||
| 417 | if err | ||
| 418 | t.backtrace = catch_backtrace() | ||
| 419 | end | ||
| 420 | |||
| 421 | donenotify = t.donenotify | ||
| 422 | if isa(donenotify, ThreadSynchronizer) | ||
| 423 | lock(donenotify) | ||
| 424 | try | ||
| 425 | if !isempty(donenotify.waitq) | ||
| 426 | handled = true | ||
| 427 | notify(donenotify) | ||
| 428 | end | ||
| 429 | finally | ||
| 430 | unlock(donenotify) | ||
| 431 | end | ||
| 432 | end | ||
| 433 | |||
| 434 | if err && !handled && Threads.threadid() == 1 | ||
| 435 | if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) && | ||
| 436 | active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) && | ||
| 437 | active_repl_backend.in_eval | ||
| 438 | throwto(active_repl_backend.backend_task, result) # this terminates the task | ||
| 439 | end | ||
| 440 | end | ||
| 441 | # Clear sigatomic before waiting | ||
| 442 | sigatomic_end() | ||
| 443 | try | ||
| 444 | wait() # this will not return | ||
| 445 | catch e | ||
| 446 | # If an InterruptException happens while blocked in the event loop, try handing | ||
| 447 | # the exception to the REPL task since the current task is done. | ||
| 448 | # issue #19467 | ||
| 449 | if Threads.threadid() == 1 && | ||
| 450 | isa(e, InterruptException) && isdefined(Base, :active_repl_backend) && | ||
| 451 | active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) && | ||
| 452 | active_repl_backend.in_eval | ||
| 453 | throwto(active_repl_backend.backend_task, e) | ||
| 454 | else | ||
| 455 | rethrow() | ||
| 456 | end | ||
| 457 | end | ||
| 458 | end | ||
| 459 | |||
| 460 | |||
| 461 | ## scheduler and work queue | ||
| 462 | |||
| 463 | struct InvasiveLinkedListSynchronized{T} | ||
| 464 | queue::InvasiveLinkedList{T} | ||
| 465 | lock::Threads.SpinLock | ||
| 466 | InvasiveLinkedListSynchronized{T}() where {T} = new(InvasiveLinkedList{T}(), Threads.SpinLock()) | ||
| 467 | end | ||
| 468 | isempty(W::InvasiveLinkedListSynchronized) = isempty(W.queue) | ||
| 469 | length(W::InvasiveLinkedListSynchronized) = length(W.queue) | ||
| 470 | function push!(W::InvasiveLinkedListSynchronized{T}, t::T) where T | ||
| 471 | lock(W.lock) | ||
| 472 | try | ||
| 473 | push!(W.queue, t) | ||
| 474 | finally | ||
| 475 | unlock(W.lock) | ||
| 476 | end | ||
| 477 | return W | ||
| 478 | end | ||
| 479 | function pushfirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T | ||
| 480 | lock(W.lock) | ||
| 481 | try | ||
| 482 | pushfirst!(W.queue, t) | ||
| 483 | finally | ||
| 484 | unlock(W.lock) | ||
| 485 | end | ||
| 486 | return W | ||
| 487 | end | ||
| 488 | function pop!(W::InvasiveLinkedListSynchronized) | ||
| 489 | lock(W.lock) | ||
| 490 | try | ||
| 491 | return pop!(W.queue) | ||
| 492 | finally | ||
| 493 | unlock(W.lock) | ||
| 494 | end | ||
| 495 | end | ||
| 496 | function popfirst!(W::InvasiveLinkedListSynchronized) | ||
| 497 | lock(W.lock) | ||
| 498 | try | ||
| 499 | return popfirst!(W.queue) | ||
| 500 | finally | ||
| 501 | unlock(W.lock) | ||
| 502 | end | ||
| 503 | end | ||
| 504 | function list_deletefirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T | ||
| 505 | lock(W.lock) | ||
| 506 | try | ||
| 507 | list_deletefirst!(W.queue, t) | ||
| 508 | finally | ||
| 509 | unlock(W.lock) | ||
| 510 | end | ||
| 511 | return W | ||
| 512 | end | ||
| 513 | |||
| 514 | const StickyWorkqueue = InvasiveLinkedListSynchronized{Task} | ||
| 515 | global const Workqueues = [StickyWorkqueue()] | ||
| 516 | global const Workqueue = Workqueues[1] # default work queue is thread 1 | ||
| 517 | function __preinit_threads__() | ||
| 518 | if length(Workqueues) < Threads.nthreads() | ||
| 519 | resize!(Workqueues, Threads.nthreads()) | ||
| 520 | for i = 2:length(Workqueues) | ||
| 521 | Workqueues[i] = StickyWorkqueue() | ||
| 522 | end | ||
| 523 | end | ||
| 524 | nothing | ||
| 525 | end | ||
| 526 | |||
| 527 | function enq_work(t::Task) | ||
| 528 | (t.state === :runnable && t.queue === nothing) || error("schedule: Task not runnable") | ||
| 529 | tid = Threads.threadid(t) | ||
| 530 | # Note there are three reasons a Task might be put into a sticky queue | ||
| 531 | # even if t.sticky == false: | ||
| 532 | # 1. The Task's stack is currently being used by the scheduler for a certain thread. | ||
| 533 | # 2. There is only 1 thread. | ||
| 534 | # 3. The multiq is full (can be fixed by making it growable). | ||
| 535 | if t.sticky || tid != 0 || Threads.nthreads() == 1 | ||
| 536 | if tid == 0 | ||
| 537 | tid = Threads.threadid() | ||
| 538 | ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1) | ||
| 539 | end | ||
| 540 | push!(Workqueues[tid], t) | ||
| 541 | else | ||
| 542 | tid = 0 | ||
| 543 | if ccall(:jl_enqueue_task, Cint, (Any,), t) != 0 | ||
| 544 | # if multiq is full, give to a random thread (TODO fix) | ||
| 545 | tid = mod(time_ns() % Int, Threads.nthreads()) + 1 | ||
| 546 | ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1) | ||
| 547 | push!(Workqueues[tid], t) | ||
| 548 | end | ||
| 549 | end | ||
| 550 | ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) | ||
| 551 | return t | ||
| 552 | end | ||
| 553 | |||
| 554 | schedule(t::Task) = enq_work(t) | ||
| 555 | |||
| 556 | """ | ||
| 557 | schedule(t::Task, [val]; error=false) | ||
| 558 | |||
| 559 | Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system | ||
| 560 | is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref). | ||
| 561 | |||
| 562 | If a second argument `val` is provided, it will be passed to the task (via the return value of | ||
| 563 | [`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in | ||
| 564 | the woken task. | ||
| 565 | |||
| 566 | # Examples | ||
| 567 | ```jldoctest | ||
| 568 | julia> a5() = sum(i for i in 1:1000); | ||
| 569 | |||
| 570 | julia> b = Task(a5); | ||
| 571 | |||
| 572 | julia> istaskstarted(b) | ||
| 573 | false | ||
| 574 | |||
| 575 | julia> schedule(b); | ||
| 576 | |||
| 577 | julia> yield(); | ||
| 578 | |||
| 579 | julia> istaskstarted(b) | ||
| 580 | true | ||
| 581 | |||
| 582 | julia> istaskdone(b) | ||
| 583 | true | ||
| 584 | ``` | ||
| 585 | """ | ||
| 586 | function schedule(t::Task, @nospecialize(arg); error=false) | ||
| 587 | # schedule a task to be (re)started with the given value or exception | ||
| 588 | t.state === :runnable || Base.error("schedule: Task not runnable") | ||
| 589 | if error | ||
| 590 | t.queue === nothing || Base.list_deletefirst!(t.queue, t) | ||
| 591 | setfield!(t, :exception, arg) | ||
| 592 | else | ||
| 593 | t.queue === nothing || Base.error("schedule: Task not runnable") | ||
| 594 | setfield!(t, :result, arg) | ||
| 595 | end | ||
| 596 | enq_work(t) | ||
| 597 | return t | ||
| 598 | end | ||
| 599 | |||
| 600 | """ | ||
| 601 | yield() | ||
| 602 | |||
| 603 | Switch to the scheduler to allow another scheduled task to run. A task that calls this | ||
| 604 | function is still runnable, and will be restarted immediately if there are no other runnable | ||
| 605 | tasks. | ||
| 606 | """ | ||
| 607 | function yield() | ||
| 608 | ct = current_task() | ||
| 609 | enq_work(ct) | ||
| 610 | try | ||
| 611 | wait() | ||
| 612 | catch | ||
| 613 | ct.queue === nothing || list_deletefirst!(ct.queue, ct) | ||
| 614 | rethrow() | ||
| 615 | end | ||
| 616 | end | ||
| 617 | |||
| 618 | """ | ||
| 619 | yield(t::Task, arg = nothing) | ||
| 620 | |||
| 621 | A fast, unfair-scheduling version of `schedule(t, arg); yield()` which | ||
| 622 | immediately yields to `t` before calling the scheduler. | ||
| 623 | """ | ||
| 624 | function yield(t::Task, @nospecialize(x=nothing)) | ||
| 625 | t.result = x | ||
| 626 | enq_work(current_task()) | ||
| 627 | return try_yieldto(ensure_rescheduled, Ref(t)) | ||
| 628 | end | ||
| 629 | |||
| 630 | """ | ||
| 631 | yieldto(t::Task, arg = nothing) | ||
| 632 | |||
| 633 | Switch to the given task. The first time a task is switched to, the task's function is | ||
| 634 | called with no arguments. On subsequent switches, `arg` is returned from the task's last | ||
| 635 | call to `yieldto`. This is a low-level call that only switches tasks, not considering states | ||
| 636 | or scheduling in any way. Its use is discouraged. | ||
| 637 | """ | ||
| 638 | function yieldto(t::Task, @nospecialize(x=nothing)) | ||
| 639 | t.result = x | ||
| 640 | return try_yieldto(identity, Ref(t)) | ||
| 641 | end | ||
| 642 | |||
| 643 | function try_yieldto(undo, reftask::Ref{Task}) | ||
| 644 | try | ||
| 645 | ccall(:jl_switchto, Cvoid, (Any,), reftask) | ||
| 646 | catch | ||
| 647 | undo(reftask[]) | ||
| 648 | rethrow() | ||
| 649 | end | ||
| 650 | ct = current_task() | ||
| 651 | exc = ct.exception | ||
| 652 | if exc !== nothing | ||
| 653 | ct.exception = nothing | ||
| 654 | throw(exc) | ||
| 655 | end | ||
| 656 | result = ct.result | ||
| 657 | ct.result = nothing | ||
| 658 | return result | ||
| 659 | end | ||
| 660 | |||
| 661 | # yield to a task, throwing an exception in it | ||
| 662 | function throwto(t::Task, @nospecialize exc) | ||
| 663 | t.exception = exc | ||
| 664 | return yieldto(t) | ||
| 665 | end | ||
| 666 | |||
| 667 | function ensure_rescheduled(othertask::Task) | ||
| 668 | ct = current_task() | ||
| 669 | W = Workqueues[Threads.threadid()] | ||
| 670 | if ct !== othertask && othertask.state === :runnable | ||
| 671 | # we failed to yield to othertask | ||
| 672 | # return it to the head of a queue to be retried later | ||
| 673 | tid = Threads.threadid(othertask) | ||
| 674 | Wother = tid == 0 ? W : Workqueues[tid] | ||
| 675 | pushfirst!(Wother, othertask) | ||
| 676 | end | ||
| 677 | # if the current task was queued, | ||
| 678 | # also need to return it to the runnable state | ||
| 679 | # before throwing an error | ||
| 680 | list_deletefirst!(W, ct) | ||
| 681 | nothing | ||
| 682 | end | ||
| 683 | |||
| 684 | function trypoptask(W::StickyWorkqueue) | ||
| 685 | isempty(W) && return | ||
| 686 | t = popfirst!(W) | ||
| 687 | if t.state !== :runnable | ||
| 688 | # assume this somehow got queued twice, | ||
| 689 | # probably broken now, but try discarding this switch and keep going | ||
| 690 | # can't throw here, because it's probably not the fault of the caller to wait | ||
| 691 | # and don't want to use print() here, because that may try to incur a task switch | ||
| 692 | ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), | ||
| 693 | "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n") | ||
| 694 | return | ||
| 695 | end | ||
| 696 | return t | ||
| 697 | end | ||
| 698 | |||
| 699 | @noinline function poptaskref(W::StickyWorkqueue) | ||
| 700 | task = trypoptask(W) | ||
| 701 | if !(task isa Task) | ||
| 702 | task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any), trypoptask, W) | ||
| 703 | end | ||
| 704 | return Ref(task) | ||
| 705 | end | ||
| 706 | |||
| 707 | function wait() | ||
| 708 | W = Workqueues[Threads.threadid()] | ||
| 709 | reftask = poptaskref(W) | ||
| 710 | result = try_yieldto(ensure_rescheduled, reftask) | ||
| 711 | process_events() | ||
| 712 | # return when we come out of the queue | ||
| 713 | return result | ||
| 714 | end | ||
| 715 | |||
| 716 | if Sys.iswindows() | ||
| 717 | pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff) | ||
| 718 | else | ||
| 719 | pause() = ccall(:pause, Cvoid, ()) | ||
| 720 | end |