[Go] Concurrency: Pipeline pattern
ํ์ดํ๋ผ์ธ ํจํด์ List ํํ์ ๋ฒกํฐ ๋ฐ์ดํฐ๋ฅผ ์ฐ์์ ์ผ๋ก ๊ฐ๊ณตํ ๋ ์ฌ์ฉํ๊ธฐ ์ ๋นํ ๋์์ฑ ์ฒ๋ฆฌ ํจํด์ด๋ค.
๊ตฌํ ๋ฐฉ์์ ์กฐ๊ธ์ฉ ๋ค๋ฅด์ง๋ง ์ด๊ฑด ๋ค๋ฅธ ์ธ์ด๋ค์์๋ ๋ฒ์ฉ์ ์ผ๋ก ์ฐ์ด๋ ๊ฐ๋ ์ด๋ค. Rust์ async Stream, Node.js์ Stream ๋ฑ๊ณผ๋ ์ ์ฌํ๋ค.
๊ธฐ๋ณธ์ ์ธ ์๋ฆฌ๋ ์ด๋ ๋ค.
๋ฐฐ์ด์ด๋ ๋ฐ์ดํฐ์ ๊ฐ๋
์ "์ฑ๋"๋ก์ ๊ฐ์ธ๊ณ , ๊ทธ๊ฑธ ํตํด์ ์ฐ์์ ์ผ๋ก chaining ํธ์ถ์ ํ ์ ์๋๋ก ํ๋ ๊ฒ์ด๋ค.
๋จผ์ ํ์ดํ๋ผ์ธ ์ฑ๋์ ์์ฑํ๋ generator ํจ์๊ฐ ์์ด์ผ ํ๋ค.
๊ฐ๋ณ์ธ์๋ฅผ ๋ฐ์์ ๊ทธ๊ฑธ ๊ธฐ๋ฐ์ผ๋ก ์ฑ๋์ ๋ง๋ค๊ณ ๋ฐํํด์ฃผ๋๋ก ํ๋ค.
๋ฌผ๋ก ์ด ์ฑ๋์ ์์ฑ์ ๋น๋๊ธฐ์ ์ผ๋ก ์ด๋ฃจ์ด์ง๋ค. ๊ทธ๋๋ ๊ด์ฐฎ๋ค.
๋ค์๊ณผ ๊ฐ์ด ํ์ดํ๋ผ์ธ ์คํธ๋ฆผ์ ์ด๊ธฐํํ ์ ์๋ค.

์ด ๋ค์์๋ ํ์ดํ๋ผ์ธ ์ฐ์ฐ์ ๊ตฌํํด์ผ ํ๋ค.
์ฌ๊ธฐ์๋ ๋ง์
๊ณผ ๊ณฑ์
์ฐ์ฐ 2๊ฐ์ง๋ฅผ ๋ง๋ค์ด๋ณด๊ฒ ๋ค.

๋์ด ๊ตฌ์กฐ๋ ์๋ฆฌ๋ ๊ฐ๋ค.
๊ธฐ์กด ์คํธ๋ฆผ ์ฑ๋์ ํ๋ผ๋ฏธํฐ๋ก ์ ๋ฌ๋ฐ๊ณ , ๊ทธ๊ฑธ ๋บ๋บ ๋๋ฉด์ ์ ์คํธ๋ฆผ์ ์์ฑํด์ ๋ฐํํ๋ค.
์ด ๋ํ ์ฒ๋ฆฌ๊ฐ ๋น๋๊ธฐ์ ์ผ๋ก ์์ฑ๋๋, ์ด์ฐจํผ ์ฑ๋์ ํตํด ํธ๋ค๋งํ๋ฏ๋ก ์ ๊ฒฝ์ธ ๊ฒ์ด ์๋ค.
๊ทธ๋ฌ๋ฉด ์ด์ ์๋์ ๊ฐ์ ํํ๋ก ์คํธ๋ฆผ์ ๋ฃ๊ณ , ๋ค์ ๋ฐํ๋ฐ๊ณ , ๋ฃ๊ณ ๋ฐํ๋ฐ๋ ์์ผ๋ก ์ฐ์์ ์ผ๋ก ๋ณ๋ ฌ ํธ์ถ์ ์ด์ด๋๊ฐ ์ ์๋ค.
์ด๋ฌํ ํํ๋ก ํ์ดํ๋ผ์ด๋ ๋น์ค๋ฌด๋ฆฌํ๊ฒ ํ ์ ์๊ธฐ ๋๋ฌธ์ ํ์ดํ๋ผ์ธ ํจํด์ด๋ผ ๋ถ๋ฅด๋ ๊ฒ์ด๋ค.
๊ฐ step์ ์๋ฒฝํ๊ฒ ๋์์ ์ผ๋ก ์คํ๋๋ฉฐ ๊ฐ ์์์ ์์ ๋ํ ๋ณด์ฅ๋๋ค.
์คํํด๋ณด๋ฉด
๊ธฐ๋ํ ๋๋ก ๋์ํ๋ค.
๊ณฑ์
2๊ฐ ์ํ๋ ์ดํ์ ๋ง์
์ด ์ ๋ถ์ด์๋ค.
๋ฌผ๋ก ์ด ์์ ์ฒ๋ผ ๊ฐ๋จํ ์ฐ์ฐ๋ง์ ํ๋ค๋ฉด ์ด๊ฑธ ์ฐ๋ ์๋ฏธ๊ฐ ์๋ค. ๊ทธ๋ฅ ์ฑ๋ ์์ด ๋๊ธฐ์ ์ผ๋ก ์คํํ๋ ๊ฒ์ด ํจ์ฌ ๋น ๋ฅผ ๊ฒ์ด๋ค. ์ด๊ฑด ๊ฐ ์์๋ณ ์์ฑ์๊ฐ์ด๋ ์ฐ์ฐ์๊ฐ์ด ๊ธธ์ด์ ธ์ ๋๊ธฐ์๊ฐ์ด ๋์ด์ง ์ ์์ ๋ ์ ์ฉํ๋ค.
์๋๋ ์ ์ฒด ์ฝ๋๋ค.
package main
func gererateStream[T any](done <-chan struct{}, values ...T) <-chan T {
stream := make(chan T)
go func() {
defer close(stream)
for _, value := range values {
select {
case <-done:
return
case stream <- value:
}
}
}()
return stream
}
func multiply(done <-chan struct{}, valueStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for value := range valueStream {
select {
case <-done:
return
case multipliedStream <- value * multiplier:
}
}
}()
return multipliedStream
}
func add(done <-chan struct{}, valueStream <-chan int, additive int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for value := range valueStream {
select {
case <-done:
return
case addedStream <- value + additive:
}
}
}()
return addedStream
}
func main() {
done := make(chan struct{})
defer close(done)
intStream := gererateStream(done, 1, 2, 3, 4, 5)
multipliedStream := multiply(done, intStream, 2)
addedStream := add(done, multipliedStream, 1)
for result := range addedStream {
println(result)
}
}