Nornir自定义Processor-4


到目前为止我们都是通过print_result打印执行结果,理所当然的我们可以自定义一个处理器(Processor)插件来达到相同的结果。

它就是一个可以处理任务的装饰器,在不改变任务结果的前提下,让我们可以自己编写代码对任务结果进行加工,添加私货,为处理任务提供了更多的扩展性。

同时由于Processor插件基于事件的特性,可以异步处理事件,在某一台主机完成任务后立即处理任务结果,而无需等待其他主机完成。

参考官网文档,我们先写一个打印一些信息的processor:

class PrintProcessor:  # 自定义processor
    def task_started(self, task: Task) -> None:
        """
        This method is called right before starting the task
        """
        print(f">>> starting task: {task.name}")

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        """
        This method is called when all the hosts have completed executing their respective task
        """
        print(f">>> completed task: {task.name}")

    def task_instance_started(self, task: Task, host: Host) -> None:
        """
        This method is called before a host starts executing its instance of the task
        """
        pass

    def task_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        """
        This method is called when a host completes its instance of a task
        """
        print(f"  - {host.name}: - {result.result}")

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        """
        This method is called before a host starts executing a subtask
        """
        pass

    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        """
        This method is called when a host completes executing a subtask
        """
        pass

可以看出一个processor插件就是一个类,需要实现一些事件的方法,从名字也能简单看出每个方法是什么时候调用。

再来一个保存信息的processor:

class SaveResultToDict:
    def __init__(self, data: Dict[str, None]) -> None:
        self.data = data

    def task_started(self, task: Task) -> None:
        self.data[task.name] = {}
        self.data[task.name]["task started"] = True

    def task_completed(self, task: Task, result: AggregatedResult) -> None:
        self.data[task.name]["task completed"] = True

    def task_instance_started(self, task: Task, host: Host) -> None:
        self.data[task.name][host.name] = {"host started": True}

    def task_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        self.data[task.name][host.name] = {
            "host completed": True,
            "host result": result.result,
        }

    def subtask_instance_started(self, task: Task, host: Host) -> None:
        pass  # to keep example short and sweet we ignore subtasks

    def subtask_instance_completed(
        self, task: Task, host: Host, result: MultiResult
    ) -> None:
        pass  # to keep example short and sweet we ignore subtasks

demo

import json
from nornir import InitNornir
from nornir.core.plugins.inventory import InventoryPluginRegister
from nornir_paramiko.plugins.tasks import paramiko_command, paramiko_commands

from lab2.cmdb import CMDBInventory
from typing import Dict

from nornir.core.inventory import Host
from nornir.core.task import AggregatedResult, MultiResult, Result, Task

InventoryPluginRegister.register("CMDBInventory", CMDBInventory)

# device_type改为platform也是兼容的
devices = [
    {'ip': '192.168.11.11', 'username': 'python',
        'password': '123', 'device_type': 'huawei'},
    {'ip': '192.168.11.12', 'username': 'python',
        'password': '123', 'platform': 'huawei'},
    {'ip': '192.168.11.13', 'username': 'python',
        'password': '123', 'platform': 'huawei'},
]

nr = InitNornir(
    runner={
        "plugin": "threaded",
        "options": {
            "num_workers": 100,
        },
    },
    inventory={
        "plugin": "CMDBInventory",
        "options": {
            "devices": devices,
        },
    },
)

data = {}
commands = ['sys', 'dis this', 'return', 'quit']

nr_with_processors = nr.with_processors([PrintProcessor(), SaveResultToDict(data)])

nr_with_processors.run(task=paramiko_commands,
                       name='paramiko processors', commands=commands)

print(json.dumps(data, indent=4))