首页 > 代码库 > c#之Redis队列在邮件提醒中的应用

c#之Redis队列在邮件提醒中的应用

场景

有这样一个场景,一个邮件提醒的windows服务,获取所有开启邮件提醒的用户,循环获取这些用户的邮件,发送一条服务号消息。但问题来了,用户比较少的情况下,轮询一遍时间还能忍受,如果用户多了,那用户名称排序靠后的人,收到邮件提醒的消息,延迟时间就非常长了。

准备

c#之Redis实践list,hashtable

c#之Redis队列

方案

1、生产者线程一获取所有开启邮件提醒的用户。

2、根据配置来决定使用多少个队列,以及每个队列的容量。

3、线程一,获取未满的队列,将当前用户入队。如果所有的队列已满,则挂起2s,然后重新获取未满的队列,用户入队。

4、根据配置开启消费者线程,每个线程独立处理逻辑。如果获取的用户为空或者当前队列为空,挂起2s。否则通过EWS服务拉取该用户的邮件,并提醒。

5、如果在获取用户邮件的过程中出错,则将该用户重新入当前队列,等待下次拉取。

测试

技术分享

队列

技术分享

测试代码

    /// <summary>    /// 消息队列管理    /// </summary>    public class MyRedisQueueBus : IDisposable    {        /// <summary>        /// 线程个数        /// </summary>        private int _threadCount;        /// <summary>        /// 每个线程中itcode的容量        /// </summary>        private int _threadCapacity;        /// <summary>        /// 线程        /// </summary>        private Thread[] _threads;        /// <summary>        /// 生产者线程        /// </summary>        private Thread _producerThread;        /// <summary>        /// 挂起时间        /// </summary>        private const int WAITSECONDE = 2000;        /// <summary>        /// 队列名称前缀        /// </summary>        private string _queuePrefix;        /// <summary>        /// 构造函数        /// </summary>        /// <param name="threadCount">线程个数</param>        /// <param name="threadCapacity">每个线程处理的队列容量</param>        ///  <param name="queuePrefix">每个线程处理的队列容量</param>        public MyRedisQueueBus(int threadCount, int threadCapacity, string queuePrefix)        {            this._threadCapacity = threadCapacity;            this._threadCount = threadCount;            this._queuePrefix = queuePrefix + "_{0}";        }        /// <summary>        /// 开启生产者        /// </summary>        public void StartProducer()        {            _producerThread = new Thread(() =>            {                IRedisClientFactory factory = RedisClientFactory.Instance;                EmailAlertsData emailAlertsData = new EmailAlertsData();                //白名单                string[] userIdsWhiteArray = TaskGloableParameter.WhiteList.Split(new char[] { ,,  },
StringSplitOptions.RemoveEmptyEntries);
while (true) { //获取所有开启邮件提醒的用户 List<EmailAlerts> lstEmails = emailAlertsData.GetAllStartAlerts(userIdsWhiteArray); //入队 using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort)) { client.Password = WebConfig.RedisPwd; client.Db = WebConfig.RedisServerDb; foreach (var item in lstEmails) { int queueIndex = -1; string queueName = string.Format(this._queuePrefix, queueIndex); for (int i = 0; i < _threadCount; i++) { queueName = string.Format(this._queuePrefix, i); //如果当前队列没有填满,则直接跳出,使用该队列进行入队 if (client.GetListCount(queueName) < _threadCapacity) { queueIndex = i; break; } } //如果所有队列都已经满了,则挂起2s等待消费者消耗一部分数据,然后重新开始 if (queueIndex == -1) { Thread.SpinWait(WAITSECONDE); //重新获取队列 for (int i = 0; i < _threadCount; i++) { queueName = string.Format(this._queuePrefix, i); //如果当前队列没有填满,则直接跳出,使用该队列进行入队 if (client.GetListCount(queueName) < _threadCapacity) { queueIndex = i; break; } } } else { //入队 client.EnqueueItemOnList(queueName, JsonConvert.SerializeObject(new MyQueueItem { UserId = item.itcode, SyncState = item.Email_SyncState })); } } } } }); _producerThread.Start(); } /// <summary> /// 开启消费者 /// </summary> public void StartCustomer() { _threads = new Thread[_threadCount]; for (int i = 0; i < _threads.Length; i++) { _threads[i] = new Thread(CustomerRun); _threads[i].Start(i); } } private void CustomerRun(object obj) { int threadIndex = Convert.ToInt32(obj); string queueName = string.Format(this._queuePrefix, threadIndex); IRedisClientFactory factory = RedisClientFactory.Instance; while (true) { using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort)) { client.Password = WebConfig.RedisPwd; client.Db = WebConfig.RedisServerDb; if (client.GetListCount(queueName) > 0) { string resultJson = client.DequeueItemFromList(queueName); //如果获取的结果为空,则挂起2s if (string.IsNullOrEmpty(resultJson)) { Thread.SpinWait(WAITSECONDE); } else { try { //耗时业务处理 MyQueueItem item = JsonConvert.DeserializeObject<MyQueueItem>(resultJson); Console.WriteLine("Threadid:{0},User:{1}", Thread.CurrentThread.ManagedThreadId.ToString(), item.UserId); } catch (Exception ex) { //如果出错,重新入队 client.EnqueueItemOnList(queueName, resultJson); } } } else { //当前队列为空,挂起2s Thread.SpinWait(WAITSECONDE); } } } } public void Dispose() { //释放资源时,销毁线程 if (this._threads != null) { for (int i = 0; i < this._threads.Length; i++) { this._threads[i].Abort(); } } GC.Collect(); } }

Main方法调用

        static void Main(string[] args)        {                     MyRedisQueueBus bus = new MyRedisQueueBus(10, 10, "mail_reminder_queue");            bus.StartProducer();            Thread.SpinWait(2000);            bus.StartCustomer();            Console.Read();        }

总结

通过配置的方式,确定开启的队列数和线程数,如果用户增加可以增加线程数,或者添加机器的方式解决。这样,可以解决排名靠后的用户,通过随机分发队列,有机会提前获取邮件提醒,可以缩短邮件提醒的延迟时间。当然,这种方案并不太完美,目前也只能想到这里了。这里把这个思路写出来,也是希望获取一个更好的解决方案。

c#之Redis队列在邮件提醒中的应用