.NET开源分布式事务解决方案-CAP

CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

你可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

  • CAP 以 NuGet 包的形式提供,对项目无任何入侵,你仍然可以以你喜爱的方式来构建分布式系统。
  • CAP 具有 Event Bus 的所有功能,并且CAP提供了更加简化的方式来处理EventBus中的发布/订阅。
  • CAP 具有消息持久化的功能,也就是当你的服务进行重启或者宕机时,她可以保证消息的可靠性。
  • CAP 实现了分布式事务中的最终一致性,你不用再去处理这些琐碎的细节。
  • CAP 提供了基于 Microsoft DI 的 API 服务,她可以和你的 ASP.NET Core 系统进行无缝结合,并且能够和你的业务代码集成支持强一致性的事务处理。
  • CAP 是开源免费的。CAP基于MIT协议开源,你可以免费的在你的私人或者商业项目中使用,不会有人向你收取任何费用。

Github:https://github.com/dotnetcore/CAP

目前, CAP 同时支持使用 RabbitMQKafkaAzure Service Bus 等进行底层之间的消息发送,你不需要具备这些消息队列的使用经验,仍然可以轻松的集成到项目中。

CAP 目前支持使用 Sql ServerMySqlPostgreSqlMongoDB 数据库的项目。

CAP 同时支持使用 EntityFrameworkCoreADO.NET 的项目,你可以根据需要选择不同的配置方式。

Getting Started

NuGet

你可以运行以下下命令在你的项目中安装 CAP。

1
PM> Install-Package DotNetCore.CAP

CAP 支持 Kafka、RabbitMQ、AzureServiceBus 等消息队列,你可以按需选择下面的包进行安装:

1
2
3
PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的扩展作为数据库存储:

1
2
3
4
5
// 按需选择安装你正在使用的数据库
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB

Configuration

首先配置CAP到 Startup.cs 文件中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void ConfigureServices(IServiceCollection services)
{
......

services.AddDbContext<AppDbContext>();

services.AddCap(x =>
{
//如果你使用的 EF 进行数据操作,你需要添加如下配置:
x.UseEntityFramework<AppDbContext>(); //可选项,你不需要再次配置 x.UseSqlServer 了

//如果你使用的ADO.NET,根据数据库选择进行配置:
x.UseSqlServer("数据库连接字符串");
x.UseMySql("数据库连接字符串");
x.UsePostgreSql("数据库连接字符串");

//如果你使用的 MongoDB,你可以添加如下配置:
x.UseMongoDB("ConnectionStrings"); //注意,仅支持MongoDB 4.0+集群

//CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
x.UseRabbitMQ("ConnectionStrings");
x.UseKafka("ConnectionStrings");
x.UseAzureServiceBus("ConnectionStrings");
});
}

发布

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 进行消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class PublishController : Controller
{
private readonly ICapPublisher _capBus;

public PublishController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}

//不使用事务
[Route("~/without/transaction")]
public IActionResult WithoutTransaction()
{
_capBus.Publish("xxx.services.show.time", DateTime.Now);

return Ok();
}

//Ado.Net 中使用事务,自动提交
[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
{
//业务代码

_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
}
return Ok();
}

//EntityFramework 中使用事务,自动提交
[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
{
//业务代码

_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
return Ok();
}
}

订阅

Action Method

在 Action 上添加 CapSubscribeAttribute 来订阅相关消息。

1
2
3
4
5
6
7
8
public class PublishController : Controller
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
}
}

Service Method

如果你的订阅方法没有位于 Controller 中,则你订阅的类需要继承 ICapSubscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
namespace xxx.Service
{
public interface ISubscriberService
{
public void CheckReceivedMessage(DateTime datetime);
}

public class SubscriberService: ISubscriberService, ICapSubscribe
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
}
}
}

然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService

1
2
3
4
5
6
7
public void ConfigureServices(IServiceCollection services)
{
//注意: 注入的服务需要在 `services.AddCap()` 之前
services.AddTransient<ISubscriberService,SubscriberService>();

services.AddCap(x=>{});
}

订阅者组

订阅者组的概念类似于 Kafka 中的消费者组,它和消息队列中的广播模式相同,用来处理不同微服务实例之间同时消费相同的消息。

当CAP启动的时候,她将创建一个默认的消费者组,如果多个相同消费者组的消费者消费同一个Topic消息的时候,只会有一个消费者被执行。 相反,如果消费者都位于不同的消费者组,则所有的消费者都会被执行。

相同的实例中,你可以通过下面的方式来指定他们位于不同的消费者组。

1
2
3
4
5
6
7
8
9
[CapSubscribe("xxx.services.show.time", Group = "group1" )]
public void ShowTime1(DateTime datetime)
{
}

[CapSubscribe("xxx.services.show.time", Group = "group2")]
public void ShowTime2(DateTime datetime)
{
}

ShowTime1ShowTime2 处于不同的组,他们将会被同时调用。

PS,你可以通过下面的方式来指定默认的消费者组名称:

1
2
3
4
services.AddCap(x =>
{
x.DefaultGroup = "default-group-name";
});

Dashboard

CAP 2.1+ 以上版本中提供了仪表盘(Dashboard)功能,你可以很方便的查看发出和接收到的消息。除此之外,你还可以在仪表盘中实时查看发送或者接收到的消息。

使用一下命令安装 Dashboard:

1
PM> Install-Package DotNetCore.CAP.Dashboard

在分布式环境中,仪表盘内置集成了 Consul 作为节点的注册发现,同时实现了网关代理功能,你同样可以方便的查看本节点或者其他节点的数据,它就像你访问本地资源一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
services.AddCap(x =>
{
//...

// 注册 Dashboard
x.UseDashboard();

// 注册节点到 Consul
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeId = 1;
d.NodeName = "CAP No.1 Node";
});
});

仪表盘默认的访问地址是:http://localhost:xxx/cap,你可以在`d.MatchPath`配置项中修改`cap`路径后缀为其他的名字。

本文转载自 -README