xbmc
EventStream.h
1 /*
2  * Copyright (C) 2016-2018 Team Kodi
3  * This file is part of Kodi - https://kodi.tv
4  *
5  * SPDX-License-Identifier: GPL-2.0-or-later
6  * See LICENSES/README.md for more information.
7  */
8 
9 #pragma once
10 
11 #include "EventStreamDetail.h"
12 #include "JobManager.h"
13 #include "threads/CriticalSection.h"
14 
15 #include <algorithm>
16 #include <memory>
17 #include <mutex>
18 #include <vector>
19 
20 
21 template<typename Event>
23 {
24 public:
25 
26  template<typename A>
27  void Subscribe(A* owner, void (A::*fn)(const Event&))
28  {
29  auto subscription = std::make_shared<detail::CSubscription<Event, A>>(owner, fn);
30  std::unique_lock<CCriticalSection> lock(m_criticalSection);
31  m_subscriptions.emplace_back(std::move(subscription));
32  }
33 
34  template<typename A>
35  void Unsubscribe(A* obj)
36  {
37  std::vector<std::shared_ptr<detail::ISubscription<Event>>> toCancel;
38  {
39  std::unique_lock<CCriticalSection> lock(m_criticalSection);
40  auto it = m_subscriptions.begin();
41  while (it != m_subscriptions.end())
42  {
43  if ((*it)->IsOwnedBy(obj))
44  {
45  toCancel.push_back(*it);
46  it = m_subscriptions.erase(it);
47  }
48  else
49  {
50  ++it;
51  }
52  }
53  }
54  for (auto& subscription : toCancel)
55  subscription->Cancel();
56  }
57 
58 protected:
59  std::vector<std::shared_ptr<detail::ISubscription<Event>>> m_subscriptions;
60  CCriticalSection m_criticalSection;
61 };
62 
63 
64 template<typename Event>
65 class CEventSource : public CEventStream<Event>
66 {
67 public:
68  explicit CEventSource() : m_queue(false, 1, CJob::PRIORITY_HIGH) {}
69 
70  template<typename A>
71  void Publish(A event)
72  {
73  std::unique_lock<CCriticalSection> lock(this->m_criticalSection);
74  auto& subscriptions = this->m_subscriptions;
75  auto task = [subscriptions, event](){
76  for (auto& s: subscriptions)
77  s->HandleEvent(event);
78  };
79  lock.unlock();
80  m_queue.Submit(std::move(task));
81  }
82 
83 private:
84  CJobQueue m_queue;
85 };
86 
87 template<typename Event>
88 class CBlockingEventSource : public CEventStream<Event>
89 {
90 public:
91  template<typename A>
92  void HandleEvent(A event)
93  {
94  std::unique_lock<CCriticalSection> lock(this->m_criticalSection);
95  for (const auto& subscription : this->m_subscriptions)
96  {
97  subscription->HandleEvent(event);
98  }
99  }
100 };
Definition: EventStream.h:65
Definition: ArraysTest1.cpp:27
Definition: EventStream.h:88
Definition: EventStream.h:22
Job Queue class to handle a queue of unique jobs to be processed sequentially.
Definition: JobManager.h:63