[Go] Concurrency: Pipeline pattern

ํŒŒ์ดํ”„๋ผ์ธ ํŒจํ„ด์€ List ํ˜•ํƒœ์˜ ๋ฒกํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์—ฐ์†์ ์œผ๋กœ ๊ฐ€๊ณตํ•  ๋•Œ ์‚ฌ์šฉํ•˜๊ธฐ ์ ๋‹นํ•œ ๋™์‹œ์„ฑ ์ฒ˜๋ฆฌ ํŒจํ„ด์ด๋‹ค.

๊ตฌํ˜„ ๋ฐฉ์‹์€ ์กฐ๊ธˆ์”ฉ ๋‹ค๋ฅด์ง€๋งŒ ์ด๊ฑด ๋‹ค๋ฅธ ์–ธ์–ด๋“ค์—์„œ๋„ ๋ฒ”์šฉ์ ์œผ๋กœ ์“ฐ์ด๋Š” ๊ฐœ๋…์ด๋‹ค. Rust์˜ async Stream, Node.js์˜ Stream ๋“ฑ๊ณผ๋„ ์œ ์‚ฌํ•˜๋‹ค.

๊ธฐ๋ณธ์ ์ธ ์›๋ฆฌ๋Š” ์ด๋ ‡๋‹ค.
๋ฐฐ์—ด์ด๋ž€ ๋ฐ์ดํ„ฐ์˜ ๊ฐœ๋…์„ "์ฑ„๋„"๋กœ์„œ ๊ฐ์‹ธ๊ณ , ๊ทธ๊ฑธ ํ†ตํ•ด์„œ ์—ฐ์‡„์ ์œผ๋กœ chaining ํ˜ธ์ถœ์„ ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” ๊ฒƒ์ด๋‹ค.

๋จผ์ € ํŒŒ์ดํ”„๋ผ์ธ ์ฑ„๋„์„ ์ƒ์„ฑํ•˜๋Š” generator ํ•จ์ˆ˜๊ฐ€ ์žˆ์–ด์•ผ ํ•œ๋‹ค.

๊ฐ€๋ณ€์ธ์ž๋ฅผ ๋ฐ›์•„์„œ ๊ทธ๊ฑธ ๊ธฐ๋ฐ˜์œผ๋กœ ์ฑ„๋„์„ ๋งŒ๋“ค๊ณ  ๋ฐ˜ํ™˜ํ•ด์ฃผ๋„๋ก ํ–ˆ๋‹ค.
๋ฌผ๋ก  ์ด ์ฑ„๋„์˜ ์™„์„ฑ์€ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ด๋ฃจ์–ด์ง„๋‹ค. ๊ทธ๋ž˜๋„ ๊ดœ์ฐฎ๋‹ค.

๋‹ค์Œ๊ณผ ๊ฐ™์ด ํŒŒ์ดํ”„๋ผ์ธ ์ŠคํŠธ๋ฆผ์„ ์ดˆ๊ธฐํ™”ํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด ๋‹ค์Œ์—๋Š” ํŒŒ์ดํ”„๋ผ์ธ ์—ฐ์‚ฐ์„ ๊ตฌํ˜„ํ•ด์•ผ ํ•œ๋‹ค.
์—ฌ๊ธฐ์„œ๋Š” ๋ง์…ˆ๊ณผ ๊ณฑ์…ˆ ์—ฐ์‚ฐ 2๊ฐ€์ง€๋ฅผ ๋งŒ๋“ค์–ด๋ณด๊ฒ ๋‹ค.

๋‘˜์ด ๊ตฌ์กฐ๋‚˜ ์›๋ฆฌ๋Š” ๊ฐ™๋‹ค.
๊ธฐ์กด ์ŠคํŠธ๋ฆผ ์ฑ„๋„์„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ „๋‹ฌ๋ฐ›๊ณ , ๊ทธ๊ฑธ ๋บ‘๋บ‘ ๋Œ๋ฉด์„œ ์ƒˆ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•ด์„œ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

์ด ๋˜ํ•œ ์ฒ˜๋ฆฌ๊ฐ€ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์™„์„ฑ๋˜๋‚˜, ์–ด์ฐจํ”ผ ์ฑ„๋„์„ ํ†ตํ•ด ํ•ธ๋“ค๋งํ•˜๋ฏ€๋กœ ์‹ ๊ฒฝ์“ธ ๊ฒƒ์ด ์—†๋‹ค.

๊ทธ๋Ÿฌ๋ฉด ์ด์ œ ์•„๋ž˜์™€ ๊ฐ™์€ ํ˜•ํƒœ๋กœ ์ŠคํŠธ๋ฆผ์„ ๋„ฃ๊ณ , ๋‹ค์‹œ ๋ฐ˜ํ™˜๋ฐ›๊ณ , ๋„ฃ๊ณ  ๋ฐ˜ํ™˜๋ฐ›๋Š” ์‹์œผ๋กœ ์—ฐ์‡„์ ์œผ๋กœ ๋ณ‘๋ ฌ ํ˜ธ์ถœ์„ ์ด์–ด๋‚˜๊ฐˆ ์ˆ˜ ์žˆ๋‹ค.

์ด๋Ÿฌํ•œ ํ˜•ํƒœ๋กœ ํŒŒ์ดํ”„๋ผ์ด๋‹ ๋น„์Šค๋ฌด๋ฆฌํ•˜๊ฒŒ ํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ํŒŒ์ดํ”„๋ผ์ธ ํŒจํ„ด์ด๋ผ ๋ถ€๋ฅด๋Š” ๊ฒƒ์ด๋‹ค.
๊ฐ step์€ ์™„๋ฒฝํ•˜๊ฒŒ ๋™์‹œ์ ์œผ๋กœ ์‹คํ–‰๋˜๋ฉฐ ๊ฐ ์š”์†Œ์˜ ์ˆœ์„œ ๋˜ํ•œ ๋ณด์žฅ๋œ๋‹ค.

์‹คํ–‰ํ•ด๋ณด๋ฉด

๊ธฐ๋Œ€ํ•œ ๋Œ€๋กœ ๋™์ž‘ํ•œ๋‹ค.
๊ณฑ์…ˆ 2๊ฐ€ ์ˆ˜ํ–‰๋œ ์ดํ›„์— ๋ง์…ˆ์ด ์ž˜ ๋ถ™์–ด์žˆ๋‹ค.

๋ฌผ๋ก  ์ด ์˜ˆ์ œ์ฒ˜๋Ÿผ ๊ฐ„๋‹จํ•œ ์—ฐ์‚ฐ๋งŒ์„ ํ•œ๋‹ค๋ฉด ์ด๊ฑธ ์“ฐ๋Š” ์˜๋ฏธ๊ฐ€ ์—†๋‹ค. ๊ทธ๋ƒฅ ์ฑ„๋„ ์—†์ด ๋™๊ธฐ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์ด ํ›จ์”ฌ ๋น ๋ฅผ ๊ฒƒ์ด๋‹ค. ์ด๊ฑด ๊ฐ ์š”์†Œ๋ณ„ ์ƒ์„ฑ์‹œ๊ฐ„์ด๋‚˜ ์—ฐ์‚ฐ์‹œ๊ฐ„์ด ๊ธธ์–ด์ ธ์„œ ๋Œ€๊ธฐ์‹œ๊ฐ„์ด ๋Š˜์–ด์งˆ ์ˆ˜ ์žˆ์„ ๋•Œ ์œ ์šฉํ•˜๋‹ค.

์•„๋ž˜๋Š” ์ „์ฒด ์ฝ”๋“œ๋‹ค.

package main

func gererateStream[T any](done <-chan struct{}, values ...T) <-chan T {
	stream := make(chan T)
	go func() {
		defer close(stream)
		for _, value := range values {
			select {
			case <-done:
				return
			case stream <- value:
			}
		}
	}()
	return stream
}

func multiply(done <-chan struct{}, valueStream <-chan int, multiplier int) <-chan int {
	multipliedStream := make(chan int)
	go func() {
		defer close(multipliedStream)
		for value := range valueStream {
			select {
			case <-done:
				return
			case multipliedStream <- value * multiplier:
			}
		}
	}()
	return multipliedStream
}

func add(done <-chan struct{}, valueStream <-chan int, additive int) <-chan int {
	addedStream := make(chan int)
	go func() {
		defer close(addedStream)
		for value := range valueStream {
			select {
			case <-done:
				return
			case addedStream <- value + additive:
			}
		}
	}()
	return addedStream
}

func main() {
	done := make(chan struct{})
	defer close(done)

	intStream := gererateStream(done, 1, 2, 3, 4, 5)

	multipliedStream := multiply(done, intStream, 2)
	addedStream := add(done, multipliedStream, 1)

	for result := range addedStream {
		println(result)
	}
}