from __future__ import print_function, division, absolute_import

from functools import partial
from io import BytesIO
from warnings import warn

import numpy as np
import pandas as pd

from ..delayed import delayed
from .io import from_delayed

from ..bytes import read_bytes
from ..bytes.compression import seekable_files, files as cfiles


delayed = delayed(pure=True)


def bytes_read_csv(b, header, kwargs, dtypes=None, columns=None,
                   write_header=True, enforce=False):
    """ Convert a block of bytes to a Pandas DataFrame

    Parameters
    ----------
    b: bytestring
        The content to be parsed with pandas.read_csv
    header: bytestring
        An optional header to prepend to b
    kwargs: dict
        A dictionary of keyword arguments to be passed to pandas.read_csv
    dtypes: dict
        DTypes to assign to columns

    See Also:
        dask.dataframe.csv.read_csv_from_bytes
    """
    bio = BytesIO()
    if write_header and not b.startswith(header.rstrip()):
        bio.write(header)
    bio.write(b)
    bio.seek(0)
    df = pd.read_csv(bio, **kwargs)
    if dtypes:
        coerce_dtypes(df, dtypes)

    if enforce and columns and (list(df.columns) != list(columns)):
        raise ValueError("Columns do not match", df.columns, columns)
    return df


def coerce_dtypes(df, dtypes):
    """ Coerce dataframe to dtypes safely

    Operates in place

    Parameters
    ----------
    df: Pandas DataFrame
    dtypes: dict like {'x': float}
    """
    for c in df.columns:
        if c in dtypes and df.dtypes[c] != dtypes[c]:
            if (np.issubdtype(df.dtypes[c], np.floating) and
                np.issubdtype(dtypes[c], np.integer)):
                if (df[c] % 1).any():
                    raise TypeError("Runtime type mismatch. "
                    "Add {'%s': float} to dtype= keyword in read_csv" % c)
            df[c] = df[c].astype(dtypes[c])


def read_csv_from_bytes(block_lists, header, head, kwargs, collection=True,
                        enforce=False):
    """ Convert blocks of bytes to a dask.dataframe or other high-level object

    This accepts a list of lists of values of bytes where each list corresponds
    to one file, and the value of bytes concatenate to comprise the entire
    file, in order.

    Parameters
    ----------
    block_lists: list of lists of delayed values of bytes
        The lists of bytestrings where each list corresponds to one logical file
    header: bytestring
        The header, found at the front of the first file, to be prepended to
        all blocks
    head: pd.DataFrame
        An example Pandas DataFrame to be used for metadata.
        Can be ``None`` if ``collection==False``
    kwargs: dict
        Keyword arguments to pass down to ``pd.read_csv``
    collection: boolean, optional (defaults to True)

    Returns
    -------
    A dask.dataframe or list of delayed values
    """
    dtypes = head.dtypes.to_dict()
    columns = list(head.columns)
    func = delayed(bytes_read_csv)
    dfs = []
    for blocks in block_lists:
        if not blocks:
            continue
        df = func(blocks[0], header, kwargs, dtypes, columns,
                  write_header=False, enforce=enforce)
        dfs.append(df)
        for b in blocks[1:]:
            dfs.append(func(b, header, kwargs, dtypes, columns,
                            enforce=enforce))

    if collection:
        return from_delayed(dfs, head)
    else:
        return dfs


def read_csv(filename, blocksize=2**25, chunkbytes=None,
        collection=True, lineterminator=None, compression=None,
        sample=256000, enforce=False, storage_options=None, **kwargs):
    """ Read CSV files into a Dask.DataFrame

    This parallelizes the ``pandas.read_csv`` file in the following ways:

    1.  It supports loading many files at once using globstrings as follows:

        >>> df = dd.read_csv('myfiles.*.csv')  # doctest: +SKIP

    2.  In some cases it can break up large files as follows:

        >>> df = dd.read_csv('largefile.csv', blocksize=25e6)  # 25MB chunks  # doctest: +SKIP

    Internally dd.read_csv uses pandas.read_csv and so supports many of the
    same keyword arguments with the same performance guarantees.

    See the docstring for ``pandas.read_csv`` for more information on available
    keyword arguments.

    Parameters
    ----------

    filename: string
        Filename or globstring for CSV files.  May include protocols like s3://
    blocksize: int or None
        Number of bytes by which to cut up larger files
    collection: boolean
        Return a dask.dataframe if True or list of dask.delayed objects if False
    sample: int
        Number of bytes to use when determining dtypes
    **kwargs: dict
        Options to pass down to ``pandas.read_csv``
    """
    if lineterminator is not None and len(lineterminator) == 1:
        kwargs['lineterminator'] = lineterminator
    else:
        lineterminator = '\n'
    if chunkbytes is not None:
        warn("Deprecation warning: chunksize csv keyword renamed to blocksize")
        blocksize=chunkbytes
    if 'index' in kwargs or 'index_col' in kwargs:
        raise ValueError("Keyword 'index' not supported "
                         "dd.read_csv(...).set_index('my-index') instead")
    for kw in ['iterator', 'chunksize']:
        if kw in kwargs:
            raise ValueError("%s not supported for dd.read_csv" % kw)
    if isinstance(kwargs.get('skiprows'), list):
        raise TypeError("List of skiprows not supported for dd.read_csv")
    if isinstance(kwargs.get('header'), list):
        raise TypeError("List of header rows not supported for dd.read_csv")

    if blocksize and compression not in seekable_files:
        warn("Warning %s compression does not support breaking apart files\n"
             "Please ensure that each individiaul file can fit in memory and\n"
             "use the keyword ``blocksize=None to remove this message``\n"
             "Setting ``blocksize=None``" % compression)
        blocksize = None
    if compression not in seekable_files and compression not in cfiles:
        raise NotImplementedError("Compression format %s not installed" %
                                  compression)

    b_lineterminator = lineterminator.encode()
    sample, values = read_bytes(filename, delimiter=b_lineterminator,
                                          blocksize=blocksize,
                                          sample=sample,
                                          compression=compression,
                                          **(storage_options or {}))
    if not isinstance(values[0], (tuple, list)):
        values = [values]

    if 'nrows' in kwargs:
        values = [[values[0][0]]]

    if kwargs.get('header', 'infer') is None:
        header = b''
    else:
        header = sample.split(b_lineterminator)[0] + b_lineterminator
    head = pd.read_csv(BytesIO(sample), **kwargs)

    df = read_csv_from_bytes(values, header, head, kwargs,
                             collection=collection, enforce=enforce)

    return df