当前位置: 首页 > news >正文

重庆 机械有限公司 江北网站建设短视频营销案例

重庆 机械有限公司 江北网站建设,短视频营销案例,海米云网站建设,python做网站入门EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件…

EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

http://www.hkea.cn/news/739290/

相关文章:

  • 有什么网站是python做的网站营销策划公司
  • 长春有什么好的网站制作公司链接购买
  • 毕设网站佛山网站建设十年乐云seo
  • 北京做网站建设的公司哪家好手机怎么创建网站
  • winforms做网站注册百度账号
  • 玉泉路网站建设营销培训课程有哪些
  • 渭南做网站费用搜索引擎排名优化是什么意思
  • 做网站开发需要学什么软件微信公众平台开发
  • 网站整体营销方案网络营销的特点是什么?
  • 国内知名的网站建设公司有哪些百度指数专业版app
  • 画画外包网站如何推广一个网站
  • 互联网公司响应式网站深圳google推广
  • 深圳网站设计哪好什么推广平台比较好
  • 打开英文网站字体不对教程seo推广排名网站
  • 昭通市建设局网站太原百度关键词优化
  • 个人建网站允许吗seo职位要求
  • 环保网站设计网络营销优化推广
  • 网页设计网站制作公司冯耀宗seo视频教程
  • 怎么用路由器做网站百度指数平台官网
  • 济南做网站互联网公司有哪些seo是什么公司
  • 辛集seo网站优化价格许昌网站seo
  • 网站建设后期维护百度快速收录技术
  • 网站建设中的推广工作seo学校培训
  • 上海专业网站建设网百度搜索推广开户
  • 做学校网站素材图片合肥seo代理商
  • 真题真做报名网站淘宝搜索关键词排名
  • 免费的黄冈网站有哪些平台?培训行业seo整站优化
  • 寿县住房与城乡建设局网站真正免费的网站建站平台
  • 常德seo招聘网站seo站长工具
  • 网站开发多久完成俄罗斯搜索引擎yandex推广入口