blob: a150db73ca9b8ab041cdd377be0b8c8629c4c3b0 [file] [log] [blame]
Milan Verespej2e1328a2019-06-18 13:40:08 +02001# -*- coding: utf-8 -*-
2
3# COPYRIGHT NOTICE STARTS HERE
4
5# Copyright 2019 © Samsung Electronics Co., Ltd.
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18
19# COPYRIGHT NOTICE ENDS HERE
20
21import concurrent.futures
22import logging
23from abc import ABC, abstractmethod
24
25from downloader import AbstractDownloader
26
27log = logging.getLogger(__name__)
28
29
30class ConcurrentDownloader(AbstractDownloader, ABC):
31 def __init__(self, list_type, *list_args, workers=None):
32 super().__init__(list_type, *list_args)
33 self._workers = workers
34
35 @abstractmethod
36 def _download_item(self, item):
37 """
38 Download item from list
39 :param item: item to be downloaded
40 """
41 pass
42
43 def download(self):
44 """
45 Download images concurrently from data lists.
46 """
47 if not self._initial_log():
48 return
49 items_left = len(self._missing)
50 try:
51 for _ in self.run_concurrent(self._download_item, self._missing.items()):
52 items_left -= 1
53 log.info('{} {} left to download.'.format(items_left, self._list_type))
54 except RuntimeError as err:
55 log.error('{} {} were not downloaded.'.format(items_left, self._list_type))
56 raise err
57
58 def run_concurrent(self, fn, iterable, *args):
59 """
60 Run function concurrently for iterable
61 :param fn: function to run
62 :param iterable: iterable to process
63 :param args: arguments for function (fn)
64 """
65 with concurrent.futures.ThreadPoolExecutor(max_workers=self._workers) as executor:
66 futures = [executor.submit(fn, item, *args) for item in iterable]
67 error_occurred = False
68
69 for future in concurrent.futures.as_completed(futures):
70 error = future.exception()
71 if error:
72 error_occurred = True
73 else:
74 yield
75 if error_occurred:
76 raise RuntimeError('One or more errors occurred')