|
Server : Apache System : Linux server.mata-lashes.com 3.10.0-1160.90.1.el7.x86_64 #1 SMP Thu May 4 15:21:22 UTC 2023 x86_64 User : matalashes ( 1004) PHP Version : 8.1.29 Disable Function : NONE Directory : /proc/17567/root/usr/src/cloud-init/tests/unittests/ |
Upload File : |
# This file is part of cloud-init. See LICENSE file for license information.
import os
import stat
from functools import partial
from textwrap import dedent
from typing import NamedTuple
from unittest import mock
from unittest.mock import patch
import pytest
from cloudinit import ssh_util, util
M_PATH = "cloudinit.ssh_util."
class FakePwEnt(NamedTuple):
pw_name: str = "UNSET_pw_name"
pw_passwd: str = "UNSET_w_passwd"
pw_uid: str = "UNSET_pw_uid"
pw_gid: str = "UNSET_pw_gid"
pw_gecos: str = "UNSET_pw_gecos"
pw_dir: str = "UNSET_pw_dir"
pw_shell: str = "UNSET_pw_shell"
def mock_get_owner(updated_permissions, value):
try:
return updated_permissions[value][0]
except ValueError:
return util.get_owner(value)
def mock_get_group(updated_permissions, value):
try:
return updated_permissions[value][1]
except ValueError:
return util.get_group(value)
def mock_get_user_groups(username):
return username
def mock_get_permissions(updated_permissions, value):
try:
return updated_permissions[value][2]
except ValueError:
return util.get_permissions(value)
def mock_getpwnam(users, username):
return users[username]
# Do not use these public keys, most of them are fetched from
# the testdata for OpenSSH, and their private keys are available
# https://github.com/openssh/openssh-portable/tree/master/regress/unittests/sshkey/testdata
VALID_CONTENT = {
"dsa": (
"AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF"
"W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa"
"A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa"
"jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv"
"KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4"
"REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7"
"hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P"
"obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE"
"JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/"
"5z7u2rVAlDw=="
),
"ecdsa": (
"AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ"
"J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/"
"Ql7lc2leWL7CY="
),
"rsa": (
"AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz"
"emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD"
"c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q"
"7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT"
"YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07"
"/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw=="
),
"ed25519": (
"AAAAC3NzaC1lZDI1NTE5AAAAIA1J77+CrJ8p6/vWCEzuylqJNMHUP/XmeYyGVWb8lnDd"
),
"ecdsa-sha2-nistp256-cert-v01@openssh.com": (
"AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
"gQIfwT/+UX68/hlKsdKuaOuAVB6ftTg03SlP/uH4OBEwAAAAIbmlzdHAyNTYAAA"
"BBBEjA0gjJmPM6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXw"
"HvNxplYBwdPlk2zEecvf9Cs2BMAAAAAAAAAAAAAAAEAAAAYa2V5cy9lY2RzYS1z"
"aGEyLW5pc3RwMjU2AAAAAAAAAAAAAAAA//////////8AAAAAAAAAggAAABVwZXJ"
"taXQtWDExLWZvcndhcmRpbmcAAAAAAAAAF3Blcm1pdC1hZ2VudC1mb3J3YXJkaW"
"5nAAAAAAAAABZwZXJtaXQtcG9ydC1mb3J3YXJkaW5nAAAAAAAAAApwZXJtaXQtc"
"HR5AAAAAAAAAA5wZXJtaXQtdXNlci1yYwAAAAAAAAAAAAAAaAAAABNlY2RzYS1z"
"aGEyLW5pc3RwMjU2AAAACG5pc3RwMjU2AAAAQQRH6Y9Q1+ocQ8ETKW3LjQqtxg7"
"OuSSDacxmmQatQVaIawwjCbmntyEAqmVj3v9ElDSXnO5m7TyYMBQu4+vsh76RAA"
"AAZQAAABNlY2RzYS1zaGEyLW5pc3RwMjU2AAAASgAAACEA47Cl2MMhr+glPGuxx"
"2tM3QXkDcwdP0SxSEW5yy4XV5oAAAAhANNMm1cdVlAt3hmycQgdD82zPlg5YvVO"
"iN0SQTbgVD8i"
),
"ecdsa-sha2-nistp256": (
"AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEjA0gjJmPM"
"6La3sXyfNlnjilvvGY6I2M8SvJj4o3X/46wcUbPWTaj4RF3EXwHvNxplYBwdPlk"
"2zEecvf9Cs2BM="
),
"ecdsa-sha2-nistp384-cert-v01@openssh.com": (
"AAAAKGVjZHNhLXNoYTItbmlzdHAzODQtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
"grnSvDsK1EnCZndO1IyGWcGkVgVSkPWi/XO2ybPFyLVUAAAAIbmlzdHAzODQAAA"
"BhBAaYSQs+8TT0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaY"
"ByhXtAJiPOMqLU5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm"
"7q6BOAAAAAAAAAAAAAAAAQAAABhrZXlzL2VjZHNhLXNoYTItbmlzdHAzODQAAAA"
"AAAAAAAAAAAD//////////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZG"
"luZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pd"
"C1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1p"
"dC11c2VyLXJjAAAAAAAAAAAAAACIAAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAA"
"IbmlzdHAzODQAAABhBLWbubcMzcWc7lMTCMGVXZlaVvUOHLjpr6SOOScFFrd8K9"
"Gl8nYELST5HZ1gym65m+MG6/tbrUWIY/flLWNIe+WtqxrdPPGdIhFruCwNw2peZ"
"SbQOa/o3AGnJ/vO6EKEGAAAAIQAAAATZWNkc2Etc2hhMi1uaXN0cDM4NAAAAGkA"
"AAAxAL10JHd5bvnbpD+fet/k1YE1BEIrqGXaoIIJ9ReE5H4nTK1uQJzMD7+wwGK"
"RVYqYQgAAADAiit0UCMDAUbjD+R2x4LvU3x/t8G3sdqDLRNfMRpjZpvcS8AwC+Y"
"VFVSQNn0AyzW0="
),
"ecdsa-sha2-nistp384": (
"AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBAaYSQs+8TT"
"0Tzciy0dorwhur6yzOGUrYQ6ueUQYWbE7eNdHmhsVrlpGPgSaYByhXtAJiPOMqL"
"U5h0eb3sCtM3ek4NvjXFTGTqPrrxJI6q0OsgrtkGE7UM9ZsfMm7q6BOA=="
),
"ecdsa-sha2-nistp521-cert-v01@openssh.com": (
"AAAAKGVjZHNhLXNoYTItbmlzdHA1MjEtY2VydC12MDFAb3BlbnNzaC5jb20AAAA"
"gGmRzkkMvRFk1V5U3m3mQ2nfW20SJVXk1NKnT5iZGDcEAAAAIbmlzdHA1MjEAAA"
"CFBAHosAOHAI1ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94C"
"Q8yyNHcby87zFZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnR"
"nxprkcQ0rfCCdagkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxAAAAAAAAAA"
"AAAAAAQAAABhrZXlzL2VjZHNhLXNoYTItbmlzdHA1MjEAAAAAAAAAAAAAAAD///"
"///////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXc"
"GVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndh"
"cmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAA"
"AAAAAAAAAAACsAAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAA"
"CFBAC6hFVXM1XEg/7qKkp5sLZuANGQVW88b5pPn2ZcK0td9IQstLH6BwWuZ6MPE"
"ogiDlvx9HD1BaKGBBfkxgOY8NGFzQHbjU9eTWH3gt0RATDbZsij1pSkFPnAXdU9"
"SjfogYloI2xdHaTCgWp3zgsUV+BBQ0QGGv2MqqcOmrF0f5YEJeOffAAAAKcAAAA"
"TZWNkc2Etc2hhMi1uaXN0cDUyMQAAAIwAAABCAT+vSOYPuYVTDopDW08576d5Sb"
"edXQMOu1op4CQIm98VKtAXvu5dfioi5VYAqpte8M+UxEMOMiQWJp+U9exYf6LuA"
"AAAQgEzkIpX3yKXPaPcK17mNx40ujEDitm4ARmbhAge0sFhZtf7YIgI55b6vkI8"
"JvMJkzQCBF1cpNOaIpVh1nFZNBphMQ=="
),
"ecdsa-sha2-nistp521": (
"AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBAHosAOHAI1"
"ZkerbKYQ72S6uit1u77PCj/OalZtXgsxv0TTAZB273puG2X94CQ8yyNHcby87zF"
"ZHdv5BSKyZ/cyREAAeiAcSakop9VS3+bUfZpEIqwBZXarwUjnRnxprkcQ0rfCCd"
"agkGZr/OA7DemK2D8tKLTHsKoEEWNImo6/pXDkFxA=="
),
"sk-ecdsa-sha2-nistp256-cert-v01@openssh.com": (
"AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
"wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
"ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
"DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz"
"c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA"
"AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
"0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
),
"sk-ecdsa-sha2-nistp256@openssh.com": (
"AAAAInNrLWVjZHNhLXNoYTItbmlzdHAyNTZAb3BlbnNzaC5jb20AAAAIbmlzdHA"
"yNTYAAABBBIELQJ2DgvaX1yQlKFokfWM2suuaCFI2qp0eJodHyg6O4ifxc3XpRK"
"d1OS8dNYQtE/YjdXSrA+AOnMF5ns2Nkx4AAAAEc3NoOg=="
),
"sk-ssh-ed25519-cert-v01@openssh.com": (
"AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
"wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
"ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
"DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz"
"c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA"
"AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
"0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
),
"sk-ssh-ed25519@openssh.com": (
"AAAAGnNrLXNzaC1lZDI1NTE5QG9wZW5zc2guY29tAAAAICFo/k5LU8863u66YC9"
"eUO2170QduohPURkQnbLa/dczAAAABHNzaDo="
),
"ssh-dss-cert-v01@openssh.com": (
"AAAAHHNzaC1kc3MtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgdTlbNU9Hn9Qng3F"
"HxwH971bxCIoq1ern/QWFFDWXgmYAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0c"
"Fn1zYd/JGvtabKnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4"
"yLB+6vCtHcJF7rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DD"
"jMF0k5emWKCsa3ZfAAAAFQCjA/+dKkMu4/CWjJPtfl7YNaStNQAAAIEA7uX1BVV"
"tJKjLmWrpw62+l/xSXA5rr7MHBuWjiCYV3VHBfXJaQDyRDtGuEJKDwdzqYgacpG"
"ApGWL/cuBtJ9nShsUl6GRG0Ra03g+Hx9VR5LviJBsjAVB4qVgciU1NGga0Bt2Le"
"cd1X4EGQRBzVXeuOpiqGM6jP/I2yDMs0Pboet0AAACBAOdXpyfmobEBaOqZAuvg"
"j1P0uhjG2P31Ufurv22FWPBU3A9qrkxbOXwE0LwvjCvrsQV/lrYhJz/tiys40Ve"
"ahulWZE5SAHMXGIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1Uuy"
"QMcUtb34+I0u9Ycnyhp2mSFsQtAAAAAAAAAAYAAAACAAAABmp1bGl1cwAAABIAA"
"AAFaG9zdDEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAA"
"MwAAAAtzc2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN"
"+F7SMGQAAAFMAAAALc3NoLWVkMjU1MTkAAABAh/z1LIdNL1b66tQ8t9DY9BTB3B"
"QKpTKmc7ezyFKLwl96yaIniZwD9Ticdbe/8i/Li3uCFE3EAt8NAIv9zff8Bg=="
),
"ssh-dss": (
"AAAAB3NzaC1kc3MAAACBAPqS600VGwdPAQC/p3f0uGyrLVql0cFn1zYd/JGvtab"
"KnIYjLaYprje/NcjwI3CZFJiz4Dp3S8kLs+X5/1DMn/Tg1Y4D4yLB+6vCtHcJF7"
"rVBFhvw/KZwc7G54ez3khyOtsg82fzpyOc8/mq+/+C5TMKO7DDjMF0k5emWKCsa"
"3ZfAAAAFQCjA/+dKkMu4/CWjJPtfl7YNaStNQAAAIEA7uX1BVVtJKjLmWrpw62+"
"l/xSXA5rr7MHBuWjiCYV3VHBfXJaQDyRDtGuEJKDwdzqYgacpGApGWL/cuBtJ9n"
"ShsUl6GRG0Ra03g+Hx9VR5LviJBsjAVB4qVgciU1NGga0Bt2Lecd1X4EGQRBzVX"
"euOpiqGM6jP/I2yDMs0Pboet0AAACBAOdXpyfmobEBaOqZAuvgj1P0uhjG2P31U"
"furv22FWPBU3A9qrkxbOXwE0LwvjCvrsQV/lrYhJz/tiys40VeahulWZE5SAHMX"
"GIf95LiLSgaXMjko7joot+LK84ltLymwZ4QMnYjnZSSclf1UuyQMcUtb34+I0u9"
"Ycnyhp2mSFsQt"
),
"ssh-ed25519-cert-v01@openssh.com": (
"AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIIxzuxl4z3u"
"wAIslne8Huft+1n1IhHAlNbWZkQyyECCGAAAAIFOG6kY7Rf4UtCFvPwKgo/BztX"
"ck2xC4a2WyA34XtIwZAAAAAAAAAAgAAAACAAAABmp1bGl1cwAAABIAAAAFaG9zd"
"DEAAAAFaG9zdDIAAAAANowB8AAAAABNHmBwAAAAAAAAAAAAAAAAAAAAMwAAAAtz"
"c2gtZWQyNTUxOQAAACBThupGO0X+FLQhbz8CoKPwc7V3JNsQuGtlsgN+F7SMGQA"
"AAFMAAAALc3NoLWVkMjU1MTkAAABABGTn+Bmz86Ajk+iqKCSdP5NClsYzn4alJd"
"0V5bizhP0Kumc/HbqQfSt684J1WdSzih+EjvnTgBhK9jTBKb90AQ=="
),
"ssh-ed25519": (
"AAAAC3NzaC1lZDI1NTE5AAAAIFOG6kY7Rf4UtCFvPwKgo/BztXck2xC4a2WyA34XtIwZ"
),
"ssh-rsa-cert-v01@openssh.com": (
"AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAg98LhS2EHxLOWCLo"
"pZPwHdg/RJXusnkOqQXSc9R7aITkAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGh"
"EZzpoojjEW5y8+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yM"
"rW6wb84gbq8C31Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXec"
"XylxIUOL0jE+u0rU1967pDJx+wAAAAAAAAAFAAAAAgAAAAZqdWxpdXMAAAASAAA"
"ABWhvc3QxAAAABWhvc3QyAAAAADaMAfAAAAAATR5gcAAAAAAAAAAAAAAAAAAAAD"
"MAAAALc3NoLWVkMjU1MTkAAAAgU4bqRjtF/hS0IW8/AqCj8HO1dyTbELhrZbIDf"
"he0jBkAAABTAAAAC3NzaC1lZDI1NTE5AAAAQI3QGlUCzC07KorupxpDkkGy6tni"
"aZ8EvBflzvv+itXWNchGvfUeHmVT6aX0sRqehdz/lR+GmXRoZBhofwh0qAM="
),
"ssh-rsa": (
"AAAAB3NzaC1yc2EAAAADAQABAAAAgQDLV5lUTt7FrADseB/CGhEZzpoojjEW5y8"
"+ePvLppmK3MmMI18ud6vxzpK3bwZLYkVSyfJYI0HmIuGhdu7yMrW6wb84gbq8C3"
"1Xoe9EORcIUuGSvDKdNSM1SjlhDquRblDFB8kToqXyx1lqrXecXylxIUOL0jE+u"
"0rU1967pDJx+w=="
),
"ssh-xmss-cert-v01@openssh.com": (
"AAAAHXNzaC14bXNzLWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIM2UD0IH+Igsekq"
"xjTO5f36exX4WGRMCtDGPjwfbXblxAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0gxMA"
"AAAEDI83/K5JMOy0BMJgQypRdz35ApAnoQinMJ8ZMoZPaEJF8Z4rANQlfzaAXum"
"N3RDU5CGIUGGw+WJ904G/wwEq9CAAAAAAAAAAAAAAABAAAACWtleXMveG1zcwAA"
"AAAAAAAAAAAAAP//////////AAAAAAAAAIIAAAAVcGVybWl0LVgxMS1mb3J3YXJ"
"kaW5nAAAAAAAAABdwZXJtaXQtYWdlbnQtZm9yd2FyZGluZwAAAAAAAAAWcGVybW"
"l0LXBvcnQtZm9yd2FyZGluZwAAAAAAAAAKcGVybWl0LXB0eQAAAAAAAAAOcGVyb"
"Wl0LXVzZXItcmMAAAAAAAAAAAAAAHUAAAAUc3NoLXhtc3NAb3BlbnNzaC5jb20A"
"AAAVWE1TU19TSEEyLTI1Nl9XMTZfSDEwAAAAQA+irIyT2kaOd07YWZT/QItzNBZ"
"kUYwnqZJihQ7BxuyiDP4HEFbnfYnnIZXx9Asyi7vDyZRvi+AMSOzmMSq4JnkAAA"
"ngAAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAJxAAAAAAFjaKTDc+7Hu2uFGIab"
"3NAku8HbbGtrq/uGXOxmqxu4RaLqmwofl5iXk3nMwWEhQAb99vAc9D9ZFtfxJO4"
"STYUTjbj4BxToov/uvbYfE5VeO6sMvkGglgh9YHkCTAItsG8EmGT1SIPfKYzLlN"
"jvUlbcv0PaPFMJ0wzS9mNfuRf+KUhf3dxQ6zaMrBH3KEJ8Me2kNjhnh6rNPROeI"
"N+IcStSKsydYuiySGKS/orsH38XysuK5QqLizbHJY3cqLbkW9LsIijb+pfEJh4Y"
"bOoAbraWAv9ySnWCyRhvw2x8uJ0ZM+p5WSRiZfB3JxCpOhHgiKa9TdmdjnAtnED"
"zqKOj/gM7y9mesn5ydQI0bENOGymlw0ThUGKbXMxn87Hc9dDPURUBmoO3NGjPDf"
"7meS39A1ZEGtCe/pbZU9iwxqGx4wJYvB4lutRP2tYC1pA6hjQCcHibvxl5iqj+1"
"jRjwPr8dbTm4PdETW/7JDSVQXKjxOT0kRLHLelJNeviGx5zSHR5PtnUP3nOBMme"
"hk9DwcQW9vfKeWSnu9CMnF8xvYJxoPKQwmz0TKo+YVOUnc9/Ma+Ykseof9/W+rk"
"USQGELc4x7XE5XBKYZZP2PmtxirQ3qTWFw+CeTX2Oa+jPYkzOa7jgmHJ3Fi9Xqw"
"3L844vRl97e28GmwS0M1SXH+ohES0mO4EcrGh5OLyXBaRTV5QMo+4Bg6FH/HwEn"
"gG1mdEOAqvctK2QC70c4lHGzfexqwQ2U6WUADPcd/BLOE8Noj1EiXYwZrSA1okZ"
"FYnS/b89Uo51D2FE4A33V4gcxAglGzVNtrPulkguNT9B4jjNgdIwkTBL9k3ujkG"
"og6pyYjZ0J5Jp5XPBn+y0LqrpOdZijzrc1OJbX59tTeIbDkM7Fw8As4a03hQPDU"
"FTOdyMHgLnuLhLXOcqIjvW5axZL/Kx3UET8wrSHizPoa6NErCG4v5mC2M4kBSOW"
"In1QV27QMaHkL/ZAa3mPsW5iFZtOVEGzw2BW4MZs0qOrcloCENZzOHiMBroKEkH"
"AbzX6D1FLwml2JpXq4JXlCrdIiFm4+co5ygnWPqb4QGzMlcbjW/x/A16TthNuok"
"wwlmK5ndKZ76LahyGKEwx2Nv0D+0xilEC1EldtiYRdBNlcGbU/A5EhH5bQ9KVIH"
"wjWm35pRPLl5224//nqvQKhwFCn9otsR35XHXev3IQ0or3HmQxIvSDOwir1l66z"
"FFrkyHMWexoucbTBxw1MN3hLb247lcVYJ5+hspJgyoYbfR5RkQVDzhpzskogP7l"
"K5t0bphu+f+hpvrca7DAiiIZkcR4R1UUQoRnJPRXyXOxlxwS10b51cP9p9jzvZj"
"d2LUs8yx1KXWSxNHo6WmtYONNaUfdX2OB5+QCvPULfLfFeBrqpX6Yp5wQMM5Cup"
"k8FEfV07eEgQkVE9nDGKHglWo3kUdOF+XCqWAnXn0b/2bNS9/SSAz6gB1GTFcN/"
"QsFGlC0QgbCJbQ7LQM6hilRWupWvN5zZ/+HJyyRHuSs5VnQnKiGbIa6AIhx7mP7"
"8T82gKjU3mHLJWMGKcT3cY8R958Gs+w4OT71VJRMw3kK6qk02WCbD5OtbFeC6ib"
"KRJKdLK3BzjVs/Fzu3mHVucVby3jpvG1Z8HKspKFhvV7gjFEPu8qHKi4MdAlif/"
"KakyPk8yZB/dMfaxh7Kv/WpJuSwWNs7RNh29e+ZG+POxqRPWiHqiVw7P17a4dN7"
"nkVOawdBEyxI4NAY+4zW+0r0bAy6zNBitBvkq3IXfr3De6Upex52sPHvK04PXoV"
"RI6gjnpPSbLLjpSpcHPKgB7DWefLfhd63BUQbc57D8zm8Jd6qtmzcSKn+wz5/zT"
"0I6v9I4a+DOjjyqpPpzzNU76pt+Y8SuBgHzMm1vcAdNWlbQrqtScvm0T9AkYni6"
"47vSh77uwRZKDtMCMSU151tVUavXhtLYLZ6/ll5NhMXkkx8//i7pk1OBjN5LHVQ"
"0QeimRmavlXU1dJ2rwsFAV+9dDdJXUNOq3VLTo9FrbOzZiWtzzjkJpVJAFREnBn"
"yIDBK5AXtXE1RzfzaBHzbI2e2kO3t+CSNLWYMFYHBDqaeICYQ9+I9aO/8hnzVSo"
"fp+8IfWO8iJhppqynUniicW2oCzrn4oczzYNEjImt8CGY7g90GxWfX+ZgXMJfy/"
"bQiFQL3dZvVypDHEbFoIGz+sxkL83xrP4MZV1V9Wwa64lDXYv01Kp4kQXmmnAZY"
"KlxBoWqYDXLeLLguSOZxDSCIDpd+YPm39wQ3wOysHW2fmsWtp6FPPlQRUYjsGIP"
"lfrkJzpoeaPKDtF1m+mOULfEh9kvTKCmKRi385T9ON39D97eWqaM4CCfUGImvdR"
"DlZLXvjmaAh5BVJ8VJxk75OkP14vWFFlTMv0/k4BYLDKsrNqCREC/G9nQBGcD2D"
"CLwC2zPNaX2Y9dnyDs2csjN1ibsYttUMnXMgBcnCOkIkVS496Bpc0jQMf35GUgb"
"PSyliwqCoXjEBP/2eyq0VLFKQ0fXGsHWvElT+Y/7RYNTiYVWttFMxN5H/2EGcgn"
"lfNHLpQvXH9u/3YminS9GX30hQ7jFhpHXxkK8gZ1mpHL9K3pfKS3lG6EF9wQ23O"
"qS8m995SG3dp3MzmywxXen/ukXx6bDiEl5VaOvdRUcbhr5Eb3exVDfdWiaJdTYF"
"WfIfJOWx88drB3J9vFwjmuaoNEOjFsoNAMYthYOxXraXaJblvmUKz6tJ3T8/G7x"
"B9QGYNBsOqBolKoKHBtsWCosLdWhEZr9VFFh2AJrOW1fx24CIkHnvfTtwYORvQq"
"Ckuq2bZS1EOdsFkU/X5gwPl6gSUTNhV3IooXkBFL3iBEbfZ6JpQHVVyIuNWjIyN"
"b2liCn9Nn0VHeNMMRLl7uyw4eKlOX2ogom8SLvihYxcJoqlCwtehpLsKsU4iwME"
"PmDteW5GBGf4GbnqPFkpIT5ed1jGhdZt/dpsp+v6QhYH1uX4pPxdkdnuc84/yb9"
"k4SQdKBJ+l3KZkfIxApNWOZqicJfz/eWwS/15hiamRKRuiiUV2zS1V+l8bV7g9O"
"gy5scPBMONxtfFlGEKikZKurFmzboCOGQKRBEUCpsY44IAp443h59pQdVIb0YAS"
"kfp2xKHwYij6ELRNdH5MrlFa3bNTskGO4k5XDR4cl/Sma2SXgBKb5XjTtlNmCQG"
"Gv6lOW7pGXNhs5wfd8K9Ukm6KeLTIlYn1iiKM37YQpa+4JQYljCYhumbqNCkPTZ"
"rNYClh8fQEQ8XuOCDpomMWu58YOTfbZNMDWs/Ou7RfCjX+VNwjPShDK9joMwWKc"
"Jy3QalZbaoWtcyyvXxR2sqhVR9F7Cmasq4="
),
"ssh-xmss@openssh.com": (
"AAAAFHNzaC14bXNzQG9wZW5zc2guY29tAAAAFVhNU1NfU0hBMi0yNTZfVzE2X0g"
"xMAAAAECqptWnK94d+Sj2xcdTu8gz+75lawZoLSZFqC5IhbYuT/Z3oBZCim6yt+"
"HAmk6MKldl3Fg+74v4sR/SII0I0Jv/"
),
}
KEY_TYPES = list(VALID_CONTENT.keys())
TEST_OPTIONS = (
"no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
'command="echo \'Please login as the user "ubuntu" rather than the'
'user "root".\';echo;sleep 10"'
)
class TestAuthKeyLineParser:
@pytest.mark.parametrize("with_options", [True, False])
@pytest.mark.parametrize("with_comment", [True, False])
@pytest.mark.parametrize("ktype", KEY_TYPES)
def test_parse(self, ktype, with_comment, with_options):
content = VALID_CONTENT[ktype]
comment = "user-%s@host" % ktype
options = TEST_OPTIONS
line_args = []
if with_options:
line_args.append(options)
line_args.extend(
[
ktype,
content,
]
)
if with_comment:
line_args.append(comment)
line = " ".join(line_args)
key = ssh_util.AuthKeyLineParser().parse(line)
assert key.base64 == content
assert key.keytype == ktype
if with_options:
assert key.options == options
else:
assert key.options is None
if with_comment:
assert key.comment == comment
else:
assert key.comment == ""
def test_parse_with_options_passed_in(self):
# test key line with key type and base64 only
parser = ssh_util.AuthKeyLineParser()
baseline = " ".join(("rsa", VALID_CONTENT["rsa"], "user@host"))
myopts = "no-port-forwarding,no-agent-forwarding"
key = parser.parse("allowedopt" + " " + baseline)
assert key.options == "allowedopt"
key = parser.parse("overridden_opt " + baseline, options=myopts)
assert key.options == myopts
def test_parse_invalid_keytype(self):
parser = ssh_util.AuthKeyLineParser()
key = parser.parse(" ".join(["badkeytype", VALID_CONTENT["rsa"]]))
assert not key.valid()
class TestUpdateAuthorizedKeys:
@pytest.mark.parametrize(
"new_entries",
[
(
[
" ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")),
]
),
pytest.param(
[
" ".join(("rsa", VALID_CONTENT["rsa"], "new_comment1")),
"xxx-invalid-thing1",
"xxx-invalid-blob2",
],
id="skip-invalid-entries",
),
],
)
def test_new_keys_replace(self, new_entries):
"""new entries with the same base64 should replace old."""
orig_entries = [
" ".join(("rsa", VALID_CONTENT["rsa"], "orig_comment1")),
" ".join(("dsa", VALID_CONTENT["dsa"], "orig_comment2")),
]
expected = "\n".join([new_entries[0], orig_entries[1]]) + "\n"
parser = ssh_util.AuthKeyLineParser()
found = ssh_util.update_authorized_keys(
[parser.parse(p) for p in orig_entries],
[parser.parse(p) for p in new_entries],
)
assert expected == found
@mock.patch(M_PATH + "util.load_file")
@mock.patch(M_PATH + "os.path.isfile")
class TestParseSSHConfig:
@pytest.mark.parametrize(
"is_file, file_content",
[
pytest.param(True, ("",), id="empty-file"),
pytest.param(False, IOError, id="not-a-file"),
],
)
def test_dummy_file(self, m_is_file, m_load_file, is_file, file_content):
m_is_file.return_value = is_file
m_load_file.side_effect = file_content
ret = ssh_util.parse_ssh_config("notmatter")
assert [] == ret
@pytest.mark.parametrize(
"file_content",
[
pytest.param(["# This is a comment"], id="comment_line"),
pytest.param(
["# This is a comment", "# This is another comment"],
id="two-comment_lines",
),
],
)
def test_comment_line(self, m_is_file, m_load_file, file_content):
m_is_file.return_value = True
m_load_file.return_value = "\n".join(file_content)
ret = ssh_util.parse_ssh_config("some real file")
assert len(file_content) == len(ret)
assert file_content[0] == ret[0].line
def test_blank_lines(self, m_is_file, m_load_file):
m_is_file.return_value = True
lines = ["", "\t", " "]
m_load_file.return_value = "\n".join(lines)
ret = ssh_util.parse_ssh_config("some real file")
assert len(lines) == len(ret)
for line in ret:
assert "" == line.line
@pytest.mark.parametrize(
"file_content, expected_key, expected_value",
[
pytest.param("foo bar", "foo", "bar", id="lower-case"),
pytest.param("Foo Bar", "foo", "Bar", id="upper-case"),
pytest.param("foo=bar", "foo", "bar", id="lower-case-with-equals"),
pytest.param("Foo=bar", "foo", "bar", id="upper-case-with-equals"),
],
)
def test_case_config(
self,
m_is_file,
m_load_file,
file_content,
expected_key,
expected_value,
):
m_is_file.return_value = True
m_load_file.return_value = file_content
ret = ssh_util.parse_ssh_config("some real file")
assert 1 == len(ret)
assert expected_key == ret[0].key
assert expected_value == ret[0].value
def test_duplicated_keys(self, m_is_file, m_load_file):
file_content = [
"HostCertificate /data/ssh/ssh_host_rsa_cert",
"HostCertificate /data/ssh/ssh_host_ed25519_cert",
]
m_is_file.return_value = True
m_load_file.return_value = "\n".join(file_content)
ret = ssh_util.parse_ssh_config("some real file")
assert len(file_content) == len(ret)
for i in range(len(file_content)):
assert file_content[i] == ret[i].line
class TestUpdateSshConfigLines:
"""Test the update_ssh_config_lines method."""
exlines = [
"#PasswordAuthentication yes",
"UsePAM yes",
"# Comment line",
"AcceptEnv LANG LC_*",
"X11Forwarding no",
]
pwauth = "PasswordAuthentication"
def check_line(self, line, opt, val):
assert line.key == opt.lower()
assert line.value == val
assert opt in str(line)
assert val in str(line)
@pytest.mark.parametrize(
"key, value",
[
pytest.param("MyKey", "MyVal", id="new_option_added"),
pytest.param(
pwauth, "no", id="commented_out_not_updated_but_appended"
),
],
)
def test_update_ssh_config_lines(self, key, value):
lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
result = ssh_util.update_ssh_config_lines(lines, {key: value})
assert [key] == result
self.check_line(lines[-1], key, value)
def test_option_without_value(self):
"""Implementation only accepts key-value pairs."""
extended_exlines = self.exlines.copy()
denyusers_opt = "DenyUsers"
extended_exlines.append(denyusers_opt)
lines = ssh_util.parse_ssh_config_lines(list(extended_exlines))
assert denyusers_opt not in str(lines)
def test_single_option_updated(self):
"""A single update should have change made and line updated."""
opt, val = ("UsePAM", "no")
lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
result = ssh_util.update_ssh_config_lines(lines, {opt: val})
assert [opt] == result
self.check_line(lines[1], opt, val)
def test_multiple_updates_with_add(self):
"""Verify multiple updates some added some changed, some not."""
updates = {
"UsePAM": "no",
"X11Forwarding": "no",
"NewOpt": "newval",
"AcceptEnv": "LANG ADD LC_*",
}
lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
result = ssh_util.update_ssh_config_lines(lines, updates)
assert set(["UsePAM", "NewOpt", "AcceptEnv"]) == set(result)
self.check_line(lines[3], "AcceptEnv", updates["AcceptEnv"])
def test_return_empty_if_no_changes(self):
"""If there are no changes, then return should be empty list."""
updates = {"UsePAM": "yes"}
lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
result = ssh_util.update_ssh_config_lines(lines, updates)
assert [] == result
assert self.exlines == [str(line) for line in lines]
def test_keycase_not_modified(self):
"""Original case of key should not be changed on update.
This behavior is to keep original config as much intact as can be."""
updates = {"usepam": "no"}
lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
result = ssh_util.update_ssh_config_lines(lines, updates)
assert ["usepam"] == result
assert "UsePAM no" == str(lines[1])
class TestUpdateSshConfig:
cfgdata = "\n".join(["#Option val", "MyKey ORIG_VAL", ""])
def test_modified(self, tmpdir):
mycfg = tmpdir.join("ssh_config_1")
util.write_file(mycfg, self.cfgdata)
ret = ssh_util.update_ssh_config({"MyKey": "NEW_VAL"}, mycfg)
assert True is ret
found = util.load_file(mycfg)
assert self.cfgdata.replace("ORIG_VAL", "NEW_VAL") == found
# assert there is a newline at end of file (LP: #1677205)
assert "\n" == found[-1]
def test_not_modified(self, tmpdir):
mycfg = tmpdir.join("ssh_config_2")
util.write_file(mycfg, self.cfgdata)
with patch("cloudinit.ssh_util.util.write_file") as m_write_file:
ret = ssh_util.update_ssh_config({"MyKey": "ORIG_VAL"}, mycfg)
assert False is ret
assert self.cfgdata == util.load_file(mycfg)
m_write_file.assert_not_called()
def test_without_include(self, tmpdir):
mycfg = tmpdir.join("sshd_config")
cfg = "X Y"
util.write_file(mycfg, cfg)
assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
assert "X Y\nkey value\n" == util.load_file(mycfg)
expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
assert not os.path.isfile(expected_conf_file)
@pytest.mark.parametrize(
"cfg",
["Include {mycfg}.d/*.conf", "Include {mycfg}.d/*.conf # comment"],
)
def test_with_include(self, cfg, tmpdir):
mycfg = tmpdir.join("sshd_config")
util.write_file(mycfg, cfg.format(mycfg=mycfg))
assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
assert os.path.isfile(expected_conf_file)
assert 0o600 == stat.S_IMODE(os.stat(expected_conf_file).st_mode)
assert "key value\n" == util.load_file(expected_conf_file)
def test_with_commented_include(self, tmpdir):
mycfg = tmpdir.join("sshd_config")
cfg = f"# Include {mycfg}.d/*.conf"
util.write_file(mycfg, cfg)
assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
assert f"{cfg}\nkey value\n" == util.load_file(mycfg)
expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
assert not os.path.isfile(expected_conf_file)
def test_with_other_include(self, tmpdir):
mycfg = tmpdir.join("sshd_config")
cfg = f"Include other_{mycfg}.d/*.conf"
util.write_file(mycfg, cfg)
assert ssh_util.update_ssh_config({"key": "value"}, mycfg)
assert f"{cfg}\nkey value\n" == util.load_file(mycfg)
expected_conf_file = f"{mycfg}.d/50-cloud-init.conf"
assert not os.path.isfile(expected_conf_file)
assert not os.path.isfile(f"other_{mycfg}.d/50-cloud-init.conf")
class TestAppendSshConfig:
cfgdata = "\n".join(["#Option val", "MyKey ORIG_VAL", ""])
@mock.patch(M_PATH + "_ensure_cloud_init_ssh_config_file")
def test_append_ssh_config(self, m_ensure_cloud_init_config_file, tmpdir):
mycfg = tmpdir.join("ssh_config")
util.write_file(mycfg, self.cfgdata)
m_ensure_cloud_init_config_file.return_value = str(mycfg)
ssh_util.append_ssh_config(
[("MyKey", "NEW_VAL"), ("MyKey", "NEW_VAL_2")], mycfg
)
found = util.load_file(mycfg)
expected_cfg = dedent(
"""\
#Option val
MyKey ORIG_VAL
MyKey NEW_VAL
MyKey NEW_VAL_2
"""
)
assert expected_cfg == found
# assert there is a newline at end of file (LP: #1677205)
assert "\n" == found[-1]
class TestBasicAuthorizedKeyParse:
@pytest.mark.parametrize(
"value, homedir, username, expected_rendered",
[
pytest.param(
"/opt/%u/keys",
"/home/bobby",
"bobby",
["/opt/bobby/keys"],
id="user",
),
pytest.param(
"/opt/%u",
"/home/bobby",
"bobby",
["/opt/bobby"],
id="user_file",
),
pytest.param(
"/opt/%u/%u",
"/home/bobby",
"bobby",
["/opt/bobby/bobby"],
id="user_file_2",
),
pytest.param(
"/keys/path1 /keys/path2",
"/home/bobby",
"bobby",
["/keys/path1", "/keys/path2"],
id="multiple",
),
pytest.param(
"/keys/path1 /keys/%u",
"/home/bobby",
"bobby",
["/keys/path1", "/keys/bobby"],
id="multiple_2",
),
pytest.param(
".secret/keys",
"/home/bobby",
"bobby",
["/home/bobby/.secret/keys"],
id="relative",
),
pytest.param(
"%h/.keys",
"/homedirs/bobby",
"bobby",
["/homedirs/bobby/.keys"],
id="home",
),
pytest.param(
"%h/.keys .secret/keys /keys/path1 /opt/%u/keys",
"/homedirs/bobby",
"bobby",
[
"/homedirs/bobby/.keys",
"/homedirs/bobby/.secret/keys",
"/keys/path1",
"/opt/bobby/keys",
],
id="all",
),
],
)
def test_render_authorizedkeysfile_paths(
self, value, homedir, username, expected_rendered
):
assert expected_rendered == ssh_util.render_authorizedkeysfile_paths(
value, homedir, username
)
class TestMultipleSshAuthorizedKeysFile:
def create_fake_users(
self,
names,
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
):
homes = []
root = str(tmpdir.join("root"))
fpw = FakePwEnt(pw_name="root", pw_dir=root)
users["root"] = fpw
for name in names:
home = str(tmpdir.join("home", name))
fpw = FakePwEnt(pw_name=name, pw_dir=home)
users[name] = fpw
homes.append(home)
m_get_permissions.side_effect = partial(
mock_get_permissions, mock_permissions
)
m_get_owner.side_effect = partial(mock_get_owner, mock_permissions)
m_get_group.side_effect = partial(mock_get_group, mock_permissions)
m_getpwnam.side_effect = partial(mock_getpwnam, users)
return homes
def create_user_authorized_file(self, home, filename, content_key, keys):
user_ssh_folder = os.path.join(home, ".ssh")
# /tmp/home/<user>/.ssh/authorized_keys = content_key
authorized_keys = str(os.path.join(user_ssh_folder, filename))
util.write_file(authorized_keys, VALID_CONTENT[content_key])
keys[authorized_keys] = content_key
return authorized_keys
def create_global_authorized_file(
self, filename, content_key, keys, tmpdir
):
authorized_keys = str(tmpdir.join(filename))
util.write_file(authorized_keys, VALID_CONTENT[content_key])
keys[authorized_keys] = content_key
return authorized_keys
def create_sshd_config(self, authorized_keys_files, tmpdir):
sshd_config = str(tmpdir.join("sshd_config"))
util.write_file(
sshd_config, "AuthorizedKeysFile " + authorized_keys_files
)
return sshd_config
def execute_and_check(self, user, sshd_config, solution, keys):
(auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys(
user, sshd_config
)
content = ssh_util.update_authorized_keys(auth_key_entries, [])
assert auth_key_fn == solution
for path, key in keys.items():
if path == solution:
assert VALID_CONTENT[key] in content
else:
assert VALID_CONTENT[key] not in content
@pytest.mark.parametrize("inverted", [False, True])
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_single_user_two_local_files(
self,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
inverted,
tmpdir,
):
user_bobby = "bobby"
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "user_keys"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
}
homes = self.create_fake_users(
[user_bobby],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home = homes[0]
# /tmp/home/bobby/.ssh/authorized_keys = rsa
authorized_keys = self.create_user_authorized_file(
home, "authorized_keys", "rsa", keys
)
# /tmp/home/bobby/.ssh/user_keys = dsa
user_keys = self.create_user_authorized_file(
home, "user_keys", "dsa", keys
)
# /tmp/sshd_config
if not inverted:
options = f"{authorized_keys} {user_keys}"
else:
options = f"{user_keys} {authorized_keys}"
sshd_config = self.create_sshd_config(options, tmpdir)
if not inverted:
exec_args = (user_bobby, sshd_config, authorized_keys, keys)
else:
exec_args = (user_bobby, sshd_config, user_keys, keys)
self.execute_and_check(*exec_args)
@pytest.mark.parametrize("inverted", [False, True])
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_single_user_local_global_files(
self,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
inverted,
tmpdir,
):
user_bobby = "bobby"
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "user_keys"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
}
homes = self.create_fake_users(
[user_bobby],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home = homes[0]
# /tmp/home/bobby/.ssh/authorized_keys = rsa
authorized_keys = self.create_user_authorized_file(
home, "authorized_keys", "rsa", keys
)
# /tmp/home/bobby/.ssh/user_keys = dsa
user_keys = self.create_user_authorized_file(
home, "user_keys", "dsa", keys
)
authorized_keys_global = self.create_global_authorized_file(
"etc/ssh/authorized_keys", "ecdsa", keys, tmpdir
)
if not inverted:
options = f"{authorized_keys_global} {user_keys} {authorized_keys}"
else:
options = f"{authorized_keys_global} {authorized_keys} {user_keys}"
sshd_config = self.create_sshd_config(options, tmpdir)
if not inverted:
exec_args = (user_bobby, sshd_config, user_keys, keys)
else:
exec_args = (user_bobby, sshd_config, authorized_keys, keys)
self.execute_and_check(*exec_args)
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_single_user_global_file(
self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
):
user_bobby = "bobby"
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
}
homes = self.create_fake_users(
[user_bobby],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home = homes[0]
# /tmp/etc/ssh/authorized_keys = rsa
authorized_keys_global = self.create_global_authorized_file(
"etc/ssh/authorized_keys", "rsa", keys, tmpdir
)
options = "%s" % authorized_keys_global
sshd_config = self.create_sshd_config(options, tmpdir)
default = "%s/.ssh/authorized_keys" % home
self.execute_and_check(user_bobby, sshd_config, default, keys)
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_local_file_standard(
self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): (
"suzie",
"suzie",
0o600,
),
}
user_bobby = "bobby"
user_suzie = "suzie"
homes = self.create_fake_users(
[user_bobby, user_suzie],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home_bobby = homes[0]
home_suzie = homes[1]
# /tmp/home/bobby/.ssh/authorized_keys = rsa
authorized_keys = self.create_user_authorized_file(
home_bobby, "authorized_keys", "rsa", keys
)
# /tmp/home/suzie/.ssh/authorized_keys = rsa
authorized_keys2 = self.create_user_authorized_file(
home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
)
options = ".ssh/authorized_keys"
sshd_config = self.create_sshd_config(options, tmpdir)
self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_local_file_custom(
self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh", "authorized_keys2"): (
"suzie",
"suzie",
0o600,
),
}
user_bobby = "bobby"
user_suzie = "suzie"
homes = self.create_fake_users(
[user_bobby, user_suzie],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home_bobby = homes[0]
home_suzie = homes[1]
# /tmp/home/bobby/.ssh/authorized_keys2 = rsa
authorized_keys = self.create_user_authorized_file(
home_bobby, "authorized_keys2", "rsa", keys
)
# /tmp/home/suzie/.ssh/authorized_keys2 = rsa
authorized_keys2 = self.create_user_authorized_file(
home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys
)
options = ".ssh/authorized_keys2"
sshd_config = self.create_sshd_config(options, tmpdir)
self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_local_global_files(
self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, tmpdir
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "bobby", ".ssh", "user_keys3"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh", "authorized_keys2"): (
"suzie",
"suzie",
0o600,
),
tmpdir.join("home", "suzie", ".ssh", "user_keys3"): (
"suzie",
"suzie",
0o600,
),
}
user_bobby = "bobby"
user_suzie = "suzie"
homes = self.create_fake_users(
[user_bobby, user_suzie],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home_bobby = homes[0]
home_suzie = homes[1]
# /tmp/home/bobby/.ssh/authorized_keys2 = rsa
self.create_user_authorized_file(
home_bobby, "authorized_keys2", "rsa", keys
)
# /tmp/home/bobby/.ssh/user_keys3 = dsa
user_keys = self.create_user_authorized_file(
home_bobby, "user_keys3", "dsa", keys
)
# /tmp/home/suzie/.ssh/authorized_keys2 = rsa
authorized_keys2 = self.create_user_authorized_file(
home_suzie, "authorized_keys2", "ssh-xmss@openssh.com", keys
)
# /tmp/etc/ssh/authorized_keys = ecdsa
authorized_keys_global = self.create_global_authorized_file(
"etc/ssh/authorized_keys2", "ecdsa", keys, tmpdir
)
options = "%s %s %%h/.ssh/authorized_keys2" % (
authorized_keys_global,
user_keys,
)
sshd_config = self.create_sshd_config(options, tmpdir)
self.execute_and_check(user_bobby, sshd_config, user_keys, keys)
self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
@patch("cloudinit.util.get_user_groups")
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_local_global_files_badguy(
self,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
m_get_user_groups,
tmpdir,
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys2"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "bobby", ".ssh", "user_keys3"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "badguy"): ("root", "root", 0o755),
tmpdir.join("home", "badguy", "home"): ("root", "root", 0o755),
tmpdir.join("home", "badguy", "home", "bobby"): (
"root",
"root",
0o655,
),
}
user_bobby = "bobby"
user_badguy = "badguy"
home_bobby, *_ = self.create_fake_users(
[user_bobby, user_badguy],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
m_get_user_groups.side_effect = mock_get_user_groups
# /tmp/home/bobby/.ssh/authorized_keys2 = rsa
authorized_keys = self.create_user_authorized_file(
home_bobby, "authorized_keys2", "rsa", keys
)
# /tmp/home/bobby/.ssh/user_keys3 = dsa
user_keys = self.create_user_authorized_file(
home_bobby, "user_keys3", "dsa", keys
)
# /tmp/home/badguy/home/bobby = ""
authorized_keys2 = str(tmpdir.join("home", "badguy", "home", "bobby"))
util.write_file(authorized_keys2, "")
# /tmp/etc/ssh/authorized_keys = ecdsa
authorized_keys_global = self.create_global_authorized_file(
"etc/ssh/authorized_keys2", "ecdsa", keys, tmpdir
)
# /tmp/sshd_config
options = "%s %%h/.ssh/authorized_keys2 %s %s" % (
authorized_keys2,
authorized_keys_global,
user_keys,
)
sshd_config = self.create_sshd_config(options, tmpdir)
self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
self.execute_and_check(
user_badguy, sshd_config, authorized_keys2, keys
)
@patch("cloudinit.util.get_user_groups")
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_unaccessible_file(
self,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
m_get_user_groups,
tmpdir,
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("etc"): ("root", "root", 0o755),
tmpdir.join("etc", "ssh"): ("root", "root", 0o755),
tmpdir.join("etc", "ssh", "userkeys"): ("root", "root", 0o700),
tmpdir.join("etc", "ssh", "userkeys", "bobby"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("etc", "ssh", "userkeys", "badguy"): (
"badguy",
"badguy",
0o600,
),
tmpdir.join("home", "badguy"): ("badguy", "badguy", 0o700),
tmpdir.join("home", "badguy", ".ssh"): ("badguy", "badguy", 0o700),
tmpdir.join("home", "badguy", ".ssh", "authorized_keys"): (
"badguy",
"badguy",
0o600,
),
}
user_bobby = "bobby"
user_badguy = "badguy"
homes = self.create_fake_users(
[user_bobby, user_badguy],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
m_get_user_groups.side_effect = mock_get_user_groups
home_bobby = homes[0]
home_badguy = homes[1]
# /tmp/home/bobby/.ssh/authorized_keys = rsa
authorized_keys = self.create_user_authorized_file(
home_bobby, "authorized_keys", "rsa", keys
)
# /tmp/etc/ssh/userkeys/bobby = dsa
# assume here that we can bypass userkeys, despite permissions
self.create_global_authorized_file(
"etc/ssh/userkeys/bobby", "dsa", keys, tmpdir
)
# /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com
authorized_keys2 = self.create_user_authorized_file(
home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys
)
# /tmp/etc/ssh/userkeys/badguy = ecdsa
self.create_global_authorized_file(
"etc/ssh/userkeys/badguy", "ecdsa", keys, tmpdir
)
# /tmp/sshd_config
options = str(
tmpdir.join("etc", "ssh", "userkeys", "%u .ssh", "authorized_keys")
)
sshd_config = self.create_sshd_config(options, tmpdir)
self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
self.execute_and_check(
user_badguy, sshd_config, authorized_keys2, keys
)
@patch("cloudinit.util.get_user_groups")
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_accessible_file(
self,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
m_get_user_groups,
tmpdir,
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("etc"): ("root", "root", 0o755),
tmpdir.join("etc", "ssh"): ("root", "root", 0o755),
tmpdir.join("etc", "ssh", "userkeys"): ("root", "root", 0o755),
tmpdir.join("etc", "ssh", "userkeys", "bobby"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("etc", "ssh", "userkeys", "badguy"): (
"badguy",
"badguy",
0o600,
),
tmpdir.join("home", "badguy"): ("badguy", "badguy", 0o700),
tmpdir.join("home", "badguy", ".ssh"): ("badguy", "badguy", 0o700),
tmpdir.join("home", "badguy", ".ssh", "authorized_keys"): (
"badguy",
"badguy",
0o600,
),
}
user_bobby = "bobby"
user_badguy = "badguy"
homes = self.create_fake_users(
[user_bobby, user_badguy],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
m_get_user_groups.side_effect = mock_get_user_groups
home_bobby = homes[0]
home_badguy = homes[1]
# /tmp/home/bobby/.ssh/authorized_keys = rsa
self.create_user_authorized_file(
home_bobby, "authorized_keys", "rsa", keys
)
# /tmp/etc/ssh/userkeys/bobby = dsa
# assume here that we can bypass userkeys, despite permissions
authorized_keys = self.create_global_authorized_file(
"etc/ssh/userkeys/bobby", "dsa", keys, tmpdir
)
# /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com
self.create_user_authorized_file(
home_badguy, "authorized_keys", "ssh-xmss@openssh.com", keys
)
# /tmp/etc/ssh/userkeys/badguy = ecdsa
authorized_keys2 = self.create_global_authorized_file(
"etc/ssh/userkeys/badguy", "ecdsa", keys, tmpdir
)
# /tmp/sshd_config
options = str(
tmpdir.join("etc", "ssh", "userkeys", "%u .ssh", "authorized_keys")
)
sshd_config = self.create_sshd_config(options, tmpdir)
self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
self.execute_and_check(
user_badguy, sshd_config, authorized_keys2, keys
)
@pytest.mark.parametrize("inverted", [False, True])
@patch("cloudinit.util.get_user_groups")
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_hardcoded_single_user_file(
self,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
m_get_user_groups,
inverted,
tmpdir,
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): (
"suzie",
"suzie",
0o600,
),
}
user_bobby = "bobby"
user_suzie = "suzie"
homes = self.create_fake_users(
[user_bobby, user_suzie],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home_bobby = homes[0]
home_suzie = homes[1]
m_get_user_groups.side_effect = mock_get_user_groups
# /tmp/home/bobby/.ssh/authorized_keys = rsa
authorized_keys = self.create_user_authorized_file(
home_bobby, "authorized_keys", "rsa", keys
)
# /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com
authorized_keys2 = self.create_user_authorized_file(
home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
)
# /tmp/sshd_config
if not inverted:
expected_keys = authorized_keys
else:
expected_keys = authorized_keys2
options = "%s" % (expected_keys)
sshd_config = self.create_sshd_config(options, tmpdir)
if not inverted:
expected_bobby = expected_keys
expected_suzie = "%s/.ssh/authorized_keys" % home_suzie
else:
expected_bobby = "%s/.ssh/authorized_keys" % home_bobby
expected_suzie = expected_keys
self.execute_and_check(user_bobby, sshd_config, expected_bobby, keys)
self.execute_and_check(user_suzie, sshd_config, expected_suzie, keys)
@patch("cloudinit.util.get_user_groups")
@patch("cloudinit.ssh_util.pwd.getpwnam")
@patch("cloudinit.util.get_permissions")
@patch("cloudinit.util.get_owner")
@patch("cloudinit.util.get_group")
def test_two_users_hardcoded_user_files(
self,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
m_get_user_groups,
tmpdir,
):
keys = {}
users = {}
mock_permissions = {
tmpdir.join("home", "bobby"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh"): ("bobby", "bobby", 0o700),
tmpdir.join("home", "bobby", ".ssh", "authorized_keys"): (
"bobby",
"bobby",
0o600,
),
tmpdir.join("home", "suzie"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh"): ("suzie", "suzie", 0o700),
tmpdir.join("home", "suzie", ".ssh", "authorized_keys"): (
"suzie",
"suzie",
0o600,
),
}
user_bobby = "bobby"
user_suzie = "suzie"
homes = self.create_fake_users(
[user_bobby, user_suzie],
mock_permissions,
m_get_group,
m_get_owner,
m_get_permissions,
m_getpwnam,
users,
tmpdir,
)
home_bobby = homes[0]
home_suzie = homes[1]
m_get_user_groups.side_effect = mock_get_user_groups
# /tmp/home/bobby/.ssh/authorized_keys = rsa
authorized_keys = self.create_user_authorized_file(
home_bobby, "authorized_keys", "rsa", keys
)
# /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com
authorized_keys2 = self.create_user_authorized_file(
home_suzie, "authorized_keys", "ssh-xmss@openssh.com", keys
)
# /tmp/etc/ssh/authorized_keys = ecdsa
authorized_keys_global = self.create_global_authorized_file(
"etc/ssh/authorized_keys", "ecdsa", keys, tmpdir
)
# /tmp/sshd_config
options = "%s %s %s" % (
authorized_keys_global,
authorized_keys,
authorized_keys2,
)
sshd_config = self.create_sshd_config(options, tmpdir)
self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys)
self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys)
# vi: ts=4 expandtab