Seata Introduction

Seata is a distributed transaction service open source by Ali , currently provides users with AT, TCC, SAGA, XA transaction mode , the overall use of a two-phase commit protocol. go version of seata-golang currently seems to implement only mysql AT, TCC mode , the author is not much updated now .

Seata has several core roles.

  • TC(Transaction Coordinator) - Transaction coordinator. (Maintains the state of global and branch transactions, drives global transaction commit or rollback)
  • TM(Transaction Manager) - Transaction Manager. (Defines the scope of global transactions: start global transactions, commit or rollback global transactions.)
  • RM(Resource Manager)-Resource Manager. (Manages resources for branch transaction processing, talks to TC to register branch transactions and report on the status of branch transactions, and drives branch transactions to commit or rollback)

Of course, looking at it this way, it may not be very understandable, I take a diagram from the official website to explain.

seata

As you can see from the above diagram, the tasks that these three roles are responsible for are as follows.

TC

  • Maintains the global and branch transaction state, which needs to be stored.
  • When a distributed transaction is finished processing, it needs to be notified to each RM whether it is commit or rollback.

TM

  • Request to TC to open a distributed transaction and get a globally unique distributed id.
  • Based on the feedback from each RM participating in the first phase of the distributed transaction, decide whether to request the TC to commit or rollback the distributed transaction in the second phase (in most scenarios, if either RM fails in the first phase, the distributed transaction fails)

RM

To put it plainly is to manage the various services involved in the distributed transaction (such as the classic order scenario involving: order services, inventory services, marketing services, etc.)

ps: personal feeling, here the RM is somewhat similar to the intermediate processing layer in microservices (professional terminology they call this bff->backend for fronted).

  • One stage prepare behavior (active): each RM calls custom prepare logic.
  • Second stage commit behavior (passive trigger): If all RMs succeed in the first stage of this distributed transaction, TC invokes the commit logic of each RM custom after processing its own state change. (All RMs in the first phase are successful)
  • If any RM in the first phase of this distributed transaction fails, the TC invokes the rollback logic of each RM custom after processing its own state changes. (Any RM fails in the first phase)

OK. Here you can see some details of seata-golang implementation, seata-golang underlying use gRPC for communication.

seata-golang

Let’s look at the RM section structure first.

1
2
3
4
5
6
type ResourceManager struct {
	addressing     string //rm地址
	rpcClient      apis.ResourceManagerServiceClient //rm rpc客户端
  managers       map[apis.BranchSession_BranchType]ResourceManagerInterface //存储rm事务模式(比如TCC、AT等)
	branchMessages chan *apis.BranchMessage //存储将要向TC响应的消息
}
1
2
3
4
5
6
7
type ResourceManagerServiceClient interface {
  // 和 TC 流数据交互接口
   BranchCommunicate(ctx context.Context, opts ...grpc.CallOption) (ResourceManagerService_BranchCommunicateClient, error)
  // 向TC 注册分支
   BranchRegister(ctx context.Context, in *BranchRegisterRequest, opts ...grpc.CallOption) (*BranchRegisterResponse, error)
  //.....
}

As for the managers, the major transaction patterns supported are stored (TCC, etc.), and each pattern only needs to implement this interface.

1
2
3
4
5
6
7
type ResourceManagerInterface interface {
  // 分支提交
	BranchCommit(ctx context.Context, request *apis.BranchCommitRequest) (*apis.BranchCommitResponse, error)
 // 分支回退
	BranchRollback(ctx context.Context, request *apis.BranchRollbackRequest) (*apis.BranchRollbackResponse, error)
  // ......
}

Look at the structure of the TC section again (only the fields that will be involved will be kept, other details can be checked by yourself).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type TransactionCoordinator struct {
  // ....
	holder             *holder.SessionHolder //数据存储
  // ......
  futures            *sync.Map //临时存储RM响应数据,有g在接受处理
  // ......
	callBackMessages   *sync.Map //临时存储TC即将向RM发送的消息,有格外g在发送数据
}


type SessionHolder struct {
	manager storage.SessionManager
}


// SessionManager stored the globalTransactions and branchTransactions.
type SessionManager interface {
	// Add global session.
	AddGlobalSession(session *apis.GlobalSession) error

	// Find global session.
	FindGlobalSession(xid string) *apis.GlobalSession

	// Find global sessions list.
	FindGlobalSessions(statuses []apis.GlobalSession_GlobalStatus) []*apis.GlobalSession
}

TC’s storage of data currently supports mysql and pgsql, that is, the implementation of the SessionManager interface, and then injected into the SessionHolder’s manager.

After the introduction of these two basic structures, remember what we said above about the relationship between them?

The second stage TC will inform RM whether to commit or rollback according to the current transaction status.

When the ResourceManager is initialized:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func InitResourceManager(addressing string, client apis.ResourceManagerServiceClient) {
	defaultResourceManager = &ResourceManager{
		addressing:     addressing,
		rpcClient:      client,
		managers:       make(map[apis.BranchSession_BranchType]ResourceManagerInterface),
		branchMessages: make(chan *apis.BranchMessage),
	}
	runtime.GoWithRecover(func() {
    // 会单独开启一个g调用
		defaultResourceManager.branchCommunicate()
	}, nil)
}
1
2
3
4
5
func (manager *ResourceManager) branchCommunicate() {
     //省略代码。。。。。。。
      stream, err := manager.rpcClient.BranchCommunicate(ctx)
      // 省略代码
}

We see that eventually a grpc interface branchCommunicate will be called for TC.

1
BranchCommunicate(ctx context.Context, opts ...grpc.CallOption) (ResourceManagerService_BranchCommunicateClient, error)
1
2
3
4
5
type ResourceManagerService_BranchCommunicateClient interface {
	Send(*BranchMessage) error
	Recv() (*BranchMessage, error)
	grpc.ClientStream
}

Corresponds to the server side.

1
rpc BranchCommunicate(stream BranchMessage) returns (stream BranchMessage);
1
2
3
4
5
type ResourceManagerService_BranchCommunicateServer interface {
	Send(*BranchMessage) error
	Recv() (*BranchMessage, error)
	grpc.ServerStream
}

We know that gRPC has four basic communication modes.

  • Unary mode (Unary RPC)
  • Server-side Streaming RPC (Server Sreaming RPC)
  • Client Streaming RPC (Client Streaming RPC)
  • Bidirectional Streaming RPC (Bidirectional Streaming RPC)

If you want the stream form is also very simple, just add the stream tag in front of the corresponding request|response parameters in the proto method definition, then the interface is streamed. As for which stream, it depends on which side you add stream to, if both request and response are added, then it is a two-way stream.

Both the client and the server can send a request via stream.

When RM calls BranchCommunicate.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
func (manager *ResourceManager) branchCommunicate() {
   for {
      ctx := metadata.AppendToOutgoingContext(context.Background(), "addressing", manager.addressing)
      // 得到steam
      stream, err := manager.rpcClient.BranchCommunicate(ctx)
      // 即->
//      type ResourceManagerService_BranchCommunicateClient interface {
//       	Send(*BranchMessage) error
//	      Recv() (*BranchMessage, error)
//      	grpc.ClientStream
//       }

      if err != nil {
         time.Sleep(time.Second)
         continue
      }

      done := make(chan bool)
      // 另开一个g用来响应消息给TC
      runtime.GoWithRecover(func() {
      // 死循环
         for {
            select {
            case _, ok := <-done:
               if !ok {
                  return
               }
               // 如果branchMessages 有数据,说明是处理完tc的响应数据,通过send响应给tc
            case msg := <-manager.branchMessages:
               err := stream.Send(msg)
               if err != nil {
                  return
               }
            }
         }
      }, nil)


// 另一个死循环是用来接受从tc发送过来的数据
      for {
      // 接受数据
         msg, err := stream.Recv()
         if err == io.EOF {
            close(done)
            break
         }
         if err != nil {
            close(done)
            break
         }
         // 判断tc发送数据的类型,是提交分支事务类型还是回滚类型
         switch msg.BranchMessageType {
         // 如果是分支提交的消息
         case apis.TypeBranchCommit:
            request := &apis.BranchCommitRequest{}
            data := msg.GetMessage().GetValue()
            err := request.Unmarshal(data)
            if err != nil {
               log.Error(err)
               continue
            }
            // 处理分支提交,得到处理结果
            response, err := manager.BranchCommit(context.Background(), request)
            if err == nil {
               content, err := types.MarshalAny(response)
               if err == nil {
               // 再把处理完的结果包装一下塞到branchMessages等待发送出去
                  manager.branchMessages <- &apis.BranchMessage{
                     ID:                msg.ID,
                     BranchMessageType: apis.TypeBranchCommitResult,
                     Message:           content,
                  }
               }
            }
            
            // 如果是回滚的消息,本质上流程差不多
         case apis.TypeBranchRollback:
            request := &apis.BranchRollbackRequest{}
            data := msg.GetMessage().GetValue()
            err := request.Unmarshal(data)
            if err != nil {
               log.Error(err)
               continue
            }
            response, err := manager.BranchRollback(context.Background(), request)
            if err == nil {
               content, err := types.MarshalAny(response)
               if err == nil {
                  manager.branchMessages <- &apis.BranchMessage{
                     ID:                msg.ID,
                     BranchMessageType: apis.TypeBranchRollBackResult,
                     Message:           content,
                  }
               }
            }
         }
      }
      // 关闭流
      err = stream.CloseSend()
      if err != nil {
         log.Error(err)
      }
   }
}

The final processing branch transaction calls manager.BranchCommit.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (manager ResourceManager) BranchCommit(ctx context.Context, request *apis.BranchCommitRequest) (*apis.BranchCommitResponse, error) {
// 根据事务模式调用对应的处理
	rm, ok := manager.managers[request.BranchType]
	if ok {
		return rm.BranchCommit(ctx, request)
	}
	return &apis.BranchCommitResponse{
		ResultCode: apis.ResultCodeFailed,
		Message:    fmt.Sprintf("there is no resource manager for %s", request.BranchType.String()),
	}, nil
}

Correspondingly, when TC is called by RM after BranchCommunicate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (tc *TransactionCoordinator) BranchCommunicate(stream apis.ResourceManagerService_BranchCommunicateServer) error {
	var addressing string
	done := make(chan bool)

	ctx := stream.Context()
	md, ok := metadata.FromIncomingContext(ctx)
  
  //省略一些无关代码
  //............
	addressing = md.Get("addressing")[0]

 //这里很好懂吧
	queue, _ := tc.callBackMessages.LoadOrStore(addressing, make(chan *apis.BranchMessage))
	q := queue.(chan *apis.BranchMessage)

  
  // 单独一个g 运行
  // 用来从callBackMessages中取数据发送给客户端
	runtime.GoWithRecover(func() {
    // 死循环
		for {
			select {
			case _, ok := <-done:
				if !ok {
					return
				}
        // 如果能拿到数据,说明有数据需要发送给客户端
			case msg := <- q:
				err := stream.Send(msg)
				if err != nil {
					return
				}
			}
		}
	}, nil)

  // 下面的死循环逻辑主要是从stearm 接受客户端响应数据,然后塞入到future中,根据唯一的branch_message_Id
	for {
		select {
		case <-ctx.Done():
			close(done)
			return ctx.Err()
		default:
      //有数据
			branchMessage, err := stream.Recv()
			if err == io.EOF {
				close(done)
				return nil
			}
			if err != nil {
				close(done)
				return err
			}
      // 要区分是什么响应
      // 根据是 事务commit的响应,还是对rollback的响应,交给不同逻辑处理。
			switch branchMessage.GetBranchMessageType() {
        // 分支事务提交的响应
			case apis.TypeBranchCommitResult:
				response := &apis.BranchCommitResponse{}
				data := branchMessage.GetMessage().GetValue()
        // 解析数据
				err := response.Unmarshal(data)
				if err != nil {
					log.Error(err)
					continue
				}
        
        // 说明发送commit的动作在等待响应
				resp, loaded := tc.futures.Load(branchMessage.ID)
				if loaded {
          // 赋值响应数据
					future := resp.(*common2.MessageFuture)
					future.Response = response
          // 通知数据到了
					future.Done <- true
					tc.futures.Delete(branchMessage.ID)
				}
        // 和上面差不多逻辑除了响应数据不同
			case apis.TypeBranchRollBackResult:
				response := &apis.BranchRollbackResponse{}
				data := branchMessage.GetMessage().GetValue()
				err := response.Unmarshal(data)
				if err != nil {
					log.Error(err)
					continue
				}
				resp, loaded := tc.futures.Load(branchMessage.ID)
				if loaded {
					future := resp.(*common2.MessageFuture)
					future.Response = response
					future.Done <- true
					tc.futures.Delete(branchMessage.ID)
				}
			}
		}
	}
}

How does the data come from the above notifications to RM to commit or rollback?

When TC wants to notify RM of a branch commit.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (tc *TransactionCoordinator) branchCommit(bs *apis.BranchSession) (apis.BranchSession_BranchStatus, error) {
   request := &apis.BranchCommitRequest{
      XID:             bs.XID,
      BranchID:        bs.BranchID,
      ResourceID:      bs.ResourceID,
      LockKey:         bs.LockKey,
      BranchType:      bs.Type,
      ApplicationData: bs.ApplicationData,
   }

   content, err := types.MarshalAny(request)
   if err != nil {
      return bs.Status, err
   }
   message := &apis.BranchMessage{
      ID:                int64(tc.idGenerator.Inc()),
      BranchMessageType: apis.TypeBranchCommit,
      Message:           content,
   }
  
    //上面是封装数据
   queue, _ := tc.callBackMessages.LoadOrStore(bs.Addressing, make(chan *apis.BranchMessage))
   q := queue.(chan *apis.BranchMessage)
   select {
     // 在这把数据塞到callBackMessages
   case q <- message:
   default:
      return bs.Status, err
   }

  // 这里创建了RM响应格式,根据 messageid,塞入到结果futures(sync.map中),
   resp := common2.NewMessageFuture(message)
   tc.futures.Store(message.ID, resp)

   timer := time.NewTimer(tc.streamMessageTimeout)
   select {
     // RM响应超时了,GG
   case <-timer.C:
      tc.futures.Delete(resp.ID)
      return bs.Status, fmt.Errorf("wait branch commit response timeout")
   case <-resp.Done:
      timer.Stop()
   }

  // 响应成功,解析数据处理。
   response, ok := resp.Response.(*apis.BranchCommitResponse)
   if !ok {
      log.Infof("rollback response: %v", resp.Response)
      return bs.Status, fmt.Errorf("response type not right")
   }
   if response.ResultCode == apis.ResultCodeSuccess {
      return response.BranchStatus, nil
   }
   return bs.Status, fmt.Errorf(response.Message)
}

The last one is TM, not much difficulty in understanding.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// tm request to  tc
type TransactionManagerInterface interface {
	// GlobalStatus_Begin a new global transaction.
	Begin(ctx context.Context, name string, timeout int32) (string, error)

	// Global commit.
	Commit(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)

	// Global rollback.
	Rollback(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)

	// Get current status of the give transaction.
	GetStatus(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)

	// Global report.
	GlobalReport(ctx context.Context, xid string, globalStatus apis.GlobalSession_GlobalStatus) (apis.GlobalSession_GlobalStatus, error)
}

type TransactionManager struct {
	addressing string
	rpcClient  apis.TransactionManagerServiceClient
}

func InitTransactionManager(addressing string, client apis.TransactionManagerServiceClient) {
	defaultTransactionManager = &TransactionManager{
		addressing: addressing,
		rpcClient:  client,
	}
}