blob: 3c65b948bdd6ef6d5ff5cc51708605dc57f8de6e [file] [log] [blame]
# Copyright (c) 2018 Intel Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
import re
import tempfile
import six
import udatetime
from vnfsdk_pkgtools.packager import utils
METADATA_KEYS = [ 'vnf_provider_id',
'vnf_product_name',
'vnf_release_data_time',
'vnf_package_version']
DIGEST_KEYS = [ 'Source', 'Algorithm', 'Hash' ]
SUPPORTED_HASH_ALGO = ['SHA-256', 'SHA-512']
NON_MANO_ARTIFACT_RE = re.compile(r'^[0-9a-z_-]+(\.[0-9a-z_-]+)*:$')
class ManifestException(Exception):
pass
class Manifest(object):
' Manifest file in CSAR package'
def __init__(self, root_path, manifest_path):
self.path = manifest_path
self.root = root_path
self.metadata = {}
# digest dict
# :key = source
# :value = (algorithm, hash)
self.digests = {}
# signature string, in CMS format
self.signature = None
# non_mano_artifact dict
# :key = set identifier
# :value = list of files
self.non_mano_artifacts = {}
self.blocks = [ ]
self._split_blocks()
self._parse_all_blocks()
@staticmethod
def __split_line(s):
remain=s
try:
(key, value)=s.split(':', 1)
value = value.strip()
remain = None
except ValueError:
key = None
value = None
return (key, value, remain)
def _split_blocks(self):
'''
Split manifest file into blocks, each block is seperated by a empty
line or a line with only spaces and tabs.
'''
block_content = [ ]
with open(os.path.join(self.root, self.path), 'rU') as fp:
for line in fp:
line = line.strip(' \t\n')
if line:
block_content.append(line)
else:
if len(block_content):
self.blocks.append(block_content)
block_content = []
if len(block_content):
self.blocks.append(block_content)
def _parse_all_blocks(self):
for block in self.blocks:
if block[0] == 'metadata:':
self.parse_metadata(block)
elif block[0] == 'non_mano_artifact_sets:':
self.parse_non_mano_artifacts(block)
elif '--BEGIN CMS--' in block[0]:
self.parse_cms(block)
else:
self.parse_digest(block)
if not self.metadata:
raise ManifestException("No metadata")
def parse_metadata(self, lines):
# Skip the first line
for line in lines[1:]:
(key, value, remain) = self.__split_line(line)
if key in METADATA_KEYS:
self.metadata[key] = value
else:
raise ManifestException("Unrecognized metadata %s:" % line)
#validate metadata keys
missing_keys = set(METADATA_KEYS) - set(self.metadata.keys())
if missing_keys:
raise ManifestException("Missing metadata keys: %s" % ','.join(missing_keys))
# validate vnf_release_data_time
try:
udatetime.from_string(self.metadata['vnf_release_data_time'])
except ValueError:
raise ManifestException("Non IETF RFC 3339 vnf_release_data_time: %s"
% self.metadata['vnf_release_data_time'])
def parse_cms(self, lines):
if '--END CMS--' not in lines[-1]:
raise ManifestException("Can NOT find end of sigature block")
self.signature = '\n'.join(lines)
def parse_digest(self, lines):
desc = {}
for line in lines:
(key, value, remain) = self.__split_line(line)
if key in DIGEST_KEYS:
desc[key] = value
else:
raise ManifestException("Unrecognized file digest line %s:" % line)
if key == 'Source':
self.digests[value] = (None, None)
elif key == 'Algorithm':
#validate algorithm
desc['Algorithm'] = desc['Algorithm'].upper()
if desc['Algorithm'] not in SUPPORTED_HASH_ALGO:
raise ManifestException("Unsupported hash algorithm: %s" % desc['Algorithm'])
#validate hash
if desc.get('Algorithm') and desc.get('Hash') and desc.get('Source'):
hash = utils.cal_file_hash(self.root, desc['Source'], desc['Algorithm'])
if hash != desc['Hash']:
raise ManifestException("Mismatched hash for file %s" % desc['Source'])
# nothing is wrong, let's store this and start a new round
self.digests[desc['Source']] = (desc['Algorithm'], desc['Hash'])
desc = {}
def parse_non_mano_artifacts(self, lines):
# Skip the first line
identifier = None
for line in lines[1:]:
if re.match(NON_MANO_ARTIFACT_RE, line):
# new non mano artifact identifier
identifier = line[:-1]
self.non_mano_artifacts[identifier] = []
else:
(key, value, remain) = self.__split_line(line)
if key == 'Source' and value and not remain and identifier:
# check for file existence
utils.check_file_dir(self.root, value)
self.non_mano_artifacts[identifier].append(value)
else:
raise ManifestException("Unrecogized non mano artifacts line %s:" % line)
def add_file(self, rel_path, algo='SHA256'):
'''Add file to the manifest and calculate the digest
'''
if algo:
algo = algo.upper()
if algo not in SUPPORTED_HASH_ALGO:
raise ManifestException("Unsupported hash algorithm: %s" % algo)
hash = utils.cal_file_hash(self.root, rel_path, algo)
else:
hash = None
self.digests[rel_path] = (algo, hash)
def return_as_string(self):
'''Return the manifest file content as a string
'''
ret = ""
# metadata
ret += "metadata:\n"
ret += "vnf_product_name: %s\n" % (self.metadata['vnf_product_name'])
ret += "vnf_provider_id: %s\n" % (self.metadata['vnf_provider_id'])
ret += "vnf_package_version: %s\n" % (self.metadata['vnf_package_version'])
ret += "vnf_release_data_time: %s\n" % (self.metadata['vnf_release_data_time'])
# non_mano_artifacts
if self.non_mano_artifacts:
ret += "\nnon_mano_artifact_sets:\n"
for (key, sources) in six.iteritems(self.non_mano_artifacts):
ret += key + ":\n"
for s in sources:
ret += "Source: %s\n" % s
# degist
for (key, digest) in six.iteritems(self.digests):
ret += "\n"
ret += "Source: %s\n" % key
if digest[0]:
ret += "Algorithm: %s\n" % digest[0]
ret += "Hash: %s\n" % digest[1]
if self.digests:
# empty line between digest and signature section
ret += "\n"
# signature
if self.signature:
ret += self.signature
return ret
def update_to_file(self, temporary=False):
content = self.return_as_string()
if temporary:
abs_path = tempfile.mktemp()
else:
abs_path = os.path.abspath(os.path.join(self.root, self.path))
with open(abs_path, 'w') as fp:
fp.write(content)
return abs_path
def save_to_temp_without_cms(self):
# we need to strip cms block with out changing the order of the
# file digest content before we verify the signature
skip = False
lines = []
with open(os.path.join(self.root, self.path), 'rU') as fp:
for line in fp:
if '--BEGIN CMS--' in line:
skip = True
elif '--END CMS--' in line:
skip = False
elif not skip:
lines.append(line)
content = ''.join(lines)
tmpfile = tempfile.NamedTemporaryFile(mode='w',delete=False)
tmpfile.write(content)
tmpfile.close()
return tmpfile.name