Processor Counter Monitor
topology.h
1 // SPDX-License-Identifier: BSD-3-Clause
2 // 2016-2020, Intel Corporation
3 
4 #pragma once
5 
6 #include <vector>
7 #include <string>
8 #include <algorithm>
9 #include <future>
10 
11 #include "types.h"
12 #include "cpucounters.h"
13 #include "threadpool.h"
14 
15 namespace pcm {
16 
17 // all can be done with forwards, anything hat actually uses PCM should be put in the topology.cpp file
18 class PCM;
19 
20 class SystemRoot;
21 class Socket;
22 class Core;
23 class HyperThread;
24 class ServerUncore;
25 class ClientUncore;
26 
27 class Visitor {
28 public:
29  Visitor() {
30  // Set the floatingpoint format to fixed. Setting the number of decimal digits to 3.
31  ss << std::fixed << std::setprecision(3);
32  }
33 
34  Visitor(const Visitor &) = delete;
35  Visitor & operator = (const Visitor &) = delete;
36 
37 public:
38  virtual void dispatch( SystemRoot const & ) = 0;
39  virtual void dispatch( Socket* ) = 0;
40  virtual void dispatch( Core* ) = 0;
41  virtual void dispatch( HyperThread* ) = 0;
42  virtual void dispatch( ServerUncore* ) = 0;
43  virtual void dispatch( ClientUncore* ) = 0;
44 
45  virtual ~Visitor() {};
46 
47 protected:
48  std::stringstream ss{};
49 };
50 
52 {
53 public:
54  virtual void accept( Visitor & v ) = 0;
55  virtual ~SystemObject() {};
56 };
57 
58 enum Status {
59  Offline = 0,
60  Online = 1
61 };
62 
63 class HyperThread : public SystemObject
64 {
65 public:
66  HyperThread( PCM* m, int32 threadID, int32 osID, enum Status status ) : pcm_(m), threadID_(threadID), osID_(osID), status_(status) {}
67  virtual ~HyperThread() { pcm_ = nullptr; }
68 
69  virtual void accept( Visitor& v ) override {
70  v.dispatch( this );
71  }
72 
73  CoreCounterState coreCounterState() const {
74  CoreCounterState ccs;
75  // fill ccs
76  ccs.BasicCounterState::readAndAggregate( msrHandle_ );
77  return ccs;
78  }
79 
80  void addMSRHandle( std::shared_ptr<SafeMsrHandle> handle ) {
81  msrHandle_ = handle;
82  }
83 
84  int32 threadID() const {
85  return threadID_;
86  }
87 
88  int32 osID() const {
89  return osID_;
90  }
91 
92  // We simply pass by value, this way the refcounting works best and as expected
93  std::shared_ptr<SafeMsrHandle> msrHandle() const {
94  return msrHandle_;
95  }
96 
97  bool isOnline() const {
98  return (status_ == Status::Online);
99  }
100 
101 private:
102  PCM* pcm_;
103  std::shared_ptr<SafeMsrHandle> msrHandle_;
104  int32 threadID_;
105  int32 osID_;
106  enum Status status_;
107 };
108 
109 class Core : public SystemObject
110 {
111  constexpr static int32 MAX_THREADS_PER_CORE = 4;
112 
113 public:
114  Core( PCM* m, int32 coreID, int32 tileID, int32 socketID ) {
115  pcm_ = m;
116  coreID_ = coreID;
117  tileID_ = tileID;
118  socketID_ = socketID;
119  }
120  virtual ~Core() {
121  pcm_ = nullptr;
122  for ( auto& thread : threads_ )
123  delete thread;
124  }
125 
126  virtual void accept( Visitor& v ) override {
127  v.dispatch( this );
128  }
129 
130  CoreCounterState coreCounterState() const {
131  CoreCounterState ccs;
132  // Fill bcs
133  for ( HyperThread* thread : threads_ ) {
134  ccs += thread->coreCounterState();
135  }
136  return ccs;
137  }
138 
139  void addHyperThreadInfo( int32 threadID, int32 osID ) {
140  if ( threadID >= MAX_THREADS_PER_CORE ) {
141  std::stringstream ss;
142  ss << "ERROR: Core: threadID cannot be larger than " << MAX_THREADS_PER_CORE << ".\n";
143  throw std::runtime_error( ss.str() );
144  }
145  if ( threads_.size() == 0 ||
146  std::find_if( threads_.begin(), threads_.end(),
147  [osID]( HyperThread const * ht ) -> bool {
148  return ht->osID() == osID;
149  }
150  ) == threads_.end() )
151  {
152  // std::cerr << "Core::addHyperThreadInfo: " << threadID << ", " << osID << "\n";
153  threads_.push_back( new HyperThread( pcm_, threadID, osID, Status::Online ) );
154  }
155  }
156 
157  HyperThread* hyperThread( size_t threadNo ) const {
158  if ( threadNo >= threads_.size() )
159  throw std::runtime_error( "ERROR: hyperThread: threadNo larger than vector." );
160  return threads_[ threadNo ];
161  }
162 
163  HyperThread* findThreadByOSID( int32 osID ) {
164  for ( HyperThread* thread : threads_ ) {
165  if ( thread->osID() == osID )
166  return thread;
167  }
168  return nullptr;
169  }
170 
171  std::vector<HyperThread*> threads() const {
172  return threads_;
173  }
174 
175  std::shared_ptr<SafeMsrHandle> msrHandle() const {
176  if ( 0 == threads_.size() )
177  throw std::runtime_error("BUG: No threads yet but asking for a msrHandle!");
178  return threads_.front()->msrHandle();
179  }
180 
181  int32 coreID() const {
182  return coreID_;
183  }
184 
185  int32 tileID() const {
186  return tileID_;
187  }
188 
189  int32 socketID() const {
190  return socketID_;
191  }
192 
193  bool isOnline() const {
194  for( auto& thread : threads_ )
195  if ( thread->isOnline() )
196  return true;
197  return false;
198  }
199 
200 private:
201  PCM* pcm_;
202  std::vector<HyperThread*> threads_;
203  int32 coreID_;
204  int32 tileID_;
205  int32 socketID_;
206 };
207 
208 class Uncore : public SystemObject
209 {
210 public:
211  Uncore( PCM* m, int32 socketID ) : pcm_( m ), refCore_( nullptr ), socketID_( socketID ) {}
212  virtual ~Uncore() {
213  pcm_ = nullptr;
214  refCore_ = nullptr;
215  }
216 
217  virtual void accept( Visitor& v ) = 0;
218 
219  virtual UncoreCounterState uncoreCounterState( void ) const = 0;
220 
221  Core* refCore() const {
222  if ( refCore_ == nullptr )
223  throw std::runtime_error( "BUG: Uncore: refCore was never set!" );
224  return refCore_;
225  }
226 
227  int32 socketID() const {
228  return socketID_;
229  }
230 
231  void setRefCore( Core* refCore ) {
232  refCore_ = refCore;
233  }
234 
235 private:
236  PCM* pcm_;
237  Core* refCore_;
238  int32 socketID_;
239 };
240 
241 class ServerUncore : public Uncore
242 {
243 public:
244  ServerUncore( PCM* m, int32 socketID ) : Uncore( m, socketID ) {}
245  virtual ~ServerUncore() {}
246 
247  virtual void accept( Visitor& v ) override {
248  v.dispatch( this );
249  }
250 
251  virtual UncoreCounterState uncoreCounterState( void ) const override;
252 };
253 
254 class ClientUncore : public Uncore
255 {
256 public:
257  ClientUncore( PCM* m, int32 socketID ) : Uncore( m, socketID ) {}
258  virtual ~ClientUncore() {}
259 
260  virtual void accept( Visitor& v ) override {
261  v.dispatch( this );
262  }
263 
264  virtual UncoreCounterState uncoreCounterState( void ) const override {
265  UncoreCounterState ucs;
266  // TODO: Fill the ucs
267  return ucs;
268  }
269 };
270 
271 class Socket : public SystemObject {
272  Socket(const Socket &) = delete;
273  Socket & operator = (const Socket &) = delete;
274 public:
275  Socket( PCM* m, int32 apicID, int32 logicalID );
276  virtual ~Socket() {
277  pcm_ = nullptr;
278  for ( auto& core : cores_ )
279  delete core;
280  refCore_ = nullptr; // cores_ is owner so it is already deleted by here
281  delete uncore_;
282  }
283 
284  virtual void accept( Visitor& v ) override {
285  v.dispatch( this );
286  }
287 
288  void addCore( Core* c ) {
289  cores_.push_back( c );
290  }
291 
292  HyperThread* findThreadByOSID( int32 osID ) {
293  HyperThread* thread;
294  for ( Core* core : cores_ ) {
295  thread = core->findThreadByOSID(osID);
296  if ( nullptr != thread )
297  return thread;
298  }
299  return nullptr;
300  }
301 
302  void setRefCore() {
303  if ( cores_.size() == 0 )
304  throw std::runtime_error("No cores added to the socket so cannot set reference core");
305  refCore_ = cores_.front();
306  // uncore_ cannot be null, it is set in the constructor
307  uncore_->setRefCore( refCore_ );
308  }
309 
310  SocketCounterState socketCounterState( void ) const;
311 
312  Core* findCoreByTileID( int32 tileID ) {
313  for ( auto& core : cores_ ) {
314  if ( core->tileID() == tileID )
315  return core;
316  }
317  return nullptr;
318  }
319 
320  std::vector<Core*> const & cores( void ) const {
321  return cores_;
322  }
323 
324  Uncore* uncore( void ) const {
325  return uncore_;
326  }
327 
328  int32 apicId() const {
329  return apicID_;
330  }
331 
332  int32 socketID() const {
333  return logicalID_;
334  }
335 
336  bool isOnline() const {
337  return refCore_->isOnline();
338  }
339 
340 private:
341  std::vector<Core*> cores_;
342  PCM* pcm_;
343  Core* refCore_;
344  Uncore* uncore_;
345  int32 apicID_;
346  int32 logicalID_;
347 };
348 
349 class SystemRoot : public SystemObject {
350 public:
351  SystemRoot(PCM * p) : pcm_(p) {}
352 
353  SystemRoot( SystemRoot const & ) = delete; // do not try to copy this please
354  SystemRoot & operator = ( SystemRoot const & ) = delete; // do not try to copy this please
355 
356  virtual ~SystemRoot() {
357  pcm_ = nullptr;
358  for ( auto& socket : sockets_ )
359  delete socket;
360  for ( auto& thread : offlinedThreadsAtStart_ )
361  delete thread;
362  }
363 
364  virtual void accept( Visitor& v ) override {
365  v.dispatch( *this );
366  }
367 
368  void addSocket( int32 apic_id, int32 logical_id ) {
369  Socket* s = new Socket( pcm_, apic_id, logical_id );
370  sockets_.push_back( s );
371  }
372 
373  // osID is the expected os_id, this is used in case te.os_id = -1 (offlined core)
374  void addThread( int32 osID, TopologyEntry& te ) {
375  // quick check during development to see if expected osId == te.os_id for onlined cores
376  // assert( te.os_id != -1 && osID == te.os_id );
377  bool entryAdded = false;
378  for ( auto& socket : sockets_ ) {
379  if ( socket->apicId() == te.socket ) {
380  Core* core = nullptr;
381  if ( (core = socket->findCoreByTileID( te.tile_id )) == nullptr ) {
382  // std::cerr << "SystemRoot::addThread: " << te.tile_id << ", " << osID << "\n";
383  core = new Core( pcm_, te.core_id, te.tile_id, te.socket );
384  // std::cerr << "new Core ThreadID: " << te.thread_id << "\n";
385  core->addHyperThreadInfo( te.thread_id, osID );
386  socket->addCore( core );
387  // std::cerr << "Added core " << te.core_id << " with os_id " << osID << ", threadid " << te.thread_id << " and tileid " << te.tile_id << " to socket " << te.socket << ".\n";
388  } else {
389  // std::cerr << "existing Core ThreadID: " << te.thread_id << "\n";
390  core->addHyperThreadInfo( te.thread_id, osID );
391  // std::cerr << "Augmented core " << te.core_id << " with osID " << osID << " and threadid " << te.thread_id << " for the hyperthread to socket " << te.socket << ".\n";
392  }
393  entryAdded = true;
394  break;
395  }
396  }
397  if ( !entryAdded ) {
398  // if ( te.os_id == -1 )
399  // std::cerr << "TE not added because os_id == -1, core is offline\n";
400  offlinedThreadsAtStart_.push_back( new HyperThread( pcm_, -1, osID, Status::Offline ) );
401  }
402  }
403 
404  HyperThread* findThreadByOSID( int32 osID ) {
405  HyperThread* thread;
406  for ( Socket* socket : sockets_ ) {
407  thread = socket->findThreadByOSID( osID );
408  if ( nullptr != thread )
409  return thread;
410  }
411  for ( HyperThread* ht: offlinedThreadsAtStart_ )
412  if ( ht->osID() == osID )
413  return ht;
414  return nullptr;
415  }
416 
417  void addMSRHandleToOSThread( std::shared_ptr<SafeMsrHandle> handle, uint32 osID )
418  {
419  // std::cerr << "addMSRHandleToOSThread: osID: " << osID << "\n";
420  HyperThread* thread = findThreadByOSID( osID );
421  if ( nullptr == thread )
422  throw std::runtime_error( "SystemRoot::addMSRHandleToOSThread osID not found" );
423  thread->addMSRHandle( handle );
424  }
425 
426  SystemCounterState systemCounterState() const {
427  SystemCounterState scs;
428  // Fill scs
429  // by iterating the sockets
430  for ( auto& socket : sockets_ ) {
431  scs += ( socket->socketCounterState() );
432  }
433  return scs;
434  }
435 
436  std::vector<Socket*> const & sockets( void ) const {
437  return sockets_;
438  }
439 
440  std::vector<HyperThread*> const & offlinedThreadsAtStart( void ) const {
441  return offlinedThreadsAtStart_;
442  }
443 
444 private:
445  std::vector<Socket*> sockets_;
446  std::vector<HyperThread*> offlinedThreadsAtStart_;
447  PCM* pcm_;
448 };
449 
450 
451 /* Method used here: while walking the tree and iterating the vector
452  * elements, collect the counters. Once all elements have been walked
453  * the vectors are filled with the aggregates.
454  */
456 {
457 public:
458  Aggregator();
459  virtual ~Aggregator() {}
460 
461 public:
462  virtual void dispatch( SystemRoot const& syp ) override;
463 
464  virtual void dispatch( Socket* sop ) override {
465  // std::cerr << "Aggregator::dispatch( Socket )\n";
466  // Fetch CoreCounterStates
467  for ( auto* core : sop->cores() )
468  core->accept( *this );
469  // Fetch UncoreCounterState async result
470  auto job = new LambdaJob<UncoreCounterState>(
471  []( Socket* s ) -> UncoreCounterState {
472  DBG( 3, "Lambda fetching UncoreCounterState async" );
473  UncoreCounterState ucs;
474  if ( !s->isOnline() )
475  return ucs;
476  return s->uncore()->uncoreCounterState();
477  }, sop
478  );
479  ucsFutures_[ sop->socketID() ] = job->getFuture();
480  wq_.addWork( job );
481  // For now execute directly to compile test
482  //job->execute();
483  }
484 
485  virtual void dispatch( Core* cop ) override {
486  // std::cerr << "Aggregator::dispatch( Core )\n";
487  // Loop each HyperThread
488  for ( auto* thread : cop->threads() ) {
489  // Fetch the CoreCounterState
490  thread->accept( *this );
491  }
492  }
493 
494  virtual void dispatch( HyperThread* htp ) override {
495  // std::cerr << "Aggregator::dispatch( HyperThread )\n";
496  // std::cerr << "Dispatch htp with osID=" << htp->osID() << "\n";
497  auto job = new LambdaJob<CoreCounterState>(
498  []( HyperThread* h ) -> CoreCounterState {
499  DBG( 3, "Lambda fetching CoreCounterState async" );
500  CoreCounterState ccs;
501  if ( !h->isOnline() )
502  return ccs;
503  return h->coreCounterState();
504  }, htp
505  );
506  ccsFutures_[ htp->osID() ] = job->getFuture();
507  wq_.addWork( job );
508  }
509 
510  virtual void dispatch( ServerUncore* /*sup*/ ) override {
511  // std::cerr << "Aggregator::dispatch( ServerUncore )\n";
512  }
513 
514  virtual void dispatch( ClientUncore* /*cup*/ ) override {
515  // std::cerr << "Aggregator::dispatch( ClientUncore )\n";
516  }
517 
518  std::vector<CoreCounterState>const & coreCounterStates( void ) const {
519  return ccsVector_;
520  }
521 
522  std::vector<SocketCounterState>const & socketCounterStates( void ) const {
523  return socsVector_;
524  }
525 
526  SystemCounterState const & systemCounterState( void ) const {
527  return sycs_;
528  }
529 
530  std::chrono::steady_clock::time_point dispatchedAt( void ) const {
531  return dispatchedAt_;
532  }
533 
534 private:
535  std::vector<CoreCounterState> ccsVector_;
536  std::vector<SocketCounterState> socsVector_;
537  SystemCounterState sycs_;
538  std::vector<std::future<CoreCounterState>> ccsFutures_;
539  std::vector<std::future<UncoreCounterState>> ucsFutures_;
540  std::chrono::steady_clock::time_point dispatchedAt_{};
541  WorkQueue wq_;
542 };
543 
544 } // namespace pcm
Internal type and constant definitions.
(Logical) core-wide counter state
Definition: cpucounters.h:2925
Definition: threadpool.h:107
Definition: topology.h:349
Definition: topology.h:208
Socket-wide counter state.
Definition: cpucounters.h:2938
Definition: topology.h:63
Definition: topology.h:254
CPU Performance Monitor.
Definition: cpucounters.h:543
Main CPU counters header.
Definition: bw.cpp:12
Definition: threadpool.h:25
Definition: topology.h:241
Definition: topology.h:455
Definition: topology.h:27
System-wide counter state.
Definition: cpucounters.h:2978
Definition: topology.h:271
Definition: topology.h:109
Definition: topology.h:51
Basic uncore counter state.
Definition: cpucounters.h:2678
Definition: cpucounters.h:87