Line | Exclusive | Inclusive | Code |
---|---|---|---|
1 | # This file is a part of Julia. License is MIT: https://julialang.org/license | ||
2 | |||
3 | ## basic task functions and TLS | ||
4 | |||
5 | Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer()) | ||
6 | |||
7 | # Container for a captured exception and its backtrace. Can be serialized. | ||
8 | struct CapturedException <: Exception | ||
9 | ex::Any | ||
10 | processed_bt::Vector{Any} | ||
11 | |||
12 | function CapturedException(ex, bt_raw::Vector) | ||
13 | # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace | ||
14 | # Typically the result of a catch_backtrace() | ||
15 | |||
16 | # Process bt_raw so that it can be safely serialized | ||
17 | bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines. | ||
18 | CapturedException(ex, bt_lines) | ||
19 | end | ||
20 | |||
21 | CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt) | ||
22 | end | ||
23 | |||
24 | function showerror(io::IO, ce::CapturedException) | ||
25 | showerror(io, ce.ex, ce.processed_bt, backtrace=true) | ||
26 | end | ||
27 | |||
28 | """ | ||
29 | CompositeException | ||
30 | |||
31 | Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel | ||
32 | or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions. | ||
33 | For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will | ||
34 | contain a "bundle" of information from each worker indicating where and why the exception(s) occurred. | ||
35 | """ | ||
36 | struct CompositeException <: Exception | ||
37 | exceptions::Vector{Any} | ||
38 | CompositeException() = new(Any[]) | ||
39 | CompositeException(exceptions) = new(exceptions) | ||
40 | end | ||
41 | length(c::CompositeException) = length(c.exceptions) | ||
42 | push!(c::CompositeException, ex) = push!(c.exceptions, ex) | ||
43 | isempty(c::CompositeException) = isempty(c.exceptions) | ||
44 | iterate(c::CompositeException, state...) = iterate(c.exceptions, state...) | ||
45 | eltype(::Type{CompositeException}) = Any | ||
46 | |||
47 | function showerror(io::IO, ex::CompositeException) | ||
48 | if !isempty(ex) | ||
49 | showerror(io, ex.exceptions[1]) | ||
50 | remaining = length(ex) - 1 | ||
51 | if remaining > 0 | ||
52 | print(io, string("\n\n...and ", remaining, " more exception(s).\n")) | ||
53 | end | ||
54 | else | ||
55 | print(io, "CompositeException()\n") | ||
56 | end | ||
57 | end | ||
58 | |||
59 | """ | ||
60 | TaskFailedException | ||
61 | |||
62 | This exception is thrown by a `wait(t)` call when task `t` fails. | ||
63 | `TaskFailedException` wraps the failed task `t`. | ||
64 | """ | ||
65 | struct TaskFailedException <: Exception | ||
66 | task::Task | ||
67 | end | ||
68 | |||
69 | function showerror(io::IO, ex::TaskFailedException) | ||
70 | stacks = [] | ||
71 | while isa(ex.task.exception, TaskFailedException) | ||
72 | pushfirst!(stacks, ex.task.backtrace) | ||
73 | ex = ex.task.exception | ||
74 | end | ||
75 | println(io, "TaskFailedException:") | ||
76 | showerror(io, ex.task.exception, ex.task.backtrace) | ||
77 | if !isempty(stacks) | ||
78 | for bt in stacks | ||
79 | show_backtrace(io, bt) | ||
80 | end | ||
81 | end | ||
82 | end | ||
83 | |||
84 | function show(io::IO, t::Task) | ||
85 | print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))") | ||
86 | end | ||
87 | |||
88 | """ | ||
89 | @task | ||
90 | |||
91 | Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only | ||
92 | creates a task, and does not run it. | ||
93 | |||
94 | # Examples | ||
95 | ```jldoctest | ||
96 | julia> a1() = sum(i for i in 1:1000); | ||
97 | |||
98 | julia> b = @task a1(); | ||
99 | |||
100 | julia> istaskstarted(b) | ||
101 | false | ||
102 | |||
103 | julia> schedule(b); | ||
104 | |||
105 | julia> yield(); | ||
106 | |||
107 | julia> istaskdone(b) | ||
108 | true | ||
109 | ``` | ||
110 | """ | ||
111 | macro task(ex) | ||
112 | :(Task(()->$(esc(ex)))) | ||
113 | end | ||
114 | |||
115 | """ | ||
116 | current_task() | ||
117 | |||
118 | Get the currently running [`Task`](@ref). | ||
119 | """ | ||
120 | current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) | ||
121 | |||
122 | """ | ||
123 | istaskdone(t::Task) -> Bool | ||
124 | |||
125 | Determine whether a task has exited. | ||
126 | |||
127 | # Examples | ||
128 | ```jldoctest | ||
129 | julia> a2() = sum(i for i in 1:1000); | ||
130 | |||
131 | julia> b = Task(a2); | ||
132 | |||
133 | julia> istaskdone(b) | ||
134 | false | ||
135 | |||
136 | julia> schedule(b); | ||
137 | |||
138 | julia> yield(); | ||
139 | |||
140 | julia> istaskdone(b) | ||
141 | true | ||
142 | ``` | ||
143 | """ | ||
144 | istaskdone(t::Task) = ((t.state === :done) | istaskfailed(t)) | ||
145 | |||
146 | """ | ||
147 | istaskstarted(t::Task) -> Bool | ||
148 | |||
149 | Determine whether a task has started executing. | ||
150 | |||
151 | # Examples | ||
152 | ```jldoctest | ||
153 | julia> a3() = sum(i for i in 1:1000); | ||
154 | |||
155 | julia> b = Task(a3); | ||
156 | |||
157 | julia> istaskstarted(b) | ||
158 | false | ||
159 | ``` | ||
160 | """ | ||
161 | istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0 | ||
162 | |||
163 | """ | ||
164 | istaskfailed(t::Task) -> Bool | ||
165 | |||
166 | Determine whether a task has exited because an exception was thrown. | ||
167 | |||
168 | # Examples | ||
169 | ```jldoctest | ||
170 | julia> a4() = error("task failed"); | ||
171 | |||
172 | julia> b = Task(a4); | ||
173 | |||
174 | julia> istaskfailed(b) | ||
175 | false | ||
176 | |||
177 | julia> schedule(b); | ||
178 | |||
179 | julia> yield(); | ||
180 | |||
181 | julia> istaskfailed(b) | ||
182 | true | ||
183 | ``` | ||
184 | """ | ||
185 | istaskfailed(t::Task) = (t.state === :failed) | ||
186 | |||
187 | Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1) | ||
188 | |||
189 | task_result(t::Task) = t.result | ||
190 | |||
191 | task_local_storage() = get_task_tls(current_task()) | ||
192 | function get_task_tls(t::Task) | ||
193 | if t.storage === nothing | ||
194 | t.storage = IdDict() | ||
195 | end | ||
196 | return (t.storage)::IdDict{Any,Any} | ||
197 | end | ||
198 | |||
199 | """ | ||
200 | task_local_storage(key) | ||
201 | |||
202 | Look up the value of a key in the current task's task-local storage. | ||
203 | """ | ||
204 | task_local_storage(key) = task_local_storage()[key] | ||
205 | |||
206 | """ | ||
207 | task_local_storage(key, value) | ||
208 | |||
209 | Assign a value to a key in the current task's task-local storage. | ||
210 | """ | ||
211 | task_local_storage(key, val) = (task_local_storage()[key] = val) | ||
212 | |||
213 | """ | ||
214 | task_local_storage(body, key, value) | ||
215 | |||
216 | Call the function `body` with a modified task-local storage, in which `value` is assigned to | ||
217 | `key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful | ||
218 | for emulating dynamic scoping. | ||
219 | """ | ||
220 | function task_local_storage(body::Function, key, val) | ||
221 | tls = task_local_storage() | ||
222 | hadkey = haskey(tls, key) | ||
223 | old = get(tls, key, nothing) | ||
224 | tls[key] = val | ||
225 | try | ||
226 | return body() | ||
227 | finally | ||
228 | hadkey ? (tls[key] = old) : delete!(tls, key) | ||
229 | end | ||
230 | end | ||
231 | |||
232 | # just wait for a task to be done, no error propagation | ||
233 | function _wait(t::Task) | ||
234 | if !istaskdone(t) | ||
235 | lock(t.donenotify) | ||
236 | try | ||
237 | while !istaskdone(t) | ||
238 | wait(t.donenotify) | ||
239 | end | ||
240 | finally | ||
241 | unlock(t.donenotify) | ||
242 | end | ||
243 | end | ||
244 | nothing | ||
245 | end | ||
246 | |||
247 | # have `waiter` wait for `t` | ||
248 | function _wait2(t::Task, waiter::Task) | ||
249 | if !istaskdone(t) | ||
250 | lock(t.donenotify) | ||
251 | if !istaskdone(t) | ||
252 | push!(t.donenotify.waitq, waiter) | ||
253 | unlock(t.donenotify) | ||
254 | return nothing | ||
255 | else | ||
256 | unlock(t.donenotify) | ||
257 | end | ||
258 | end | ||
259 | schedule(waiter) | ||
260 | nothing | ||
261 | end | ||
262 | |||
263 | function wait(t::Task) | ||
264 | t === current_task() && error("deadlock detected: cannot wait on current task") | ||
265 | _wait(t) | ||
266 | if istaskfailed(t) | ||
267 | throw(TaskFailedException(t)) | ||
268 | end | ||
269 | nothing | ||
270 | end | ||
271 | |||
272 | fetch(@nospecialize x) = x | ||
273 | |||
274 | """ | ||
275 | fetch(t::Task) | ||
276 | |||
277 | Wait for a Task to finish, then return its result value. | ||
278 | If the task fails with an exception, a `TaskFailedException` (which wraps the failed task) | ||
279 | is thrown. | ||
280 | """ | ||
281 | function fetch(t::Task) | ||
282 | wait(t) | ||
283 | return task_result(t) | ||
284 | end | ||
285 | |||
286 | |||
287 | ## lexically-scoped waiting for multiple items | ||
288 | |||
289 | function sync_end(refs) | ||
290 | local c_ex | ||
291 | defined = false | ||
292 | for r in refs | ||
293 | if isa(r, Task) | ||
294 | _wait(r) | ||
295 | if istaskfailed(r) | ||
296 | if !defined | ||
297 | defined = true | ||
298 | c_ex = CompositeException() | ||
299 | end | ||
300 | push!(c_ex, TaskFailedException(r)) | ||
301 | end | ||
302 | else | ||
303 | try | ||
304 | wait(r) | ||
305 | catch e | ||
306 | if !defined | ||
307 | defined = true | ||
308 | c_ex = CompositeException() | ||
309 | end | ||
310 | push!(c_ex, e) | ||
311 | end | ||
312 | end | ||
313 | end | ||
314 | |||
315 | if defined | ||
316 | throw(c_ex) | ||
317 | end | ||
318 | nothing | ||
319 | end | ||
320 | |||
321 | const sync_varname = gensym(:sync) | ||
322 | |||
323 | """ | ||
324 | @sync | ||
325 | |||
326 | Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed` | ||
327 | are complete. All exceptions thrown by enclosed async operations are collected and thrown as | ||
328 | a `CompositeException`. | ||
329 | """ | ||
330 | macro sync(block) | ||
331 | var = esc(sync_varname) | ||
332 | quote | ||
333 | let $var = Any[] | ||
334 | v = $(esc(block)) | ||
335 | sync_end($var) | ||
336 | v | ||
337 | end | ||
338 | end | ||
339 | end | ||
340 | |||
341 | # schedule an expression to run asynchronously | ||
342 | |||
343 | """ | ||
344 | @async | ||
345 | |||
346 | Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. | ||
347 | |||
348 | Values can be interpolated into `@async` via `\$`, which copies the value directly into the | ||
349 | constructed underlying closure. This allows you to insert the _value_ of a variable, | ||
350 | isolating the aysnchronous code from changes to the variable's value in the current task. | ||
351 | |||
352 | !!! compat "Julia 1.4" | ||
353 | Interpolating values via `\$` is available as of Julia 1.4. | ||
354 | """ | ||
355 | macro async(expr) | ||
356 | letargs = Base._lift_one_interp!(expr) | ||
357 | |||
358 | 833 (100 %) |
833 (100 %)
samples spent calling
run_backend
thunk = esc(:(()->($expr)))
|
|
359 | var = esc(sync_varname) | ||
360 | quote | ||
361 | let $(letargs...) | ||
362 | local task = Task($thunk) | ||
363 | if $(Expr(:islocal, var)) | ||
364 | push!($var, task) | ||
365 | end | ||
366 | schedule(task) | ||
367 | task | ||
368 | end | ||
369 | end | ||
370 | end | ||
371 | |||
372 | # Capture interpolated variables in $() and move them to let-block | ||
373 | function _lift_one_interp!(e) | ||
374 | letargs = Any[] # store the new gensymed arguments | ||
375 | _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false) | ||
376 | letargs | ||
377 | end | ||
378 | _lift_one_interp_helper(v, _, _) = v | ||
379 | function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs) | ||
380 | if expr.head === :$ | ||
381 | if in_quote_context # This $ is simply interpolating out of the quote | ||
382 | # Now, we're out of the quote, so any _further_ $ is ours. | ||
383 | in_quote_context = false | ||
384 | else | ||
385 | newarg = gensym() | ||
386 | push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1])))) | ||
387 | return newarg # Don't recurse into the lifted $() exprs | ||
388 | end | ||
389 | elseif expr.head === :quote | ||
390 | in_quote_context = true # Don't try to lift $ directly out of quotes | ||
391 | elseif expr.head === :macrocall | ||
392 | return expr # Don't recur into macro calls, since some other macros use $ | ||
393 | end | ||
394 | for (i,e) in enumerate(expr.args) | ||
395 | expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs) | ||
396 | end | ||
397 | expr | ||
398 | end | ||
399 | |||
400 | |||
401 | # add a wait-able object to the sync pool | ||
402 | macro sync_add(expr) | ||
403 | var = esc(sync_varname) | ||
404 | quote | ||
405 | local ref = $(esc(expr)) | ||
406 | push!($var, ref) | ||
407 | ref | ||
408 | end | ||
409 | end | ||
410 | |||
411 | # runtime system hook called when a task finishes | ||
412 | function task_done_hook(t::Task) | ||
413 | # `finish_task` sets `sigatomic` before entering this function | ||
414 | err = istaskfailed(t) | ||
415 | result = task_result(t) | ||
416 | handled = false | ||
417 | if err | ||
418 | t.backtrace = catch_backtrace() | ||
419 | end | ||
420 | |||
421 | donenotify = t.donenotify | ||
422 | if isa(donenotify, ThreadSynchronizer) | ||
423 | lock(donenotify) | ||
424 | try | ||
425 | if !isempty(donenotify.waitq) | ||
426 | handled = true | ||
427 | notify(donenotify) | ||
428 | end | ||
429 | finally | ||
430 | unlock(donenotify) | ||
431 | end | ||
432 | end | ||
433 | |||
434 | if err && !handled && Threads.threadid() == 1 | ||
435 | if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) && | ||
436 | active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) && | ||
437 | active_repl_backend.in_eval | ||
438 | throwto(active_repl_backend.backend_task, result) # this terminates the task | ||
439 | end | ||
440 | end | ||
441 | # Clear sigatomic before waiting | ||
442 | sigatomic_end() | ||
443 | try | ||
444 | wait() # this will not return | ||
445 | catch e | ||
446 | # If an InterruptException happens while blocked in the event loop, try handing | ||
447 | # the exception to the REPL task since the current task is done. | ||
448 | # issue #19467 | ||
449 | if Threads.threadid() == 1 && | ||
450 | isa(e, InterruptException) && isdefined(Base, :active_repl_backend) && | ||
451 | active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) && | ||
452 | active_repl_backend.in_eval | ||
453 | throwto(active_repl_backend.backend_task, e) | ||
454 | else | ||
455 | rethrow() | ||
456 | end | ||
457 | end | ||
458 | end | ||
459 | |||
460 | |||
461 | ## scheduler and work queue | ||
462 | |||
463 | struct InvasiveLinkedListSynchronized{T} | ||
464 | queue::InvasiveLinkedList{T} | ||
465 | lock::Threads.SpinLock | ||
466 | InvasiveLinkedListSynchronized{T}() where {T} = new(InvasiveLinkedList{T}(), Threads.SpinLock()) | ||
467 | end | ||
468 | isempty(W::InvasiveLinkedListSynchronized) = isempty(W.queue) | ||
469 | length(W::InvasiveLinkedListSynchronized) = length(W.queue) | ||
470 | function push!(W::InvasiveLinkedListSynchronized{T}, t::T) where T | ||
471 | lock(W.lock) | ||
472 | try | ||
473 | push!(W.queue, t) | ||
474 | finally | ||
475 | unlock(W.lock) | ||
476 | end | ||
477 | return W | ||
478 | end | ||
479 | function pushfirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T | ||
480 | lock(W.lock) | ||
481 | try | ||
482 | pushfirst!(W.queue, t) | ||
483 | finally | ||
484 | unlock(W.lock) | ||
485 | end | ||
486 | return W | ||
487 | end | ||
488 | function pop!(W::InvasiveLinkedListSynchronized) | ||
489 | lock(W.lock) | ||
490 | try | ||
491 | return pop!(W.queue) | ||
492 | finally | ||
493 | unlock(W.lock) | ||
494 | end | ||
495 | end | ||
496 | function popfirst!(W::InvasiveLinkedListSynchronized) | ||
497 | lock(W.lock) | ||
498 | try | ||
499 | return popfirst!(W.queue) | ||
500 | finally | ||
501 | unlock(W.lock) | ||
502 | end | ||
503 | end | ||
504 | function list_deletefirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T | ||
505 | lock(W.lock) | ||
506 | try | ||
507 | list_deletefirst!(W.queue, t) | ||
508 | finally | ||
509 | unlock(W.lock) | ||
510 | end | ||
511 | return W | ||
512 | end | ||
513 | |||
514 | const StickyWorkqueue = InvasiveLinkedListSynchronized{Task} | ||
515 | global const Workqueues = [StickyWorkqueue()] | ||
516 | global const Workqueue = Workqueues[1] # default work queue is thread 1 | ||
517 | function __preinit_threads__() | ||
518 | if length(Workqueues) < Threads.nthreads() | ||
519 | resize!(Workqueues, Threads.nthreads()) | ||
520 | for i = 2:length(Workqueues) | ||
521 | Workqueues[i] = StickyWorkqueue() | ||
522 | end | ||
523 | end | ||
524 | nothing | ||
525 | end | ||
526 | |||
527 | function enq_work(t::Task) | ||
528 | (t.state === :runnable && t.queue === nothing) || error("schedule: Task not runnable") | ||
529 | tid = Threads.threadid(t) | ||
530 | # Note there are three reasons a Task might be put into a sticky queue | ||
531 | # even if t.sticky == false: | ||
532 | # 1. The Task's stack is currently being used by the scheduler for a certain thread. | ||
533 | # 2. There is only 1 thread. | ||
534 | # 3. The multiq is full (can be fixed by making it growable). | ||
535 | if t.sticky || tid != 0 || Threads.nthreads() == 1 | ||
536 | if tid == 0 | ||
537 | tid = Threads.threadid() | ||
538 | ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1) | ||
539 | end | ||
540 | push!(Workqueues[tid], t) | ||
541 | else | ||
542 | tid = 0 | ||
543 | if ccall(:jl_enqueue_task, Cint, (Any,), t) != 0 | ||
544 | # if multiq is full, give to a random thread (TODO fix) | ||
545 | tid = mod(time_ns() % Int, Threads.nthreads()) + 1 | ||
546 | ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1) | ||
547 | push!(Workqueues[tid], t) | ||
548 | end | ||
549 | end | ||
550 | ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) | ||
551 | return t | ||
552 | end | ||
553 | |||
554 | schedule(t::Task) = enq_work(t) | ||
555 | |||
556 | """ | ||
557 | schedule(t::Task, [val]; error=false) | ||
558 | |||
559 | Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system | ||
560 | is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref). | ||
561 | |||
562 | If a second argument `val` is provided, it will be passed to the task (via the return value of | ||
563 | [`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in | ||
564 | the woken task. | ||
565 | |||
566 | # Examples | ||
567 | ```jldoctest | ||
568 | julia> a5() = sum(i for i in 1:1000); | ||
569 | |||
570 | julia> b = Task(a5); | ||
571 | |||
572 | julia> istaskstarted(b) | ||
573 | false | ||
574 | |||
575 | julia> schedule(b); | ||
576 | |||
577 | julia> yield(); | ||
578 | |||
579 | julia> istaskstarted(b) | ||
580 | true | ||
581 | |||
582 | julia> istaskdone(b) | ||
583 | true | ||
584 | ``` | ||
585 | """ | ||
586 | function schedule(t::Task, @nospecialize(arg); error=false) | ||
587 | # schedule a task to be (re)started with the given value or exception | ||
588 | t.state === :runnable || Base.error("schedule: Task not runnable") | ||
589 | if error | ||
590 | t.queue === nothing || Base.list_deletefirst!(t.queue, t) | ||
591 | setfield!(t, :exception, arg) | ||
592 | else | ||
593 | t.queue === nothing || Base.error("schedule: Task not runnable") | ||
594 | setfield!(t, :result, arg) | ||
595 | end | ||
596 | enq_work(t) | ||
597 | return t | ||
598 | end | ||
599 | |||
600 | """ | ||
601 | yield() | ||
602 | |||
603 | Switch to the scheduler to allow another scheduled task to run. A task that calls this | ||
604 | function is still runnable, and will be restarted immediately if there are no other runnable | ||
605 | tasks. | ||
606 | """ | ||
607 | function yield() | ||
608 | ct = current_task() | ||
609 | enq_work(ct) | ||
610 | try | ||
611 | wait() | ||
612 | catch | ||
613 | ct.queue === nothing || list_deletefirst!(ct.queue, ct) | ||
614 | rethrow() | ||
615 | end | ||
616 | end | ||
617 | |||
618 | """ | ||
619 | yield(t::Task, arg = nothing) | ||
620 | |||
621 | A fast, unfair-scheduling version of `schedule(t, arg); yield()` which | ||
622 | immediately yields to `t` before calling the scheduler. | ||
623 | """ | ||
624 | function yield(t::Task, @nospecialize(x=nothing)) | ||
625 | t.result = x | ||
626 | enq_work(current_task()) | ||
627 | return try_yieldto(ensure_rescheduled, Ref(t)) | ||
628 | end | ||
629 | |||
630 | """ | ||
631 | yieldto(t::Task, arg = nothing) | ||
632 | |||
633 | Switch to the given task. The first time a task is switched to, the task's function is | ||
634 | called with no arguments. On subsequent switches, `arg` is returned from the task's last | ||
635 | call to `yieldto`. This is a low-level call that only switches tasks, not considering states | ||
636 | or scheduling in any way. Its use is discouraged. | ||
637 | """ | ||
638 | function yieldto(t::Task, @nospecialize(x=nothing)) | ||
639 | t.result = x | ||
640 | return try_yieldto(identity, Ref(t)) | ||
641 | end | ||
642 | |||
643 | function try_yieldto(undo, reftask::Ref{Task}) | ||
644 | try | ||
645 | ccall(:jl_switchto, Cvoid, (Any,), reftask) | ||
646 | catch | ||
647 | undo(reftask[]) | ||
648 | rethrow() | ||
649 | end | ||
650 | ct = current_task() | ||
651 | exc = ct.exception | ||
652 | if exc !== nothing | ||
653 | ct.exception = nothing | ||
654 | throw(exc) | ||
655 | end | ||
656 | result = ct.result | ||
657 | ct.result = nothing | ||
658 | return result | ||
659 | end | ||
660 | |||
661 | # yield to a task, throwing an exception in it | ||
662 | function throwto(t::Task, @nospecialize exc) | ||
663 | t.exception = exc | ||
664 | return yieldto(t) | ||
665 | end | ||
666 | |||
667 | function ensure_rescheduled(othertask::Task) | ||
668 | ct = current_task() | ||
669 | W = Workqueues[Threads.threadid()] | ||
670 | if ct !== othertask && othertask.state === :runnable | ||
671 | # we failed to yield to othertask | ||
672 | # return it to the head of a queue to be retried later | ||
673 | tid = Threads.threadid(othertask) | ||
674 | Wother = tid == 0 ? W : Workqueues[tid] | ||
675 | pushfirst!(Wother, othertask) | ||
676 | end | ||
677 | # if the current task was queued, | ||
678 | # also need to return it to the runnable state | ||
679 | # before throwing an error | ||
680 | list_deletefirst!(W, ct) | ||
681 | nothing | ||
682 | end | ||
683 | |||
684 | function trypoptask(W::StickyWorkqueue) | ||
685 | isempty(W) && return | ||
686 | t = popfirst!(W) | ||
687 | if t.state !== :runnable | ||
688 | # assume this somehow got queued twice, | ||
689 | # probably broken now, but try discarding this switch and keep going | ||
690 | # can't throw here, because it's probably not the fault of the caller to wait | ||
691 | # and don't want to use print() here, because that may try to incur a task switch | ||
692 | ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), | ||
693 | "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n") | ||
694 | return | ||
695 | end | ||
696 | return t | ||
697 | end | ||
698 | |||
699 | @noinline function poptaskref(W::StickyWorkqueue) | ||
700 | task = trypoptask(W) | ||
701 | if !(task isa Task) | ||
702 | task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any), trypoptask, W) | ||
703 | end | ||
704 | return Ref(task) | ||
705 | end | ||
706 | |||
707 | function wait() | ||
708 | W = Workqueues[Threads.threadid()] | ||
709 | reftask = poptaskref(W) | ||
710 | result = try_yieldto(ensure_rescheduled, reftask) | ||
711 | process_events() | ||
712 | # return when we come out of the queue | ||
713 | return result | ||
714 | end | ||
715 | |||
716 | if Sys.iswindows() | ||
717 | pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff) | ||
718 | else | ||
719 | pause() = ccall(:pause, Cvoid, ()) | ||
720 | end |