首页 > 代码库 > Orleans框架------基于Actor模型生成分布式Id

Orleans框架------基于Actor模型生成分布式Id

一、Actor简介

actor模型是一种并行计算的数学模型。 响应于收到的消息,演员可以:做出决定,创建更多Actor,发送更多消息,并确定如何响应接收到的下一条消息。 演员可以修改自己的状态,但只能通过消息相互影响(避免需要任何锁)。

     actor是一个计算实体,当其收到消息时,可以并发执行如下操作:

   1. 发送有限数量的消息给其他actor

   2. 创建有限数量的新actor

   3. 指定收到下一消息时的行为

   在Orleans中使用的是虚拟Actor方式,详细:http://dotnet.github.io/orleans/Documentation/Introduction.html

    详细参见: https://en.wikipedia.org/wiki/Actor_model

 

二、Orleans框架

      Orleans是一个框架,可以直接构建分布式大规模计算应用程序,而无需学习和应用复杂的并发或其他缩放模式。 它是由Microsoft Research创建的,旨在用于云端。

Orleans已被Microsoft Azure广泛应用于微软的几个产品集团,其中最着名的是343个行业,作为所有Halo 4和Halo 5云服务的平台,以及越来越多的其他公司。

  特性:

    1、默认可扩展
           奥尔良处理构建分布式系统的复杂性,使您的应用程序能够扩展到数百台服务器。
    2、低延迟
           奥尔良允许您保持内存所需的状态,因此您的应用程序可以快速响应传入的请求。

    3、简化并发

          Orleans允许您编写简单的单线程C#代码,通过actor之间的异步消息传递来处理并发。

 

 

三、生成流水号项目实战

      1、场景

          现在系统基于分布式服务开发,数据在客户端处理后提交到服务端入库,但是由于多个系统间的并发而流水号全部在一张表,每次都是先select在update 高并发容易直接死锁。

     如图:技术分享

 

 

     2、基于Orleans的actor

        将每条数据改造成一个Actor,由个Actor之间的状态来保证流水号的递增,这样即使单个流水号访问量大只要扩展Orleans的soli即可。

 

四、关键代码:

         1、利用初始化SerialNumberStorgeProvider初始化管理Grain的状态

 

 

    
public class SerialNumberStorgeProvider : IStorageProvider { public Logger Log { get; set; } public string Name { get; set; } public Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) { return TaskDone.Done; } public Task Close() { return TaskDone.Done; } public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config) { this.Name = nameof(SerialNumberStorgeProvider); this.Log = providerRuntime.GetLogger(this.Name); return TaskDone.Done; } public Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) { Console.WriteLine("获取种子信息"); var SerialNumber = grainReference.GetPrimaryKeyString(); using (var db = new DBContext()) { var query = db.SerialNumbers.AsNoTracking().FirstOrDefault(o => o.Name.Equals(SerialNumber)); if (query != null) grainState.State = query; else { db.SerialNumbers.Add(new SerialNumberInfo { Name = grainReference.GetPrimaryKeyString(), Number = 1 }); db.SaveChanges(); grainState.State = new SerialNumberInfo { Name = grainReference.GetPrimaryKeyString(), Number = 1 }; } } return TaskDone.Done; } public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) { var model = grainState.State as SerialNumberInfo; using (var db = new DBContext()) { var query = db.SerialNumbers.FirstOrDefault(o => o.Name.Equals(model.Name)); query.Number = model.Number; await db.SaveChangesAsync(); } } }

   

2、Grain获取流水号的实现  
[StorageProvider(ProviderName = "SerialNumberStorgeProvider")]
    public class SerialNumberGrain : Grain<SerialNumberInfo>, ISerialNumberGrain
    {
        /// <summary>
        /// 获取多个流水号
        /// </summary>
        /// <param name="number"></param>
        /// <returns></returns>
        public Task<List<long>> GetMutilSerialNumber(int number)
        {
            if (number == 0) { number = 1; }

            List<long> list = new List<long>();
            for (int i = 1; i <= number; i++)
            {
                this.State.Number += 1;
                list.Add(this.State.Number);
            }
            this.WriteStateAsync();
            return Task.FromResult(list);
        }

        /// <summary>
        /// 获取单个流水号
        /// </summary>
        /// <returns></returns>
        public Task<long> GetSerialNumber()
        {
            this.WriteStateAsync();
            return Task.FromResult(this.State.Number);
        }
    }

 


最后,最多说一句,在测试的时候发现如果不适用如下这种方式,并发时会发生Task调度异常

技术分享

 

 

源码地址:https://github.com/liyang-live/MakeSerialNumber

 

    参考资料:http://dotnet.github.io/orleans/index.html

                     http://www.cnblogs.com/joab/p/5657851.html

                     https://github.com/dotnet/orleans

Orleans框架------基于Actor模型生成分布式Id