Go 并发异常处理和 Pipeline 模式结合接口的实践

TLDR: 这篇文章讲解如何优雅地对 Golang 并发异常处理和 Pipeline 模式,最后会给出笔者开发区块链过程中结合接口类型的最佳实践。

并发中的异常处理

遇到异常时,是返回给上层调用栈还是直接处理呢?通常上在 main goroutine 中, 提倡更多的是直接对异常处理而不是把异常传递到上层调用栈,以此能够缩小异常排查范围,但是在构建并发的 goroutine 返回值时我们应当把异常 (Error) 视为“一等公民”,goroutine 产生的异常返回给上层的 goroutine 。并发程序中处理的过程依赖上下游 goroutine ,异常出错时要尽可能地把异常传递到程序全局状态的上下文 goroutine 中。

举个例子发出 HTTP 请求检测服务状态的需求。新建的 goroutine 发出 HTTP 请求检测是否能正常响应,如果出现第三方服务响应异常,直接在子 goroutine 中处理还是通过 channel 传递给 main goroutine 做响应的处理呢?给出代码继续讨论:

checkStatus := func(
done <-chan interface{},
urls ...string
)<-chan *http.Response {
responses := make(chan *http.Response)
go func() {
  defer close(responses)
  for _, url := range urls {
    resp, err := http.Get(url) if err!=nil{
      fmt.Println(err)
      continue
    }
    select {
    case <- done:
      return
    case response <- resp:
    }
  }
}()
return response
}

done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for response := range checkStatus(done, urls...) {
fmt.Printf("Response: %v\n", response.Status)
}

发出 HTTP 请求若收到响应错误,直接在 goroutine 输出警告信息并继续下一个 url 请求。如果我们把子 goroutine 中的异常传达到 main goroutine ,主程序统计异常次数,异常达到设定的阈值就退出状态检测。

type Result struct {
Error error
Response *http.Response
}

checkStatus := func(
done <-chan interface{},
urls ...string
)<-chan Result {
results := make(chan Result)
go func() {
  defer close(results)
  for _, url := range urls {
    var result Result
    resp, err := http.Get(url)
    result = Result{Error: err, Response: resp}
    select {
    case <- done:
      return
    case results <- result
    }
  }
}()
return results
}

done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
  fmt.Printf("error: %v\n", result.Error)
  errCount++
  if errCount >= 3 {
    fmt.Println("Too many errors, breaking!")
    break
  }
  continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}

总结出来的最佳实践就是:并发的 goroutine 产生的异常尽可能地传达到具备掌控全局状态的上层 goroutine 。为 被嵌套的子级 goroutine 的返回数据构建一个包含 Error 接收类型 (read) 的的结构体通道 (<-chan Result),在接收到通道的 goroutine 对异常的具体处理,或者继续上传到顶级 goroutine 。

并发模式之 pipeline

一个函数代码行数不宜过长,反之则需要继续抽象函数中的某个模块独立成另外一个函数调用。并且单一个业务逻辑依赖多个独立的功能时,低耦合的代码组装起来非常方便,此外大多数情况下数据流都是依赖的,此时 pipeline 模式非常适用。在 Golang 中,通道 (Channel) 具备读写数据且并发安全的功能,所以结合通道可实现并发模式的 pipeline 模式。

比如曾遇到的业务场景需要通过 JSON-RPC 查询区块信息,然后解析需要的数据保存到数据库中。通过封装查询区块函数、从链上拿到区块数据之后传到封装好的数据入库的函数中。你会发现这其实是一种“流”的体现,如果不抽象两个独立函数而把这个业务逻辑放到一个函数中,代码会非常丑陋和难以维护。接下来看一下我的封装代码片段:

queryBlockResult := b.Query.Block(int64(ledgerInfo.Headers - 5))
createBlockResul := <- sqldb.CreateBitcoinBlockWithUTXOs(queryBlockResult)
if createBlockResul.Error != nil{
  configure.Sugar.Fatal(createBlockResul.Error.Error())
}

以上是查询区块并让数据入库的方法调用,Block 方法是 ChainQuery 接口声明的区块查询的具体实现,ChainQueryBlockchain 结构体 Query 字段的具体类型,不同的公链通过 interface 的方式对 Blockchain 结构体中各种接口类型字段声明函数的实现具体方法,这也是 Go “面向接口编程”的好处。

// QueryBlockResult query block result
type QueryBlockResult struct {
  Error error
  Chain string
  Block interface{}
}
...
// Blockchain chain info
type Blockchain struct {
  Operator  TxOperator
  Wallet    ChainWallet
  Query     ChainQuery
}
...
// ChainQuery blockchain client query
type ChainQuery interface {
  Ledger() (interface{}, error)
  Balance(ctx context.Context, account, symbol, code string) (string, error)
  Block(height int64) (<-chan common.QueryBlockResult)
}
...

// Block query bitcoin block interface method
func (c BitcoinCoreChain) Block(height int64) (<-chan common.QueryBlockResult) {
  blockCh := make(chan common.QueryBlockResult)
  go func (height int64)  {
    defer close(blockCh)
    blockHash, err := c.Client.GetBlockHash(height)
    if err != nil {
      blockCh <- common.QueryBlockResult{Error: fmt.Errorf("Query bitcoin block hash error: %s", err), Chain: Bitcoin}
      return
    }

    block, err := c.Client.GetBlockVerboseTxM(blockHash)
    if err != nil {
      blockCh <- common.QueryBlockResult{Error: fmt.Errorf("Query bitcoin block error %s", err), Chain: Bitcoin}
      return
    }
    blockCh <- common.QueryBlockResult{Block: block, Chain: Bitcoin}
    return
  }(height)
  return blockCh
}

使用公链的结构体(在这里是 BitcoinCoreChain )初始化 Blockchain 结构体的对象,并且 BitcoinCoreChain 结构体实现了 ChainQuery 接口声明的所有方法,所以我们能通过 b.Query.Block(int64(blockHeight)) 查询区块信息。

重点讨论 Block 方法,其返回一个包含异常信息字段的 common.QueryBlockResult 结构体只读通道类型。在方法体中新建的 goroutine 查询链上区块数据,若遇到异常使用通道传回给上层 goroutine ,通过该方法的返回的只读类型通道携带的数据传达到其他 goroutine 。

接下来继续看看 CreateBitcoinBlockWithUTXOs 方法:

// CreateBitcoinBlockWithUTXOs save block and utxo related with subAddress blockResultCh <-chan
func (db *GormDB) CreateBitcoinBlockWithUTXOs(queryBlockResultCh <- chan common.QueryBlockResult) (<-chan common.CreateBlockResult) {
  createBlockCh := make(chan common.CreateBlockResult)
  go func() {
    defer close(createBlockCh)
    var (
      rawBlock *btcjson.GetBlockVerboseResult
      chain string
    )
    b := <- queryBlockResultCh
    if b.Error != nil {
      createBlockCh <- common.CreateBlockResult{Error: b.Error}
      return
    }
    rawBlock = b.Block.(*btcjson.GetBlockVerboseResult)
    chain = b.Chain

    var block SimpleBitcoinBlock
    ts := db.Begin()
    if err := ts.FirstOrCreate(&block, SimpleBitcoinBlock{
      Hash: rawBlock.Hash,
      Height: rawBlock.Height,
      Chain: chain,
    }).Error; err != nil {
      ts.Rollback()
      createBlockCh <- common.CreateBlockResult{Error: fmt.Errorf("create block error: %s", err)}
      return
    }

    for _, tx := range rawBlock.Tx {
      for _, vout := range tx.Vout {
        for _, address := range vout.ScriptPubKey.Addresses {
          var (
            addr SubAddress
            utxo UTXO
          )
          if err := db.Where("address = ? AND asset = ?", address, chain).First(&addr).Error; err != nil && err.Error() == "record not found" {
            continue
          }else if err != nil {
            createBlockCh <- common.CreateBlockResult{Error: fmt.Errorf("Query sub address err: %s", err)}
            return
          }
          ts.FirstOrCreate(&utxo, UTXO{Txid: tx.Txid,
            Amount: vout.Value,
            Height: rawBlock.Height,
            VoutIndex: vout.N,
            SubAddressID: addr.ID})
        }
      }
    }
    if err := ts.Commit().Error; err != nil {
      ts.Rollback()
      createBlockCh <- common.CreateBlockResult{Error: fmt.Errorf("database transaction err: %s", err)}
      return
    }
    configure.Sugar.Info("Saved block to database, height: ", block.Height, " hash: ", block.Hash)
    createBlockCh <- common.CreateBlockResult{Block: block}
  }()
  return createBlockCh
}

可以看出 CreateBitcoinBlockWithUTXOs 方法接收一个只读类型的通道参数,其实就是把Block 函数的返回值作为该方法的参数。新建的 goroutine 读取 queryBlockResultCh 通道的数据,做业务上的数据入库处理,最后把处理结果返回到一个只读通道中 (<-chan common.CreateBlockResult),传递到上层 goroutine 。

createBlockResul := <- sqldb.CreateBitcoinBlockWithUTXOs(queryBlockResult)

接收只读通道的返回并赋值给 createBlockResul

推荐阅读

0 条评论
您想说点什么吗?