# -*- coding:utf-8 -*- # # Copyright © 2009-2010 Pierre Raybaut # Licensed under the terms of the MIT License # (see spyderlib/__init__.py for details) """ Input/Output Utilities Note: 'load' functions has to return a dictionary from which a globals() namespace may be updated """ from __future__ import print_function import sys import os import tarfile import os.path as osp import shutil import warnings import json import inspect import dis # - If pandas fails to import here (for any reason), Spyder # will crash at startup (e.g. see Issue 2300) # - This also prevents Spyder to start IPython kernels # (see Issue 2456) try: import pandas as pd except: pd = None #analysis:ignore # Local imports from spyderlib.py3compat import pickle, to_text_string, getcwd, PY2 class MatlabStruct(dict): """ Matlab style struct, enhanced. Supports dictionary and attribute style access. Can be pickled, and supports code completion in a REPL. Examples ======== >>> from spyderlib.utils.iofuncs import MatlabStruct >>> a = MatlabStruct() >>> a.b = 'spam' # a["b"] == 'spam' >>> a.c["d"] = 'eggs' # a.c.d == 'eggs' >>> print(a) {'c': {'d': 'eggs'}, 'b': 'spam'} """ def __getattr__(self, attr): """Access the dictionary keys for unknown attributes.""" try: return self[attr] except KeyError: msg = "'MatlabStruct' object has no attribute %s" % attr raise AttributeError(msg) def __getitem__(self, attr): """ Get a dict value; create a MatlabStruct if requesting a submember. Do not create a key if the attribute starts with an underscore. """ if attr in self.keys() or attr.startswith('_'): return dict.__getitem__(self, attr) frame = inspect.currentframe() # step into the function that called us if frame.f_back.f_back and self._is_allowed(frame.f_back.f_back): dict.__setitem__(self, attr, MatlabStruct()) elif self._is_allowed(frame.f_back): dict.__setitem__(self, attr, MatlabStruct()) return dict.__getitem__(self, attr) def _is_allowed(self, frame): """Check for allowed op code in the calling frame""" allowed = [dis.opmap['STORE_ATTR'], dis.opmap['LOAD_CONST'], dis.opmap.get('STOP_CODE', 0)] bytecode = frame.f_code.co_code instruction = bytecode[frame.f_lasti + 3] instruction = ord(instruction) if PY2 else instruction return instruction in allowed __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ @property def __dict__(self): """Allow for code completion in a REPL""" return self.copy() def get_matlab_value(val): """ Extract a value from a Matlab file From the oct2py project, see http://pythonhosted.org/oct2py/conversions.html """ import numpy as np if not isinstance(val, np.ndarray): return val # check for objects if "'|O" in str(val.dtype) or "O'" in str(val.dtype): data = MatlabStruct() for key in val.dtype.fields.keys(): data[key] = get_matlab_value(val[key][0]) return data # handle cell arrays if val.dtype == np.object: if val.size == 1: val = val[0] if "'|O" in str(val.dtype) or "O'" in str(val.dtype): val = get_matlab_value(val) if isinstance(val, MatlabStruct): return val if val.size == 1: val = val.flatten() if val.dtype == np.object: if len(val.shape) > 2: val = val.T val = np.array([get_matlab_value(val[i].T) for i in range(val.shape[0])]) if len(val.shape) > 1: if len(val.shape) == 2: val = val.T try: return val.astype(val[0][0].dtype) except ValueError: # dig into the cell type for row in range(val.shape[0]): for i in range(val[row].size): if not np.isscalar(val[row][i]): if val[row][i].size > 1: val[row][i] = val[row][i].squeeze() else: val[row][i] = val[row][i][0] else: val = np.array([get_matlab_value(val[i]) for i in range(val.size)]) if len(val.shape) == 1 or val.shape[0] == 1 or val.shape[1] == 1: val = val.flatten() val = val.tolist() elif val.size == 1: if hasattr(val, 'flatten'): val = val.flatten()[0] if isinstance(val, MatlabStruct) and isinstance(val.size, MatlabStruct): del val['size'] del val['dtype'] return val try: import numpy as np try: with warnings.catch_warnings(): warnings.simplefilter("ignore") import scipy.io as spio except AttributeError: # Python 2.5: warnings.catch_warnings was introduced in Python 2.6 import scipy.io as spio # analysis:ignore except: spio = None if spio is None: load_matlab = None save_matlab = None else: def load_matlab(filename): try: out = spio.loadmat(filename) for key, value in list(out.items()): out[key] = get_matlab_value(value) return out, None except Exception as error: return None, str(error) def save_matlab(data, filename): try: spio.savemat(filename, data, oned_as='row') except Exception as error: return str(error) except: load_matlab = None save_matlab = None try: import numpy as np # analysis:ignore def load_array(filename): try: name = osp.splitext(osp.basename(filename))[0] data = np.load(filename) if hasattr(data, 'keys'): return data, None else: return {name: data}, None except Exception as error: return None, str(error) def __save_array(data, basename, index): """Save numpy array""" fname = basename + '_%04d.npy' % index np.save(fname, data) return fname except: load_array = None try: from spyderlib.pil_patch import Image if sys.byteorder == 'little': _ENDIAN = '<' else: _ENDIAN = '>' DTYPES = { "1": ('|b1', None), "L": ('|u1', None), "I": ('%si4' % _ENDIAN, None), "F": ('%sf4' % _ENDIAN, None), "I;16": ('|u2', None), "I;16S": ('%si2' % _ENDIAN, None), "P": ('|u1', None), "RGB": ('|u1', 3), "RGBX": ('|u1', 4), "RGBA": ('|u1', 4), "CMYK": ('|u1', 4), "YCbCr": ('|u1', 4), } def __image_to_array(filename): img = Image.open(filename) try: dtype, extra = DTYPES[img.mode] except KeyError: raise RuntimeError("%s mode is not supported" % img.mode) shape = (img.size[1], img.size[0]) if extra is not None: shape += (extra,) return np.array(img.getdata(), dtype=np.dtype(dtype)).reshape(shape) def load_image(filename): try: name = osp.splitext(osp.basename(filename))[0] return {name: __image_to_array(filename)}, None except Exception as error: return None, str(error) except: load_image = None def load_pickle(filename): """Load a pickle file as a dictionary""" try: if pd: return pd.read_pickle(filename), None else: with open(filename, 'rb') as fid: data = pickle.load(fid) return data, None except Exception as err: return None, str(err) def load_json(filename): """Load a json file as a dictionary""" try: with open(filename, 'rb') as fid: data = json.load(fid) return data, None except Exception as err: return None, str(err) def save_dictionary(data, filename): """Save dictionary in a single file .spydata file""" filename = osp.abspath(filename) old_cwd = getcwd() os.chdir(osp.dirname(filename)) error_message = None try: saved_arrays = {} if load_array is not None: # Saving numpy arrays with np.save arr_fname = osp.splitext(filename)[0] for name in list(data.keys()): if isinstance(data[name], np.ndarray) and data[name].size > 0: # Saving arrays at data root fname = __save_array(data[name], arr_fname, len(saved_arrays)) saved_arrays[(name, None)] = osp.basename(fname) data.pop(name) elif isinstance(data[name], (list, dict)): # Saving arrays nested in lists or dictionaries if isinstance(data[name], list): iterator = enumerate(data[name]) else: iterator = iter(list(data[name].items())) to_remove = [] for index, value in iterator: if isinstance(value, np.ndarray) and value.size > 0: fname = __save_array(value, arr_fname, len(saved_arrays)) saved_arrays[(name, index)] = osp.basename(fname) to_remove.append(index) for index in sorted(to_remove, reverse=True): data[name].pop(index) if saved_arrays: data['__saved_arrays__'] = saved_arrays pickle_filename = osp.splitext(filename)[0]+'.pickle' with open(pickle_filename, 'wb') as fdesc: pickle.dump(data, fdesc, 2) tar = tarfile.open(filename, "w") for fname in [pickle_filename]+[fn for fn in list(saved_arrays.values())]: tar.add(osp.basename(fname)) os.remove(fname) tar.close() if saved_arrays: data.pop('__saved_arrays__') except (RuntimeError, pickle.PicklingError, TypeError) as error: error_message = to_text_string(error) os.chdir(old_cwd) return error_message def load_dictionary(filename): """Load dictionary from .spydata file""" filename = osp.abspath(filename) old_cwd = getcwd() os.chdir(osp.dirname(filename)) data = None error_message = None try: tar = tarfile.open(filename, "r") tar.extractall() pickle_filename = osp.splitext(filename)[0]+'.pickle' try: # Old format (Spyder 2.0-2.1 for Python 2) with open(pickle_filename, 'U') as fdesc: data = pickle.loads(fdesc.read()) except (pickle.PickleError, TypeError, UnicodeDecodeError): # New format (Spyder >=2.2 for Python 2 and Python 3) with open(pickle_filename, 'rb') as fdesc: data = pickle.loads(fdesc.read()) saved_arrays = {} if load_array is not None: # Loading numpy arrays saved with np.save try: saved_arrays = data.pop('__saved_arrays__') for (name, index), fname in list(saved_arrays.items()): arr = np.load( osp.join(osp.dirname(filename), fname) ) if index is None: data[name] = arr elif isinstance(data[name], dict): data[name][index] = arr else: data[name].insert(index, arr) except KeyError: pass for fname in [pickle_filename]+[fn for fn in list(saved_arrays.values())]: os.remove(fname) except (EOFError, ValueError) as error: error_message = to_text_string(error) os.chdir(old_cwd) return data, error_message from spyderlib.baseconfig import get_conf_path, STDERR SAVED_CONFIG_FILES = ('inspector', 'onlinehelp', 'path', 'pylint.results', 'spyder.ini', 'temp.py', 'temp.spydata', 'template.py', 'history.py', 'history_internal.py', 'workingdir', '.projects', '.spyderproject', '.ropeproject', 'monitor.log', 'monitor_debug.log', 'rope.log') def reset_session(): """Remove all config files""" print("*** Reset Spyder settings to defaults ***", file=STDERR) for fname in SAVED_CONFIG_FILES: cfg_fname = get_conf_path(fname) if osp.isfile(cfg_fname): os.remove(cfg_fname) elif osp.isdir(cfg_fname): shutil.rmtree(cfg_fname) else: continue print("removing:", cfg_fname, file=STDERR) def save_session(filename): """Save Spyder session""" local_fname = get_conf_path(osp.basename(filename)) filename = osp.abspath(filename) old_cwd = getcwd() os.chdir(get_conf_path()) error_message = None try: tar = tarfile.open(local_fname, "w") for fname in SAVED_CONFIG_FILES: if osp.isfile(fname): tar.add(fname) tar.close() shutil.move(local_fname, filename) except Exception as error: error_message = to_text_string(error) os.chdir(old_cwd) return error_message def load_session(filename): """Load Spyder session""" filename = osp.abspath(filename) old_cwd = getcwd() os.chdir(osp.dirname(filename)) error_message = None renamed = False try: tar = tarfile.open(filename, "r") extracted_files = tar.getnames() # Rename original config files for fname in extracted_files: orig_name = get_conf_path(fname) bak_name = get_conf_path(fname+'.bak') if osp.isfile(bak_name): os.remove(bak_name) if osp.isfile(orig_name): os.rename(orig_name, bak_name) renamed = True tar.extractall() for fname in extracted_files: shutil.move(fname, get_conf_path(fname)) except Exception as error: error_message = to_text_string(error) if renamed: # Restore original config files for fname in extracted_files: orig_name = get_conf_path(fname) bak_name = get_conf_path(fname+'.bak') if osp.isfile(orig_name): os.remove(orig_name) if osp.isfile(bak_name): os.rename(bak_name, orig_name) finally: # Removing backup config files for fname in extracted_files: bak_name = get_conf_path(fname+'.bak') if osp.isfile(bak_name): os.remove(bak_name) os.chdir(old_cwd) return error_message from spyderlib.baseconfig import _ class IOFunctions(object): def __init__(self): self.load_extensions = None self.save_extensions = None self.load_filters = None self.save_filters = None self.load_funcs = None self.save_funcs = None def setup(self): iofuncs = self.get_internal_funcs()+self.get_3rd_party_funcs() load_extensions = {} save_extensions = {} load_funcs = {} save_funcs = {} load_filters = [] save_filters = [] load_ext = [] for ext, name, loadfunc, savefunc in iofuncs: filter_str = to_text_string(name + " (*%s)" % ext) if loadfunc is not None: load_filters.append(filter_str) load_extensions[filter_str] = ext load_funcs[ext] = loadfunc load_ext.append(ext) if savefunc is not None: save_extensions[filter_str] = ext save_filters.append(filter_str) save_funcs[ext] = savefunc load_filters.insert(0, to_text_string(_("Supported files")+" (*"+\ " *".join(load_ext)+")")) load_filters.append(to_text_string(_("All files (*.*)"))) self.load_filters = "\n".join(load_filters) self.save_filters = "\n".join(save_filters) self.load_funcs = load_funcs self.save_funcs = save_funcs self.load_extensions = load_extensions self.save_extensions = save_extensions def get_internal_funcs(self): return [ ('.spydata', _("Spyder data files"), load_dictionary, save_dictionary), ('.npy', _("NumPy arrays"), load_array, None), ('.npz', _("NumPy zip arrays"), load_array, None), ('.mat', _("Matlab files"), load_matlab, save_matlab), ('.csv', _("CSV text files"), 'import_wizard', None), ('.txt', _("Text files"), 'import_wizard', None), ('.jpg', _("JPEG images"), load_image, None), ('.png', _("PNG images"), load_image, None), ('.gif', _("GIF images"), load_image, None), ('.tif', _("TIFF images"), load_image, None), ('.pkl', _("Pickle files"), load_pickle, None), ('.pickle', _("Pickle files"), load_pickle, None), ('.json', _("JSON files"), load_json, None), ] def get_3rd_party_funcs(self): other_funcs = [] from spyderlib.otherplugins import get_spyderplugins_mods for mod in get_spyderplugins_mods(prefix='io_', extension='.py'): try: other_funcs.append((mod.FORMAT_EXT, mod.FORMAT_NAME, mod.FORMAT_LOAD, mod.FORMAT_SAVE)) except AttributeError as error: print("%s: %s" % (mod, str(error)), file=STDERR) return other_funcs def save(self, data, filename): ext = osp.splitext(filename)[1].lower() if ext in self.save_funcs: return self.save_funcs[ext](data, filename) else: return _("Unsupported file type '%s'") % ext def load(self, filename): ext = osp.splitext(filename)[1].lower() if ext in self.load_funcs: return self.load_funcs[ext](filename) else: return None, _("Unsupported file type '%s'") % ext iofunctions = IOFunctions() iofunctions.setup() def save_auto(data, filename): """Save data into filename, depending on file extension""" pass if __name__ == "__main__": from spyderlib.py3compat import u import datetime testdict = {'d': 1, 'a': np.random.rand(10, 10), 'b': [1, 2]} testdate = datetime.date(1945, 5, 8) example = {'str': 'kjkj kj k j j kj k jkj', 'unicode': u('éù'), 'list': [1, 3, [4, 5, 6], 'kjkj', None], 'tuple': ([1, testdate, testdict], 'kjkj', None), 'dict': testdict, 'float': 1.2233, 'array': np.random.rand(4000, 400), 'empty_array': np.array([]), 'date': testdate, 'datetime': datetime.datetime(1945, 5, 8), } import time t0 = time.time() save_dictionary(example, "test.spydata") print(" Data saved in %.3f seconds" % (time.time()-t0)) t0 = time.time() example2, ok = load_dictionary("test.spydata") print("Data loaded in %.3f seconds" % (time.time()-t0)) # for key in example: # print key, ":", example[key] == example2[key] a = MatlabStruct() a.b = 'spam' assert a["b"] == 'spam' a.c["d"] = 'eggs' assert a.c.d == 'eggs' assert a == {'c': {'d': 'eggs'}, 'b': 'spam'}