DevOps OpenStack Ceilometer Collector代码解读

xqb66888 · November 10, 2019 · 15 hits

Collector 功能

Collector 顾名思义是负责数据收集的,它负责搜集来自 OpenStack 其他组件(如 Nova,Glance,Cinder 等)的 Notification 信息,以及从 Compute Agent 和 Central Agent 发送来的数据,然后将这些数据存储在数据库中。

PubSubHubbub

PubSubHubbub是 Google 推出的一个基于 Web-hook 方式的解决方案,它其实是 RSS 的改进。它具体要解决的是 RSS 效率低和压力大的问题,有一个Go real time with pubsubhubbub and feeds讲的挺清楚

Tim的这篇博客也讲了它的机制,其中有这个图:

PubSubHubbub

一个 PubSubHubbub 的大致流程如下:

  1. Sub 找 Pub 订阅内容,Pub 将 Hub 的地址发给 Sub,告诉 Sub:你以后找它要内容去
  2. Sub 将自己要订阅的地址发给 Hub,并在 Hub 那里注册了一个 Callback 函数,以后有新内容麻烦给 Callback 就好啦
  3. Hub 可以主动,也可以被动的从 Pub 那里获得内容,然后再分发给在自己这里注册的 Sub

图中可以看到,有这么几个关键部分,在 Ceilometer 中,它们对应如下:

  • Publisher 内容提供方,OpenStack 的各组件和 Agent 模块的角色
  • Subscriber 内容订阅方,Collector 的角色
  • Hub 中转,Collector 也充当了这个角色

Collector 代码原理

有些相思代码在之前的OpenStack Ceilometer Compute Agent 源码解读讲过

这里只写和 collector 有关的

入口函数

Collector 的核心功能在ceilometer.collector.service:CollectorService中,它是 OpenStack 的 Service 服务,启动以后从initialize_service_hook()开始运行

def initialize_service_hook(self, service):
    self.pipeline_manager = pipeline.setup_pipeline(
        transformer.TransformerExtensionManager(
            'ceilometer.transformer',
        ),
        publisher.PublisherExtensionManager(
            'ceilometer.publisher',
        ),
    )

self.notification_manager = extension_manager.ActivatedExtensionManager( namespace=self.COLLECTOR_NAMESPACE, disabled_names= cfg.CONF.collector.disabled_notification_listeners, )

self.notification_manager.map(self._setup_subscription)

self.conn.create_worker( cfg.CONF.publisher_meter.metering_topic, rpc_dispatcher.RpcDispatcher([self]), 'ceilometer.collector.' + cfg.CONF.publisher_meter.metering_topic, )

这里只说重点的,self.notification_manager是导入所有可用的内容的处理对象,从setup.cfg中可以找到

ceilometer.collector =
    instance = ceilometer.compute.notifications:Instance
    instance_flavor = ceilometer.compute.notifications:InstanceFlavor
    instance_delete = ceilometer.compute.notifications:InstanceDelete
    ...

订阅内容

接着self.notification_manager.map(self._setup_subscription)要对这些对象进行配置,其实就相当于 PubSubHubbub 中的订阅了

def _setup_subscription(self, ext, *args, **kwds):
    handler = ext.obj
    for exchange_topic in handler.get_exchange_topics(cfg.CONF):
        for topic in exchange_topic.topics:
            self.conn.join_consumer_pool(
                callback=self.process_notification,
                pool_name='ceilometer.notifications',
                topic=topic,
                exchange_name=exchange_topic.exchange,
            )

回调函数

这里_setup_subscription()讲每一个订阅对象都join_consumer_pool,即在 AMQP 中接收这些订阅相关 topic 的内容,然后指定了 callback 函数为self.process_notification

def process_notification(self, notification):
    self.notification_manager.map(self._process_notification_for_ext,
                                  notification=notification,
                                  )

def _process_notification_for_ext(self, ext, notification): handler = ext.obj if notification['event_type'] in handler.get_event_types(): ctxt = context.get_admin_context() with self.pipeline_manager.publisher(ctxt, cfg.CONF.counter_source) as p: p(list(handler.process_notification(notification)))

callback 在执行后会调用这些 notification 中的process_notification(),它的作用是对不同的消息进行不同处理,因为从 Nova,Glance 等组件发来的消息 Collector 不一定都读的懂

处理内容

处理好的消息还是会通过 Pipeline 发送到 AMQP 中,然后和 Agent 直接发来的消息类似,Collector 接收并交给

def record_metering_data(self, context, data):
    for meter in data:
        if meter.get('timestamp'):
            ts = timeutils.parse_isotime(meter['timestamp'])
            meter['timestamp'] = timeutils.normalize_time(ts)
        self.storage_conn.record_metering_data(meter)

来处理,其实相当于自己给自己通过 AMQP 发了一条信息,这也就能看出,其实 Collector 充当了 Hub 和 Sub 双重身份

总结

Collector 相对来说不是很复杂,了解了 PubSubHubbub 后再看就相对简单了。

这里没有详细说数据存储部分,因为存储和 API 调用部分联系比较紧密,留给存储部分再讲吧

No Reply at the moment.
You need to Sign in before reply, if you don't have an account, please Sign up first.