# -*- coding: utf-8 -*-
# Copyright (C) 2012 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import, division, print_function, unicode_literals

from errno import ENOENT
import json
import os
from os.path import abspath, basename, dirname, expanduser, expandvars, isdir, join, exists
import re
import sys
from textwrap import dedent

# Since we have to have configuration context here, anything imported by
#   conda.base.context is fair game, but nothing more.
from . import CONDA_PACKAGE_ROOT, CONDA_SOURCE_ROOT
from ._vendor.toolz import concatv, drop
from .auxlib.compat import Utf8NamedTemporaryFile
from .base.constants import PREFIX_STATE_FILE, PACKAGE_ENV_VARS_DIR, CONDA_ENV_VARS_UNSET_VAR
from .base.context import ROOT_ENV_NAME, context, locate_prefix_by_name
from .common.compat import FILESYSTEM_ENCODING, on_win
from .common.path import paths_equal


class _Activator(object):
    # Activate and deactivate have three tasks
    #   1. Set and unset environment variables
    #   2. Execute/source activate.d/deactivate.d scripts
    #   3. Update the command prompt
    #
    # Shells should also use 'reactivate' following conda's install, update, and
    #   remove/uninstall commands.
    #
    # All core logic is in build_activate() or build_deactivate(), and is independent of
    # shell type.  Each returns a map containing the keys:
    #   export_vars
    #   unset_var
    #   activate_scripts
    #   deactivate_scripts
    #
    # The value of the CONDA_PROMPT_MODIFIER environment variable holds conda's contribution
    #   to the command prompt.
    #
    # To implement support for a new shell, ideally one would only need to add shell-specific
    # information to the __init__ method of this class.

    # The following instance variables must be defined by each implementation.
    pathsep_join = None
    sep = None
    path_conversion = None
    script_extension = None
    tempfile_extension = None  # None means write instructions to stdout rather than a temp file
    command_join = None

    unset_var_tmpl = None
    export_var_tmpl = None
    set_var_tmpl = None
    run_script_tmpl = None

    hook_source_path = None

    def __init__(self, arguments=None):
        self._raw_arguments = arguments
        self.environ = os.environ.copy()

    def get_export_unset_vars(self, export_metavars=True, **kwargs):
        """
        :param export_metavars: whether to export `conda_exe_vars` meta variables.
        :param kwargs: environment variables to export.
                       .. if you pass and set any other variable to None, then it
                       emits it to the dict with a value of None.

        :return: A dict of env vars to export ordered the same way as kwargs.
                 And a list of env vars to unset.
        """
        unset_vars = []
        export_vars = {}

        # split provided environment variables into exports vs unsets
        for name, value in kwargs.items():
            if value is None:
                unset_vars.append(name.upper())
            else:
                export_vars[name.upper()] = value

        if export_metavars:
            # split meta variables into exports vs unsets
            for name, value in context.conda_exe_vars_dict.items():
                if value is None:
                    unset_vars.append(name.upper())
                else:
                    export_vars[name.upper()] = self.path_conversion(value) if value else value
        else:
            # unset all meta variables
            unset_vars.extend(context.conda_exe_vars_dict)

        return export_vars, unset_vars

    # Used in tests only.
    def add_export_unset_vars(self, export_vars, unset_vars, **kwargs):
        new_export_vars, new_unset_vars = self.get_export_unset_vars(**kwargs)
        if export_vars is not None:
            export_vars = {**export_vars, **new_export_vars}
        if unset_vars is not None:
            unset_vars = [*unset_vars, *new_unset_vars]
        return export_vars, unset_vars

    # Used in tests only.
    def get_scripts_export_unset_vars(self, **kwargs):
        export_vars, unset_vars = self.get_export_unset_vars(**kwargs)
        script_export_vars = script_unset_vars = None
        if export_vars:
            script_export_vars = self.command_join.join(
                [self.export_var_tmpl % (k, v) for k, v in export_vars.items()])
        if unset_vars:
            script_unset_vars = self.command_join.join(
                [self.unset_var_tmpl % (k) for k in unset_vars])
        return script_export_vars or '', script_unset_vars or ''

    def _finalize(self, commands, ext):
        commands = concatv(commands, ('',))  # add terminating newline
        if ext is None:
            return self.command_join.join(commands)
        elif ext:
            with Utf8NamedTemporaryFile('w+', suffix=ext, delete=False) as tf:
                # the default mode is 'w+b', and universal new lines don't work in that mode
                # command_join should account for that
                tf.write(self.command_join.join(commands))
            return tf.name
        else:
            raise NotImplementedError()

    def activate(self):
        if self.stack:
            builder_result = self.build_stack(self.env_name_or_prefix)
        else:
            builder_result = self.build_activate(self.env_name_or_prefix)
        return self._finalize(self._yield_commands(builder_result), self.tempfile_extension)

    def deactivate(self):
        return self._finalize(self._yield_commands(self.build_deactivate()),
                              self.tempfile_extension)

    def reactivate(self):
        return self._finalize(self._yield_commands(self.build_reactivate()),
                              self.tempfile_extension)

    def hook(self, auto_activate_base=None):
        builder = []
        builder.append(self._hook_preamble())
        with open(self.hook_source_path) as fsrc:
            builder.append(fsrc.read())
        if auto_activate_base is None and context.auto_activate_base or auto_activate_base:
            builder.append("conda activate base\n")
        postamble = self._hook_postamble()
        if postamble is not None:
            builder.append(postamble)
        return "\n".join(builder)

    def execute(self):
        # return value meant to be written to stdout
        self._parse_and_set_args(self._raw_arguments)
        return getattr(self, self.command)()

    def commands(self):
        """
        Returns a list of possible subcommands that are valid
        immediately following `conda` at the command line.
        This method is generally only used by tab-completion.
        """
        # Import locally to reduce impact on initialization time.
        from .cli.find_commands import find_commands
        from .cli.conda_argparse import generate_parser, find_builtin_commands
        # return value meant to be written to stdout
        # Hidden commands to provide metadata to shells.
        return "\n".join(sorted(
            find_builtin_commands(generate_parser()) +
            tuple(find_commands(True))
        ))

    def _hook_preamble(self):
        # must be implemented in subclass
        raise NotImplementedError()

    def _hook_postamble(self):
        return None

    def _parse_and_set_args(self, arguments):

        def raise_invalid_command_error(actual_command=None):
            from .exceptions import ArgumentError
            message = "'activate', 'deactivate', 'hook', 'commands', or 'reactivate' " \
                "command must be given"
            if actual_command:
                message += ". Instead got '%s'." % actual_command
            raise ArgumentError(message)

        if arguments is None or len(arguments) < 1:
            raise_invalid_command_error()

        command = arguments[0]
        arguments = tuple(drop(1, arguments))
        help_flags = ('-h', '--help', '/?')
        non_help_args = tuple(arg for arg in arguments if arg not in help_flags)
        help_requested = len(arguments) != len(non_help_args)
        remainder_args = list(arg for arg in non_help_args if arg and arg != command)

        if not command:
            raise_invalid_command_error()
        elif help_requested:
            from .exceptions import ActivateHelp, DeactivateHelp, GenericHelp
            help_classes = {
                'activate': ActivateHelp(),
                'deactivate': DeactivateHelp(),
                'hook': GenericHelp('hook'),
                'commands': GenericHelp('commands'),
                'reactivate': GenericHelp('reactivate'),
            }
            raise help_classes[command]
        elif command not in ('activate', 'deactivate', 'reactivate', 'hook', 'commands'):
            raise_invalid_command_error(actual_command=command)

        if command.endswith('activate') or command == 'hook':
            try:
                dev_idx = remainder_args.index('--dev')
            except ValueError:
                context.dev = False
            else:
                del remainder_args[dev_idx]
                context.dev = True

        if command == 'activate':
            self.stack = context.auto_stack and context.shlvl <= context.auto_stack
            try:
                stack_idx = remainder_args.index('--stack')
            except ValueError:
                stack_idx = -1
            try:
                no_stack_idx = remainder_args.index('--no-stack')
            except ValueError:
                no_stack_idx = -1
            if stack_idx >= 0 and no_stack_idx >= 0:
                from .exceptions import ArgumentError
                raise ArgumentError('cannot specify both --stack and --no-stack to ' + command)
            if stack_idx >= 0:
                self.stack = True
                del remainder_args[stack_idx]
            if no_stack_idx >= 0:
                self.stack = False
                del remainder_args[no_stack_idx]
            if len(remainder_args) > 1:
                from .exceptions import ArgumentError
                raise ArgumentError(command + ' does not accept more than one argument:\n'
                                    + str(remainder_args) + '\n')
            self.env_name_or_prefix = remainder_args and remainder_args[0] or 'base'

        else:
            if remainder_args:
                from .exceptions import ArgumentError
                raise ArgumentError('%s does not accept arguments\nremainder_args: %s\n'
                                    % (command, remainder_args))

        self.command = command

    def _yield_commands(self, cmds_dict):
        for key, value in sorted(cmds_dict.get('export_path', {}).items()):
            yield self.export_var_tmpl % (key, value)

        for script in cmds_dict.get('deactivate_scripts', ()):
            yield self.run_script_tmpl % script

        for key in cmds_dict.get('unset_vars', ()):
            yield self.unset_var_tmpl % key

        for key, value in cmds_dict.get('set_vars', {}).items():
            yield self.set_var_tmpl % (key, value)

        for key, value in cmds_dict.get('export_vars', {}).items():
            yield self.export_var_tmpl % (key, value)

        for script in cmds_dict.get('activate_scripts', ()):
            yield self.run_script_tmpl % script

    def build_activate(self, env_name_or_prefix):
        return self._build_activate_stack(env_name_or_prefix, False)

    def build_stack(self, env_name_or_prefix):
        return self._build_activate_stack(env_name_or_prefix, True)

    def _build_activate_stack(self, env_name_or_prefix, stack):
        if re.search(r'\\|/', env_name_or_prefix):
            prefix = expand(env_name_or_prefix)
            if not isdir(join(prefix, 'conda-meta')):
                from .exceptions import EnvironmentLocationNotFound
                raise EnvironmentLocationNotFound(prefix)
        elif env_name_or_prefix in (ROOT_ENV_NAME, 'root'):
            prefix = context.root_prefix
        else:
            prefix = locate_prefix_by_name(env_name_or_prefix)

        # query environment
        old_conda_shlvl = int(self.environ.get('CONDA_SHLVL', '').strip() or 0)
        new_conda_shlvl = old_conda_shlvl + 1
        old_conda_prefix = self.environ.get('CONDA_PREFIX')

        if old_conda_prefix == prefix and old_conda_shlvl > 0:
            return self.build_reactivate()

        activate_scripts = self._get_activate_scripts(prefix)
        conda_default_env = self._default_env(prefix)
        conda_prompt_modifier = self._prompt_modifier(prefix, conda_default_env)
        conda_environment_env_vars = self._get_environment_env_vars(prefix)
        unset_env_vars = [k for k, v in conda_environment_env_vars.items()
                          if v == CONDA_ENV_VARS_UNSET_VAR]
        [conda_environment_env_vars.pop(_) for _ in unset_env_vars]

        clobbering_env_vars = [k for k in conda_environment_env_vars.keys()
                               if k in os.environ.keys()]

        for cvar in clobbering_env_vars:
            save_var = "__CONDA_SHLVL_%s_%s" % (old_conda_shlvl, cvar)
            conda_environment_env_vars[save_var] = os.environ.get(cvar)

        if clobbering_env_vars:
            print("WARNING: overwriting environment variables set in the machine", file=sys.stderr)
            print("overwriting variable %s" % ' '.join(clobbering_env_vars), file=sys.stderr)

        unset_vars = []
        if old_conda_shlvl == 0:
            export_vars, unset_vars = self.get_export_unset_vars(
                path=self.pathsep_join(self._add_prefix_to_path(prefix)),
                conda_prefix=prefix,
                conda_shlvl=new_conda_shlvl,
                conda_default_env=conda_default_env,
                conda_prompt_modifier=conda_prompt_modifier,
                **conda_environment_env_vars,
            )
            deactivate_scripts = ()
        else:
            if self.environ.get('CONDA_PREFIX_%s' % (old_conda_shlvl - 1)) == prefix:
                # in this case, user is attempting to activate the previous environment,
                #  i.e. step back down
                return self.build_deactivate()
            if stack:
                deactivate_scripts = ()
                export_vars, unset_vars = self.get_export_unset_vars(
                    path=self.pathsep_join(self._add_prefix_to_path(prefix)),
                    conda_prefix=prefix,
                    conda_shlvl=new_conda_shlvl,
                    conda_default_env=conda_default_env,
                    conda_prompt_modifier=conda_prompt_modifier,
                    **conda_environment_env_vars,
                )
                export_vars['CONDA_PREFIX_%d' % old_conda_shlvl] = old_conda_prefix
                export_vars['CONDA_STACKED_%d' % new_conda_shlvl] = 'true'
            else:
                deactivate_scripts = self._get_deactivate_scripts(old_conda_prefix)
                export_vars, unset_vars = self.get_export_unset_vars(
                    path=self.pathsep_join(self._replace_prefix_in_path(old_conda_prefix, prefix)),
                    conda_prefix=prefix,
                    conda_shlvl=new_conda_shlvl,
                    conda_default_env=conda_default_env,
                    conda_prompt_modifier=conda_prompt_modifier,
                    **conda_environment_env_vars,
                )
                export_vars['CONDA_PREFIX_%d' % old_conda_shlvl] = old_conda_prefix

        set_vars = {}
        if context.changeps1:
            self._update_prompt(set_vars, conda_prompt_modifier)

        self._build_activate_shell_custom(export_vars)

        return {
            'unset_vars': unset_vars,
            'set_vars': set_vars,
            'export_vars': export_vars,
            'deactivate_scripts': deactivate_scripts,
            'activate_scripts': activate_scripts,
        }

    def build_deactivate(self):
        self._deactivate = True
        # query environment
        old_conda_prefix = self.environ.get('CONDA_PREFIX')
        old_conda_shlvl = int(self.environ.get('CONDA_SHLVL', '').strip() or 0)
        if not old_conda_prefix or old_conda_shlvl < 1:
            # no active environment, so cannot deactivate; do nothing
            return {
                "unset_vars": (),
                "set_vars": {},
                "export_vars": {},
                "deactivate_scripts": (),
                "activate_scripts": (),
            }
        deactivate_scripts = self._get_deactivate_scripts(old_conda_prefix)
        old_conda_environment_env_vars = self._get_environment_env_vars(old_conda_prefix)

        new_conda_shlvl = old_conda_shlvl - 1
        set_vars = {}
        if old_conda_shlvl == 1:
            new_path = self.pathsep_join(self._remove_prefix_from_path(old_conda_prefix))
            # You might think that you can remove the CONDA_EXE vars with export_metavars=False
            # here so that "deactivate means deactivate" but you cannot since the conda shell
            # scripts still refer to them and they only set them once at the top. We could change
            # that though, the conda() shell function could set them instead of doing it at the
            # top.  This would be *much* cleaner. I personally cannot abide that I have
            # deactivated conda and anything at all in my env still references it (apart from the
            # shell script, we need something I suppose!)
            export_vars, unset_vars = self.get_export_unset_vars(
                conda_prefix=None,
                conda_shlvl=new_conda_shlvl,
                conda_default_env=None,
                conda_prompt_modifier=None,
            )
            conda_prompt_modifier = ''
            activate_scripts = ()
            export_path = {'PATH': new_path, }
        else:
            assert old_conda_shlvl > 1
            new_prefix = self.environ.get('CONDA_PREFIX_%d' % new_conda_shlvl)
            conda_default_env = self._default_env(new_prefix)
            conda_prompt_modifier = self._prompt_modifier(new_prefix, conda_default_env)
            new_conda_environment_env_vars = self._get_environment_env_vars(new_prefix)

            old_prefix_stacked = 'CONDA_STACKED_%d' % old_conda_shlvl in self.environ
            new_path = ''

            unset_vars = ['CONDA_PREFIX_%d' % new_conda_shlvl]
            if old_prefix_stacked:
                new_path = self.pathsep_join(self._remove_prefix_from_path(old_conda_prefix))
                unset_vars.append('CONDA_STACKED_%d' % old_conda_shlvl)
            else:
                new_path = self.pathsep_join(
                    self._replace_prefix_in_path(old_conda_prefix, new_prefix)
                )

            export_vars, unset_vars2 = self.get_export_unset_vars(
                conda_prefix=new_prefix,
                conda_shlvl=new_conda_shlvl,
                conda_default_env=conda_default_env,
                conda_prompt_modifier=conda_prompt_modifier,
                **new_conda_environment_env_vars,
            )
            unset_vars += unset_vars2
            export_path = {'PATH': new_path, }
            activate_scripts = self._get_activate_scripts(new_prefix)

        if context.changeps1:
            self._update_prompt(set_vars, conda_prompt_modifier)

        for env_var in old_conda_environment_env_vars.keys():
            unset_vars.append(env_var)
            save_var = "__CONDA_SHLVL_%s_%s" % (new_conda_shlvl, env_var)
            if save_var in os.environ.keys():
                export_vars[env_var] = os.environ[save_var]
        return {
            'unset_vars': unset_vars,
            'set_vars': set_vars,
            'export_vars': export_vars,
            'export_path': export_path,
            'deactivate_scripts': deactivate_scripts,
            'activate_scripts': activate_scripts,
        }

    def build_reactivate(self):
        self._reactivate = True
        conda_prefix = self.environ.get('CONDA_PREFIX')
        conda_shlvl = int(self.environ.get('CONDA_SHLVL', '').strip() or 0)
        if not conda_prefix or conda_shlvl < 1:
            # no active environment, so cannot reactivate; do nothing
            return {
                "unset_vars": (),
                "set_vars": {},
                "export_vars": {},
                "deactivate_scripts": (),
                "activate_scripts": (),
            }
        conda_default_env = self.environ.get('CONDA_DEFAULT_ENV', self._default_env(conda_prefix))
        new_path = self.pathsep_join(self._replace_prefix_in_path(conda_prefix, conda_prefix))
        set_vars = {}
        conda_prompt_modifier = self._prompt_modifier(conda_prefix, conda_default_env)
        if context.changeps1:
            self._update_prompt(set_vars, conda_prompt_modifier)

        env_vars_to_unset = ()
        env_vars_to_export = {
            "PATH": new_path,
            "CONDA_SHLVL": conda_shlvl,
            "CONDA_PROMPT_MODIFIER": self._prompt_modifier(conda_prefix, conda_default_env),
        }
        conda_environment_env_vars = self._get_environment_env_vars(conda_prefix)
        for k, v in conda_environment_env_vars.items():
            if v == CONDA_ENV_VARS_UNSET_VAR:
                env_vars_to_unset = env_vars_to_unset + (k,)
            else:
                env_vars_to_export[k] = v
        # environment variables are set only to aid transition from conda 4.3 to conda 4.4
        return {
            'unset_vars': env_vars_to_unset,
            'set_vars': set_vars,
            'export_vars': env_vars_to_export,
            'deactivate_scripts': self._get_deactivate_scripts(conda_prefix),
            'activate_scripts': self._get_activate_scripts(conda_prefix),
        }

    def _get_starting_path_list(self):
        # For isolation, running the conda test suite *without* env. var. inheritance
        # every so often is a good idea. We should probably make this a pytest fixture
        # along with one that tests both hardlink-only and copy-only, but before that
        # conda's testsuite needs to be a lot faster!
        clean_paths = {'darwin': '/usr/bin:/bin:/usr/sbin:/sbin',
                       # You may think 'let us do something more clever here and interpolate
                       # `%windir%`' but the point here is the the whole env. is cleaned out
                       'win32': 'C:\\Windows\\system32;'
                                'C:\\Windows;'
                                'C:\\Windows\\System32\\Wbem;'
                                'C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\'
                       }
        path = self.environ.get('PATH',
                                clean_paths[sys.platform] if sys.platform in clean_paths else
                                '/usr/bin')
        path_split = path.split(os.pathsep)
        # We used to prepend sys.prefix\Library\bin to PATH on startup but not anymore.
        # Instead, in conda 4.6 we add the full suite of entries. This is performed in
        # condabin\conda.bat and condabin\ _conda_activate.bat. However, we
        # need to ignore the stuff we add there, and only consider actual PATH entries.
        prefix_dirs = tuple(self._get_path_dirs(sys.prefix))
        start_index = 0
        while (start_index < len(prefix_dirs) and
               start_index < len(path_split) and
               paths_equal(path_split[start_index], prefix_dirs[start_index])):
            start_index += 1
        path_split = path_split[start_index:]
        library_bin_dir = self.path_conversion(
                self.sep.join((sys.prefix, 'Library', 'bin')))
        if paths_equal(path_split[0], library_bin_dir):
            path_split = path_split[1:]
        return path_split

    def _get_path_dirs(self, prefix, extra_library_bin=False):
        if on_win:  # pragma: unix no cover
            yield prefix.rstrip("\\")
            yield self.sep.join((prefix, 'Library', 'mingw-w64', 'bin'))
            yield self.sep.join((prefix, 'Library', 'usr', 'bin'))
            yield self.sep.join((prefix, 'Library', 'bin'))
            yield self.sep.join((prefix, 'Scripts'))
            yield self.sep.join((prefix, 'bin'))
        else:
            yield self.sep.join((prefix, 'bin'))

    def _add_prefix_to_path(self, prefix, starting_path_dirs=None):
        prefix = self.path_conversion(prefix)
        if starting_path_dirs is None:
            path_list = list(self.path_conversion(self._get_starting_path_list()))
        else:
            path_list = list(self.path_conversion(starting_path_dirs))

        # If this is the first time we're activating an environment, we need to ensure that
        # the condabin directory is included in the path list.
        # Under normal conditions, if the shell hook is working correctly, this should
        # never trigger.
        old_conda_shlvl = int(self.environ.get('CONDA_SHLVL', '').strip() or 0)
        if not old_conda_shlvl and not any(p.endswith("condabin") for p in path_list):
            condabin_dir = self.path_conversion(join(context.conda_prefix, "condabin"))
            path_list.insert(0, condabin_dir)

        path_list[0:0] = list(self.path_conversion(self._get_path_dirs(prefix)))
        return tuple(path_list)

    def _remove_prefix_from_path(self, prefix, starting_path_dirs=None):
        return self._replace_prefix_in_path(prefix, None, starting_path_dirs)

    def _replace_prefix_in_path(self, old_prefix, new_prefix, starting_path_dirs=None):
        old_prefix = self.path_conversion(old_prefix)
        new_prefix = self.path_conversion(new_prefix)
        if starting_path_dirs is None:
            path_list = list(self.path_conversion(self._get_starting_path_list()))
        else:
            path_list = list(self.path_conversion(starting_path_dirs))

        def index_of_path(paths, test_path):
            for q, path in enumerate(paths):
                if paths_equal(path, test_path):
                    return q
            return None

        if old_prefix is not None:
            prefix_dirs = tuple(self._get_path_dirs(old_prefix))
            first_idx = index_of_path(path_list, prefix_dirs[0])
            if first_idx is None:
                first_idx = 0
            else:
                prefix_dirs_idx = len(prefix_dirs) - 1
                last_idx = None
                while last_idx is None and prefix_dirs_idx > -1:
                    last_idx = index_of_path(path_list, prefix_dirs[prefix_dirs_idx])
                    if last_idx is None:
                        print(
                            "Did not find path entry {0}".format(prefix_dirs[prefix_dirs_idx]),
                            file=sys.stderr
                        )
                    prefix_dirs_idx = prefix_dirs_idx - 1
                # this compensates for an extra Library/bin dir entry from the interpreter on
                #     windows.  If that entry isn't being added, it should have no effect.
                library_bin_dir = self.path_conversion(
                    self.sep.join((sys.prefix, 'Library', 'bin')))
                if path_list[last_idx + 1] == library_bin_dir:
                    last_idx += 1
                del path_list[first_idx:last_idx + 1]
        else:
            first_idx = 0

        if new_prefix is not None:
            path_list[first_idx:first_idx] = list(self._get_path_dirs(new_prefix))

        return tuple(path_list)

    def _build_activate_shell_custom(self, export_vars):
        # A method that can be overridden by shell-specific implementations.
        # The signature of this method may change in the future.
        pass

    def _update_prompt(self, set_vars, conda_prompt_modifier):
        pass

    def _default_env(self, prefix):
        if paths_equal(prefix, context.root_prefix):
            return 'base'
        return basename(prefix) if basename(dirname(prefix)) == 'envs' else prefix

    def _prompt_modifier(self, prefix, conda_default_env):
        if context.changeps1:
            # Get current environment and prompt stack
            env_stack = []
            prompt_stack = []
            old_shlvl = int(self.environ.get('CONDA_SHLVL', '0').rstrip())
            for i in range(1, old_shlvl + 1):
                if i == old_shlvl:
                    env_i = self._default_env(self.environ.get('CONDA_PREFIX', ''))
                else:
                    env_i = self._default_env(
                        self.environ.get('CONDA_PREFIX_{}'.format(i), '').rstrip()
                    )
                stacked_i = bool(self.environ.get('CONDA_STACKED_{}'.format(i), '').rstrip())
                env_stack.append(env_i)
                if not stacked_i:
                    prompt_stack = prompt_stack[0:-1]
                prompt_stack.append(env_i)

            # Modify prompt stack according to pending operation
            deactivate = getattr(self, '_deactivate', False)
            reactivate = getattr(self, '_reactivate', False)
            if deactivate:
                prompt_stack = prompt_stack[0:-1]
                env_stack = env_stack[0:-1]
                stacked = bool(self.environ.get('CONDA_STACKED_{}'.format(old_shlvl), '').rstrip())
                if not stacked and env_stack:
                    prompt_stack.append(env_stack[-1])
            elif reactivate:
                pass
            else:
                stack = getattr(self, 'stack', False)
                if not stack:
                    prompt_stack = prompt_stack[0:-1]
                prompt_stack.append(conda_default_env)

            conda_stacked_env = ','.join(prompt_stack[::-1])

            return context.env_prompt.format(
                default_env=conda_default_env,
                stacked_env=conda_stacked_env,
                prefix=prefix,
                name=basename(prefix),
            )
        else:
            return ""

    def _get_activate_scripts(self, prefix):
        _script_extension = self.script_extension
        se_len = -len(_script_extension)
        try:
            paths = (
                entry.path for entry in os.scandir(join(prefix, "etc", "conda", "activate.d"))
            )
        except EnvironmentError:
            return ()
        return self.path_conversion(sorted(
            p for p in paths if p[se_len:] == _script_extension
        ))

    def _get_deactivate_scripts(self, prefix):
        _script_extension = self.script_extension
        se_len = -len(_script_extension)
        try:
            paths = (
                entry.path for entry in os.scandir(join(prefix, "etc", "conda", "deactivate.d"))
            )
        except EnvironmentError:
            return ()
        return self.path_conversion(sorted(
            (p for p in paths if p[se_len:] == _script_extension),
            reverse=True
        ))

    def _get_environment_env_vars(self, prefix):
        env_vars_file = join(prefix, PREFIX_STATE_FILE)
        pkg_env_var_dir = join(prefix, PACKAGE_ENV_VARS_DIR)
        env_vars = {}

        # First get env vars from packages
        if exists(pkg_env_var_dir):
            for pkg_env_var_path in sorted(entry.path for entry in os.scandir(pkg_env_var_dir)):
                with open(pkg_env_var_path, 'r') as f:
                    env_vars.update(json.loads(f.read()))

        # Then get env vars from environment specification
        if exists(env_vars_file):
            with open(env_vars_file, 'r') as f:
                prefix_state = json.loads(f.read())
                prefix_state_env_vars = prefix_state.get('env_vars', {})
                dup_vars = [ev for ev in env_vars.keys() if ev in prefix_state_env_vars.keys()]
                for dup in dup_vars:
                    print("WARNING: duplicate env vars detected. Vars from the environment "
                          "will overwrite those from packages", file=sys.stderr)
                    print("variable %s duplicated" % dup, file=sys.stderr)
                env_vars.update(prefix_state_env_vars)

        return env_vars


def expand(path):
    return abspath(expanduser(expandvars(path)))


def ensure_binary(value):
    try:
        return value.encode('utf-8')
    except AttributeError:  # pragma: no cover
        # AttributeError: '<>' object has no attribute 'encode'
        # In this case assume already binary type and do nothing
        return value


def ensure_fs_path_encoding(value):
    try:
        return value.decode(FILESYSTEM_ENCODING)
    except AttributeError:
        return value


def native_path_to_unix(paths):  # pragma: unix no cover
    # on windows, uses cygpath to convert windows native paths to posix paths
    if not on_win:
        return path_identity(paths)
    if paths is None:
        return None
    from subprocess import CalledProcessError, PIPE, Popen
    from conda.auxlib.compat import shlex_split_unicode
    # It is very easy to end up with a bash in one place and a cygpath in another due to e.g.
    # using upstream MSYS2 bash, but with a conda env that does not have bash but does have
    # cygpath.  When this happens, we have two different virtual POSIX machines, rooted at
    # different points in the Windows filesystem.  We do our path conversions with one and
    # expect the results to work with the other.  It does not.
    from .common.path import which
    bash = which('bash')
    command = os.path.join(dirname(bash), 'cygpath') if bash else 'cygpath'
    command += ' --path -f -'

    single_path = isinstance(paths, str)
    joined = paths if single_path else ("%s" % os.pathsep).join(paths)

    if hasattr(joined, 'encode'):
        joined = joined.encode('utf-8')

    try:
        p = Popen(shlex_split_unicode(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
    except EnvironmentError as e:
        if e.errno != ENOENT:
            raise
        # This code path should (hopefully) never be hit be real conda installs. It's here
        # as a backup for tests run under cmd.exe with cygpath not available.
        def _translation(found_path):  # NOQA
            found = found_path.group(1).replace("\\", "/").replace(":", "").replace("//", "/")
            return "/" + found.rstrip("/")
        joined = ensure_fs_path_encoding(joined)
        stdout = re.sub(
            r'([a-zA-Z]:[\/\\\\]+(?:[^:*?\"<>|;]+[\/\\\\]*)*)',
            _translation,
            joined
        ).replace(";/", ":/").rstrip(";")
    else:
        stdout, stderr = p.communicate(input=joined)
        rc = p.returncode
        if rc != 0 or stderr:
            message = "\n  stdout: %s\n  stderr: %s\n  rc: %s\n" % (stdout, stderr, rc)
            print(message, file=sys.stderr)
            raise CalledProcessError(rc, command, message)
        if hasattr(stdout, 'decode'):
            stdout = stdout.decode('utf-8')
        stdout = stdout.strip()
    final = stdout and stdout.split(':') or ()
    return final[0] if single_path else tuple(final)


def path_identity(paths):
    if isinstance(paths, str):
        return os.path.normpath(paths)
    elif paths is None:
        return None
    else:
        return tuple(os.path.normpath(_) for _ in paths)


class PosixActivator(_Activator):

    def __init__(self, arguments=None):
        self.pathsep_join = ':'.join
        self.sep = '/'
        self.path_conversion = native_path_to_unix
        self.script_extension = '.sh'
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
        self.command_join = '\n'

        self.unset_var_tmpl = 'unset %s'
        self.export_var_tmpl = "export %s='%s'"
        self.set_var_tmpl = "%s='%s'"
        self.run_script_tmpl = '. "%s"'

        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'etc', 'profile.d', 'conda.sh')

        super(PosixActivator, self).__init__(arguments)

    def _update_prompt(self, set_vars, conda_prompt_modifier):
        ps1 = self.environ.get('PS1', '')
        if 'POWERLINE_COMMAND' in ps1:
            # Defer to powerline (https://github.com/powerline/powerline) if it's in use.
            return
        current_prompt_modifier = self.environ.get('CONDA_PROMPT_MODIFIER')
        if current_prompt_modifier:
            ps1 = re.sub(re.escape(current_prompt_modifier), r'', ps1)
        # Because we're using single-quotes to set shell variables, we need to handle the
        # proper escaping of single quotes that are already part of the string.
        # Best solution appears to be https://stackoverflow.com/a/1250279
        ps1 = ps1.replace("'", "'\"'\"'")
        set_vars.update({
            'PS1': conda_prompt_modifier + ps1,
        })

    def _hook_preamble(self):
        result = ''
        for key, value in context.conda_exe_vars_dict.items():
            if value is None:
                # Using `unset_var_tmpl` would cause issues for people running
                # with shell flag -u set (error on unset).
                # result += join(self.unset_var_tmpl % key) + '\n'
                result += join(self.export_var_tmpl % (key, '')) + '\n'
            else:
                if key in ('PYTHONPATH', 'CONDA_EXE'):
                    result += join(self.export_var_tmpl % (
                        key, self.path_conversion(value))) + '\n'
                else:
                    result += join(self.export_var_tmpl % (key, value)) + '\n'
        return result


class CshActivator(_Activator):

    def __init__(self, arguments=None):
        self.pathsep_join = ':'.join
        self.sep = '/'
        self.path_conversion = native_path_to_unix
        self.script_extension = '.csh'
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
        self.command_join = ';\n'

        self.unset_var_tmpl = 'unsetenv %s'
        self.export_var_tmpl = 'setenv %s "%s"'
        self.set_var_tmpl = "set %s='%s'"
        self.run_script_tmpl = 'source "%s"'

        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'etc', 'profile.d', 'conda.csh')

        super(CshActivator, self).__init__(arguments)

    def _update_prompt(self, set_vars, conda_prompt_modifier):
        prompt = self.environ.get('prompt', '')
        current_prompt_modifier = self.environ.get('CONDA_PROMPT_MODIFIER')
        if current_prompt_modifier:
            prompt = re.sub(re.escape(current_prompt_modifier), r'', prompt)
        set_vars.update({
            'prompt': conda_prompt_modifier + prompt,
        })

    def _hook_preamble(self):
        if on_win:
            return dedent(
                f"""
                setenv CONDA_EXE `cygpath {context.conda_exe}`
                setenv _CONDA_ROOT `cygpath {context.conda_prefix}`
                setenv _CONDA_EXE `cygpath {context.conda_exe}`
                setenv CONDA_PYTHON_EXE `cygpath {sys.executable}`
                """
            ).strip()
        else:
            return dedent(
                f"""
                setenv CONDA_EXE "{context.conda_exe}"
                setenv _CONDA_ROOT "{context.conda_prefix}"
                setenv _CONDA_EXE "{context.conda_exe}"
                setenv CONDA_PYTHON_EXE "{sys.executable}"
                """
            ).strip()


class XonshActivator(_Activator):

    @staticmethod
    def path_conversion(paths):
        if not on_win:
            return path_identity(paths)
        elif isinstance(paths, str):
            return paths.replace('\\', '/')
        elif paths is None:
            return None
        else:
            return tuple([path.replace('\\', '/') for path in paths])

    def __init__(self, arguments=None):
        self.pathsep_join = ';'.join if on_win else ':'.join
        self.sep = '/'
        self.tempfile_extension = None
        self.command_join = '\n'

        self.unset_var_tmpl = 'del $%s'
        self.export_var_tmpl = "$%s = '%s'"
        self.set_var_tmpl = "$%s = '%s'"  # TODO: determine if different than export_var_tmpl

        # 'scripts' really refer to de/activation scripts, not scripts in the language per se
        # xonsh can piggy-back activation scripts from other languages depending on the platform
        import platform
        if platform.system() == 'Windows':
            self.script_extension = '.bat'
            self.run_script_tmpl = 'source-cmd --suppress-skip-message "%s"'
        else:
            self.script_extension = '.sh'
            self.run_script_tmpl = 'source-bash --suppress-skip-message "%s"'

        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'conda.xsh')

        super(XonshActivator, self).__init__(arguments)

    def _hook_preamble(self):
        return '$CONDA_EXE = "%s"' % self.path_conversion(context.conda_exe)


class CmdExeActivator(_Activator):

    def __init__(self, arguments=None):
        self.pathsep_join = ';'.join
        self.sep = '\\'
        self.path_conversion = path_identity
        self.script_extension = '.bat'
        self.tempfile_extension = '.bat'
        self.command_join = '\n'

        self.unset_var_tmpl = '@SET %s='
        self.export_var_tmpl = '@SET "%s=%s"'
        self.set_var_tmpl = '@SET "%s=%s"'  # TODO: determine if different than export_var_tmpl
        self.run_script_tmpl = '@CALL "%s"'

        self.hook_source_path = None
        # TODO: cmd.exe doesn't get a hook function? Or do we need to do something different?
        #       Like, for cmd.exe only, put a special directory containing only conda.bat on PATH?

        super(CmdExeActivator, self).__init__(arguments)

    # def _hook_preamble(self):
    #     if on_win:
    #         return '@chcp 65001'


class FishActivator(_Activator):

    def __init__(self, arguments=None):
        self.pathsep_join = '" "'.join
        self.sep = '/'
        self.path_conversion = native_path_to_unix
        self.script_extension = '.fish'
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
        self.command_join = ';\n'

        self.unset_var_tmpl = 'set -e %s'
        self.export_var_tmpl = 'set -gx %s "%s"'
        self.set_var_tmpl = 'set -g %s "%s"'
        self.run_script_tmpl = 'source "%s"'

        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'etc', 'fish', 'conf.d',
                                     'conda.fish')

        super(FishActivator, self).__init__(arguments)

    def _hook_preamble(self):
        if on_win:
            return dedent(
                f"""
                set -gx CONDA_EXE (cygpath "{context.conda_exe}")
                set _CONDA_ROOT (cygpath "{context.conda_prefix}")
                set _CONDA_EXE (cygpath "{context.conda_exe}")
                set -gx CONDA_PYTHON_EXE (cygpath "{sys.executable}")
                """
            ).strip()
        else:
            return dedent(
                f"""
                set -gx CONDA_EXE "{context.conda_exe}"
                set _CONDA_ROOT "{context.conda_prefix}"
                set _CONDA_EXE "{context.conda_exe}"
                set -gx CONDA_PYTHON_EXE "{sys.executable}"
                """
            ).strip()


class PowerShellActivator(_Activator):

    def __init__(self, arguments=None):
        self.pathsep_join = ';'.join if on_win else ':'.join
        self.sep = '\\' if on_win else '/'
        self.path_conversion = path_identity
        self.script_extension = '.ps1'
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
        self.command_join = '\n'

        self.unset_var_tmpl = '$Env:%s = ""'
        self.export_var_tmpl = '$Env:%s = "%s"'
        self.set_var_tmpl = '$Env:%s = "%s"'
        self.run_script_tmpl = '. "%s"'

        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'condabin', 'conda-hook.ps1')

        super(PowerShellActivator, self).__init__(arguments)

    def _hook_preamble(self):
        if context.dev:
            return dedent(
                f"""
                $Env:PYTHONPATH = "{CONDA_SOURCE_ROOT}"
                $Env:CONDA_EXE = "{sys.executable}"
                $Env:_CE_M = "-m"
                $Env:_CE_CONDA = "conda"
                $Env:_CONDA_ROOT = "{CONDA_PACKAGE_ROOT}"
                $Env:_CONDA_EXE = "{context.conda_exe}"
                $CondaModuleArgs = @{{ChangePs1 = ${context.changeps1}}}
                """
            ).strip()
        else:
            return dedent(
                f"""
                $Env:CONDA_EXE = "{context.conda_exe}"
                $Env:_CE_M = ""
                $Env:_CE_CONDA = ""
                $Env:_CONDA_ROOT = "{context.conda_prefix}"
                $Env:_CONDA_EXE = "{context.conda_exe}"
                $CondaModuleArgs = @{{ChangePs1 = ${context.changeps1}}}
                """
            ).strip()

    def _hook_postamble(self):
        return "Remove-Variable CondaModuleArgs"


class JSONFormatMixin(_Activator):
    """Returns the necessary values for activation as JSON, so that tools can use them."""

    def __init__(self, arguments=None):
        self.pathsep_join = list
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
        self.command_join = list

        super(JSONFormatMixin, self).__init__(arguments)

    def _hook_preamble(self):
        if context.dev:
            return {
                "PYTHONPATH": CONDA_SOURCE_ROOT,
                "CONDA_EXE": sys.executable,
                "_CE_M": "-m",
                "_CE_CONDA": "conda",
                "_CONDA_ROOT": CONDA_PACKAGE_ROOT,
                "_CONDA_EXE": context.conda_exe,
            }
        else:
            return {
                'CONDA_EXE': context.conda_exe,
                '_CE_M': '',
                '_CE_CONDA': '',
                '_CONDA_ROOT': context.conda_prefix,
                '_CONDA_EXE': context.conda_exe,
            }

    def get_scripts_export_unset_vars(self, **kwargs):
        export_vars, unset_vars = self.get_export_unset_vars(**kwargs)
        script_export_vars = script_unset_vars = None
        if export_vars:
            script_export_vars = dict(export_vars.items())
        if unset_vars:
            script_unset_vars = unset_vars
        return script_export_vars or {}, script_unset_vars or []

    def _finalize(self, commands, ext):
        merged = {}
        for _cmds in commands:
            merged.update(_cmds)

        commands = merged
        if ext is None:
            return json.dumps(commands, indent=2)
        elif ext:
            with Utf8NamedTemporaryFile('w+', suffix=ext, delete=False) as tf:
                # the default mode is 'w+b', and universal new lines don't work in that mode
                # command_join should account for that
                json.dump(commands, tf, indent=2)
            return tf.name
        else:
            raise NotImplementedError()

    def _yield_commands(self, cmds_dict):
        # TODO: _Is_ defining our own object shape here any better than
        # just dumping the `cmds_dict`?
        path = cmds_dict.get('export_path', {})
        export_vars = cmds_dict.get('export_vars', {})
        # treat PATH specially
        if 'PATH' in export_vars:
            new_path = path.get('PATH', [])
            new_path.extend(export_vars.pop('PATH'))
            path['PATH'] = new_path

        yield {
            'path': path,
            'vars': {
                'export': export_vars,
                'unset': cmds_dict.get('unset_vars', ()),
                'set': cmds_dict.get('set_vars', {}),
            },
            'scripts': {
                'activate': cmds_dict.get('activate_scripts', ()),
                'deactivate': cmds_dict.get('deactivate_scripts', ()),
            }
        }


activator_map = {
    'posix': PosixActivator,
    'ash': PosixActivator,
    'bash': PosixActivator,
    'dash': PosixActivator,
    'zsh': PosixActivator,
    'csh': CshActivator,
    'tcsh': CshActivator,
    'xonsh': XonshActivator,
    'cmd.exe': CmdExeActivator,
    'fish': FishActivator,
    'powershell': PowerShellActivator,
}

formatter_map = {
    'json': JSONFormatMixin,
}


def _build_activator_cls(shell):
    """Construct the activator class dynamically from a base activator and any
    number of formatters, appended using '+' to the name. For example,
    `posix+json` (as in `conda shell.posix+json activate`) would use the
    `PosixActivator` base class and add the `JSONFormatMixin`."""
    shell_etc = shell.split('+')
    activator, formatters = shell_etc[0], shell_etc[1:]
    bases = [activator_map[activator]]

    for f in formatters:
        bases.append(formatter_map[f])

    cls = type(str('Activator'), tuple(bases), {})
    return cls
