재직중인 회사에서 RDS 환경에 대해 취약점을 진단해야하는 업무가 있어서 CCE 진단 자동화 Python 스크립트를 작성했다.
원격에서 RDS Enpoint로 직접 접속해서 쿼리문을 통해 진단하는 방식으로 개발했는데, 사실 DB의 경우 스크립트 수행만 가지고는 100% 정확한 취약점 진단이 불가능하다.
보통 CCE 취약점이란게 계정이나 권한은 어떻게 관리하는지, 불필요한 테이블정보는 없는지를 체크하는 체크리스트를 통해 확인하게 되는데, 규모가 어느정도 있는 회사에선 접근제어를 위한 솔루션이 필요하고 RDS 나 on-prem에서 관리되는 DB들은 이런 솔루션을 거쳐서 접근하게끔 아키텍쳐가 짜여있다. 쿼리 조회만으로는 이런 현황을 파악할 순 없다.. 따라서 자세한 정보는 별도로 확인하고 스크립트를 통해 조회한 정보는 심사 시 제출할 증적을 만들어낼 용도로 사용했다.
진단이 필요한 RDS 종류는 Aurora mysql 2~3 버전이라 각 버전별 특성을 고려해서 스크립트를 작성했다.
Aurora Mysql 2/3 진단 스크립트
class template():
def __init__(self) -> None:
self.data = {"info":[], "result":[]}
self.type = None
self.db_info = None
self.cursor = None
self.endpoint = None
self.CLUSTER_NAME = None
self.RESULT_FILE = ""
self.endpoint = None
self.user = None
self.pwd = None
self.endpoint_multi = []
self.user_multi = []
self.pwd_multi = []
self.put_s3 = None
def put_result(self, id, description, check, result):
if type(result) is list:
[{"user":"admin"},]
results = [value for inner_dict in result for value in inner_dict.values()]
result_str = '\n'.join(results)
a = {"id":id, "description":description, "check":check, "result":result_str}
self.data["result"].append(a)
else:
a = {"id":id, "description":description, "check":check, "result":result}
self.data["result"].append(a)
def put_info(self, info):
self.data["info"] = info
def get_query(self, sql: str):
self.cursor.execute(sql)
a = self.cursor.fetchall()
return a
def print_banner(self):
print("""
██████╗ ██████╗ ███████╗ ██████╗ ██████╗███████╗ ███████╗ ██████╗██████╗ ██╗██████╗ ████████╗
██╔══██╗██╔══██╗██╔════╝ ██╔════╝██╔════╝██╔════╝ ██╔════╝██╔════╝██╔══██╗██║██╔══██╗╚══██╔══╝
██████╔╝██║ ██║███████╗ ██║ ██║ █████╗ ███████╗██║ ██████╔╝██║██████╔╝ ██║
██╔══██╗██║ ██║╚════██║ ██║ ██║ ██╔══╝ ╚════██║██║ ██╔══██╗██║██╔═══╝ ██║
██║ ██║██████╔╝███████║ ╚██████╗╚██████╗███████╗ ███████║╚██████╗██║ ██║██║██║ ██║
╚═╝ ╚═╝╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝╚══════╝ ╚══════╝ ╚═════╝╚═╝ ╚═╝╚═╝╚═╝ ╚═╝
""")
def output_jsonflie(self):
self.RESULT_FILE = f"{self.CLUSTER_NAME}_{datetime.now().date()}_securitycheck.json"
with open(self.RESULT_FILE, 'w') as f:
json.dump(self.data, f, ensure_ascii=False)
def connect(self, user, pwd, endpoint):
self.CLUSTER_NAME = endpoint.split('.')[0]
try:
self.db_info = pymysql.connect (
user = user,
password = pwd,
host = endpoint,
db = 'mysql',
charset = 'utf8'
)
self.cursor = self.db_info.cursor(pymysql.cursors.DictCursor)
self.endpoint = endpoint
except pymysql.err.OperationalError as e:
if e.args[0] == 2003:
a = f"\033[91m [-] {endpoint} - 존재하지 않는 Endpoint 주소입니다.\033[0m\n Error Meesage : {e.args[1]}"
print(a)
sys.exit(1)
elif e.args[0] == 1045:
a = f"\033[91m [-] 계정정보가 일치하지 않습니다.\033[0m\n Error Meesage : {e.args[1]}"
print(a)
sys.exit(1)
def argparse(self):
parser = argparse.ArgumentParser(description="RDS 취약점 점검 스크립트")
parser.add_argument('--type','-t', type=int, default=0, help="0 : 단일 인스턴스 점검 / 1 : 멀티 인스턴스 점검(CSV)")
parser.add_argument('--host','-host', type=str, help="Endpoint 주소 또는 IP 입력")
parser.add_argument('--password','-p', type=str, help="패스워드 입력")
parser.add_argument('--user','-u', type=str, help="계정명 입력")
parser.add_argument('--file','-f', type=str, help="CSV 파일 절대경로 입력")
parser.add_argument('--s3', '-s', type=str, help="s3 파일 전송 시 사용, 버킷명 입력")
args = parser.parse_args()
self.type = args.type
self.put_s3 = args.s3
if args.type == 0 and args.host == None and args.password == None and args.user == None:
self.endpoint = input("Endpoint 주소입력: ")
self.user = input("계정명 입력: ")
self.pwd = getpass.getpass("패스워드 입력: ")
elif args.type == 1:
try:
with open(args.file, 'r', encoding='utf-8-sig') as f:
file = csv.reader(f)
for line in file:
self.endpoint_multi.append(line[0])
self.user_multi.append(line[1])
self.pwd_multi.append(line[2])
except FileNotFoundError as e:
print(f"\033[91m [-] {args.file} 파일을 찾을 수 없습니다.\033[0m")
sys.exit(1)
else:
self.endpoint = args.host
self.user = args.user
self.pwd = args.password
def send_s3(self):
s3 = boto3.client('s3')
res = s3.upload_file(os.getcwd()+"/"+self.RESULT_FILE, self.put_s3, self.RESULT_FILE)
class checklist(template):
def __init__(self) -> None:
super().__init__()
super().print_banner()
self.aurora_ver = None
self.mysql_ver = None
def HEAD(self):
self.get_query("select version()")
version1 = self.get_query("select version()")
version2 = self.get_query("select aurora_version()")
self.mysql_ver = version1[0]['version()']
self.aurora_ver = version2[0]['aurora_version()']
info = {"clustername":self.CLUSTER_NAME, "mysql_ver":self.mysql_ver, "aurora_ver":self.aurora_ver}
self.put_info(info)
def DBM001(self):
id = "DBM-001"
description = "취약하게 설정된 비밀번호 존재"
check = "인터뷰"
result = ""
sql = self.get_query("select user from mysql.user where account_locked='N'")
result += "[활성화된 계정]\n"
for user in sql:
result += user["user"] + "\n"
self.put_result(id, description, check, result)
template 클래스에 필요한 기능들을 작성했다. 제일 중요한 결과 수집 및 저장기능, 결과 관리를 위한 S3로 파일전송 기능, 단일 혹은 멀티 클러스터 동시 진단을 위한 기능 정도가 있다.
진단할 체크리스트 기준은 checklist 클래스를 만들어서 관리할 목적으로 작성했다. 체크리스트 항목별로 기준에 맞게 조회할 수 있는 쿼리를 작성하고, 결과를 파싱했다.
동작 테스트
RDS 클러스터를 하나 생성해서 테스트해봤더니 의도한 대로 클러스터 주소와 계정정보를 입력 시 결과가 잘 생성되었다. 수집된 결과는JSON 으로 저장된다. JSON 포맷으로 저장하면 SIEM으로 진단결과를 바로 보내줄 수도 있고, json 파싱 스크립트를 별도로 만들어서 결과를 관리하기 용이하다.
{
"info": {
"clustername": "database-1",
"mysql_ver": "5.7.12",
"aurora_ver": "2.11.2"
},
"result": [
{
"id": "DBM-001",
"description": "취약하게 설정된 비밀번호 존재",
"check": "인터뷰",
"result": "[활성화된 계정]\nadmin\nrdsadmin\n"
},
{
"id": "DBM-003",
"description": "업무상 불필요한 계정 존재",
"check": "Y",
"result": "[활성화된 계정]\nadmin\nrdsadmin\n"
},
{
"id": "DBM-004",
"description": "업무상 불필요하게 관리자 권한이 부여된 계정 존재",
"check": "Y",
"result": "\n# SHOW GRANTS FOR 'admin'@'%'\nGRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER, LOAD FROM S3, SELECT INTO S3, INVOKE LAMBDA, INVOKE SAGEMAKER, INVOKE COMPREHEND ON *.* TO 'admin'@'%' WITH GRANT OPTION\n\n# SHOW GRANTS FOR 'rdsadmin'@'localhost'\nGRANT ALL PRIVILEGES ON *.* TO 'rdsadmin'@'localhost' WITH GRANT OPTION\n"
},
{
"id": "DBM-005",
"description": "데이터베이스 내 중요정보 암호화 미적용",
"check": "인터뷰",
"result": ""
},
{
"id": "DBM-006",
"description": "로그인 실패 횟수에 따른 접속 제한 설정 미흡",
"check": "Y",
"result": "Mysql aurora 3(mysql v8.0.19) 부터 Login Attempts 기능 제공\n현재 버전: 2.11.2"
},
{
"id": "DBM-007",
"description": "비밀번호의 복잡도 정책 설정 미흡",
"check": "N",
"result": "비밀번호 복잡도 정책 설정 미존재"
},
{
"id": "DBM-008",
"description": "주기적인 비밀번호 변경 미흡",
"check": "Y",
"result": "admin\n - 패스워드 마지막 변경일 : 2023-08-06 14:32:19\n - 패스워드 사용기간 : None\nrdsadmin\n - 패스워드 마지막 변경일 : 2023-08-06 14:32:18\n - 패스워드 사용기간 : 0\n"
},
{
"id": "DBM-009",
"description": "사용되지 않는 세션 종료 미흡",
"check": "N",
"result": "interactive_timeout 설정 값 : 28800"
},
{
"id": "DBM-011",
"description": "감사 로그 수집 및 백업 미흡",
"check": "인터뷰",
"result": ""
},
{
"id": "DBM-013",
"description": "원격 접속에 대한 접근 제어 미흡",
"check": "N",
"result": "[User] : [접속 허용위치]\nadmin : %\nrdsadmin : localhost\n"
},
{
"id": "DBM-016",
"description": "최신 보안패치와 벤더 권고사항 미적용",
"check": "인터뷰",
"result": "Aurora Mysql Version : 2.11.2\nMysql Version : 5.7.12"
},
{
"id": "DBM-017",
"description": "업무상 불필요한 시스템 테이블 접근 권한 존재",
"check": "인터뷰",
"result": "\n# SHOW GRANTS FOR 'admin'@'%'\nGRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER, LOAD FROM S3, SELECT INTO S3, INVOKE LAMBDA, INVOKE SAGEMAKER, INVOKE COMPREHEND ON *.* TO 'admin'@'%' WITH GRANT OPTION\n\n# SHOW GRANTS FOR 'rdsadmin'@'localhost'\nGRANT ALL PRIVILEGES ON *.* TO 'rdsadmin'@'localhost' WITH GRANT OPTION\n"
},
{
"id": "DBM-019",
"description": "비밀번호 재사용 방지 설정 미흡",
"check": "Y",
"result": "Mysql aurora 3(mysql v8.0.19) 부터 비밀번호 재사용 방지 기능 제공\n현재 버전: 2.11.2"
},
{
"id": "DBM-020",
"description": "사용자별 계정 분리 미흡",
"check": "인터뷰",
"result": "[활성화된 계정]\nadmin\nrdsadmin\n"
},
{
"id": "DBM-024",
"description": "불필요하게 WITH GRANT OPTION 옵션이 설정된 권한 존재",
"check": "인터뷰",
"result": "\n# SHOW GRANTS FOR 'admin'@'%'\nGRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER, LOAD FROM S3, SELECT INTO S3, INVOKE LAMBDA, INVOKE SAGEMAKER, INVOKE COMPREHEND ON *.* TO 'admin'@'%' WITH GRANT OPTION\n\n# SHOW GRANTS FOR 'rdsadmin'@'localhost'\nGRANT ALL PRIVILEGES ON *.* TO 'rdsadmin'@'localhost' WITH GRANT OPTION\n"
},
{
"id": "DBM-025",
"description": "서비스 지원이 종료된(EoS) 데이터 베이스 사용",
"check": "Y",
"result": "Aurora Mysql Version : 2.11.2\nMysql Version : 5.7.12"
},
{
"id": "DBM-028",
"description": "업무상 불필요한 데이터베이스 Object 존재",
"check": "Y",
"result": "[DB명]:[테이블명]:[컬럼명]\n"
}
]
}
온프렘환경에서 스크립트를 통해 서버 취약점을 진단하듯이 RDS 환경도 비슷한 방법으로 진단해봤는데, 충분한 권한이 있는 계정정보를 알아야 하고 RDS에 접근할 수 있도록 보안그룹도 다시 셋팅이 필요해서 서버 진단과 달리 여간 귀찮은게 한두가지가 아니였다..
이런 부분도 얼른 국내리전에서 AWS Native 서비스로 지원해줬으면 좋을 것 같다는 생각이 든다. AWS Inspector 에서 충분히 가능하지 않을까? 나와 같은 보이스가 많아져서 국내 인증요건에 맞는 CCE 부분도 지원되었으면 하는 바램이 있다.
'AWS' 카테고리의 다른 글
Organization 환경에서 AWS Config 적용 (0) | 2023.08.08 |
---|---|
Multi-Account 환경에서 사용중인 서비스 조회 (0) | 2023.08.06 |
AWS Config 테라폼 모듈 구현 (0) | 2023.08.05 |