Многонишково програмиране

02 декември 2025

Административни неща

Административни неща

Административни неща

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Fearless concurrency

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    thread::spawn(|| {
        // това няма да се принтира, защото програмата
        // ще завърши преди втората нишка да е започнала
        println!("hi from spawned thread");
    });

    println!("hi from main thread");
}
hi from main thread
use std::thread;

fn main() {
    thread::spawn(|| {
        // това няма да се принтира, защото програмата
        // ще завърши преди втората нишка да е започнала
        println!("hi from spawned thread");
    });

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7 8 9 10 11 12
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото главната нишка
        // ни изчаква
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото главната нишка
        // ни изчаква
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

Сигнатурата на std::thread::spawn

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото изчакваме
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // това ще се принтира, защото изчакваме
        println!("hi from spawned thread")
    });

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

1 2 3 4 5 6 7 8 9 10 11
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // very hard computation ...
        42
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}
The answer is Ok(42)
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        // very hard computation ...
        42
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}

Нишки

1 2 3 4 5 6 7 8 9 10
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        panic!("too hard computation ...");
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}
thread '<unnamed>' panicked at src/bin/main_480a341d0afaddfcfc405e2dd9b455c8eb27e962.rs:5:9: too hard computation ... note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace The answer is Err(Any { .. })
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        panic!("too hard computation ...");
    });

    let answ = handle.join();
    println!("The answer is {:?}", answ);
}

Panic в нишка

Panic в нишка

Panic в нишка

Panic в нишка

Споделяне на стойности

Споделяне на стойности

Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход …

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход - води до компилационна грешка.

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10 | | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` (bin "main_44c598e1dd6669cdea1e15fcdbf808719a029646") due to 1 previous error
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn thread1() {
    println!("thread 1 started");

    thread::spawn(|| {
        println!("thread 2 started");
        thread::sleep(std::time::Duration::from_millis(1));
        println!("thread 2 will exit");
    });

    println!("thread 1 will exit");
}

fn main() {
    let _ = thread::spawn(thread1).join();
    println!("thread 1 exited");

    thread::sleep(std::time::Duration::from_millis(100));
}
thread 1 started thread 1 will exit thread 1 exited thread 2 started thread 2 will exit
use std::thread;
fn thread1() {
    println!("thread 1 started");

    thread::spawn(|| {
        println!("thread 2 started");
        thread::sleep(std::time::Duration::from_millis(1));
        println!("thread 2 will exit");
    });

    println!("thread 1 will exit");
}

fn main() {
    let _ = thread::spawn(thread1).join();
    println!("thread 1 exited");

    thread::sleep(std::time::Duration::from_millis(100));
}

Споделяне на стойности

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static

Споделяне на стойности

Ако използваме стойността само от новата нишка, можем да я преместим с move closure

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
number 0 number 1 number 2 number 3
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Отклонение

Move closure

1 2 3 4 5 6 7 8 9 10 11 12
let nums = vec![0, 1, 2, 3];

// Прихваща `nums` то референция (`&Vec<i32>`),
// защото това се използва в тялото
let closure = || {
    for i in &nums {
        println!("number {}", i);
    }
};

closure();
println!("{:?}", nums);
number 0 number 1 number 2 number 3 [0, 1, 2, 3]
fn main() {
let nums = vec![0, 1, 2, 3];

// Прихваща `nums` то референция (`&Vec`),
// защото това се използва в тялото
let closure = || {
    for i in &nums {
        println!("number {}", i);
    }
};

closure();
println!("{:?}", nums);
}

Отклонение

Move closure

1 2 3 4 5 6 7 8 9 10 11 12 13 14
let nums = vec![0, 1, 2, 3];

// Прихваща `nums` то стойност (`Vec<i32>`),
// защото това се използва в тялото
let closure = || {
    let nums = nums;

    for i in &nums {
        println!("number {}", i);
    }
};

closure();
// println!("{:?}", nums); // error: use of moved value
number 0 number 1 number 2 number 3
fn main() {
let nums = vec![0, 1, 2, 3];

// Прихваща `nums` то стойност (`Vec`),
// защото това се използва в тялото
let closure = || {
    let nums = nums;

    for i in &nums {
        println!("number {}", i);
    }
};

closure();
// println!("{:?}", nums); // error: use of moved value
}

Отклонение

Move closure

1 2 3 4 5 6 7 8 9 10 11 12
let nums = vec![0, 1, 2, 3];

// Прихваща `nums` то стойност (`Vec<i32>`),
// независимо как се използва в тялото
let closure = move || {
    for i in &nums {
        println!("number {}", i);
    }
};

closure();
// println!("{:?}", nums); // error: use of moved value
number 0 number 1 number 2 number 3
fn main() {
let nums = vec![0, 1, 2, 3];

// Прихваща `nums` то стойност (`Vec`),
// независимо как се използва в тялото
let closure = move || {
    for i in &nums {
        println!("number {}", i);
    }
};

closure();
// println!("{:?}", nums); // error: use of moved value
}

Обратно към споделяне на стойности

Ако използваме стойността само от новата нишка, можем да я преместим с move closure

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
number 0 number 1 number 2 number 3
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Но това не би работило ако имаме повече от една нишка

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(move || {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0382]: use of moved value: `nums` --> src/bin/main_fb665c10ea221cdd41e075bfd1468b11553fe02f.rs:8:36 | 4 | let nums = vec![0, 1, 2, 3]; | ---- move occurs because `nums` has type `Vec<i32>`, which does not implement the `Copy` trait ... 7 | for _ in 0..2 { | ------------- inside of this loop 8 | handles.push(thread::spawn(move || { | ^^^^^^^ value moved into closure here, in previous iteration of loop 9 | for i in &nums { | ---- use occurs due to use in closure | help: consider cloning the value before moving it into the closure | 8 ~ let value = nums.clone(); 9 ~ handles.push(thread::spawn(move || { 10~ for i in &value { | For more information about this error, try `rustc --explain E0382`. error: could not compile `rust` (bin "main_fb665c10ea221cdd41e075bfd1468b11553fe02f") due to 1 previous error
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    let mut handles = vec![];
    for _ in 0..2 {
        handles.push(thread::spawn(move || {
            for i in &nums {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Scoped threads

Един вариант е да използваме scoped threads API-то

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    thread::scope(|s| {
        for _ in 0..2 {
            s.spawn(|| {
                for i in &nums {
                    println!("number {}", i);
                }
            });
        }
    });
}
number 0 number 1 number 2 number 0 number 1 number 2 number 3 number 3
use std::thread;

fn main() {
    let nums = vec![0, 1, 2, 3];

    thread::scope(|s| {
        for _ in 0..2 {
            s.spawn(|| {
                for i in &nums {
                    println!("number {}", i);
                }
            });
        }
    });
}

Scoped threads

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
thread::scope(|s /*: thread::Scope<'_, '_> */| {
    // тази функция се изпълнява в същата нишка

    for _ in 0..2 {
        // Scope::spawn създава нова нишка
        // Новата нишка може да държи референции към локални променливи
        s.spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        });
    }

    // на края на функцията всички нишки създадени чрез Scope::spawn
    // се join-ват.
});
use std::thread;
fn main() {
let nums = vec![1, 2, 3];
thread::scope(|s /*: thread::Scope<'_, '_> */| {
    // тази функция се изпълнява в същата нишка

    for _ in 0..2 {
        // Scope::spawn създава нова нишка
        // Новата нишка може да държи референции към локални променливи
        s.spawn(|| {
            for i in &nums {
                println!("number {}", i);
            }
        });
    }

    // на края на функцията всички нишки създадени чрез Scope::spawn
    // се join-ват.
});
}

Scoped threads

В сигнатурата на Scope::spawn ограничението е F: 'scope, а не F: 'static

1 2 3 4 5 6 7
impl<'scope, 'env> Scope<'scope, 'env> {
    pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
    where
        F: FnOnce() -> T + Send + 'scope,
        T: Send + 'scope,
    { /* ... */ }
}

Споделяне на стойности - Rc

Друг вариант е да използваме споделена собственост (shared ownership).
Дали ще проработи с Rc?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Rc

Друг вариант е да използваме споделена собственост (shared ownership).
Дали ще проработи с Rc? - не

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ------------- ^------ | | | | ______________________|_____________within this `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}` | | | | | required by a bound introduced by this call 12 | | for i in &*nums_rc { 13 | | println!("number {}", i); 14 | | } 15 | | })); | |_________^ `Rc<Vec<i32>>` cannot be sent between threads safely | = help: within `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Vec<i32>>` note: required because it's used within this closure --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ^^^^^^^ note: required by a bound in `spawn` --> /home/nikola/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:726:8 | 723 | pub fn spawn<F, T>(f: F) -> JoinHandle<T> | ----- required by a bound in this function ... 726 | F: Send + 'static, | ^^^^ required by this bound in `spawn` For more information about this error, try `rustc --explain E0277`. error: could not compile `rust` (bin "main_1c4b38ddc008640338e13791daaf3eccafcab0dc") due to 1 previous error
use std::thread;
use std::rc::Rc;

fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_rc = Rc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_rc = Rc::clone(&nums_rc);
        handles.push(thread::spawn(move || {
            for i in &*nums_rc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Arc

Проблема е, че Rc не е thread safe.
Трябва да използваме Arc.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_arc = Arc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_arc = Arc::clone(&nums_arc);
        handles.push(thread::spawn(move || {
            for i in &*nums_arc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::thread;
use std::sync::Arc;

fn main() {
    let nums_vec = vec![0, 1, 2, 3];
    let nums_arc = Arc::new(nums_vec);

    let mut handles = vec![];
    for _ in 0..2 {
        let nums_arc = Arc::clone(&nums_arc);
        handles.push(thread::spawn(move || {
            for i in &*nums_arc {
                println!("number {}", i);
            }
        }));
    }

    for h in handles {
        let _ = h.join();
    }
}

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Споделяне на стойности - Arc

std::sync::Arc

Send и Sync

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static

Send и Sync

Трейтовете std::marker::Send и std::marker::Sync показват дали един тип е thread safe.
Т.е. дали обекти от този тип могат да се използват безопасно в многонишков контекст.

1 2
pub unsafe auto trait Send { }
pub unsafe auto trait Sync { }

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Sync

Send и Sync

Въпрос

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Въпрос

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Въпрос

Дали обикновен тип като Vec<T> имплементира Sync?

Send и Sync

Аuto traits

1
pub struct Token(u32);
pub struct Token(u32);
fn main() {}

Auto trait docs

Send и Sync

Unsafe traits

1 2 3 4
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {}
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}

Send и Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send и Sync

Деимплементация

1 2 3
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {}
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);

Send и Sync

Деимплементация

1 2 3
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {}
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);

Примитиви за синхронизация

Примитиви за синхронизация

Ако искаме да модифицираме стойност от няколко нишки?

Примитиви за синхронизация

Ако искаме да модифицираме стойност от няколко нишки?

Примитиви за синхронизация

Ако искаме да модифицираме стойност от няколко нишки?

Примитиви за синхронизация

Ако искаме да модифицираме стойност от няколко нишки?

Примитиви за синхронизация

Ако искаме да модифицираме стойност от няколко нишки?

Примитиви за синхронизация

Ако искаме да модифицираме стойност от няколко нишки?

Примитиви за синхронизация

Модула std::sync

Mutex

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock()` връща "умен указател" с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock()` връща "умен указател" с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Мutex

Обикновенно мутекса се възприема като примитива която определя критична секция

1 2 3 4 5 6 7
lock(my_mutex);
// начало на критичната секция

do_stuff(shared_data);

// край на критичната секция
unlock(my_mutex);

В Ръст това не би било удобно, защото не дава достатъчна информация на компилатора как ползваме данните.
Затова Mutex е generic и опакова данните.

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

RwLock

RwLock

RwLock

RwLock

Mutex или RwLock

Mutex или RwLock

Mutex или RwLock

Mutex или RwLock

Arc + Mutex

Често използвани са:

Подобно на Rc<RefCell<T>>

Arc + Mutex

Пример

(Това всъщост е доста неоптимално решение, но…)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
let nums = (0..20).collect::<Vec<_>>();

let mut odds = Arc::new(Mutex::new(vec![]));
let mut evens = Arc::new(Mutex::new(vec![]));

thread::scope(|s| {
    let ranges = [0..10, 10..20];

    for range in ranges {
        s.spawn(|| {
            for n in &nums[range] {
                if n % 2 == 0 {
                    evens.lock().unwrap().push(n);
                } else {
                    odds.lock().unwrap().push(n);
                }
            }
        });
    }
});

println!("evens: {:?}", evens.lock().unwrap());
println!("odds:  {:?}", odds.lock().unwrap());
evens: [0, 2, 10, 4, 12, 6, 14, 8, 16, 18] odds: [1, 3, 11, 5, 13, 7, 15, 9, 17, 19]
use std::thread;
use std::sync::{Arc, Mutex};

fn main() {
let nums = (0..20).collect::>();

let mut odds = Arc::new(Mutex::new(vec![]));
let mut evens = Arc::new(Mutex::new(vec![]));

thread::scope(|s| {
    let ranges = [0..10, 10..20];

    for range in ranges {
        s.spawn(|| {
            for n in &nums[range] {
                if n % 2 == 0 {
                    evens.lock().unwrap().push(n);
                } else {
                    odds.lock().unwrap().push(n);
                }
            }
        });
    }
});

println!("evens: {:?}", evens.lock().unwrap());
println!("odds:  {:?}", odds.lock().unwrap());
}

Други примитиви

Канали

MPSC

Канали

Do not communicate by sharing memory;
instead, share memory by communicating.

-- slogan from "Effective Go"

Канали в стандартната библиотека

1 2 3 4 5 6 7 8 9 10 11 12
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

тип метод резултат грешки
Sender send(T) Result<(), SendError<T>> disconnected
Receiver recv() Result<T, RecvError> disconnected
Receiver try_recv() Result<T, TryRecvError> Empty, Disconnected
Receiver recv_timeout(Duration) Result<T, RecvTimeoutError> Timeout, Disconnected

Типове канали

Неограничен канал

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
}

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Ограничен канал

тип метод резултат грешки
SyncSender send(T) Result<(), SendError<T>> disconnected
SyncSender try_send(T) Result<(), TrySendError<T>> Full, Disconnected
Receiver recv() Result<T, RecvError> disconnected
Receiver try_recv() Result<T, TryRecvError> Empty, Disconnected
Receiver recv_timeout(Duration) Result<T, RecvTimeoutError> Timeout, Disconnected

Типове канали

Ограничен канал

1 2 3 4 5 6 7 8 9 10 11 12
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}

Канали

Множество изпращачи

Изпращащата част може да се клонира

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let (sender1, receiver) = mpsc::channel();
let sender2 = sender1.clone();

thread::spawn(move || {
    sender1.send(1).unwrap();
    sender1.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
1 3 2 4
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender1, receiver) = mpsc::channel();
let sender2 = sender1.clone();

thread::spawn(move || {
    sender1.send(1).unwrap();
    sender1.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
}

Канали

Множество получатели

Канали

Множество получатели

Канали

Множество получатели

Канали

Множество получатели

Канали

Итератори

1 2 3 4 5 6 7 8 9 10 11 12
// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
    println!("received {}", msg);
}

// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
    println!("received {}", msg);
}
use std::sync::mpsc;
fn main() {
let (sender, receiver) = mpsc::channel::();
// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
    println!("received {}", msg);
}

// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
    println!("received {}", msg);
}
}

Crossbeam channel

Crossbeam channel

Разлики в API-то

std::sync::mpsc crossbeam_channel
вид MPSC MPMC
неограничен channel() unbounded()
неограничен - типове (Sender, Receiver) (Sender, Receiver)
ограничен sync_channel(k) bounded(k)
ограничен - типове (SyncSender, Receiver) (Sender, Receiver)

Външни библиотеки

Crossbeam

Външни библиотеки

Parking lot

Въпроси