ETJava Beta | Java    注册   登录
  • 搜索:
  • WPF下使用FreeRedis操作RedisStream实现简单的消息队列

    发表于      阅读(1)     博客类别:Crawler     转自:https://www.cnblogs.com/wdw984/p/18440864
    如有侵权 请联系我们删除  (页面底部联系我们)  

    Redis Stream简介

    Redis Stream是随着5.0版本发布的一种新的Redis数据类型:

    高效消费者组:允许多个消费者组从同一数据流的不同部分消费数据,每个消费者组都能独立地处理消息,这样可以并行处理和提高效率。

    阻塞操作:消费者可以设置阻塞操作,这样它们会在流中有新数据添加时被唤醒并开始处理,这有助于减少资源消耗并提高响应速度。

    数据持久化:它可以将数据持久化到内存(配置本地持久化后会写入到存储设备)中进行保存,等待消费。

    多生产者多消费者:Redis Streams能够在多个生产者和消费者之间建立一个数据通道,使得数据的流动和处理更加灵活。

    扩展性和异步通信:用户可以通过应用程序轻松扩展消费者数量,并且生产者和消费者之间的通信可以是异步的,这有助于提高系统的整体性能。

    满足多样化需求:Redis Streams满足从实时数据处理到历史数据访问的各种需求,同时保持易于管理。

    Redis Stream可以干什么

    消息队列:Redis Stream可以用作一个可靠的消息队列系统,支持发布/订阅模式,生产者和消费者可以异步地发送和接收消息。

    任务调度:Redis Stream可以用于实现分布式任务调度系统,将任务分发到多个消费者进行处理,从而提高处理速度和系统可扩展性。

    事件驱动架构:Redis Stream可以作为事件驱动架构中的核心组件,用于处理来自不同服务的事件,实现解耦和灵活性。

    FreeRedis简介

    FreeRedis 的命名来自,“自由”、“免费”,它和名字与 FreeSql 是一个理念,简易是他们一致的追寻方向,最低可支持 .NET Framework 4.0 运行环境,支持到 Redis-server 7.2。

    github MIT开源协议

    作者博客园地址

    官方介绍

    基于 .NET 的 Redis 客户端,支持 .NET Core 2.1+、.NET Framework 4.0+、Xamarin 以及 AOT。

    • 所有方法名与 redis-cli 保持一致
    • 支持 Redis 集群(服务端要求 3.2 及以上版本)
    • 支持 Redis 哨兵模式
    • 支持主从分离(Master-Slave)
    • 支持发布订阅(Pub-Sub)
    • 支持 Redis Lua 脚本
    • 支持管道(Pipeline)
    • 支持事务
    • 支持 GEO 命令(服务端要求 3.2 及以上版本)
    • 支持 STREAM 类型命令(服务端要求 5.0 及以上版本)
    • 支持本地缓存(Client-side-cahing,服务端要求 6.0 及以上版本)
    • 支持 Redis 6 的 RESP3 协议

    要实现的功能

    1、生产者生产数据

    2、消费者消费数据后确认

    3、消费者消费数据后不确认

    4、已消费但超时未确认的消息监控

    5、已消费但超时未确认的消息二次消费

    项目依赖

    WPF
    CommunityToolkit.Mvvm
    FreeRedis
    Newtonsoft.Json
    NLog
    redis-windows-7.2.5

    业务场景代码

    涉及到的Redis命令

    创建消费者组 XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]

    查询消费者组信息 XINFO STREAM key [FULL [COUNT count]]

    消息队列数量(长度) XLEN key

    添加消息到队列尾部 XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

    消费组成员读取消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

    确认消息 XACK key group id [id ...]

    删除消息 XDEL key id [id ...]

    获取消费未确认消息的队列信息 XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

    把未消费的消息消费者转移到当前消费者名下 XAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]

    代码

    App.xaml.cs

        public partial class App : Application
        {
            public static Logger Logger = LogManager.GetCurrentClassLogger();        
            public static RedisClient RedisHelper;
            public static MainViewModel MainViewModel;
            private void App_OnStartup(object sender, StartupEventArgs e)
            {
                Current.DispatcherUnhandledException += Current_DispatcherUnhandledException;
    
                try
                {
                    //redis6以上版本启用了ACL用户管理机制,默认用户名是default,可以忽略密码
                    RedisHelper = new RedisClient("127.0.0.1:6379,user=defualt,defaultDatabase=13");
                    RedisHelper.Serialize = JsonConvert.SerializeObject;//序列化
                    RedisHelper.Deserialize = JsonConvert.DeserializeObject;//反序列化
                    RedisHelper.Notice += (s, ee) => Console.WriteLine(ee.Log); //打印命令日志
                    MainViewModel = new MainViewModel();
                }
                catch (Exception exception)
                {
                    MessageBox.Show(exception.Message);
                    Current.Shutdown(-100);
                }
            }
    
            private void Current_DispatcherUnhandledException(object sender, System.Windows.Threading.DispatcherUnhandledExceptionEventArgs e)
            {
                e.Handled = true;
                Logger.Error($"未捕获的错误:来源:{sender},错误:{e}");
            }
    
            private void App_OnExit(object sender, ExitEventArgs e)
            {
                RedisHelper.Dispose();
            }
        }
    

    MainWindow.xaml的主要内容

        <StackPanel>
            <WrapPanel ItemHeight="40" Margin="10,10,0,0" VerticalAlignment="Center">
                <TextBlock Text="生产数据数:" VerticalAlignment="Center"></TextBlock>
                <TextBox Text="{Binding RecordCount}" Width="100" MaxLength="9" VerticalAlignment="Center"></TextBox>
    
                <TextBlock Text="生产者数量:" Margin="10,0,0,0" VerticalAlignment="Center"></TextBlock>
                <TextBox Text="{Binding TaskCount}" Width="100" MaxLength="6" VerticalAlignment="Center"></TextBox>
    
                <TextBlock Margin="10,0,0,0" VerticalAlignment="Center">
                    <Run Text="正在产生第:"></Run>
                    <Run Text="{Binding ProducerIndex}"></Run>
                    <Run Text="条数据"></Run>
                </TextBlock>
    
                <Button Content="生成数据" HorizontalAlignment="Left" Margin="10,0,0,0" VerticalAlignment="Center" Width="103" Command="{Binding ProducerRelayCommand}" />
            </WrapPanel>
    
            <WrapPanel ItemHeight="40" Margin="0,10,0,0" VerticalAlignment="Center">
                <TextBlock Text="消费者数量:" Margin="10,0,0,0" VerticalAlignment="Center"></TextBlock>
                <TextBox Text="{Binding ConsumerCount}" Width="100" MaxLength="6" VerticalAlignment="Center"></TextBox>
    
                <TextBlock Margin="10,0,0,0" VerticalAlignment="Center">
                    <Run Text="剩余未消费:"></Run>
                    <Run Text="{Binding ConsumeIndex}"></Run>
                    <Run Text="条数据。"></Run>
                </TextBlock>
                <CheckBox IsChecked="{Binding IsAutoAck,Mode=TwoWay}" Content="自动确认" VerticalAlignment="Center" ></CheckBox>
                <Button Content="开始消费" HorizontalAlignment="Left" Margin="10,0,0,0" VerticalAlignment="Center" Width="103" Command="{Binding ConsumeRelayCommand}" />
                <Button Content="消费未确认消费队列" HorizontalAlignment="Left" Margin="10,0,0,0" VerticalAlignment="Center" Width="103" Command="{Binding PendingRelayCommand}" />
            </WrapPanel>
    
            <WrapPanel>
                <StackPanel>
                    <TextBlock Margin="10,0,0,0" Text="队列信息:" VerticalAlignment="Center"></TextBlock>
                    <TextBox Margin="10,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding StreamInfo}" VerticalScrollBarVisibility="Auto"></TextBox>
                </StackPanel>
    
                <StackPanel>
                    <TextBlock Margin="13,0,0,0" Text="消费信息:" VerticalAlignment="Center"></TextBlock>
                    <TextBox Margin="13,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding ConsumeInfo}" VerticalScrollBarVisibility="Auto" TextWrapping="WrapWithOverflow"></TextBox>
                </StackPanel>
    
                <StackPanel>
                    <TextBlock Margin="13,0,0,0" Text="未确认消费信息:" VerticalAlignment="Center"></TextBlock>
                    <TextBox Margin="13,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding PendingInfo}" VerticalScrollBarVisibility="Auto" TextWrapping="WrapWithOverflow"></TextBox>
                </StackPanel>
    
                <StackPanel>
                    <TextBlock Margin="13,0,0,0" Text="消费未确认信息:" VerticalAlignment="Center"></TextBlock>
                    <TextBox Margin="13,0,0,0" VerticalAlignment="Center" Height="310" Width="250" Text="{Binding PendingConsumeInfo}" VerticalScrollBarVisibility="Visible" HorizontalScrollBarVisibility="Auto" TextWrapping="WrapWithOverflow"></TextBox>
                </StackPanel>
            </WrapPanel>
        </StackPanel>
    </Grid>
    

    image

    MainWindow.xaml.cs

    	public partial class MainWindow : Window
        {
            public MainWindow()
            {
                InitializeComponent();
                DataContext = App.MainViewModel;
            }
    
            private void MainWindow_OnLoaded(object sender, RoutedEventArgs e)
            {
                App.MainViewModel.RecordCount = 1000;
                App.MainViewModel.TaskCount = 5;
                App.MainViewModel.ConsumerCount = 1;
                App.MainViewModel.ConsumeInfo = "等待消费信息……";
                App.MainViewModel.PendingInfo = "加载中……";
                App.MainViewModel.StreamInfo = "加载中……";
            }
        }
    

    MainViewModel的声明和变量定义

    	public class MainViewModel : ObservableObject
    	{
    		#region 变量定义
    		private readonly string _streamKey = "redisstream";
    		private readonly string _consumeGroupName = "counsumeGroup";
    
    		private DateTime _utcTime = new DateTime(1970, 1, 1, 0, 0, 0);
    		/// <summary>
    		/// 生成的消息条数
    		/// </summary>
    		private static int _exchangeValue;
    		/// <summary>
    		/// 剩余未消费条数
    		/// </summary>
    		private static int _consumeValue;
    		/// <summary>
    		/// 消费信息展示队列
    		/// </summary>
    		private static ConcurrentQueue<string> _consumedQueue = new ConcurrentQueue<string>();
    		/// <summary>
    		/// 消费未确认展示队列
    		/// </summary>
    		private static ConcurrentQueue<string> _pendingConsumedQueue = new ConcurrentQueue<string>();
    		/// <summary>
    		/// 退出令牌
    		/// </summary>
    		private CancellationTokenSource _cancellationTokenSource;
    		/// <summary>
    		/// 生成消息
    		/// </summary>
    		public RelayCommand ProducerRelayCommand { get; }
    		/// <summary>
    		/// 消费消息
    		/// </summary>
    		public RelayCommand ConsumeRelayCommand { get; }
    		/// <summary>
    		/// 消费未确认信息队列消费
    		/// </summary>
    		public RelayCommand PendingRelayCommand { get; }
    		private int _recordCount;
    		/// <summary>
    		/// 数据条数
    		/// </summary>
    		public int RecordCount
    		{
    			get => _recordCount;
    			set => SetProperty(ref _recordCount, value);
    		}
    
    		private int _taskCount;
    		/// <summary>
    		/// 开启后台生产者数量
    		/// </summary>
    		public int TaskCount
    		{
    			get => _taskCount;
    			set => SetProperty(ref _taskCount, value);
    		}
    
    		private int _consumerCount;
    		/// <summary>
    		/// 消费者数量
    		/// </summary>
    		public int ConsumerCount
    		{
    			get => _consumerCount;
    			set => SetProperty(ref _consumerCount, value);
    		}
    
    		private int _producerIndex;
    		/// <summary>
    		/// 正在生产的序列号
    		/// </summary>
    		public int ProducerIndex
    		{
    			get => Interlocked.Exchange(ref _producerIndex, _exchangeValue);
    			set
    			{
    				SetProperty(ref _producerIndex, _exchangeValue);
    			}
    		}
    
    		private long _consumeIndex;
    		/// <summary>
    		/// 正在消费的序列号
    		/// </summary>
    		public long ConsumeIndex
    		{
    			get => Interlocked.Read(ref _consumeIndex);
    			set => SetProperty(ref _consumeIndex, value);
    		}
    
    		private string _streamInfo;
    		/// <summary>
    		/// 队列信息展示
    		/// </summary>
    		public string StreamInfo
    		{
    			get => _streamInfo;
    			set => SetProperty(ref _streamInfo, value);
    		}
    
    		private string _consumeInfo;
    		/// <summary>
    		/// 消费信息展示
    		/// </summary>
    		public string ConsumeInfo
    		{
    			get => _consumeInfo;
    			set
    			{
    				value = $"暂无消费消息[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]";
    				if (_consumedQueue.TryDequeue(out var message))
    				{
    					value = _consumeInfo + Environment.NewLine + message;
    				}
    				SetProperty(ref _consumeInfo, value);
    			}
    		}
    
    		private string _pendingInfo;
    		/// <summary>
    		/// 消费未确认队列信息展示
    		/// </summary>
    		public string PendingInfo
    		{
    			get => _pendingInfo;
    			set => SetProperty(ref _pendingInfo, value);
    		}
    
    		private string _pedingConsumeInfo;
    		/// <summary>
    		/// 消费未确认队列的展示
    		/// </summary>
    		public string PendingConsumeInfo
    		{
    			get => _pedingConsumeInfo;
    			set
    			{
    				value = $"暂无未确认消费信息[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]";
    				if (_pendingConsumedQueue.TryDequeue(out var message))
    				{
    					value = _pedingConsumeInfo + Environment.NewLine + message;
    				}
    
    				SetProperty(ref _pedingConsumeInfo, value);
    			}
    		}
    
    		private bool _isProduceCanExec;
    		/// <summary>
    		/// 是否可以执行生成任务
    		/// </summary>
    		public bool IsProduceCanExec
    		{
    			get => _isProduceCanExec;
    			set
    			{
    				SetProperty(ref _isProduceCanExec, value);
    				ProducerRelayCommand.NotifyCanExecuteChanged();
    			}
    		}
    
    		private bool _isAutoAck;
    		/// <summary>
    		/// 是否自动确认消费信息
    		/// </summary>
    		public bool IsAutoAck
    		{
    			get => _isAutoAck;
    			set
    			{
    				SetProperty(ref _isAutoAck, value);
    				ProducerRelayCommand.NotifyCanExecuteChanged();
    			}
    		}
    		#endregion
    
    		public MainViewModel()
    		{
    			ProducerRelayCommand = new RelayCommand(async () => await DoProduce(), () => !_isProduceCanExec);
    
    			ConsumeRelayCommand = new RelayCommand(async () => await DoConsume());
    
    			PendingRelayCommand = new RelayCommand(async () => await DoPendingConsume());
    
    			_cancellationTokenSource = new CancellationTokenSource();
    
    			var exist = App.RedisHelper.Exists(_streamKey);
    			if (!exist)
    			{
    				//创建消费组,同一个消费组可以有多个消费者,它们直接不会重复读取到同一条消息
    				App.RedisHelper.XGroupCreate(_streamKey, _consumeGroupName, MkStream: true);
    			}
    			else
    			{
    				var groups = App.RedisHelper.XInfoGroups(_streamKey);
    
    				if (groups == null || !groups.Any())
    				{
    					App.RedisHelper.XGroupCreate(_streamKey, _consumeGroupName);
    				}
    			}
    
    			ConsumeIndex = App.RedisHelper.XLen(_streamKey);
    
    			DoLoadStreamInfo();
    			DoLoadPendingInfo();
    		}
    	}
    

    消息生成者

    private async Task DoProduce()
    {
    	if (IsProduceCanExec)
    	{
    		return;
    	}
    	IsProduceCanExec = true;
    
    	if (TaskCount > RecordCount)
    	{
    		TaskCount = RecordCount;
    	}
    
    	_exchangeValue = 0;
    	ProducerIndex = 0;
    
    	if (TaskCount > 0)
    	{
    		var pageSize = RecordCount / TaskCount;
    		var tasks = Enumerable.Range(1, TaskCount).Select(x =>
    		{
    			return Task.Run(() =>
    			{
    				var internalPageSize = pageSize;
    
    				if (TaskCount > 1 && x == TaskCount)
    				{
    					if (x * internalPageSize < RecordCount)
    					{
    						internalPageSize = RecordCount - (TaskCount - 1) * internalPageSize;
    					}
    				}
    				
    				for (var i = 1; i <= internalPageSize; i++)
    				{
    					ProducerIndex = Interlocked.Increment(ref _exchangeValue);
    					ConsumeIndex = Interlocked.Increment(ref _consumeValue);
    
    					var dic = new Dictionary<string, MessageModel> { { $"user_{x}", new MessageModel { Age = 16, Description = $"描述:{ProducerIndex}", Id = 1, Name = "wang", Status = 1 } } };
    					App.RedisHelper.XAdd(_streamKey, 0, "*", dic);
    				}
    				return Task.CompletedTask;
    			});
    		});
    		await Task.WhenAll(tasks);
    	}
    	IsProduceCanExec = false;
    }
    
    #endregion
    

    image

    消息消费者

    private Task DoConsume()
    {
    	var groups = App.RedisHelper.XInfoGroups(_streamKey);
    	if (groups == null || !groups.Any())
    	{
    		App.RedisHelper.XGroupCreate(_streamKey, _consumeGroupName);
    	}
    
    	//添加消费者
    	var tasks = Enumerable.Range(1, _consumerCount).Select(c =>
    	{
    		var task = Task.Run(async () =>
    		{
    			//从消费组中读取消息,同一个组内的成员不会重复获取同一条消息。
    			var streamRead = App.RedisHelper.XReadGroup(_consumeGroupName, $"consumer{c}", 0, _streamKey, ">");
    			if (streamRead != null)
    			{
    				//取得消息
    				var id = streamRead.id;
    				var model = new Dictionary<string, MessageModel>(1)
    				{
    					{ streamRead.fieldValues[0].ToString(), JsonConvert.DeserializeObject<MessageModel>(streamRead.fieldValues[1].ToString()) }
    				};
    				_consumedQueue.Enqueue($"consumer{c}取到了消息{id},{DateTime.Now:yyyy-MM-dd HH:mm:ss}");
    				ConsumeInfo = "";
    				await Task.Delay(100);//模拟业务逻辑耗时
    				
    				if (IsAutoAck)
    				{
    					//ACK
    					var success = App.RedisHelper.XAck(_streamKey, _consumeGroupName, id);
    					if (success > 0)
    					{
    						//xdel
    						App.RedisHelper.XDel(_streamKey, id);
    						_consumedQueue.Enqueue($"consumer{c}成功消费了消息{id},{DateTime.Now:yyyy-MM-dd HH:mm:ss}");
    					}
    					else
    					{
    						_consumedQueue.Enqueue($"consumer{c}的消息{id}加入了未确认队列,{DateTime.Now:yyyy-MM-dd HH:mm:ss}");
    					}
    					ConsumeInfo = "";
    				}
    			}
    		});
    		return task;
    	});
    	Task.WhenAll(tasks);
    
    	return Task.CompletedTask;
    }
    #endregion
    

    image

    未消费信息队列监控

    private Task DoLoadStreamInfo()
    {
    	Task.Factory.StartNew(async () =>
    	{
    		while (!_cancellationTokenSource.IsCancellationRequested)
    		{
    			StreamInfo = "正在查询队列信息……";
    			try
    			{
    				if (!App.RedisHelper.Exists(_streamKey))
    				{
    					StreamInfo = "队列尚未创建";
    					await Task.Delay(3000);
    					continue;
    				}
    				var info = App.RedisHelper.XInfoStream(_streamKey);
    				StreamInfo = info?.first_entry == null ? "队列数据为空。" : $"队列长度:{info.length}{Environment.NewLine}第一个编号:{info.first_entry.id}{Environment.NewLine}最后一个编号:{info.last_entry.id}{Environment.NewLine}更新时间:{DateTime.Now:yyyy-MM-dd HH:mm:ss}";
    				ConsumeIndex = info?.first_entry == null ? 0 : (int)info.length;
    			}
    			catch (Exception e)
    			{
    				StreamInfo = $"获取队列信息失败:{e.Message}";
    			}
    			await Task.Delay(3000);
    		}
    	}, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    	return Task.CompletedTask;
    }
    #endregion
    

    未消费成功(超时或业务逻辑执行失败)的消息队列消息展示

    private Task DoLoadPendingInfo()
    {
    	Task.Run(async () =>
    	{
    		while (!_cancellationTokenSource.IsCancellationRequested)
    		{
    			PendingInfo = "正在查询未确认队列……";
    			if (!App.RedisHelper.Exists(_streamKey))
    			{
    				PendingInfo = "队列尚未创建";
    				await Task.Delay(3000);
    				continue;
    			}
    			try
    			{
    				var info = App.RedisHelper.XPending(_streamKey, _consumeGroupName);
    				if (info == null || info.count == 0)
    				{
    					PendingInfo = $"暂无未确认信息。[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]";
    					await Task.Delay(3000);
    
    					continue;
    				}
    				var infoTxt = $"未消费数量:{info.count}{Environment.NewLine}涉及{info.consumers.Length}个消费者{Environment.NewLine}最小编号:{info.minId}{Environment.NewLine}最大编号:{info.maxId}{Environment.NewLine}更新时间:[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]";
    				PendingInfo = infoTxt;
    			}
    			catch (Exception e)
    			{
    				PendingInfo = $"获取队列信息失败:{e.Message}";
    			}
    
    			await Task.Delay(3000);
    		}
    	}, _cancellationTokenSource.Token);
    	return Task.CompletedTask;
    }
    #endregion
    

    image

    未消费成功的消息重新消费

    private Task DoPendingConsume()
    {
    	Task.Run(async () =>
    	{
    		while (!_cancellationTokenSource.IsCancellationRequested)
    		{
    			_pendingConsumedQueue.Enqueue($"正在查询未确认队列……[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]");
    			PendingConsumeInfo = "";
    			if (!App.RedisHelper.Exists(_streamKey))
    			{
    				_pendingConsumedQueue.Enqueue($"队列尚未创建……[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]");
    				PendingConsumeInfo = "";
    				await Task.Delay(3000);
    				continue;
    			}
    			try
    			{
    				//从stream队列的头部(0-0的位置)获取2条已读取时间超过2分钟且未确认的消息,修改所有者为pendingUser重新消费并确认。
    				var info = App.RedisHelper.XAutoClaim(_streamKey, _consumeGroupName, "pendingUser", 120000, "0-0", 2);
    				if (info == null || info.entries == null || info.entries.Length == 0)
    				{
    					_pendingConsumedQueue.Enqueue("未确认队列中暂无信息。[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]");
    					await Task.Delay(3000);
    					continue;
    				}
    
    				foreach (var entry in info.entries)
    				{
    					if (entry == null) continue;
    					_pendingConsumedQueue.Enqueue($"未确认消费信息:{entry.id}[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]");
    
    					Debug.WriteLine(JsonConvert.DeserializeObject<MessageModel>(entry.fieldValues[1].ToString()));
    					PendingConsumeInfo = "";
    					//ACK
    					await Task.Delay(100);//模拟业务逻辑执行时间
    					var success = App.RedisHelper.XAck(_streamKey, _consumeGroupName, entry.id);
    					if (success > 0)
    					{
    						//xdel
    						if (App.RedisHelper.XDel(_streamKey, entry.id) > 0)
    						{
    							_pendingConsumedQueue.Enqueue($"consumer[pendingUser]成功消费了消息{entry.id},{DateTime.Now:yyyy-MM-dd HH:mm:ss}");
    						}
    						else
    						{
    							_pendingConsumedQueue.Enqueue($"[pendingUser]删除{entry.id}[失败],{DateTime.Now:yyyy-MM-dd HH:mm:ss}");
    						}
    					}
    					else
    					{
    						_pendingConsumedQueue.Enqueue($"[pendingUser]消费{entry.id}[失败],{DateTime.Now:yyyy-MM-dd HH:mm:ss}");
    					}
    				}
    			}
    			catch (Exception e)
    			{
    				PendingConsumeInfo = $"获取队列信息失败:{e.Message}";
    			}
    			PendingConsumeInfo = "";
    
    			await Task.Delay(3000);
    		}
    	}, _cancellationTokenSource.Token);
    	return Task.CompletedTask;
    }
    
    #endregion
    

    image

    总结

    本次我们通过Redis的Stream数据类型实现了部署简单、高性能、高可用性的消息队列,在中小型项目上可适用于需要处理数据流转的场景。

    参考资料

    Redis Streams

    Redis Commands