blob: d819a70ab6d5246441fa93d03d137e12eb0631dd [file] [log] [blame]
# Copyright (c) 2018 Intel Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from collections import namedtuple
import os
import tempfile
import udatetime
from vnfsdk_pkgtools.packager import utils
METADATA_KEYS = [ 'vnf_provider_id',
'vnf_product_name',
'vnf_release_data_time',
'vnf_package_version']
DIGEST_KEYS = [ 'Source', 'Algorithm', 'Hash' ]
SUPPORTED_HASH_ALGO = ['SHA256', 'SHA512']
class ManifestException(Exception):
pass
class Manifest(object):
' Manifest file in CSAR package'
def __init__(self, root_path, manifest_path):
self.path = manifest_path
self.root = root_path
self.metadata = {}
# digest dict
# :key = source
# :value = (algorithm, hash)
self.digests = {}
self.signature = None
self.blocks = [ ]
self._split_blocks()
self._parse_blocks()
@staticmethod
def __split_line(s):
remain=s
try:
(key, value)=s.split(':', 1)
value = value.strip()
remain = None
except ValueError:
key = None
value = None
return (key, value, remain)
def _split_blocks(self):
'''
Split manifest file into blocks, each block is seperated by a empty
line or a line with only spaces and tabs.
'''
block_content = [ ]
with open(os.path.join(self.root, self.path),'rU') as fp:
for line in fp:
line = line.strip(' \t\n')
if line:
block_content.append(line)
else:
if len(block_content):
self.blocks.append(block_content)
block_content = []
if len(block_content):
self.blocks.append(block_content)
def _parse_blocks(self):
for block in self.blocks:
(key, value, remain) = self.__split_line(block.pop(0))
if key == 'metadata':
# metadata block
for line in block:
(key, value, remain) = self.__split_line(line)
if key in METADATA_KEYS:
self.metadata[key] = value
else:
raise ManifestException("Unrecognized metadata %s:" % line)
#validate metadata keys
missing_keys = set(METADATA_KEYS) - set(self.metadata.keys())
if missing_keys:
raise ManifestException("Missing metadata keys: %s" % ','.join(missing_keys))
# validate vnf_release_data_time
try:
udatetime.from_string(self.metadata['vnf_release_data_time'])
except ValueError:
raise ManifestException("Non IETF RFC 3339 vnf_release_data_time: %s"
% self.metadata['vnf_release_data_time'])
elif key in DIGEST_KEYS:
# file digest block
desc = {}
desc[key] = value
for line in block:
(key, value, remain) = self.__split_line(line)
if key in DIGEST_KEYS:
desc[key] = value
else:
raise ManifestException("Unrecognized file digest line %s:" % line)
# validate file digest keys
missing_keys = set(DIGEST_KEYS) - set(desc.keys())
if missing_keys:
raise ManifestException("Missing file digest keys: %s" % ','.join(missing_keys))
# validate file digest algo
desc['Algorithm'] = desc['Algorithm'].upper()
if desc['Algorithm'] not in SUPPORTED_HASH_ALGO:
raise ManifestException("Unsupported hash algorithm: %s" % desc['Algorithm'])
# validate file digest hash
# TODO need to support remote file
if "://" not in desc['Source']:
hash = utils.cal_file_hash(self.root, desc['Source'], desc['Algorithm'])
if hash != desc['Hash']:
raise ManifestException("Mismatched hash for file %s" % desc['Source'])
# nothing is wrong, let's store this
self.digests[desc['Source']] = (desc['Algorithm'], desc['Hash'])
elif key:
raise ManifestException("Unknown key in line '%s:%s'" % (key, value))
else:
# TODO signature block
pass
if not self.metadata:
raise ManifestException("No metadata")
def add_file(self, rel_path, algo='SHA256'):
'''Add file to the manifest and calculate the digest
'''
algo = algo.upper()
if algo not in SUPPORTED_HASH_ALGO:
raise ManifestException("Unsupported hash algorithm: %s" % algo)
hash = utils.cal_file_hash(self.root, rel_path, algo)
self.digests[rel_path] = (algo, hash)
def return_as_string(self):
'''Return the manifest file content as a string
'''
ret = ""
# metadata
ret += "metadata:\n"
ret += "vnf_product_name: %s\n" % (self.metadata['vnf_product_name'])
ret += "vnf_provider_id: %s\n" % (self.metadata['vnf_provider_id'])
ret += "vnf_package_version: %s\n" % (self.metadata['vnf_package_version'])
ret += "vnf_release_data_time: %s\n" % (self.metadata['vnf_release_data_time'])
# degist
for (key, digest) in self.digests.iteritems():
ret += "\n"
ret += "Source: %s\n" % key
ret += "Algorithm: %s\n" % digest[0]
ret += "Hash: %s\n" % digest[1]
return ret
def update_to_file(self, temporary=False):
content = self.return_as_string()
if temporary:
abs_path = tempfile.mktemp()
else:
abs_path = os.path.abspath(os.path.join(self.root, self.path))
with open(abs_path, 'w') as fp:
fp.write(content)
return abs_path