Go/chan
表示
< Go
chanはGoにおける重要なキーワードで、チャネル(channel)を宣言するために使用されます。チャネルはゴルーチン間で値を送受信するための型付きの通信パイプです。
基本的な宣言と使用方法
[編集]// チャネルの宣言 ch := make(chan int) // int型の値を送受信できるチャネル ch := make(chan string) // string型の値を送受信できるチャネル // 送信 ch <- 42 // チャネルに値を送信 // 受信 value := <-ch // チャネルから値を受信
chanと他のキーワードの組み合わせ
[編集]// バッファなしチャネル ch1 := make(chan int) // バッファありチャネル(キャパシティ指定) ch2 := make(chan int, 10) // 最大10個の値を保持できる
方向性の指定
[編集]// 双方向チャネル(デフォルト) ch := make(chan int) // 送信専用チャネル sendCh := make(chan<- int) // 受信専用チャネル recvCh := make(<-chan int) // 関数のパラメータで使用する例 func sendOnly(ch chan<- int) { ch <- 42 // 送信のみ可能 } func recvOnly(ch <-chan int) { val := <-ch // 受信のみ可能 }
select { case v1 := <-ch1: fmt.Println("ch1から受信:", v1) case ch2 <- 42: fmt.Println("ch2へ送信成功") case v3, ok := <-ch3: if !ok { fmt.Println("ch3は閉じられています") } else { fmt.Println("ch3から受信:", v3) } case <-time.After(1 * time.Second): fmt.Println("タイムアウト") default: fmt.Println("どのチャネル操作もブロックされる場合の処理") }
ch := make(chan int) close(ch) // チャネルを閉じる // 閉じられたチャネルの確認 val, ok := <-ch if !ok { fmt.Println("チャネルは閉じられています") } // 閉じられたチャネルからの反復処理 for val := range ch { // 閉じられるまで値を受信 }
ch := make(chan int, 5) // チャネルに値を送信 ch <- 1 ch <- 2 ch <- 3 close(ch) // レンジループの終了条件として閉じる // チャネルから全ての値を受信 for v := range ch { fmt.Println(v) // 1, 2, 3 の順に出力 }
nil チャネル
[編集]var ch chan int // デフォルト値はnil // nil チャネルへの送受信は永遠にブロックする // select で無効化する用途などに使用
主なユースケース
[編集]1. ゴルーチン間の通信
[編集]func main() { ch := make(chan string) go func() { ch <- "Hello from goroutine!" }() msg := <-ch fmt.Println(msg) }
2. 同期(処理の完了待ち)
[編集]func main() { done := make(chan bool) go func() { // 何らかの処理 time.Sleep(2 * time.Second) done <- true }() <-done // ゴルーチンの完了を待つ fmt.Println("処理が完了しました") }
3. 複数の結果を集める(ファンイン)
[編集]func fanIn(ch1, ch2 <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for { select { case v, ok := <-ch1: if !ok { ch1 = nil if ch2 == nil { return } continue } out <- v case v, ok := <-ch2: if !ok { ch2 = nil if ch1 == nil { return } continue } out <- v } } }() return out }
4. 値の分散処理(ワーカープール)
[編集]func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Printf("worker %d started job %d\n", id, j) time.Sleep(time.Second) // 処理時間の模擬 fmt.Printf("worker %d finished job %d\n", id, j) results <- j * 2 } } func main() { jobs := make(chan int, 100) results := make(chan int, 100) // ワーカーの起動 for w := 1; w <= 3; w++ { go worker(w, jobs, results) } // ジョブの送信 for j := 1; j <= 5; j++ { jobs <- j } close(jobs) // 結果の収集 for a := 1; a <= 5; a++ { <-results } }
5. タイムアウト処理
[編集]func main() { ch := make(chan string) go func() { time.Sleep(2 * time.Second) ch <- "処理が完了しました" }() select { case result := <-ch: fmt.Println(result) case <-time.After(1 * time.Second): fmt.Println("タイムアウトしました") } }
6. キャンセレーション(処理の中断)
[編集]func main() { done := make(chan struct{}) go func() { // キャンセル信号(Ctrl+C)を受け取る c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) <-c close(done) // キャンセル信号を送信 }() go func() { for { select { case <-done: fmt.Println("処理を中断します") return default: // 通常の処理 fmt.Println("作業中...") time.Sleep(500 * time.Millisecond) } } }() // メインゴルーチンを終了させない time.Sleep(5 * time.Second) }
7. レート制限
[編集]func main() { requests := make(chan int, 5) limiter := time.Tick(200 * time.Millisecond) // リクエストの生成 for i := 1; i <= 10; i++ { requests <- i } close(requests) // レート制限付きの処理 for req := range requests { <-limiter // 200msごとに1つの処理を許可 fmt.Println("リクエスト処理:", req) } }
8. パイプライン処理
[編集]func generator(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out } func square(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out } func main() { // パイプライン: generator -> square -> print for n := range square(generator(1, 2, 3, 4, 5)) { fmt.Println(n) // 1, 4, 9, 16, 25 } }
9. コンテキストパッケージとの連携
[編集]func main() { // タイムアウト付きコンテキストの作成 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() // コンテキストとチャネルの連携 go func(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("コンテキストが終了しました:", ctx.Err()) return default: fmt.Println("処理中...") time.Sleep(500 * time.Millisecond) } } }(ctx) // メインゴルーチンを待機 time.Sleep(3 * time.Second) }
10. エラー処理用チャネル
[編集]func runTask(errCh chan<- error) { // 何らかの処理 if rand.Intn(2) == 0 { errCh <- errors.New("タスク実行中にエラーが発生しました") } else { errCh <- nil // 成功 } } func main() { errCh := make(chan error, 3) // 複数のタスクを実行 for i := 0; i < 3; i++ { go runTask(errCh) } // すべてのエラーを収集 for i := 0; i < 3; i++ { if err := <-errCh; err != nil { fmt.Println("エラー:", err) } else { fmt.Println("タスクが成功しました") } } }
11. 優先度キュー(複数チャネル使用)
[編集]func main() { high := make(chan string) medium := make(chan string) low := make(chan string) go func() { time.Sleep(1 * time.Second) high <- "高優先度タスク" }() go func() { time.Sleep(500 * time.Millisecond) medium <- "中優先度タスク" }() go func() { low <- "低優先度タスク" }() // 優先度の高いチャネルから順に処理 for i := 0; i < 3; i++ { select { case msg := <-high: fmt.Println("処理:", msg) case msg := <-medium: // highからの受信がなければmediumを処理 fmt.Println("処理:", msg) case msg := <-low: // highとmediumからの受信がなければlowを処理 fmt.Println("処理:", msg) } } }
12. セマフォとしての使用(並行処理数の制限)
[編集]func main() { const maxConcurrent = 3 sem := make(chan struct{}, maxConcurrent) for i := 1; i <= 10; i++ { // セマフォ獲得(空きがなければブロック) sem <- struct{}{} go func(id int) { defer func() { <-sem }() // セマフォ解放 fmt.Printf("Worker %d starting\n", id) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) fmt.Printf("Worker %d done\n", id) }(i) } // すべてのゴルーチンが終了するのを待つ // (セマフォがmaxConcurrent個すべて解放されるまで) for i := 0; i < maxConcurrent; i++ { sem <- struct{}{} } }
13. ブロードキャスト(単一送信-複数受信)
[編集]func main() { listenerCount := 3 ch := make(chan int) done := make(chan struct{}) // 複数のリスナーを起動 for i := 0; i < listenerCount; i++ { go func(id int) { for val := range ch { fmt.Printf("リスナー %d: 値 %d を受信\n", id, val) } done <- struct{}{} }(i) } // 値の送信 for i := 1; i <= 5; i++ { ch <- i } close(ch) // すべてのリスナーの終了を待つ for i := 0; i < listenerCount; i++ { <-done } }
14. オルタネーター(複数チャネルを交互に処理)
[編集]func alternator(ch1, ch2 <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for ch1 != nil || ch2 != nil { if ch1 != nil { if val, ok := <-ch1; ok { out <- val } else { ch1 = nil } } if ch2 != nil { if val, ok := <-ch2; ok { out <- val } else { ch2 = nil } } } }() return out } func generator(values ...int) <-chan int { ch := make(chan int) go func() { defer close(ch) for _, v := range values { ch <- v } }() return ch } func main() { ch1 := generator(1, 3, 5) ch2 := generator(2, 4, 6) for val := range alternator(ch1, ch2) { fmt.Println(val) // 1,2,3,4,5,6の順に出力されることが期待される } }
15. チャネルの動的生成と管理
[編集]func manageChannels() { // チャネルのスライス channels := make([]chan int, 0) // チャネルの動的作成 for i := 0; i < 3; i++ { ch := make(chan int) channels = append(channels, ch) go func(id int, ch chan int) { for val := range ch { fmt.Printf("チャネル %d: %d を受信\n", id, val) } }(i, ch) } // すべてのチャネルに値を送信 for i, ch := range channels { ch <- i * 10 } // すべてのチャネルを閉じる for _, ch := range channels { close(ch) } } func main() { manageChannels() time.Sleep(time.Second) // ゴルーチンの終了を待つ }
16. 再試行パターン
[編集]func retryOperation(maxRetries int) error { backoff := 100 * time.Millisecond for attempt := 0; attempt < maxRetries; attempt++ { // 操作の実行 err := someOperation() if err == nil { return nil // 成功 } fmt.Printf("操作に失敗しました(試行 %d/%d): %v\n", attempt+1, maxRetries, err) // 次の試行の前に待機(バックオフ) select { case <-time.After(backoff): // バックオフ時間を徐々に増加 backoff *= 2 } } return fmt.Errorf("最大試行回数 %d に達しました", maxRetries) } func someOperation() error { // 実際の操作(ここでは50%の確率で失敗を模倣) if rand.Intn(2) == 0 { return errors.New("接続エラー") } return nil } func main() { if err := retryOperation(5); err != nil { fmt.Println("最終エラー:", err) } else { fmt.Println("操作が成功しました") } }
17. チャネルによる非同期結果の収集
[編集]type Result struct { Value int Err error } func asyncOperation(id int) <-chan Result { resultCh := make(chan Result, 1) go func() { defer close(resultCh) // 操作の実行(遅延をシミュレート) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) // 結果の生成(ランダムに成功または失敗) if rand.Intn(3) == 0 { resultCh <- Result{Err: fmt.Errorf("操作 %d でエラーが発生", id)} } else { resultCh <- Result{Value: id * 10} } }() return resultCh } func main() { // 複数の非同期操作を開始 operations := make([]<-chan Result, 5) for i := 0; i < 5; i++ { operations[i] = asyncOperation(i) } // すべての結果を収集 for i, resultCh := range operations { result := <-resultCh if result.Err != nil { fmt.Printf("操作 %d: エラー - %v\n", i, result.Err) } else { fmt.Printf("操作 %d: 成功 - 値 = %d\n", i, result.Value) } } }
18. リクエスト/レスポンスパターン
[編集]type Request struct { ID int Data string } type Response struct { RequestID int Result string Err error } func server(reqCh <-chan Request, respCh chan<- Response) { for req := range reqCh { // リクエスト処理をシミュレート time.Sleep(100 * time.Millisecond) // レスポンスを返す respCh <- Response{ RequestID: req.ID, Result: fmt.Sprintf("処理済み: %s", req.Data), } } } func client(reqCh chan<- Request, respCh <-chan Response) { // リクエスト送信 reqCh <- Request{ID: 1, Data: "サンプルデータ1"} reqCh <- Request{ID: 2, Data: "サンプルデータ2"} // レスポンス待機 for i := 0; i < 2; i++ { resp := <-respCh fmt.Printf("リクエスト %d の応答: %s\n", resp.RequestID, resp.Result) } } func main() { reqCh := make(chan Request) respCh := make(chan Response) // サーバーの起動 go server(reqCh, respCh) // クライアントの実行 client(reqCh, respCh) // 終了 close(reqCh) }
19. チャネルを使った状態マシン
[編集]type State int const ( Idle State = iota Running Paused Stopped ) type Event string const ( Start Event = "start" Pause Event = "pause" Resume Event = "resume" Stop Event = "stop" ) type StateMachine struct { State State Transitions map[State]map[Event]State EventCh chan Event } func NewStateMachine() *StateMachine { sm := &StateMachine{ State: Idle, EventCh: make(chan Event), } // 状態遷移定義 sm.Transitions = map[State]map[Event]State{ Idle: { Start: Running, }, Running: { Pause: Paused, Stop: Stopped, }, Paused: { Resume: Running, Stop: Stopped, }, Stopped: {}, // 最終状態 } // 状態マシンを実行 go sm.Run() return sm } func (sm *StateMachine) Run() { for event := range sm.EventCh { if transitions, ok := sm.Transitions[sm.State]; ok { if newState, validTransition := transitions[event]; validTransition { fmt.Printf("状態遷移: %v --%v--> %v\n", sm.State, event, newState) sm.State = newState } else { fmt.Printf("無効な遷移: 状態 %v ではイベント %v は処理できません\n", sm.State, event) } } } } func main() { sm := NewStateMachine() // イベントを送信 sm.EventCh <- Start sm.EventCh <- Pause sm.EventCh <- Resume sm.EventCh <- Stop // 無効な遷移を試す sm.EventCh <- Start close(sm.EventCh) time.Sleep(100 * time.Millisecond) // 出力を確認するための待機 }
20. チャネルのチャネル(メタチャネル)
[編集]func main() { // チャネルのチャネル controlCh := make(chan chan string) go func() { // 通信用の新しいチャネルを作成 comm := make(chan string) // コントロールチャネル経由で通信チャネルを送信 controlCh <- comm // 通信チャネルでメッセージを受信 msg := <-comm fmt.Println("受信:", msg) // 返信 comm <- "応答: " + msg }() // 通信チャネルを受信 comm := <-controlCh // 通信チャネルでメッセージを送信 comm <- "こんにちは、ゴルーチン" // 応答を受信 reply := <-comm fmt.Println("応答を受信:", reply) }
チャネルとゴルーチンのベストプラクティス
[編集]1. チャネルの所有権モデル
[編集]// チャネルの生成者(所有者)がチャネルを閉じるべき func producer(ch chan<- int, count int) { defer close(ch) // 生成者がチャネルを閉じる for i := 0; i < count; i++ { ch <- i } } func consumer(ch <-chan int) { for v := range ch { fmt.Println("消費:", v) } } func main() { ch := make(chan int) go producer(ch, 5) consumer(ch) }
2. チャネルサイズの選択
[編集]// バッファなしチャネル - 同期通信 syncCh := make(chan int) // 小さいバッファサイズ - 少し非同期性を持たせる smallBufCh := make(chan int, 2) // バッファサイズを動的に決定 numWorkers := runtime.NumCPU() workCh := make(chan int, numWorkers)
3. チャネルのクローズ処理とイディオム
[編集]func main() { ch := make(chan int) // クローズ検出パターン go func() { for { val, ok := <-ch if !ok { fmt.Println("チャネルが閉じられました") return } fmt.Println("受信:", val) } }() ch <- 1 ch <- 2 close(ch) // このパターンは「閉じられたチャネルからは常にゼロ値を読み取れる」 // という性質を利用している time.Sleep(100 * time.Millisecond) }
4. select文でのデフォルトケース使用パターン
[編集]func nonBlockingReceive(ch <-chan string) { select { case msg := <-ch: fmt.Println("メッセージ受信:", msg) default: fmt.Println("メッセージはありません") } } func nonBlockingSend(ch chan<- string, msg string) { select { case ch <- msg: fmt.Println("メッセージを送信しました") default: fmt.Println("送信できません(バッファが満杯)") } } func main() { ch := make(chan string, 1) nonBlockingReceive(ch) // "メッセージはありません" nonBlockingSend(ch, "こんにちは") // "メッセージを送信しました" nonBlockingSend(ch, "送れるかな?") // "送信できません" nonBlockingReceive(ch) // "メッセージ受信: こんにちは" }
これらのパターンとユースケースを組み合わせることで、Goのチャネルとゴルーチンを最大限に活用した効率的な並行プログラミングが可能になります。チャネルは単なる値の受け渡しだけでなく、フロー制御や同期、調整、シグナリングなど、様々な目的に使用できるGoの強力な機能です。