第3回 Rust 勉強会 資料

こんにちは,M2の小倉です.

乃村研では5月から,Rust勉強会を行っています.
ScrapBox の勉強会概要ページ: https://scrapbox.io/nomlab/Rust%E5%8B%89%E5%BC%B7%E4%BC%9A

今回は,第3回Rust勉強会(2019/6/12 17:00-)の資料を掲載します.

The Rust Programming Language 要約 16,20章

http://doc.rust-jp.rs/book/second-edition/

16章 恐れるな!並行性

https://doc.rust-jp.rs/book/second-edition/ch16-00-concurrency.html

スレッド

  • プログラム内で,独立した部分を同時に実行可能
  • 独立した部分を走らせる機能がスレッド
  • プログラム内の計算を複数のスレッドで分割することで,パフォーマンスが向上

spawnで新規スレッドを生成する

  • 新規スレッドの生成は,thread::spawnメソッドを呼び出す
  • 新規スレッドで走らせたいコードは,クロージャで渡す
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            // やあ!立ち上げたスレッドから数字{}だよ!
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        // メインスレッドから数字{}だよ!
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}
  • 上記プログラムは,メインスレッドと新規スレッドからテキストを出力する
  • 実行結果は以下のようになる
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
  • 新規スレッドの処理が途中までしか完了していない
    • 新規スレッドは,実行が終わったかに関わらず,メインスレッドの終了とともに停止するため
    • 新規スレッドの処理が完了されるだけでなく,実行されるかどうかも保証されない

join ハンドルで全スレッドの終了を待つ

  • thread::spawnの戻り値(JoinHandle型)を用いる
  • JoinHandle型のJoinメソッドを呼び出すことで,生成したスレッドの処理が完了することを保証
use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}
  • joinメソッドを呼び出すと,変数handle(thread::spawnの戻り値)が示すスレッドの処理が完了するまで,現在実行中のスレッドをブロック
    • 複数のスレッドを生成した場合,生成したすべてのスレッドに対してjoinメソッドを呼ぶ必要有
  • 実行結果は以下のようになる
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
  • ここで,handle.joinメソッドを呼び出す位置を変えると…
use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}
  • 実行結果は以下のようになる
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!
  • 新規スレッドの処理が完了するまで,メインスレッドの処理が始まらない
  • joinメソッドをどこで呼ぶかによって,スレッドが同時に走らなくなる

スレッドでmoveクロージャを使用する

  • あるスレッドのデータを別スレッドで使用できるようにするためにmoveクロージャがよく使用される
  • メインスレッドで生成したベクタを,立ち上げたスレッドで使用する場合を考える
  • 以下のコードは動かない
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        // こちらがベクタ: {:?}
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}
  • 上記コードをコンパイルすると以下のようなエラーが出力される
error[E0373]: closure may outlive the current function, but it borrows `v`,
which is owned by the current function
(エラー: クロージャは現在の関数よりも長生きするかもしれませんが、現在の関数が所有している
`v`を借用しています)
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
help: to force the closure to take ownership of `v` (and any other referenced
variables), use the `move` keyword
(助言: `v`(や他の参照されている変数)の所有権をクロージャに奪わせるには、`move`キーワードを使用してください)
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^
  • クロージャはベクタvを借用しようとする
  • しかし,立ち上げたスレッドがどの程度走るか分からないため,ベクタvへの参照が常に有効であるか把握できない
  • 以下のコードはもう一つの例
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    // いや〜!
    drop(v); // oh no!

    handle.join().unwrap();
}
  • メインスレッドは,スレッドを立ち上げた後,ベクタvをドロップする
  • スレッドがベクタvを参照する頃には,ベクタvへの参照が不正となる
  • そこで,moveクロージャを使用して,所有権を強制的に奪わせる
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

メッセージ受け渡し

  • メッセージ(データを含む)をやり取りすることでメモリを共有する
  • チャンネル: メッセージをやり取りするための道具の1つ
    • 標準ライブラリで提供
    • 転送機と受信機の2つで構成
    • パイプに似てる?
  • チャンネルは以下のように生成可能
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}
  • mpsc::channel関数で新規チャンネルを生成
  • tx: 転送機,rx: 受信機
  • 新規スレッドからのメッセージをメインスレッドで受け取るプログラムを以下に示す
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    // 値は{}です
    println!("Got: {}", received);
}
  • sendメソッド: 受信機へメッセージを送信
    • Result<T, E>型を返す
    • 受信機がドロップされていればエラー
  • recvメソッド: 転送機からのメッセージを受信
    • Result<T, E>型を返す.
    • メッセージが届くまでスレッドをブロック
    • 転送機がドロップするとエラー
  • メッセージの受信には,try_recvメソッドもある.
    • スレッドのブロックがなく,即座にResult<T, E>型を返す
    • メッセージがなければエラー
  • 実行結果は以下のようになる
Got: hi

チャンネルと所有権の転送

  • データをチャンネルで転送した後に使用してみる
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        // valは{}
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}
  • このコードはコンパイル時に以下のエラーが出力される
error[E0382]: use of moved value: `val`
  --> src/main.rs:10:31
   |
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value used here after move
   |
   = note: move occurs because `val` has type `std::string::String`, which does
not implement the `Copy` trait
  • sendメソッドは,引数の所有権を受信側に渡す
  • 送信後,送信側のスレッドは引数の所有権を持たない

複数の値を送信し,受信側が待機するのを確かめる

  • 受信側がメッセージの送信を待つか確かめる
  • 立ち上げたスレッドが,複数のメッセージを1秒間隔で送信するプログラムを実行
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        // スレッドからやあ(hi from the thread)
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}
  • メインスレッドでは,recvメソッドを明示的に呼んでいない
  • rxをイテレータとして扱い,受信した値それぞれを出力している
  • 実行結果は以下のようになる
Got: hi
Got: from
Got: the
Got: thread
  • 受信側であるメインスレッドは,停止やsleepしない
  • メッセージを受信するまで待機していることが分かる

転送機をクローンして複数の生成器を作成する

  • 送信機をクローンすることで,同じ受信機に送信する複数のスレッドを生成
let (tx, rx) = mpsc::channel();

let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    // 君のためにもっとメッセージを(more messages for you)
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {}", received);
}

// --snip--
  • cloneメソッドで転送機を複製
  • 出力は以下のようになる
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

状態共有

ミューティックスを使用して一度に1つのスレッドからデータにアクセスすることを許可する

  • ミューティックスの規則
    • データを使用する前にロックを獲得
    • データの使用が終わったらアンロックする
  • ミューティックスの使用方法を以下に示す
use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}
  • newメソッドを使用してミューティックスを生成
    • 上記コードでは内部データは 5
  • lockメソッド: ロックの獲得
    • 戻り値は,中に入っているデータの可変参照
  • ロックはスコープを抜けるとアンロックされる

複数スレッド間でMutex<T>を共有する

  • 10個のスレッドで値を共有するプログラムを以下に示す
    • これはコンパイルエラーを起こす
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
  • コンパイルすると以下のようなエラーが出力
error[E0382]: capture of moved value: `counter`
(エラー: ムーブされた値をキャプチャしています: `counter`)
  --> src/main.rs:10:27
   |
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved (into closure) here
10 |             let mut num = counter.lock().unwrap();
   |                           ^^^^^^^ value captured here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error[E0382]: use of moved value: `counter`
  --> src/main.rs:21:29
   |
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved (into closure) here
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value used here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error: aborting due to 2 previous errors
(エラー: 前述の2つのエラーによりアボート)
  • 変数counterがクロージャにムーブされ,lockメソッドを呼び出したときにキャプチャされているらしい
  • よくわからないので,プログラムを単純にしてみる
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    let handle = thread::spawn(move || {
        let mut num = counter.lock().unwrap();

        *num += 1;
    });
    handles.push(handle);

    let handle2 = thread::spawn(move || {
        let mut num2 = counter.lock().unwrap();

        *num2 += 1;
    });
    handles.push(handle2);

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
  • コンパイルエラーは以下のようになる
error[E0382]: capture of moved value: `counter`
  --> src/main.rs:16:24
   |
8  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
16 |         let mut num2 = counter.lock().unwrap();
   |                        ^^^^^^^ value captured here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error[E0382]: use of moved value: `counter`
  --> src/main.rs:26:29
   |
8  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
26 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value used here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error: aborting due to 2 previous errors
  • 変数counterは最初に生成したスレッドにムーブされている
  • 後から生成したスレッドでcounterをキャプチャしようとしているためエラー
  • 変数counterの所有権を複数スレッドに移すことはできない
  • 変数の所有権を複数のスレッドに移すには,複数の所有権が必要

複数のスレッドで複数の所有権

  • スマートポインタのRc<T>を使用して,変数に複数の所有権を与える
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
  • コンパイルすると以下のようなエラーが出力
error[E0277]: the trait bound `std::rc::Rc<std::sync::Mutex<i32>>:
std::marker::Send` is not satisfied in `[closure@src/main.rs:11:36:
15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
(エラー: トレイト境界`std::rc::Rc<std::sync::Mutex<i32>>:
std::marker::Send`は`[closure@src/main.rs:11:36:15:10
counter:std::rc::Rc<std::sync::Mutex<i32>>]`で満たされていません)
  --> src/main.rs:11:22
   |
11 |         let handle = thread::spawn(move || {
   |                      ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>`
cannot be sent between threads safely
                          (`std::rc::Rc<std::sync::Mutex<i32>>`は、スレッド間で安全に送信できません)
   |
   = help: within `[closure@src/main.rs:11:36: 15:10
counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is
not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
     (ヘルプ: `[closure@src/main.rs:11:36 15:10
     counter:std::rc::Rc<std::sync::Mutex<i32>>]`内でトレイト`std::marker::Send`は、
     `std::rc::Rc<std::sync::Mutex<i32>>`に対して実装されていません)
   = note: required because it appears within the type
`[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
     (注釈: 型`[closure@src/main.rs:11:36 15:10
     counter:std::rc::Rc<std::sync::Mutex<i32>>]`内に出現するので必要です)
   = note: required by `std::thread::spawn`
     (注釈: `std::thread::spawn`により必要とされています)
  • Rc<T>では,スレッド間で変数を安全に共有できない
    • Rc<T>が参照カウントを書き換える際に競合が発生する可能性有

Arc<T>で原始的な参照カウント

  • Arc<T>: 参照カウントの書換えをアトミックに行うRc<T>
  • シングルスレッドで値に処理を施すだけなら,Rc<T>の方が高速
use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
  • 実行すると以下のような出力が得られる
Result: 10

RefCell<T>/Rc<T>とMutex<T>/Arc<T>の類似性

  • Mutex<T>は,Cell系のように内部可変性を提供.
  • コンパイラは,Mutex<T>を使用する際のロジックエラーを保護しない
  • Mutex<T>は,デッドロックを生成する可能性有

SyncとSendトレイトで拡張可能な並行性

  • 並列性概念として,SendとSync

Sendでスレッド間の所有権の転送を許可する

  • Send: 実装されている型の変数は,所有権をスレッド間で転送可能
  • Rustのほとんどの型はSendが実装されているが,一部例外有
    • Rc<T>
    • Rc<T>を別スレッドに転送できると,複数のスレッドが同時に参照カウントを更新可能
    • マルチスレッドではArc<T>を使用する

Syncで複数のスレッドからのアクセスを許可する

  • Sync: 実装されている型の変数は,複数のスレッドから参照されても安全
  • Sendと同様にRc<T>では,Syncは実装されていない
  • RefCell<T>型でも実装されていない
  • マルチスレッドではMutex<T>を使用する

20章 最後のプロジェクト: マルチスレッドのWebサーバを構築する

https://doc.rust-jp.rs/book/second-edition/ch20-00-final-project-a-web-server.html

  • helloと話すWebサーバを作る

シングルスレッドのWebサーバ

  • Webサーバ構築には2つのプロトコルが関係
    • HTTP
    • TCP

TCP接続をリッスンする

  • WebサーバはTCP接続を待ち受ける
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        // 接続が確立しました
        println!("Connection established!");
    }
}
  • 127.0.0.1:7878というアドレスに来るTCPストリームを待ち受ける
  • TcpListenerでアドレス127.0.0.1:7878に来るTCP接続を待ち受ける
  • incomingメソッドは一連のTCPストリームを与えるイテレータを返す
  • コードを実行し,Webブラウザでアクセスるると以下のような出力が得られる
     Running `target/debug/hello`
Connection established!
Connection established!
Connection established!
  • 現時点では,ブラウザ上には何も表示されない

リクエストを読み取る

  • ブラウザからのリクエストを読み取る機能を実装
use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];

    stream.read(&mut buffer).unwrap();

    println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}
  • std::io::preludeをスコープすることで,ストリームから読み書き可能なトレイトへアクセス可能
  • stream.readメソッドで,TCPストリームからバイトを読み取ってbufferに格納
  • stream.readメソッドの引数bufferは,内部状態が変化する可能性があるため,可変引数としている
  • String::from_utf8_lossyメソッドで,バイトを文字列に変換
    • lossyは無効なUTF-8シーケンスを目の当たりにした際のこの関数の振る舞いを示唆
  • コードを実行し,Webブラウザでアクセスすると以下のような出力が得られる
$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.42 secs
     Running `target/debug/hello`
Request: GET / HTTP/1.1
Host: 127.0.0.1:7878
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101
Firefox/52.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Connection: keep-alive
Upgrade-Insecure-Requests: 1
������������������������������������

HTTPリクエストを詳しく見る

  • HTTPは以下のフォーマットに則る
Method Request-URI HTTP-Version CRLF
headers CRLF
message-body
  • 各行は以下のとおり
    • リクエスト: クライアントの要求についての情報を保持
    • URI
    • クライアントの使用しているHTTPのバージョン
  • リクエストの最後はCRLFで終了する.(CRLFは\r\nとも表記される)
  • 受け取ったリクエストから,以下の3つが確認できる
    • GETがメソッド
    • / が要求URI
    • HTTP/1.1がバージョン

レスポンスを記述する

  • レスポンスは以下のようなフォーマット
HTTP-Version Status-Code Reason-Phrase CRLF
headers CRLF
message-body
  • 1行目は,ステータス
    • HTTPバージョン
    • ステータスコード
    • ステータスコードをテキストで表現する理由フレーズ
    • CRLF
  • CRLFの後には,ヘッダ,別のCRLF,レスポンスの本体がある
  • HTTPバージョン1.1,ステータスコードが200,OKフレーズ,ヘッダと本体なしのレスポンスを以下に示す
HTTP/1.1 200 OK\r\n\r\n
  • このレスポンスを返答するようにhandle_connection関数を改変
fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];

    stream.read(&mut buffer).unwrap();

    let response = "HTTP/1.1 200 OK\r\n\r\n";

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}
  • streamのwriteメソッドを使用し,レスポンスのデータをバイトで送信する
  • flushメソッドで,バイトがすべて送信されるまで実行をブロック
  • 上記コードを実行し,Webブラウザでアクセスすると空のページが表示される

本物のHTMLを返す

  • HTMLファイルを記述し,プロジェクトのルートディレクトリに作成
  • 以下にHTMLファイルの例を示す
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <!-- やあ! -->
    <h1>Hello!</h1>
    <!-- Rustからやあ -->
    <p>Hi from Rust</p>
  </body>
</html>
  • このファイルを読み込み,本体としてレスポンスに追加して送る
use std::fs::File;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let mut file = File::open("hello.html").unwrap();

    let mut contents = String::new();
    file.read_to_string(&mut contents).unwrap();

    let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}
  • format!マクロで,ファイルの中身をレスポンスの本体として追記
  • 上記コードを実行し,WebブラウザでアクセスするとHTMLが描画される

リクエストにバリデーションをかけ、選択的にレスポンスを返す

  • 現状,クライアントが何を要求しても,作成したHTMLを返す
  • / をリクエストしているか確認し,違う場合エラーを返すようにする
// --snip--

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    if buffer.starts_with(get) {
        let mut file = File::open("hello.html").unwrap();

        let mut contents = String::new();
        file.read_to_string(&mut contents).unwrap();

        let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

        stream.write(response.as_bytes()).unwrap();
        stream.flush().unwrap();
    } else {
        // 何か他の要求
        // some other request
    }
}
  • / リクエストに対応するデータをget変数にハードコード
  • starts_withメソッドで,リクエストがgetから始まっているか確認
  • 上記コードを実行し,Webブラウザでアクセスすると以下のような結果が得られる
    • 127.0.0.1:7878にアクセスすると,HTMLが描画
    • 127.0.0.1:7878/something-elseにアクセスすると,接続エラー
  • 上記コードの内,elseブロックにステータスコードが404のレスポンスを返すように改変
// --snip--

} else {
    let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
    let mut file = File::open("404.html").unwrap();
    let mut contents = String::new();

    file.read_to_string(&mut contents).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}
  • レスポンスの本体は404.htmlファイルに記述
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <!-- ああ! -->
    <h1>Oops!</h1>
    <!-- すいません。要求しているものが理解できません -->
    <p>Sorry, I don't know what you're asking for.</p>
  </body>
</html>

上記コードを実行し,Webブラウザで127.0.0.1:7878/fooなどにアクセスすると,エラーHTMLが返る

リファクタリングの触り

  • 作成したコードでは,ifとelseのブロックに大きな差異はない
  • リファクタリングし,コードを簡潔にする
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let mut file = File::open(filename).unwrap();
    let mut contents = String::new();

    file.read_to_string(&mut contents).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

シングルスレッドサーバをマルチスレッド化する

  • シングルスレッドでは,リクエストの処理が終わるまで新しいリクエストに対応できない
  • リクエストをサーバが処理する時間が長いと問題

現在のサーバの実装で遅いリクエストをシミュレーションする

  • 応答する前に5秒スリープさせるレスポンスをシミュレートした /sleepへのリクエストを実装
use std::thread;
use std::time::Duration;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    // --snip--
}
  • 上記コードを実行し,http://localhost:7878/sleepへアクセスする
  • その後,http://localhost:7878/にアクセスすると,sleepへのアクセスにおける5秒のスリープが終わるまで待機する

スレッドプールでスループットを向上させる

  • スレッドプール: タスクを処理する準備のできたスレッド群
  • タスクに対して,プール中のスレッドのどれかを割り当てることで,並列にタスクを処理可能
  • プール内のスレッド数は小さくするべき
    • DoS攻撃への対処
  • 今回は,プール内に有限数のスレッドを待機
  • スレッドプールの実装にはコンパイラクドウ開発を使用

各リクエストに対してスレッドを立ち上げられる場合のコードの構造

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}
  • リクエストが来るたびにスレッドを生成し,処理を割り当てる
  • リクエストが集中すると,スレッドが大量に生成され,システムが落ちる可能性有

有限数のスレッド用に似たインターフェイスを作成する

  • スレッドプールのインタフェースは,スレッドに似た動作をしてほしい
    • スレッドからの変更に伴う工数の削減
  • thread::spawnの代わりとなるThreadPool構造体の架空のインタフェースを使用する
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}
  • TheadPool::newにより,設定可能なスレッド数で新しいスレッドプールを作成
  • pool.executeで,各ストリームに対して実行すべきクロージャを受け取り,プール内のスレッドに与える
    • thread::spawnと似たインタフェース
  • コンパイルし,修正のヒントを得る
$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
(エラー: 解決に失敗しました。未定義の型またはモジュール`ThreadPool`を使用しています)
  --> src\main.rs:10:16
   |
10 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^^^^^^ Use of undeclared type or module
   `ThreadPool`

error: aborting due to previous error
  • エラーから,ThreadPool型かモジュールが必要だとわかる
  • 現在のクレート(ここでは,helloとする)をバイナリクレートからライブラリクレートに切り換え,ThreadPoolの実装を保持させる
    • ThreadPoolの実装はWebサーバが行う仕事の種類とは独立しているため
  • 以下の内容を記述したsrc/lib.rsを生成
pub struct ThreadPool;
  • その後,以下のコマンドを実行(プロジェクトのルートディレクトリにいる前提)
$ mkdir src/bin
$ mv main.rs src/bin
  • これにより,ライブラリクレートに切り替わる
  • ライブラリクレートであっても,src/bin/main.rsのバイナリは cargo run で実行可能
  • src/bin/main.rsの先頭に以下の内容を追記し,ThreadPoolをスコープに導入
extern crate hello;
use hello::ThreadPool;
  • 再度コンパイルし,エラーから修正のヒントを得る
$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for type
`hello::ThreadPool` in the current scope
(エラー: 現在のスコープで型`hello::ThreadPool`の関数または関連アイテムに`new`というものが見つかりません)
 --> src/bin/main.rs:13:16
   |
13 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^^^^^^ function or associated item not found in
   `hello::ThreadPool`
  • エラーから,ThreadPoolに対して,newというメソッドを作成する必要があることが分かる
  • newメソッドに関して,4を引数として受け入れる引数が1つあり,ThreadPoolインスタンスを返すことが分かっている
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}
  • sizeの引数はusize
    • マイナスのスレッド数はありえないため
  • 再度コンパイル
$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
(警告: 未使用の変数: `size`)
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^
  |
  = note: #[warn(unused_variables)] on by default
  = note: to avoid this warning, consider using `_size` instead

error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
  --> src/bin/main.rs:18:14
   |
18 |         pool.execute(|| {
   |              ^^^^^^^
  • 警告は一時的に無視
  • エラーから,ThreadPoolにexecuteメソッドが実装する必要があることが分かる
  • executeメソッドは,thread::spawnと似たインタフェース
  • 与えられたクロージャを受け取り,プール中のスレッドに渡す.
  • thread::spawnを参考に,クロージャを引数として受け取るように定義する
  • 以下にthread::spawnの定義を示す
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static
  • thread::spawnではトレイト協会としてFnOnceを使用している
  • Sendとライフタイム境界の ‘staticも必要
  • これらから,executeメソッドは以下のように定義できる
impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}
  • 再度コンパイル
$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^
  |
  = note: #[warn(unused_variables)] on by default
  = note: to avoid this warning, consider using `_size` instead

warning: unused variable: `f`
 --> src/lib.rs:8:30
  |
8 |     pub fn execute<F>(&self, f: F)
  |                              ^
  |
  = note: to avoid this warning, consider using `_f` instead
  • 警告だけになったためコンパイルは可能
  • クロージャを呼び出すように実装していないため,ブラウザにHTMLは描画されない

newでスレッド数を検査する

  • newメソッドの本体を実装
  • まず,引数について考える
  • 0 の場合,プログラムをパニックさせる

impl ThreadPool {
    /// 新しいThreadPoolを生成する。
    ///
    /// sizeがプールのスレッド数です。
    ///
    /// # パニック
    ///
    /// sizeが0なら、`new`関数はパニックします。
    ///
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}
  • doc comment でThreadPoolにドキュメンテーションを追加

スレッドを格納するスペースを生成

  • thread::spawnシグネチャを参考に,スレッドの格納方法を考える
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static
  • spawnメソッドはJoinHandle<T>を返す
    • Tはクロージャが返す型
  • ThreadPoolの場合,クロージャは何も返さないので T = ()
  • src/lib.rsに以下の変更を加える
    • ThreadPoolがthread::JoinHandle<()>インスタンスのベクタを保持するように定義を変更
    • sizeに応じたのベクタを初期化
    • スレッドを生成する何らかのコードを実行するforループの実装
    • 上記らを含むThreadPoolインスタンスの返却
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // スレッドを生成してベクタに格納する
            // create some threads and store them in the vector
        }

        ThreadPool {
            threads
        }
    }

    // --snip--
}
  • with_capacity: Vec::newと同様の作業をしつつ,ベクタにあらかじめスペースを確保
    • Vec::newと比べ少し効率的

ThreadPoolからスレッドにコードを送信する責任を負うWorker構造体

  • 生成したスレッドは,実装したいコードを受け取るまで待機してほしい
    • thread::spawnでは,生成されるとすぐに実行すべきコードが与えられる
  • この機能をWorkerというデータ構造を導入することで実装
  • src/lib.rsの変更点を以下に示す
    • idとJoinHandle<()>を保持するWorker構造体を定義
    • ThreadPoolを変更し,Workerインスタンスのベクタを保持
    • idと空のクロージャを受け取るスレッドを保持するWorkerインスタンスを返すWorker::newメソッドを定義
    • hreadPool::newでforループカウンタを使用してidを生成し,そのidで新しいWorkerを生成し,ベクタにWorkerを格納
  • これらの変更を加えたコードを以下に示す
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers
        }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}
  • このコードはコンパイルできるが,executeメソッドで得るクロージャを処理していない

チャンネル経由でスレッドにリクエストを送信する

  • ThreadPoolの生成中,Workerをそれぞれ生成する際にクロージャをthread::spawnに与える必要がある
  • ThreadPoolが保持するキューから,実行するコードをフェッチし,このコードをWorkerに送信できればいい
  • 上記のため,チャンネルをキューとして使う
  • 以下にチャンネル経由でスレッドにリクエストを送信する計画を示す
    • ThreadPoolはチャンネルを生成し、チャンネルの転送機を持つ
    • Workerそれぞれは、チャンネルの受信機を持つ
    • チャンネルに送信したいクロージャを保持する新しいJob構造体を生成する
    • executeメソッドは、実行したい仕事をチャンネルの送信
    • Workerはチャンネルの受信繰り返し,クロージャを実行
  • まず,ThreadPoolでチャンネルを生成し,転送機を保持させる
// --snip--
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}
  • 次にWorkerに受信機を渡し,クロージャの内側で使用する
impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker {
            id,
            thread,
        }
    }
}
  • コンパイルすると,以下のエラーが得られる
$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:27:42
   |
27 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here in
   previous iteration of loop
   |
   = note: move occurs because `receiver` has type
   `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
  • エラーから,受信機を複数のWorkerインスタンスに渡そうとしていることが分かる.
  • スマートポインタを使用し,複数のスレッドで所有権を共有させる.
use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    // --snip--
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
    }
}

executeメソッドを実装する

  • executeメソッドを実装
  • Jobを構造体から,クロージャの型を保持するトレイトオブジェクト型のエイリアスに変更
type Job = Box<FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--
  • 得たクロージャを使用して,新しいJobインスタンスを生成した後,Jobをチャンネルの送信機に送信
  • Worker中のthread::spawnに渡されているクロージャを無限ループさせ,受信した仕事を実行するように変更
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                // ワーカー{}は仕事を得ました; 実行します
                println!("Worker {} got a job; executing.", id);

                (*job)();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}
  • コードをコンパイルすると以下のようなエラーが得られる
error[E0161]: cannot move a value of type std::ops::FnOnce() +
std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
statically determined
(エラー: std::ops::FnOnce() + std::marker::Sendの値をムーブできません:
std::ops::FnOnce() + std::marker::Sendのサイズを静的に決定できません)
  --> src/lib.rs:63:17
   |
63 |                 (*job)();
   |                 ^^^^^^
  • Box<T>に格納されたクロージャを呼ぶ際に,クロージャがBox<T>に格納されたSelfの所有権を奪うことが原因
  • 記法self: Box<Self>を用いると,Selfの所有権を奪える
  • しかし,コンパイラはクロージャが呼び出された際には,self: Box<Self>を使用してクロージャの所有権を奪えると認識しない
  • そこで,Box<T>の内部の値の所有権を奪うことができることをコンパイラに明示的に教える
  • 具体的には以下のようにする
    • self: Box<Self>を使用するcall_boxというメソッドのある新しいトレイトFnBoxを定義
    • FnOnce()を実装する任意の型に対してFnBoxを定義
    • 型エイリアスがFnBoxトレイトを使用するように変更
    • Workerがcall_boxメソッドを使用するように変更
trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<FnBox + Send + 'static>;

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}
  • FnBoxというトレイトには,call_boxというメソッドがある
  • FnBoxトレイトをFnOnceを実装する任意の型に実装できるようにする
  • コードを実行すると以下のような出力が得られる
$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never used: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: #[warn(dead_code)] on by default

warning: field is never used: `id`
  --> src/lib.rs:61:5
   |
61 |     id: usize,
   |     ^^^^^^^^^
   |
   = note: #[warn(dead_code)] on by default

warning: field is never used: `thread`
  --> src/lib.rs:62:5
   |
62 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: #[warn(dead_code)] on by default

    Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
  • 成功!

優美なシャットダウンと片付け

  • 現時点でのサーバプログラムでは,シャットダウンができない
  • シャットダウンする前に,取り掛かっているリクエストを完了できるようにする

ThreadPoolにDropトレイトを実装する

  • プールがドロップされると,すべてのスレッドがjoinして,作業が完了するのを確かめるべき
  • ThreadPoolにDropトレイトを実装する
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            // ワーカー{}を終了します
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}
  • Workerそれぞれのスレッドに対して,joinメソッドを呼び出す
  • このコードをコンパイルすると以下のエラーが得られる
error[E0507]: cannot move out of borrowed content
  --> src/lib.rs:65:13
   |
65 |             worker.thread.join().unwrap();
   |             ^^^^^^ cannot move out of borrowed content
  • joinは引数の所有権を奪うことが原因
  • 解決には,joinメソッドがスレッドを消費できるように,Workerからスレッドをムーブする必要有
  • これは,WorkerがOption<thread::JoinHandle<()>>を保持していれば可能
    • Optionに対してtakeメソッドを呼び出し,Some列挙子から値をムーブし,その場所にNone列挙子を残すことができる
    • 言い換えれば,実行中のWorkerにはthreadにSome列挙子があり,Workerを片付けたい時には、 Workerが実行するスレッドがないようにSomeをNoneで置き換える
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}
  • コンパイルして,修正のヒントを得る
error[E0599]: no method named `join` found for type
`std::option::Option<std::thread::JoinHandle<()>>` in the current scope
  --> src/lib.rs:65:27
   |
65 |             worker.thread.join().unwrap();
   |                           ^^^^

error[E0308]: mismatched types
  --> src/lib.rs:89:13
   |
89 |             thread,
   |             ^^^^^^
   |             |
   |             expected enum `std::option::Option`, found struct
   `std::thread::JoinHandle`
   |             help: try using a variant of the expected type: `Some(thread)`
   |
   = note: expected type `std::option::Option<std::thread::JoinHandle<()>>`
              found type `std::thread::JoinHandle<_>`
  • 2つ目のエラーを見る
    • 新しいWorkerを作成する際に,threadの値をSomeで包む必要有
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}
  • 1つ目のエラーはDrop実装内にある
    • Option値に対してtakeを呼び出し、 threadをworkerからムーブする必要有
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
  • スレッドがNoneならば,何も起きない

スレッドに仕事をリッスンするのを止めるよう通知する

  • Workerインスタンスのスレッドで実行されているクロージャは無限ループするため,スレッドが終了しない
  • スレッドに対して,Jobと無限ループを抜ける通知の2種類を送信できるようにする
  • enum列挙子を使用し,チャンネルに上記2つの内,一方を送信できるようにする
enum Message {
    NewJob(Job),
    Terminate,
}
  • また,チャンネルを調整し,型Messageを使用するように変更
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

// --snip--

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
        Worker {

        let thread = thread::spawn(move ||{
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();

                match message {
                    Message::NewJob(job) => {
                        println!("Worker {} got a job; executing.", id);

                        job.call_box();
                    },
                    Message::Terminate => {
                        // ワーカー{}は停止するよう指示された
                        println!("Worker {} was told to terminate.", id);

                        break;
                    },
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}
  • 変更箇所は,ThreadPoolの定義とWorker::newのシグネチャ
  • 上記コードでもコンパイルは通るが,Terminateのメッセージを生成していないため,警告が出る
impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        // 全ワーカーを閉じます
        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            // ワーカー{}を閉じます
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}
  • Workerを2回走査している
    • Terminateを送信するため
    • 各Workerのスレッドに対してjoinを呼び出すため
  • このコードが動くことを確認するために,mainを変更し,リクエストを2つ受け付けたのち,サーバを閉じるように変更
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}
  • 実行結果は以下のようになる
$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0 secs
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 3 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 was told to terminate.
Worker 2 was told to terminate.
Worker 0 was told to terminate.
Worker 3 was told to terminate.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3


Comment

No comment