blob: c84dac86e0bbf042c2dbfca0baf1bf551bf5a063 [file] [log] [blame]
Milan Verespej2e1328a2019-06-18 13:40:08 +02001#! /usr/bin/env python
2# -*- coding: utf-8 -*-
3
4# COPYRIGHT NOTICE STARTS HERE
5
6# Copyright 2019 © Samsung Electronics Co., Ltd.
7#
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19
20# COPYRIGHT NOTICE ENDS HERE
21
22import concurrent.futures
23import logging
24from abc import ABC, abstractmethod
25
26from downloader import AbstractDownloader
27
28log = logging.getLogger(__name__)
29
30
31class ConcurrentDownloader(AbstractDownloader, ABC):
32 def __init__(self, list_type, *list_args, workers=None):
33 super().__init__(list_type, *list_args)
34 self._workers = workers
35
36 @abstractmethod
37 def _download_item(self, item):
38 """
39 Download item from list
40 :param item: item to be downloaded
41 """
42 pass
43
44 def download(self):
45 """
46 Download images concurrently from data lists.
47 """
48 if not self._initial_log():
49 return
50 items_left = len(self._missing)
51 try:
52 for _ in self.run_concurrent(self._download_item, self._missing.items()):
53 items_left -= 1
54 log.info('{} {} left to download.'.format(items_left, self._list_type))
55 except RuntimeError as err:
56 log.error('{} {} were not downloaded.'.format(items_left, self._list_type))
57 raise err
58
59 def run_concurrent(self, fn, iterable, *args):
60 """
61 Run function concurrently for iterable
62 :param fn: function to run
63 :param iterable: iterable to process
64 :param args: arguments for function (fn)
65 """
66 with concurrent.futures.ThreadPoolExecutor(max_workers=self._workers) as executor:
67 futures = [executor.submit(fn, item, *args) for item in iterable]
68 error_occurred = False
69
70 for future in concurrent.futures.as_completed(futures):
71 error = future.exception()
72 if error:
73 error_occurred = True
74 else:
75 yield
76 if error_occurred:
77 raise RuntimeError('One or more errors occurred')