StatProfilerHTML.jl report
Generated on Thu, 26 Mar 2020 19:09:01
File source code
Line Exclusive Inclusive Code
1 # This file is a part of Julia. License is MIT: https://julialang.org/license
2
3 ## basic task functions and TLS
4
5 Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer())
6
7 # Container for a captured exception and its backtrace. Can be serialized.
8 struct CapturedException <: Exception
9 ex::Any
10 processed_bt::Vector{Any}
11
12 function CapturedException(ex, bt_raw::Vector)
13 # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace
14 # Typically the result of a catch_backtrace()
15
16 # Process bt_raw so that it can be safely serialized
17 bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines.
18 CapturedException(ex, bt_lines)
19 end
20
21 CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt)
22 end
23
24 function showerror(io::IO, ce::CapturedException)
25 showerror(io, ce.ex, ce.processed_bt, backtrace=true)
26 end
27
28 """
29 CompositeException
30
31 Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel
32 or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions.
33 For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will
34 contain a "bundle" of information from each worker indicating where and why the exception(s) occurred.
35 """
36 struct CompositeException <: Exception
37 exceptions::Vector{Any}
38 CompositeException() = new(Any[])
39 CompositeException(exceptions) = new(exceptions)
40 end
41 length(c::CompositeException) = length(c.exceptions)
42 push!(c::CompositeException, ex) = push!(c.exceptions, ex)
43 isempty(c::CompositeException) = isempty(c.exceptions)
44 iterate(c::CompositeException, state...) = iterate(c.exceptions, state...)
45 eltype(::Type{CompositeException}) = Any
46
47 function showerror(io::IO, ex::CompositeException)
48 if !isempty(ex)
49 showerror(io, ex.exceptions[1])
50 remaining = length(ex) - 1
51 if remaining > 0
52 print(io, string("\n\n...and ", remaining, " more exception(s).\n"))
53 end
54 else
55 print(io, "CompositeException()\n")
56 end
57 end
58
59 """
60 TaskFailedException
61
62 This exception is thrown by a `wait(t)` call when task `t` fails.
63 `TaskFailedException` wraps the failed task `t`.
64 """
65 struct TaskFailedException <: Exception
66 task::Task
67 end
68
69 function showerror(io::IO, ex::TaskFailedException)
70 stacks = []
71 while isa(ex.task.exception, TaskFailedException)
72 pushfirst!(stacks, ex.task.backtrace)
73 ex = ex.task.exception
74 end
75 println(io, "TaskFailedException:")
76 showerror(io, ex.task.exception, ex.task.backtrace)
77 if !isempty(stacks)
78 for bt in stacks
79 show_backtrace(io, bt)
80 end
81 end
82 end
83
84 function show(io::IO, t::Task)
85 print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
86 end
87
88 """
89 @task
90
91 Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only
92 creates a task, and does not run it.
93
94 # Examples
95 ```jldoctest
96 julia> a1() = sum(i for i in 1:1000);
97
98 julia> b = @task a1();
99
100 julia> istaskstarted(b)
101 false
102
103 julia> schedule(b);
104
105 julia> yield();
106
107 julia> istaskdone(b)
108 true
109 ```
110 """
111 macro task(ex)
112 :(Task(()->$(esc(ex))))
113 end
114
115 """
116 current_task()
117
118 Get the currently running [`Task`](@ref).
119 """
120 current_task() = ccall(:jl_get_current_task, Ref{Task}, ())
121
122 """
123 istaskdone(t::Task) -> Bool
124
125 Determine whether a task has exited.
126
127 # Examples
128 ```jldoctest
129 julia> a2() = sum(i for i in 1:1000);
130
131 julia> b = Task(a2);
132
133 julia> istaskdone(b)
134 false
135
136 julia> schedule(b);
137
138 julia> yield();
139
140 julia> istaskdone(b)
141 true
142 ```
143 """
144 istaskdone(t::Task) = ((t.state === :done) | istaskfailed(t))
145
146 """
147 istaskstarted(t::Task) -> Bool
148
149 Determine whether a task has started executing.
150
151 # Examples
152 ```jldoctest
153 julia> a3() = sum(i for i in 1:1000);
154
155 julia> b = Task(a3);
156
157 julia> istaskstarted(b)
158 false
159 ```
160 """
161 istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0
162
163 """
164 istaskfailed(t::Task) -> Bool
165
166 Determine whether a task has exited because an exception was thrown.
167
168 # Examples
169 ```jldoctest
170 julia> a4() = error("task failed");
171
172 julia> b = Task(a4);
173
174 julia> istaskfailed(b)
175 false
176
177 julia> schedule(b);
178
179 julia> yield();
180
181 julia> istaskfailed(b)
182 true
183 ```
184 """
185 istaskfailed(t::Task) = (t.state === :failed)
186
187 Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
188
189 task_result(t::Task) = t.result
190
191 task_local_storage() = get_task_tls(current_task())
192 function get_task_tls(t::Task)
193 if t.storage === nothing
194 t.storage = IdDict()
195 end
196 return (t.storage)::IdDict{Any,Any}
197 end
198
199 """
200 task_local_storage(key)
201
202 Look up the value of a key in the current task's task-local storage.
203 """
204 task_local_storage(key) = task_local_storage()[key]
205
206 """
207 task_local_storage(key, value)
208
209 Assign a value to a key in the current task's task-local storage.
210 """
211 task_local_storage(key, val) = (task_local_storage()[key] = val)
212
213 """
214 task_local_storage(body, key, value)
215
216 Call the function `body` with a modified task-local storage, in which `value` is assigned to
217 `key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
218 for emulating dynamic scoping.
219 """
220 function task_local_storage(body::Function, key, val)
221 tls = task_local_storage()
222 hadkey = haskey(tls, key)
223 old = get(tls, key, nothing)
224 tls[key] = val
225 try
226 return body()
227 finally
228 hadkey ? (tls[key] = old) : delete!(tls, key)
229 end
230 end
231
232 # just wait for a task to be done, no error propagation
233 function _wait(t::Task)
234 if !istaskdone(t)
235 lock(t.donenotify)
236 try
237 while !istaskdone(t)
238 wait(t.donenotify)
239 end
240 finally
241 unlock(t.donenotify)
242 end
243 end
244 nothing
245 end
246
247 # have `waiter` wait for `t`
248 function _wait2(t::Task, waiter::Task)
249 if !istaskdone(t)
250 lock(t.donenotify)
251 if !istaskdone(t)
252 push!(t.donenotify.waitq, waiter)
253 unlock(t.donenotify)
254 return nothing
255 else
256 unlock(t.donenotify)
257 end
258 end
259 schedule(waiter)
260 nothing
261 end
262
263 function wait(t::Task)
264 t === current_task() && error("deadlock detected: cannot wait on current task")
265 _wait(t)
266 if istaskfailed(t)
267 throw(TaskFailedException(t))
268 end
269 nothing
270 end
271
272 fetch(@nospecialize x) = x
273
274 """
275 fetch(t::Task)
276
277 Wait for a Task to finish, then return its result value.
278 If the task fails with an exception, a `TaskFailedException` (which wraps the failed task)
279 is thrown.
280 """
281 function fetch(t::Task)
282 wait(t)
283 return task_result(t)
284 end
285
286
287 ## lexically-scoped waiting for multiple items
288
289 function sync_end(refs)
290 local c_ex
291 defined = false
292 for r in refs
293 if isa(r, Task)
294 _wait(r)
295 if istaskfailed(r)
296 if !defined
297 defined = true
298 c_ex = CompositeException()
299 end
300 push!(c_ex, TaskFailedException(r))
301 end
302 else
303 try
304 wait(r)
305 catch e
306 if !defined
307 defined = true
308 c_ex = CompositeException()
309 end
310 push!(c_ex, e)
311 end
312 end
313 end
314
315 if defined
316 throw(c_ex)
317 end
318 nothing
319 end
320
321 const sync_varname = gensym(:sync)
322
323 """
324 @sync
325
326 Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed`
327 are complete. All exceptions thrown by enclosed async operations are collected and thrown as
328 a `CompositeException`.
329 """
330 macro sync(block)
331 var = esc(sync_varname)
332 quote
333 let $var = Any[]
334 v = $(esc(block))
335 sync_end($var)
336 v
337 end
338 end
339 end
340
341 # schedule an expression to run asynchronously
342
343 """
344 @async
345
346 Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
347
348 Values can be interpolated into `@async` via `\$`, which copies the value directly into the
349 constructed underlying closure. This allows you to insert the _value_ of a variable,
350 isolating the aysnchronous code from changes to the variable's value in the current task.
351
352 !!! compat "Julia 1.4"
353 Interpolating values via `\$` is available as of Julia 1.4.
354 """
355 macro async(expr)
356 letargs = Base._lift_one_interp!(expr)
357
358 833 (100 %)
833 (100 %) samples spent calling run_backend
thunk = esc(:(()->($expr)))
359 var = esc(sync_varname)
360 quote
361 let $(letargs...)
362 local task = Task($thunk)
363 if $(Expr(:islocal, var))
364 push!($var, task)
365 end
366 schedule(task)
367 task
368 end
369 end
370 end
371
372 # Capture interpolated variables in $() and move them to let-block
373 function _lift_one_interp!(e)
374 letargs = Any[] # store the new gensymed arguments
375 _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
376 letargs
377 end
378 _lift_one_interp_helper(v, _, _) = v
379 function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
380 if expr.head === :$
381 if in_quote_context # This $ is simply interpolating out of the quote
382 # Now, we're out of the quote, so any _further_ $ is ours.
383 in_quote_context = false
384 else
385 newarg = gensym()
386 push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
387 return newarg # Don't recurse into the lifted $() exprs
388 end
389 elseif expr.head === :quote
390 in_quote_context = true # Don't try to lift $ directly out of quotes
391 elseif expr.head === :macrocall
392 return expr # Don't recur into macro calls, since some other macros use $
393 end
394 for (i,e) in enumerate(expr.args)
395 expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
396 end
397 expr
398 end
399
400
401 # add a wait-able object to the sync pool
402 macro sync_add(expr)
403 var = esc(sync_varname)
404 quote
405 local ref = $(esc(expr))
406 push!($var, ref)
407 ref
408 end
409 end
410
411 # runtime system hook called when a task finishes
412 function task_done_hook(t::Task)
413 # `finish_task` sets `sigatomic` before entering this function
414 err = istaskfailed(t)
415 result = task_result(t)
416 handled = false
417 if err
418 t.backtrace = catch_backtrace()
419 end
420
421 donenotify = t.donenotify
422 if isa(donenotify, ThreadSynchronizer)
423 lock(donenotify)
424 try
425 if !isempty(donenotify.waitq)
426 handled = true
427 notify(donenotify)
428 end
429 finally
430 unlock(donenotify)
431 end
432 end
433
434 if err && !handled && Threads.threadid() == 1
435 if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
436 active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) &&
437 active_repl_backend.in_eval
438 throwto(active_repl_backend.backend_task, result) # this terminates the task
439 end
440 end
441 # Clear sigatomic before waiting
442 sigatomic_end()
443 try
444 wait() # this will not return
445 catch e
446 # If an InterruptException happens while blocked in the event loop, try handing
447 # the exception to the REPL task since the current task is done.
448 # issue #19467
449 if Threads.threadid() == 1 &&
450 isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
451 active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) &&
452 active_repl_backend.in_eval
453 throwto(active_repl_backend.backend_task, e)
454 else
455 rethrow()
456 end
457 end
458 end
459
460
461 ## scheduler and work queue
462
463 struct InvasiveLinkedListSynchronized{T}
464 queue::InvasiveLinkedList{T}
465 lock::Threads.SpinLock
466 InvasiveLinkedListSynchronized{T}() where {T} = new(InvasiveLinkedList{T}(), Threads.SpinLock())
467 end
468 isempty(W::InvasiveLinkedListSynchronized) = isempty(W.queue)
469 length(W::InvasiveLinkedListSynchronized) = length(W.queue)
470 function push!(W::InvasiveLinkedListSynchronized{T}, t::T) where T
471 lock(W.lock)
472 try
473 push!(W.queue, t)
474 finally
475 unlock(W.lock)
476 end
477 return W
478 end
479 function pushfirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T
480 lock(W.lock)
481 try
482 pushfirst!(W.queue, t)
483 finally
484 unlock(W.lock)
485 end
486 return W
487 end
488 function pop!(W::InvasiveLinkedListSynchronized)
489 lock(W.lock)
490 try
491 return pop!(W.queue)
492 finally
493 unlock(W.lock)
494 end
495 end
496 function popfirst!(W::InvasiveLinkedListSynchronized)
497 lock(W.lock)
498 try
499 return popfirst!(W.queue)
500 finally
501 unlock(W.lock)
502 end
503 end
504 function list_deletefirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T
505 lock(W.lock)
506 try
507 list_deletefirst!(W.queue, t)
508 finally
509 unlock(W.lock)
510 end
511 return W
512 end
513
514 const StickyWorkqueue = InvasiveLinkedListSynchronized{Task}
515 global const Workqueues = [StickyWorkqueue()]
516 global const Workqueue = Workqueues[1] # default work queue is thread 1
517 function __preinit_threads__()
518 if length(Workqueues) < Threads.nthreads()
519 resize!(Workqueues, Threads.nthreads())
520 for i = 2:length(Workqueues)
521 Workqueues[i] = StickyWorkqueue()
522 end
523 end
524 nothing
525 end
526
527 function enq_work(t::Task)
528 (t.state === :runnable && t.queue === nothing) || error("schedule: Task not runnable")
529 tid = Threads.threadid(t)
530 # Note there are three reasons a Task might be put into a sticky queue
531 # even if t.sticky == false:
532 # 1. The Task's stack is currently being used by the scheduler for a certain thread.
533 # 2. There is only 1 thread.
534 # 3. The multiq is full (can be fixed by making it growable).
535 if t.sticky || tid != 0 || Threads.nthreads() == 1
536 if tid == 0
537 tid = Threads.threadid()
538 ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
539 end
540 push!(Workqueues[tid], t)
541 else
542 tid = 0
543 if ccall(:jl_enqueue_task, Cint, (Any,), t) != 0
544 # if multiq is full, give to a random thread (TODO fix)
545 tid = mod(time_ns() % Int, Threads.nthreads()) + 1
546 ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
547 push!(Workqueues[tid], t)
548 end
549 end
550 ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
551 return t
552 end
553
554 schedule(t::Task) = enq_work(t)
555
556 """
557 schedule(t::Task, [val]; error=false)
558
559 Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
560 is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
561
562 If a second argument `val` is provided, it will be passed to the task (via the return value of
563 [`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
564 the woken task.
565
566 # Examples
567 ```jldoctest
568 julia> a5() = sum(i for i in 1:1000);
569
570 julia> b = Task(a5);
571
572 julia> istaskstarted(b)
573 false
574
575 julia> schedule(b);
576
577 julia> yield();
578
579 julia> istaskstarted(b)
580 true
581
582 julia> istaskdone(b)
583 true
584 ```
585 """
586 function schedule(t::Task, @nospecialize(arg); error=false)
587 # schedule a task to be (re)started with the given value or exception
588 t.state === :runnable || Base.error("schedule: Task not runnable")
589 if error
590 t.queue === nothing || Base.list_deletefirst!(t.queue, t)
591 setfield!(t, :exception, arg)
592 else
593 t.queue === nothing || Base.error("schedule: Task not runnable")
594 setfield!(t, :result, arg)
595 end
596 enq_work(t)
597 return t
598 end
599
600 """
601 yield()
602
603 Switch to the scheduler to allow another scheduled task to run. A task that calls this
604 function is still runnable, and will be restarted immediately if there are no other runnable
605 tasks.
606 """
607 function yield()
608 ct = current_task()
609 enq_work(ct)
610 try
611 wait()
612 catch
613 ct.queue === nothing || list_deletefirst!(ct.queue, ct)
614 rethrow()
615 end
616 end
617
618 """
619 yield(t::Task, arg = nothing)
620
621 A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
622 immediately yields to `t` before calling the scheduler.
623 """
624 function yield(t::Task, @nospecialize(x=nothing))
625 t.result = x
626 enq_work(current_task())
627 return try_yieldto(ensure_rescheduled, Ref(t))
628 end
629
630 """
631 yieldto(t::Task, arg = nothing)
632
633 Switch to the given task. The first time a task is switched to, the task's function is
634 called with no arguments. On subsequent switches, `arg` is returned from the task's last
635 call to `yieldto`. This is a low-level call that only switches tasks, not considering states
636 or scheduling in any way. Its use is discouraged.
637 """
638 function yieldto(t::Task, @nospecialize(x=nothing))
639 t.result = x
640 return try_yieldto(identity, Ref(t))
641 end
642
643 function try_yieldto(undo, reftask::Ref{Task})
644 try
645 ccall(:jl_switchto, Cvoid, (Any,), reftask)
646 catch
647 undo(reftask[])
648 rethrow()
649 end
650 ct = current_task()
651 exc = ct.exception
652 if exc !== nothing
653 ct.exception = nothing
654 throw(exc)
655 end
656 result = ct.result
657 ct.result = nothing
658 return result
659 end
660
661 # yield to a task, throwing an exception in it
662 function throwto(t::Task, @nospecialize exc)
663 t.exception = exc
664 return yieldto(t)
665 end
666
667 function ensure_rescheduled(othertask::Task)
668 ct = current_task()
669 W = Workqueues[Threads.threadid()]
670 if ct !== othertask && othertask.state === :runnable
671 # we failed to yield to othertask
672 # return it to the head of a queue to be retried later
673 tid = Threads.threadid(othertask)
674 Wother = tid == 0 ? W : Workqueues[tid]
675 pushfirst!(Wother, othertask)
676 end
677 # if the current task was queued,
678 # also need to return it to the runnable state
679 # before throwing an error
680 list_deletefirst!(W, ct)
681 nothing
682 end
683
684 function trypoptask(W::StickyWorkqueue)
685 isempty(W) && return
686 t = popfirst!(W)
687 if t.state !== :runnable
688 # assume this somehow got queued twice,
689 # probably broken now, but try discarding this switch and keep going
690 # can't throw here, because it's probably not the fault of the caller to wait
691 # and don't want to use print() here, because that may try to incur a task switch
692 ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
693 "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n")
694 return
695 end
696 return t
697 end
698
699 @noinline function poptaskref(W::StickyWorkqueue)
700 task = trypoptask(W)
701 if !(task isa Task)
702 task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any), trypoptask, W)
703 end
704 return Ref(task)
705 end
706
707 function wait()
708 W = Workqueues[Threads.threadid()]
709 reftask = poptaskref(W)
710 result = try_yieldto(ensure_rescheduled, reftask)
711 process_events()
712 # return when we come out of the queue
713 return result
714 end
715
716 if Sys.iswindows()
717 pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
718 else
719 pause() = ccall(:pause, Cvoid, ())
720 end