简介

定义

根据维基百科的解释: Structured concurrency is a programming paradigm aimed at improving the clarity, quality, and development time of a computer program by using a structured approach to concurrent programming.

The core concept is the encapsulation of concurrent threads of execution (here encompassing kernel and userland threads and processes) by way of control flow constructs that have clear entry and exit points and that ensure all spawned threads have completed before exit. Such encapsulation allows errors in concurrent threads to be propagated to the control structure’s parent scope and managed by the native error handling mechanisms of each particular computer language. It allows control flow to remain readily evident by the structure of the source code despite the presence of concurrency. To be effective, this model must be applied consistently throughout all levels of the program – otherwise concurrent threads may leak out, become orphaned, or fail to have runtime errors correctly propagated.

Structured concurrency is analogous to structured programming, which introduced control flow constructs that encapsulated sequential statements and subroutines.

简单来说:结构化并发(Structured Concurrency) 就是类似 结构化编程 的一个术语,目的是提高并发编程的可读性、可控制性,也就是提高开发体验和并发编程的代码质量。
基本概念是封装线程的执行,让它们有清晰的进入和退出点,并且所有线程都在退出前完成。
其中封装的手段,还包括把线程中产生的错误,委托到父级范围处理。
而且就算使用并发,控制流结构在源代码中依然清晰可读。


结构化编程 vs goto

结构化编程:使用代码块封装基本的控制流,包括常见的条件语句、循环语句以及函数调用。

goto: 不加限制的跳转。


图示



背景

发展过程

2016 年,ZeroMQ 的作者 Martin Sústrik 在他的 文章 中第一次形式化的提出结构化并发这个概念。
2018 年 Nathaniel J. Smith 在 Python 中实现了这一范式 - trio,并在 Notes on structured concurrency, or: Go statement considered harmful 一文中进一步阐述了 Structured Concurrency。
同时期,Roman Elizarov 也提出了相同的理念,并在 Kotlin 中实现了大家熟知的kotlinx.coroutine
2019 年,OpenJDK loom project 也开始引入 structured concurrency,作为其轻量级线程和协程的一部分。在 Java 19 中会包含此功能。
2022 年,Python 3.11 引入 task group 和 exception group,官方支持了结构化并发。
另外,Swift、Rust 等较新的编程语言,也有官方或第三方库,实现了 Structured Concurrency。


示例

import asyncio  
import httpx  
  
client = httpx.AsyncClient()  
  
  
async def run():  
    r = await client.get("http://127.0.0.1:8000", timeout=3)  
    print(r.text)  
  
  
async def main():  
    async with asyncio.timeout(1):  
        async with asyncio.TaskGroup() as tg:  
            for i in range(2):  
                tg.create_task(run())  
        print("all tasks have completed now.")  
  
  
asyncio.run(main())
Traceback (most recent call last):
  File "C:\Users\jack\PycharmProjects\demo\main.py", line 14, in main
    async with asyncio.TaskGroup() as tg:
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\taskgroups.py", line 121, in __aexit__
    raise propagate_cancellation_error
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\taskgroups.py", line 96, in __aexit__
    await self._on_completed_fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\jack\PycharmProjects\demo\main.py", line 20, in <module>
    asyncio.run(main())
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\runners.py", line 187, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\runners.py", line 120, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\base_events.py", line 650, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\jack\PycharmProjects\demo\main.py", line 13, in main
    async with asyncio.timeout(1):
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\timeouts.py", line 98, in __aexit__
    raise TimeoutError
TimeoutError

Process finished with exit code 1

Go 中的提案

有人给 Go 官方提了一个 # proposal: Go 2: use structured concurrency #29011, 不过根据ianlancetaylor 等 Go 社区贡献者的意见,这个提案可能会对已有的 Go 代码造成兼容性问题,以及当前 Go 标准库中已经有 context.Contextsync.WaitGroupx/sync/errgroup 等工具,这个提案最终没有被接受。


在 Go 中如何实践 Structured Concurrency

并发编程需要考虑的问题

  • 这个任务什么时候开始,什么时候结束
  • 怎么做到当所有子任务都结束,主任务再结束?
  • 假如某个子任务失败,主任务如何取消掉其他剩下子任务?
  • 如何保证所有子任务在某个特定的超时时间内返回,无论它成功还是失败?
  • 更进一步,如何保证主任务在规定的时间内返回,无论其成功还是失败,同时取消掉它产生的所有子任务?
  • 主任务已经结束了,子任务还在执行,是不是存在资源泄漏?

Go 中的解决方案

  • channel
  • context
  • waitgroup
  • errgroup

例一:errgroup

链接

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"

	"golang.org/x/sync/errgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}

type result struct {
	path string
	sum  [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)

	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()

	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}

errgroup 源码分析

type Group struct {
	cancel func()       // 封装context cancel 方法
	wg sync.WaitGroup   // 使用 waitGroup 同步
	sem chan token      // 使用 channel 限制数量
	errOnce sync.Once   // 只执行一次
	err     error       // 保存执行的错误信息
}


// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

例二:waitGroup

https://github.com/arunsworld/nursery/blob/ecfe7a688cfd866de0da8ecff34de72b34d22f53/nursery.go


参考与引用

https://en.wikipedia.org/wiki/Structured_concurrency
http://250bpm.com/blog:71
https://trio.readthedocs.io/en/stable/
https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
https://medium.com/@elizarov/structured-concurrency-722d765aa952
https://kotlinlang.org/docs/coroutines-basics.html
https://github.com/golang/go/issues/29011
https://github.com/arunsworld/nursery/blob/master/nursery.go
https://arunsworld.medium.com/structured-concurrency-in-go-b800c7c4434e
https://realpython.com/python311-exception-groups/
https://zhuanlan.zhihu.com/p/108759542
https://onevcat.com/2021/09/structured-concurrency/
https://blog.softwaremill.com/structured-concurrency-and-pure-functions-92dd8ed1a9f2