#!/usr/bin/env python
import os
import re
import pickle
import json
import glob
import numpy as np
from abc import ABC, abstractmethod
from concurrent.futures import ProcessPoolExecutor
from contextlib import contextmanager
from collections import namedtuple, OrderedDict
from tqdm import tqdm
from .utils import img_to_jpeg_bytes, jpeg_bytes_to_img
ImgInfo = namedtuple('ImgInfo', ['loc',
'pad',
'length'])
class AbstractSerializer(ABC): # pragma: no cover
@abstractmethod
def load(self, file_name):
pass
@abstractmethod
def dump(self, thing, file_name):
pass
class PickleSerializer(AbstractSerializer):
def load(self, file_name):
with open(file_name, 'rb') as file_pointer:
return pickle.load(file_pointer)
def dump(self, thing, file_name):
with open(file_name, 'wb') as file_pointer:
pickle.dump(thing, file_pointer)
class JSONSerializer(AbstractSerializer):
def load(self, file_name):
with open(file_name, 'r') as file_pointer:
return json.load(file_pointer, object_pairs_hook=OrderedDict)
def dump(self, thing, file_name):
with open(file_name, 'w') as file_pointer:
json.dump(thing, file_pointer)
pickle_serializer = PickleSerializer()
json_serializer = JSONSerializer()
def extract_input_for_getitem(element):
if isinstance(element, tuple) and len(element) == 2:
id_, slice_ = element
elif isinstance(element, (int, str)):
id_, slice_ = element, None
else:
raise TypeError("Undefined input type! id or (id, slice) expected")
id_ = str(id_)
return id_, slice_
[docs]class GulpDirectory(object):
""" Represents a directory containing *.gulp and *.gmeta files.
Parameters
----------
output_dir: str
Path to the directory containing the files.
jpeg_decoder: callable that takes a JPEG stored as :py:class:`bytes` and returns
the desired decoded image format (e.g. np.ndarray)
Attributes
----------
all_meta_dicts: list of dicts
All meta dicts from all chunks as a list.
chunk_lookup: dict: int -> str
Mapping element id to chunk index.
chunk_objs_lookup: dict: int -> GulpChunk
Mapping element id to chunk index.
merged_meta_dict: dict: id -> meta dict
all meta dicts merged
"""
def __init__(self, output_dir, jpeg_decoder=jpeg_bytes_to_img):
self.output_dir = output_dir
self.jpeg_decoder = jpeg_decoder
self.chunk_objs_lookup = OrderedDict(zip(self._chunk_ids(), self._chunks()))
self.all_meta_dicts = [c.meta_dict for c in self.chunk_objs_lookup.values()]
self.num_chunks = len(self.chunk_objs_lookup)
self.chunk_lookup = {}
for chunk_id, chunk in self.chunk_objs_lookup.items():
for id_ in chunk.meta_dict:
self.chunk_lookup[id_] = chunk_id
self.merged_meta_dict = {}
for d in self.all_meta_dicts:
for k in d.keys():
assert k not in self.merged_meta_dict,\
"Duplicate id detected {}".format(k)
else:
self.merged_meta_dict.update(d)
def __iter__(self):
return iter(self.chunk_objs_lookup.values())
[docs] def chunks(self):
""" Return a generator over existing GulpChunk objects which are ready
to be opened and read from. """
return self.__iter__()
def _chunks(self):
return (GulpChunk(*paths, jpeg_decoder=self.jpeg_decoder) for paths in
self._existing_file_paths())
[docs] def new_chunks(self, total_new_chunks):
""" Return a generator over freshly setup GulpChunk objects which are ready
to be opened and written to.
Parameters
----------
total_new_chunks: int
The total number of new chunks to initialize.
"""
return ((GulpChunk(*paths, jpeg_decoder=self.jpeg_decoder) for paths in
self._allocate_new_file_paths(total_new_chunks)))
def __getitem__(self, element):
id_, _ = extract_input_for_getitem(element)
chunk_id = self.chunk_lookup[id_]
gulp_chunk = self.chunk_objs_lookup[chunk_id]
with gulp_chunk.open():
return gulp_chunk[element]
def _find_existing_data_paths(self):
return sorted(glob.glob(os.path.join(self.output_dir, 'data*.gulp')))
def _find_existing_meta_paths(self):
return sorted(glob.glob(os.path.join(self.output_dir, 'meta*.gmeta')))
def _load_label_dict(self):
return json.load(open(os.path.join(self.output_dir, 'label2idx.json'),
'rb'))
def _existing_file_paths(self):
data_paths = self._find_existing_data_paths()
meta_paths = self._find_existing_meta_paths()
assert len(data_paths) == len(meta_paths)
return zip(data_paths, meta_paths)
def _find_ids_from_paths(self, paths):
return [int(re.findall(r'\d+', os.path.basename(p))[0]) for p in paths]
def _chunk_ids(self):
data_paths = self._find_existing_data_paths()
meta_paths = self._find_existing_meta_paths()
data_ids = self._find_ids_from_paths(data_paths)
meta_ids = self._find_ids_from_paths(meta_paths)
assert data_ids == meta_ids
return data_ids
def _next_chunk_id(self):
existing_chunk_ids = self._chunk_ids()
next_chunk_id = 0
if len(existing_chunk_ids) > 0:
next_chunk_id = max([int(i) for i in existing_chunk_ids]) + 1
return next_chunk_id
def _allocate_new_file_paths(self, total_new_chunks):
next_chunk_id = self._next_chunk_id()
return [self._initialize_filenames(i)
for i in range(next_chunk_id,
next_chunk_id + total_new_chunks)]
def _initialize_filenames(self, chunk_id):
data_file_path = os.path.join(
self.output_dir, 'data_{}.gulp'.format(chunk_id))
meta_file_path = os.path.join(
self.output_dir, 'meta_{}.gmeta'.format(chunk_id))
return data_file_path, meta_file_path
[docs]class GulpChunk(object):
""" Represents a gulp chunk on disk.
Parameters
----------
data_file_path: str
Path to the *.gulp file.
meta_file_path: str
Path to the *.gmeta file.
serializer: subclass of AbstractSerializer
The type of serializer to use.
jpeg_decoder: callable that takes a JPEG stored as :py:class:`bytes` and returns
the desired decoded image format (e.g. np.ndarray)
"""
def __init__(self, data_file_path, meta_file_path,
serializer=json_serializer, jpeg_decoder=jpeg_bytes_to_img):
self.jpeg_decoder = jpeg_decoder
self.serializer = serializer
self.data_file_path = data_file_path
self.meta_file_path = meta_file_path
self.meta_dict = self._get_or_create_dict()
self._img_info = {}
self.fp = None
def __contains__(self, id_):
return str(id_) in self.meta_dict
def __getitem__(self, element):
id_, slice_ = extract_input_for_getitem(element)
return self.read_frames(id_, slice_)
def __iter__(self):
return self.iter_all()
def _get_frame_infos(self, id_):
id_ = str(id_)
if id_ in self.meta_dict:
return (self._get_or_create_img_info(id_),
self._copy_meta_data(id_))
def _copy_meta_data(self, id_):
return dict(self.meta_dict[id_]['meta_data'][0])
def _get_or_create_img_info(self, id_):
if id_ not in self._img_info:
self._img_info[id_] = [ImgInfo(*info) for info in self.meta_dict[id_]['frame_info']]
return self._img_info[id_]
def _get_or_create_dict(self):
if os.path.exists(self.meta_file_path):
return self.serializer.load(self.meta_file_path)
else:
return OrderedDict()
@staticmethod
def _default_factory():
return OrderedDict([('frame_info', []), ('meta_data', [])])
@staticmethod
def _pad_image(number):
return (4 - (number % 4)) % 4
def _append_meta(self, id_, meta_data):
id_ = str(id_)
if id_ not in self.meta_dict: # implements an OrderedDefaultDict
self.meta_dict[id_] = self._default_factory()
self.meta_dict[id_]['meta_data'].append(meta_data)
def _write_frame(self, id_, image):
loc = self.fp.tell()
img_str = img_to_jpeg_bytes(image)
assert len(img_str) > 0
pad = self._pad_image(len(img_str))
record = img_str.ljust(len(img_str) + pad, b'\0')
assert len(record) > 0
img_info = ImgInfo(loc=loc,
length=len(record),
pad=pad)
id_ = str(id_)
if id_ not in self.meta_dict: # implements an OrderedDefaultDict
self.meta_dict[id_] = self._default_factory()
self.meta_dict[id_]['frame_info'].append(img_info)
self.fp.write(record)
def _write_frames(self, id_, frames):
for frame in frames:
self._write_frame(id_, frame)
[docs] @contextmanager
def open(self, flag='rb'):
"""Open the gulp chunk for reading.
Parameters
----------
flag: str
'rb': Read binary
'wb': Write binary
'ab': Append to binary
Notes
-----
Works as a context manager but returns None.
"""
if flag in ['wb', 'rb', 'ab']:
self.fp = open(self.data_file_path, flag)
else:
m = "This file does not support the mode: '{}'".format(flag)
raise NotImplementedError(m)
yield
if flag in ['wb', 'ab']:
self.flush()
self.fp.close()
[docs] def flush(self):
"""Flush all buffers and write the meta file."""
self.fp.flush()
self.serializer.dump(self.meta_dict, self.meta_file_path)
[docs] def append(self, id_, meta_data, frames):
""" Append an item to the gulp.
Parameters
----------
id_ : str
The ID of the item
meta_data: dict
The meta-data associated with the item.
frames: list of numpy arrays
The frames of the item as a list of numpy dictionaries consisting
of image pixel values.
"""
self._append_meta(id_, meta_data)
self._write_frames(id_, frames)
[docs] def read_frames(self, id_, slice_=None):
""" Read frames for a single item.
Parameters
----------
id_: str
The ID of the item
slice_: slice or list of ints:
A slice or list of indices with which to select frames.
Returns
-------
frames (int), meta(dict)
The frames of the item as a list of numpy arrays consisting of
image pixel values. And the metadata.
"""
frame_infos, meta_data = self._get_frame_infos(id_)
slice_element = slice_ if slice_ is not None else slice(0, len(frame_infos))
def extract_frame(frame_info):
self.fp.seek(frame_info.loc)
record = self.fp.read(frame_info.length)
img_str = record[:len(record)-frame_info.pad]
img = self.jpeg_decoder(img_str)
return img
if isinstance(slice_element, (list, np.ndarray)):
selected_frame_infos = [frame_infos[idx] for idx in slice_element]
else:
selected_frame_infos = frame_infos[slice_element]
frames = [extract_frame(frame_info)
for frame_info in selected_frame_infos]
return frames, meta_data
[docs] def iter_all(self, accepted_ids=None, shuffle=False):
""" Iterate over all frames in the gulp.
Parameters
----------
accepted_ids: list of str
A filter for accepted ids.
shuffle: bool
Shuffle the items or not.
Returns
-------
iterator
An iterator that yield a series of frames,meta tuples. See
`read_frames` for details.
"""
ids = self.meta_dict.keys()
if accepted_ids is not None:
intersection = list(set(ids) & set(accepted_ids))
ids = [id_ for id_ in ids if id_ in intersection]
if shuffle:
ids = list(ids)
np.random.shuffle(ids)
with self.open('rb'):
for id_ in ids:
frames, meta = self.read_frames(id_)
yield frames, meta
[docs]class ChunkWriter(object):
"""Can write from an adapter to a gulp chunk.
Parameters
----------
adapter: subclass of AbstractDatasetAdapter
The adapter to get items from.
"""
def __init__(self, adapter):
self.adapter = adapter
[docs] def write_chunk(self, output_chunk, input_slice):
"""Write from an input slice in the adapter to an output chunk.
Parameters
----------
output_chunk: GulpChunk
The chunk to write to
input_slice: slice
The slice to use from the adapter.
"""
with output_chunk.open('wb'):
for video in self.adapter.iter_data(input_slice):
id_ = video['id']
meta_data = video['meta']
frames = video['frames']
if len(frames) > 0:
output_chunk.append(id_, meta_data, frames)
else:
print("Failed to write video with id: {}; no frames"
.format(id_))
def calculate_chunk_slices(items_per_chunk, num_items):
"""Calculate slices for indexing an adapter.
Parameters
----------
items_per_chunk: int
Approximate number of items per chunk.
num_items: int
Total number of items.
Returns
-------
list of slices
"""
assert items_per_chunk > 0
assert num_items > 0
return [slice(i, min(i + items_per_chunk, num_items))
for i in range(0, num_items, items_per_chunk)]
[docs]class GulpIngestor(object):
"""Ingest items from an adapter into an gulp chunks.
Parameters
----------
adapter: subclass of AbstractDatasetAdapter
The adapter to ingest from.
output_folder: str
The folder/directory to write to.
videos_per_chunk: int
The total number of items per chunk.
num_workers: int
The level of parallelism.
"""
def __init__(self, adapter, output_folder, videos_per_chunk, num_workers):
assert int(num_workers) > 0
self.adapter = adapter
self.output_folder = output_folder
self.videos_per_chunk = int(videos_per_chunk)
self.num_workers = int(num_workers)
def __call__(self):
os.makedirs(self.output_folder, exist_ok=True)
chunk_slices = calculate_chunk_slices(self.videos_per_chunk,
len(self.adapter))
gulp_directory = GulpDirectory(self.output_folder)
new_chunks = gulp_directory.new_chunks(len(chunk_slices))
chunk_writer = ChunkWriter(self.adapter)
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
result = executor.map(chunk_writer.write_chunk,
new_chunks,
chunk_slices)
for r in tqdm(result,
desc='Chunks finished',
unit='chunk',
dynamic_ncols=True,
total=len(chunk_slices)):
pass