Prv8 Shell
Server : Apache
System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64
User : matalashes ( 1004)
PHP Version : 8.1.29
Disable Function : NONE
Directory :  /usr/src/cloud-init/tests/unittests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/src/cloud-init/tests/unittests/test_builtin_handlers.py
# This file is part of cloud-init. See LICENSE file for license information.

"""Tests of the built-in user data handlers."""

import copy
import errno
import os
from textwrap import dedent

import pytest

from cloudinit import handlers, helpers, util
from cloudinit.cmd.devel import read_cfg_paths
from cloudinit.handlers.cloud_config import CloudConfigPartHandler
from cloudinit.handlers.jinja_template import (
    JinjaLoadError,
    JinjaTemplatePartHandler,
    convert_jinja_instance_data,
    render_jinja_payload,
)
from cloudinit.handlers.shell_script import ShellScriptPartHandler
from cloudinit.handlers.shell_script_by_frequency import (
    get_script_folder_by_frequency,
    path_map,
)
from cloudinit.settings import PER_ALWAYS, PER_INSTANCE, PER_ONCE
from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja

INSTANCE_DATA_FILE = "instance-data-sensitive.json"


class TestJinjaTemplatePartHandler(CiTestCase):

    with_logs = True

    mpath = "cloudinit.handlers.jinja_template."

    def setUp(self):
        super(TestJinjaTemplatePartHandler, self).setUp()
        self.tmp = self.tmp_dir()
        self.run_dir = os.path.join(self.tmp, "run_dir")
        util.ensure_dir(self.run_dir)
        self.paths = helpers.Paths(
            {"cloud_dir": self.tmp, "run_dir": self.run_dir}
        )

    def test_jinja_template_part_handler_defaults(self):
        """On init, paths are saved and subhandler types are empty."""
        h = JinjaTemplatePartHandler(self.paths)
        self.assertEqual(["## template: jinja"], h.prefixes)
        self.assertEqual(3, h.handler_version)
        self.assertEqual(self.paths, h.paths)
        self.assertEqual({}, h.sub_handlers)

    def test_jinja_template_part_handler_looks_up_sub_handler_types(self):
        """When sub_handlers are passed, init lists types of subhandlers."""
        script_handler = ShellScriptPartHandler(self.paths)
        cloudconfig_handler = CloudConfigPartHandler(self.paths)
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler, cloudconfig_handler]
        )
        self.assertCountEqual(
            [
                "text/cloud-config",
                "text/cloud-config-jsonp",
                "text/x-shellscript",
            ],
            h.sub_handlers,
        )

    def test_jinja_template_part_handler_looks_up_subhandler_types(self):
        """When sub_handlers are passed, init lists types of subhandlers."""
        script_handler = ShellScriptPartHandler(self.paths)
        cloudconfig_handler = CloudConfigPartHandler(self.paths)
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler, cloudconfig_handler]
        )
        self.assertCountEqual(
            [
                "text/cloud-config",
                "text/cloud-config-jsonp",
                "text/x-shellscript",
            ],
            h.sub_handlers,
        )

    def test_jinja_template_handle_noop_on_content_signals(self):
        """Perform no part handling when content type is CONTENT_SIGNALS."""
        script_handler = ShellScriptPartHandler(self.paths)

        h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler])
        with mock.patch.object(script_handler, "handle_part") as m_handle_part:
            h.handle_part(
                data="data",
                ctype=handlers.CONTENT_START,
                filename="part-1",
                payload="## template: jinja\n#!/bin/bash\necho himom",
                frequency="freq",
                headers="headers",
            )
        m_handle_part.assert_not_called()

    @skipUnlessJinja()
    def test_jinja_template_handle_subhandler_v2_with_clean_payload(self):
        """Call version 2 subhandler.handle_part with stripped payload."""
        script_handler = ShellScriptPartHandler(self.paths)
        self.assertEqual(2, script_handler.handler_version)

        # Create required instance data json file
        instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE)
        instance_data = {"topkey": "echo himom"}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler])
        with mock.patch.object(script_handler, "handle_part") as m_part:
            # ctype with leading '!' not in handlers.CONTENT_SIGNALS
            h.handle_part(
                data="data",
                ctype="!" + handlers.CONTENT_START,
                filename="part01",
                payload="## template: jinja   \t \n#!/bin/bash\n{{ topkey }}",
                frequency="freq",
                headers="headers",
            )
        m_part.assert_called_once_with(
            "data", "!__begin__", "part01", "#!/bin/bash\necho himom", "freq"
        )

    @skipUnlessJinja()
    def test_jinja_template_handle_subhandler_v3_with_clean_payload(self):
        """Call version 3 subhandler.handle_part with stripped payload."""
        cloudcfg_handler = CloudConfigPartHandler(self.paths)
        self.assertEqual(3, cloudcfg_handler.handler_version)

        # Create required instance-data.json file
        instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE)
        instance_data = {"topkey": {"sub": "runcmd: [echo hi]"}}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[cloudcfg_handler]
        )
        with mock.patch.object(cloudcfg_handler, "handle_part") as m_part:
            # ctype with leading '!' not in handlers.CONTENT_SIGNALS
            h.handle_part(
                data="data",
                ctype="!" + handlers.CONTENT_END,
                filename="part01",
                payload="## template: jinja\n#cloud-config\n{{ topkey.sub }}",
                frequency="freq",
                headers="headers",
            )
        m_part.assert_called_once_with(
            "data",
            "!__end__",
            "part01",
            "#cloud-config\nruncmd: [echo hi]",
            "freq",
            "headers",
        )

    def test_jinja_template_handle_errors_on_missing_instance_data_json(self):
        """If instance-data is absent, raise an error from handle_part."""
        script_handler = ShellScriptPartHandler(self.paths)
        h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler])
        with self.assertRaises(JinjaLoadError) as context_manager:
            h.handle_part(
                data="data",
                ctype="!" + handlers.CONTENT_START,
                filename="part01",
                payload="## template: jinja  \n#!/bin/bash\necho himom",
                frequency="freq",
                headers="headers",
            )
        script_file = os.path.join(script_handler.script_dir, "part01")
        self.assertEqual(
            "Cannot render jinja template vars. Instance data not yet present"
            " at {}/{}".format(self.run_dir, INSTANCE_DATA_FILE),
            str(context_manager.exception),
        )
        self.assertFalse(
            os.path.exists(script_file),
            "Unexpected file created %s" % script_file,
        )

    def test_jinja_template_handle_errors_on_unreadable_instance_data(self):
        """If instance-data is unreadable, raise an error from handle_part."""
        script_handler = ShellScriptPartHandler(self.paths)
        instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE)
        util.write_file(instance_json, util.json_dumps({}))
        h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler])
        with mock.patch(self.mpath + "load_file") as m_load:
            with self.assertRaises(JinjaLoadError) as context_manager:
                m_load.side_effect = OSError(errno.EACCES, "Not allowed")
                h.handle_part(
                    data="data",
                    ctype="!" + handlers.CONTENT_START,
                    filename="part01",
                    payload="## template: jinja  \n#!/bin/bash\necho himom",
                    frequency="freq",
                    headers="headers",
                )
        script_file = os.path.join(script_handler.script_dir, "part01")
        self.assertEqual(
            "Cannot render jinja template vars. No read permission on "
            "'{}/{}'. Try sudo".format(self.run_dir, INSTANCE_DATA_FILE),
            str(context_manager.exception),
        )
        self.assertFalse(
            os.path.exists(script_file),
            "Unexpected file created %s" % script_file,
        )

    @skipUnlessJinja()
    def test_jinja_template_handle_renders_jinja_content(self):
        """When present, render jinja variables from instance data"""
        script_handler = ShellScriptPartHandler(self.paths)
        instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE)
        instance_data = {"topkey": {"subkey": "echo himom"}}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler])
        h.handle_part(
            data="data",
            ctype="!" + handlers.CONTENT_START,
            filename="part01",
            payload=(
                "## template: jinja  \n"
                "#!/bin/bash\n"
                '{{ topkey.subkey|default("nosubkey") }}'
            ),
            frequency="freq",
            headers="headers",
        )
        script_file = os.path.join(script_handler.script_dir, "part01")
        self.assertNotIn(
            "Instance data not yet present at {}/{}".format(
                self.run_dir, INSTANCE_DATA_FILE
            ),
            self.logs.getvalue(),
        )
        self.assertEqual(
            "#!/bin/bash\necho himom", util.load_file(script_file)
        )

    @skipUnlessJinja()
    def test_jinja_template_handle_renders_jinja_content_missing_keys(self):
        """When specified jinja variable is undefined, log a warning."""
        script_handler = ShellScriptPartHandler(self.paths)
        instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE)
        instance_data = {"topkey": {"subkey": "echo himom"}}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler])
        h.handle_part(
            data="data",
            ctype="!" + handlers.CONTENT_START,
            filename="part01",
            payload="## template: jinja  \n#!/bin/bash\n{{ goodtry }}",
            frequency="freq",
            headers="headers",
        )
        script_file = os.path.join(script_handler.script_dir, "part01")
        self.assertTrue(
            os.path.exists(script_file),
            "Missing expected file %s" % script_file,
        )
        self.assertIn(
            "WARNING: Could not render jinja template variables in file"
            " 'part01': 'goodtry'\n",
            self.logs.getvalue(),
        )


class TestConvertJinjaInstanceData:
    @pytest.mark.parametrize(
        "include_key_aliases,data,expected",
        (
            (False, {"my-key": "my-val"}, {"my-key": "my-val"}),
            (
                True,
                {"my-key": "my-val"},
                {"my-key": "my-val", "my_key": "my-val"},
            ),
            (False, {"my.key": "my.val"}, {"my.key": "my.val"}),
            (
                True,
                {"my.key": "my.val"},
                {"my.key": "my.val", "my_key": "my.val"},
            ),
            (
                True,
                {"my/key": "my/val"},
                {"my/key": "my/val", "my_key": "my/val"},
            ),
        ),
    )
    def test_convert_instance_data_operators_to_underscores(
        self, include_key_aliases, data, expected
    ):
        """Replace Jinja operators keys with underscores in instance-data."""
        assert expected == convert_jinja_instance_data(
            data=data, include_key_aliases=include_key_aliases
        )

    def test_convert_instance_data_promotes_versioned_keys_to_top_level(self):
        """Any versioned keys are promoted as top-level keys

        This provides any cloud-init standardized keys up at a top-level to
        allow ease of reference for users. Intsead of v1.availability_zone,
        the name availability_zone can be used in templates.
        """
        data = {
            "ds": {"dskey1": 1, "dskey2": 2},
            "v1": {"v1key1": "v1.1"},
            "v2": {"v2key1": "v2.1"},
        }
        expected_data = copy.deepcopy(data)
        expected_data.update({"v1key1": "v1.1", "v2key1": "v2.1"})

        converted_data = convert_jinja_instance_data(data=data)
        assert sorted(["ds", "v1", "v2", "v1key1", "v2key1"]) == sorted(
            converted_data.keys()
        )
        assert expected_data == converted_data

    def test_convert_instance_data_most_recent_version_of_promoted_keys(self):
        """The most-recent versioned key value is promoted to top-level."""
        data = {
            "v1": {"key1": "old v1 key1", "key2": "old v1 key2"},
            "v2": {"key1": "newer v2 key1", "key3": "newer v2 key3"},
            "v3": {"key1": "newest v3 key1"},
        }
        expected_data = copy.deepcopy(data)
        expected_data.update(
            {
                "key1": "newest v3 key1",
                "key2": "old v1 key2",
                "key3": "newer v2 key3",
            }
        )

        converted_data = convert_jinja_instance_data(data=data)
        assert expected_data == converted_data

    def test_convert_instance_data_decodes_decode_paths(self):
        """Any decode_paths provided are decoded by convert_instance_data."""
        data = {"key1": {"subkey1": "aGkgbW9t"}, "key2": "aGkgZGFk"}
        expected_data = copy.deepcopy(data)
        expected_data["key1"]["subkey1"] = "hi mom"

        converted_data = convert_jinja_instance_data(
            data=data, decode_paths=("key1/subkey1",)
        )
        assert expected_data == converted_data


class TestRenderJinjaPayload(CiTestCase):

    with_logs = True

    @skipUnlessJinja()
    def test_render_jinja_payload_logs_jinja_vars_on_debug(self):
        """When debug is True, log jinja varables available."""
        payload = (
            "## template: jinja\n#!/bin/sh\necho hi from {{ v1.hostname }}"
        )
        instance_data = {"v1": {"hostname": "foo"}, "instance-id": "iid"}
        expected_log = dedent(
            """\
            DEBUG: Converted jinja variables
            {
             "hostname": "foo",
             "instance-id": "iid",
             "instance_id": "iid",
             "v1": {
              "hostname": "foo"
             }
            }
            """
        )
        self.assertEqual(
            render_jinja_payload(
                payload=payload,
                payload_fn="myfile",
                instance_data=instance_data,
                debug=True,
            ),
            "#!/bin/sh\necho hi from foo",
        )
        self.assertEqual(expected_log, self.logs.getvalue())

    @skipUnlessJinja()
    def test_render_jinja_payload_replaces_missing_variables_and_warns(self):
        """Warn on missing jinja variables and replace the absent variable."""
        payload = "## template: jinja\n#!/bin/sh\necho hi from {{ NOTHERE }}"
        instance_data = {"v1": {"hostname": "foo"}, "instance-id": "iid"}
        self.assertEqual(
            render_jinja_payload(
                payload=payload,
                payload_fn="myfile",
                instance_data=instance_data,
            ),
            "#!/bin/sh\necho hi from CI_MISSING_JINJA_VAR/NOTHERE",
        )
        expected_log = (
            "WARNING: Could not render jinja template variables in file"
            " 'myfile': 'NOTHERE'"
        )
        self.assertIn(expected_log, self.logs.getvalue())


class TestShellScriptByFrequencyHandlers:
    def do_test_frequency(self, frequency):
        ci_paths = read_cfg_paths()
        scripts_dir = ci_paths.get_cpath("scripts")
        testFolder = os.path.join(scripts_dir, path_map[frequency])
        folder = get_script_folder_by_frequency(frequency, scripts_dir)
        assert testFolder == folder

    def test_get_script_folder_per_boot(self):
        self.do_test_frequency(PER_ALWAYS)

    def test_get_script_folder_per_instance(self):
        self.do_test_frequency(PER_INSTANCE)

    def test_get_script_folder_per_once(self):
        self.do_test_frequency(PER_ONCE)


# vi: ts=4 expandtab

haha - 2025