# -*- coding: utf-8 -*-
#
# Copyright © 2012 Pierre Raybaut
# Licensed under the terms of the MIT License
# (see spyderlib/__init__.py for details)
"""IPython Console plugin
Handles IPython clients (and in the future, will handle IPython kernels too
-- meanwhile, the external console plugin is handling them)"""
# pylint: disable=C0103
# pylint: disable=R0903
# pylint: disable=R0911
# pylint: disable=R0201
# Stdlib imports
import atexit
import os
import os.path as osp
import sys
# Qt imports
from spyderlib.qt.QtGui import (QVBoxLayout, QHBoxLayout, QFormLayout,
QMessageBox, QGroupBox, QDialogButtonBox,
QDialog, QTabWidget, QFontComboBox,
QCheckBox, QApplication, QLabel,QLineEdit,
QPushButton, QKeySequence, QWidget)
from spyderlib.qt.compat import getopenfilename
from spyderlib.qt.QtCore import SIGNAL, Qt
# IPython imports
from IPython.core.application import get_ipython_dir
from IPython.kernel.connect import find_connection_file
from IPython.qt.manager import QtKernelManager
try: # IPython = "<=2.0"
from IPython.external.ssh import tunnel as zmqtunnel
import IPython.external.pexpect as pexpect
except ImportError:
from zmq.ssh import tunnel as zmqtunnel # analysis:ignore
try:
import pexpect # analysis:ignore
except ImportError:
pexpect = None # analysis:ignore
# Local imports
from spyderlib import dependencies
from spyderlib.baseconfig import _
from spyderlib.config import CONF
from spyderlib.utils.misc import get_error_match, remove_backslashes
from spyderlib.utils import programs
from spyderlib.utils.qthelpers import get_icon, create_action
from spyderlib.widgets.tabs import Tabs
from spyderlib.widgets.ipython import IPythonClient
from spyderlib.widgets.findreplace import FindReplace
from spyderlib.plugins import SpyderPluginWidget, PluginConfigPage
from spyderlib.py3compat import to_text_string
SYMPY_REQVER = '>=0.7.3'
dependencies.add("sympy", _("Symbolic mathematics in the IPython Console"),
required_version=SYMPY_REQVER)
# Replacing pyzmq openssh_tunnel method to work around the issue
# https://github.com/zeromq/pyzmq/issues/589 which was solved in pyzmq
# https://github.com/zeromq/pyzmq/pull/615
def _stop_tunnel(cmd):
pexpect.run(cmd)
def openssh_tunnel(self, lport, rport, server, remoteip='127.0.0.1',
keyfile=None, password=None, timeout=0.4):
if pexpect is None:
raise ImportError("pexpect unavailable, use paramiko_tunnel")
ssh="ssh "
if keyfile:
ssh += "-i " + keyfile
if ':' in server:
server, port = server.split(':')
ssh += " -p %s" % port
cmd = "%s -O check %s" % (ssh, server)
(output, exitstatus) = pexpect.run(cmd, withexitstatus=True)
if not exitstatus:
pid = int(output[output.find("(pid=")+5:output.find(")")])
cmd = "%s -O forward -L 127.0.0.1:%i:%s:%i %s" % (
ssh, lport, remoteip, rport, server)
(output, exitstatus) = pexpect.run(cmd, withexitstatus=True)
if not exitstatus:
atexit.register(_stop_tunnel, cmd.replace("-O forward",
"-O cancel",
1))
return pid
cmd = "%s -f -S none -L 127.0.0.1:%i:%s:%i %s sleep %i" % (
ssh, lport, remoteip, rport, server, timeout)
# pop SSH_ASKPASS from env
env = os.environ.copy()
env.pop('SSH_ASKPASS', None)
ssh_newkey = 'Are you sure you want to continue connecting'
tunnel = pexpect.spawn(cmd, env=env)
failed = False
while True:
try:
i = tunnel.expect([ssh_newkey, '[Pp]assword:'], timeout=.1)
if i==0:
host = server.split('@')[-1]
question = _("The authenticity of host %s can't be "
"established. Are you sure you want to continue "
"connecting?") % host
reply = QMessageBox.question(self, _('Warning'), question,
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No)
if reply == QMessageBox.Yes:
tunnel.sendline('yes')
continue
else:
tunnel.sendline('no')
raise RuntimeError(
_("The authenticity of the host can't be established"))
if i==1 and password is not None:
tunnel.sendline(password)
except pexpect.TIMEOUT:
continue
except pexpect.EOF:
if tunnel.exitstatus:
raise RuntimeError(_("Tunnel '%s' failed to start") % cmd)
else:
return tunnel.pid
else:
if failed or password is None:
raise RuntimeError(_("Could not connect to remote host"))
# TODO: Use this block when pyzmq bug #620 is fixed
# # Prompt a passphrase dialog to the user for a second attempt
# password, ok = QInputDialog.getText(self, _('Password'),
# _('Enter password for: ') + server,
# echo=QLineEdit.Password)
# if ok is False:
# raise RuntimeError('Could not connect to remote host.')
tunnel.sendline(password)
failed = True
class IPythonConsoleConfigPage(PluginConfigPage):
def __init__(self, plugin, parent):
PluginConfigPage.__init__(self, plugin, parent)
self.get_name = lambda: _("IPython console")
def setup_page(self):
newcb = self.create_checkbox
mpl_present = programs.is_module_installed("matplotlib")
# --- Display ---
font_group = self.create_fontgroup(option=None, text=None,
fontfilters=QFontComboBox.MonospacedFonts)
# Interface Group
interface_group = QGroupBox(_("Interface"))
banner_box = newcb(_("Display initial banner"), 'show_banner',
tip=_("This option lets you hide the message shown at\n"
"the top of the console when it's opened."))
gui_comp_box = newcb(_("Use a completion widget"),
'use_gui_completion',
tip=_("Use a widget instead of plain text "
"output for tab completion"))
pager_box = newcb(_("Use a pager to display additional text inside "
"the console"), 'use_pager',
tip=_("Useful if you don't want to fill the "
"console with long help or completion texts.\n"
"Note: Use the Q key to get out of the "
"pager."))
calltips_box = newcb(_("Display balloon tips"), 'show_calltips')
ask_box = newcb(_("Ask for confirmation before closing"),
'ask_before_closing')
interface_layout = QVBoxLayout()
interface_layout.addWidget(banner_box)
interface_layout.addWidget(gui_comp_box)
interface_layout.addWidget(pager_box)
interface_layout.addWidget(calltips_box)
interface_layout.addWidget(ask_box)
interface_group.setLayout(interface_layout)
# Background Color Group
bg_group = QGroupBox(_("Background color"))
light_radio = self.create_radiobutton(_("Light background"),
'light_color')
dark_radio = self.create_radiobutton(_("Dark background"),
'dark_color')
bg_layout = QVBoxLayout()
bg_layout.addWidget(light_radio)
bg_layout.addWidget(dark_radio)
bg_group.setLayout(bg_layout)
# Source Code Group
source_code_group = QGroupBox(_("Source code"))
buffer_spin = self.create_spinbox(
_("Buffer: "), _(" lines"),
'buffer_size', min_=-1, max_=1000000, step=100,
tip=_("Set the maximum number of lines of text shown in the\n"
"console before truncation. Specifying -1 disables it\n"
"(not recommended!)"))
source_code_layout = QVBoxLayout()
source_code_layout.addWidget(buffer_spin)
source_code_group.setLayout(source_code_layout)
# --- Graphics ---
# Pylab Group
pylab_group = QGroupBox(_("Support for graphics (Matplotlib)"))
pylab_box = newcb(_("Activate support"), 'pylab')
autoload_pylab_box = newcb(_("Automatically load Pylab and NumPy "
"modules"),
'pylab/autoload',
tip=_("This lets you load graphics support "
"without importing \nthe commands to do "
"plots. Useful to work with other\n"
"plotting libraries different to "
"Matplotlib or to develop \nGUIs with "
"Spyder."))
autoload_pylab_box.setEnabled(self.get_option('pylab') and mpl_present)
self.connect(pylab_box, SIGNAL("toggled(bool)"),
autoload_pylab_box.setEnabled)
pylab_layout = QVBoxLayout()
pylab_layout.addWidget(pylab_box)
pylab_layout.addWidget(autoload_pylab_box)
pylab_group.setLayout(pylab_layout)
if not mpl_present:
self.set_option('pylab', False)
self.set_option('pylab/autoload', False)
pylab_group.setEnabled(False)
pylab_tip = _("This feature requires the Matplotlib library.\n"
"It seems you don't have it installed.")
pylab_box.setToolTip(pylab_tip)
# Pylab backend Group
inline = _("Inline")
automatic = _("Automatic")
backend_group = QGroupBox(_("Graphics backend"))
bend_label = QLabel(_("Decide how graphics are going to be displayed "
"in the console. If unsure, please select "
"%s to put graphics inside the "
"console or %s to interact with "
"them (through zooming and panning) in a "
"separate window.") % (inline, automatic))
bend_label.setWordWrap(True)
backends = [(inline, 0), (automatic, 1), ("Qt", 2)]
# TODO: Add gtk3 when 0.13 is released
if sys.platform == 'darwin':
backends.append( ("Mac OSX", 3) )
if programs.is_module_installed('pygtk'):
backends.append( ("Gtk", 4) )
if programs.is_module_installed('wxPython'):
backends.append( ("Wx", 5) )
if programs.is_module_installed('_tkinter'):
backends.append( ("Tkinter", 6) )
backends = tuple(backends)
backend_box = self.create_combobox( _("Backend:")+" ", backends,
'pylab/backend', default=0,
tip=_("This option will be applied the "
"next time a console is opened."))
backend_layout = QVBoxLayout()
backend_layout.addWidget(bend_label)
backend_layout.addWidget(backend_box)
backend_group.setLayout(backend_layout)
backend_group.setEnabled(self.get_option('pylab') and mpl_present)
self.connect(pylab_box, SIGNAL("toggled(bool)"),
backend_group.setEnabled)
# Inline backend Group
inline_group = QGroupBox(_("Inline backend"))
inline_label = QLabel(_("Decide how to render the figures created by "
"this backend"))
inline_label.setWordWrap(True)
formats = (("PNG", 0), ("SVG", 1))
format_box = self.create_combobox(_("Format:")+" ", formats,
'pylab/inline/figure_format', default=0)
resolution_spin = self.create_spinbox(
_("Resolution:")+" ", " "+_("dpi"),
'pylab/inline/resolution', min_=50, max_=150, step=0.1,
tip=_("Only used when the format is PNG. Default is "
"72"))
width_spin = self.create_spinbox(
_("Width:")+" ", " "+_("inches"),
'pylab/inline/width', min_=2, max_=20, step=1,
tip=_("Default is 6"))
height_spin = self.create_spinbox(
_("Height:")+" ", " "+_("inches"),
'pylab/inline/height', min_=1, max_=20, step=1,
tip=_("Default is 4"))
inline_layout = QVBoxLayout()
inline_layout.addWidget(inline_label)
inline_layout.addWidget(format_box)
inline_layout.addWidget(resolution_spin)
inline_layout.addWidget(width_spin)
inline_layout.addWidget(height_spin)
inline_group.setLayout(inline_layout)
inline_group.setEnabled(self.get_option('pylab') and mpl_present)
self.connect(pylab_box, SIGNAL("toggled(bool)"),
inline_group.setEnabled)
# --- Startup ---
# Run lines Group
run_lines_group = QGroupBox(_("Run code"))
run_lines_label = QLabel(_("You can run several lines of code when "
"a console is started. Please introduce "
"each one separated by commas, for "
"example:
"
"import os, import sys"))
run_lines_label.setWordWrap(True)
run_lines_edit = self.create_lineedit(_("Lines:"), 'startup/run_lines',
'', alignment=Qt.Horizontal)
run_lines_layout = QVBoxLayout()
run_lines_layout.addWidget(run_lines_label)
run_lines_layout.addWidget(run_lines_edit)
run_lines_group.setLayout(run_lines_layout)
# Run file Group
run_file_group = QGroupBox(_("Run a file"))
run_file_label = QLabel(_("You can also run a whole file at startup "
"instead of just some lines (This is "
"similar to have a PYTHONSTARTUP file)."))
run_file_label.setWordWrap(True)
file_radio = newcb(_("Use the following file:"),
'startup/use_run_file', False)
run_file_browser = self.create_browsefile('', 'startup/run_file', '')
run_file_browser.setEnabled(False)
self.connect(file_radio, SIGNAL("toggled(bool)"),
run_file_browser.setEnabled)
run_file_layout = QVBoxLayout()
run_file_layout.addWidget(run_file_label)
run_file_layout.addWidget(file_radio)
run_file_layout.addWidget(run_file_browser)
run_file_group.setLayout(run_file_layout)
# ---- Advanced settings ----
# Greedy completer group
greedy_group = QGroupBox(_("Greedy completion"))
greedy_label = QLabel(_("Enable Tab completion on elements "
"of lists, results of function calls, etc, "
"without assigning them to a "
"variable.
"
"For example, you can get completions on "
"things like li[0].<Tab> or "
"ins.meth().<Tab>"))
greedy_label.setWordWrap(True)
greedy_box = newcb(_("Use the greedy completer"), "greedy_completer",
tip="Warning: It can be unsafe because the "
"code is actually evaluated when you press "
"Tab.")
greedy_layout = QVBoxLayout()
greedy_layout.addWidget(greedy_label)
greedy_layout.addWidget(greedy_box)
greedy_group.setLayout(greedy_layout)
# Autocall group
autocall_group = QGroupBox(_("Autocall"))
autocall_label = QLabel(_("Autocall makes IPython automatically call "
"any callable object even if you didn't type "
"explicit parentheses.
"
"For example, if you type str 43 it "
"becomes str(43) automatically."))
autocall_label.setWordWrap(True)
smart = _('Smart')
full = _('Full')
autocall_opts = ((_('Off'), 0), (smart, 1), (full, 2))
autocall_box = self.create_combobox(
_("Autocall: "), autocall_opts, 'autocall', default=0,
tip=_("On %s mode, Autocall is not applied if "
"there are no arguments after the callable. On "
"%s mode, all callable objects are "
"automatically called (even if no arguments are "
"present).") % (smart, full))
autocall_layout = QVBoxLayout()
autocall_layout.addWidget(autocall_label)
autocall_layout.addWidget(autocall_box)
autocall_group.setLayout(autocall_layout)
# Sympy group
sympy_group = QGroupBox(_("Symbolic Mathematics"))
sympy_label = QLabel(_("Perfom symbolic operations in the console "
"(e.g. integrals, derivatives, vector calculus, "
"etc) and get the outputs in a beautifully "
"printed style."))
sympy_label.setWordWrap(True)
sympy_box = newcb(_("Use symbolic math"), "symbolic_math",
tip=_("This option loads the Sympy library to work "
"with.
Please refer to its documentation to "
"learn how to use it."))
sympy_layout = QVBoxLayout()
sympy_layout.addWidget(sympy_label)
sympy_layout.addWidget(sympy_box)
sympy_group.setLayout(sympy_layout)
sympy_present = programs.is_module_installed("sympy")
if not sympy_present:
self.set_option("symbolic_math", False)
sympy_box.setEnabled(False)
sympy_tip = _("This feature requires the Sympy library.\n"
"It seems you don't have it installed.")
sympy_box.setToolTip(sympy_tip)
# Prompts group
prompts_group = QGroupBox(_("Prompts"))
prompts_label = QLabel(_("Modify how Input and Output prompts are "
"shown in the console."))
prompts_label.setWordWrap(True)
in_prompt_edit = self.create_lineedit(_("Input prompt:"),
'in_prompt', '',
_('Default is
'
'In [<span class="in-prompt-number">'
'%i</span>]:'),
alignment=Qt.Horizontal)
out_prompt_edit = self.create_lineedit(_("Output prompt:"),
'out_prompt', '',
_('Default is
'
'Out[<span class="out-prompt-number">'
'%i</span>]:'),
alignment=Qt.Horizontal)
prompts_layout = QVBoxLayout()
prompts_layout.addWidget(prompts_label)
prompts_layout.addWidget(in_prompt_edit)
prompts_layout.addWidget(out_prompt_edit)
prompts_group.setLayout(prompts_layout)
# --- Tabs organization ---
tabs = QTabWidget()
tabs.addTab(self.create_tab(font_group, interface_group, bg_group,
source_code_group), _("Display"))
tabs.addTab(self.create_tab(pylab_group, backend_group, inline_group),
_("Graphics"))
tabs.addTab(self.create_tab(run_lines_group, run_file_group),
_("Startup"))
tabs.addTab(self.create_tab(greedy_group, autocall_group, sympy_group,
prompts_group), _("Advanced Settings"))
vlayout = QVBoxLayout()
vlayout.addWidget(tabs)
self.setLayout(vlayout)
class KernelConnectionDialog(QDialog):
"""Dialog to connect to existing kernels (either local or remote)"""
def __init__(self, parent=None):
super(KernelConnectionDialog, self).__init__(parent)
self.setWindowTitle(_('Connect to an existing kernel'))
main_label = QLabel(_("Please enter the connection info of the kernel "
"you want to connect to. For that you can "
"either select its JSON connection file using "
"the Browse button, or write directly "
"its id, in case it's a local kernel (for "
"example kernel-3764.json or just "
"3764)."))
main_label.setWordWrap(True)
main_label.setAlignment(Qt.AlignJustify)
# connection file
cf_label = QLabel(_('Connection info:'))
self.cf = QLineEdit()
self.cf.setPlaceholderText(_('Path to connection file or kernel id'))
self.cf.setMinimumWidth(250)
cf_open_btn = QPushButton(_('Browse'))
self.connect(cf_open_btn, SIGNAL('clicked()'),
self.select_connection_file)
cf_layout = QHBoxLayout()
cf_layout.addWidget(cf_label)
cf_layout.addWidget(self.cf)
cf_layout.addWidget(cf_open_btn)
# remote kernel checkbox
self.rm_cb = QCheckBox(_('This is a remote kernel'))
# ssh connection
self.hn = QLineEdit()
self.hn.setPlaceholderText(_('username@hostname:port'))
self.kf = QLineEdit()
self.kf.setPlaceholderText(_('Path to ssh key file'))
kf_open_btn = QPushButton(_('Browse'))
self.connect(kf_open_btn, SIGNAL('clicked()'), self.select_ssh_key)
kf_layout = QHBoxLayout()
kf_layout.addWidget(self.kf)
kf_layout.addWidget(kf_open_btn)
self.pw = QLineEdit()
self.pw.setPlaceholderText(_('Password or ssh key passphrase'))
self.pw.setEchoMode(QLineEdit.Password)
ssh_form = QFormLayout()
ssh_form.addRow(_('Host name'), self.hn)
ssh_form.addRow(_('Ssh key'), kf_layout)
ssh_form.addRow(_('Password'), self.pw)
# Ok and Cancel buttons
accept_btns = QDialogButtonBox(
QDialogButtonBox.Ok | QDialogButtonBox.Cancel,
Qt.Horizontal, self)
self.connect(accept_btns, SIGNAL('accepted()'), self.accept)
self.connect(accept_btns, SIGNAL('rejected()'), self.reject)
# Dialog layout
layout = QVBoxLayout(self)
layout.addWidget(main_label)
layout.addLayout(cf_layout)
layout.addWidget(self.rm_cb)
layout.addLayout(ssh_form)
layout.addWidget(accept_btns)
# remote kernel checkbox enables the ssh_connection_form
def ssh_set_enabled(state):
for wid in [self.hn, self.kf, kf_open_btn, self.pw]:
wid.setEnabled(state)
for i in range(ssh_form.rowCount()):
ssh_form.itemAt(2 * i).widget().setEnabled(state)
ssh_set_enabled(self.rm_cb.checkState())
self.connect(self.rm_cb, SIGNAL('stateChanged(int)'), ssh_set_enabled)
def select_connection_file(self):
cf = getopenfilename(self, _('Open IPython connection file'),
osp.join(get_ipython_dir(), 'profile_default', 'security'),
'*.json;;*.*')[0]
self.cf.setText(cf)
def select_ssh_key(self):
kf = getopenfilename(self, _('Select ssh key'),
get_ipython_dir(), '*.pem;;*.*')[0]
self.kf.setText(kf)
@staticmethod
def get_connection_parameters(parent=None):
dialog = KernelConnectionDialog(parent)
result = dialog.exec_()
is_remote = bool(dialog.rm_cb.checkState())
accepted = result == QDialog.Accepted
if is_remote:
falsy_to_none = lambda arg: arg if arg else None
return (dialog.cf.text(), # connection file
falsy_to_none(dialog.hn.text()), # host name
falsy_to_none(dialog.kf.text()), # ssh key file
falsy_to_none(dialog.pw.text()), # ssh password
accepted) # ok
else:
return (dialog.cf.text(), None, None, None, accepted)
class IPythonConsole(SpyderPluginWidget):
"""
IPython Console plugin
This is a widget with tabs where each one is an IPythonClient
"""
CONF_SECTION = 'ipython_console'
CONFIGWIDGET_CLASS = IPythonConsoleConfigPage
DISABLE_ACTIONS_WHEN_HIDDEN = False
def __init__(self, parent):
SpyderPluginWidget.__init__(self, parent)
self.tabwidget = None
self.menu_actions = None
self.extconsole = None # External console plugin
self.inspector = None # Object inspector plugin
self.historylog = None # History log plugin
self.variableexplorer = None # Variable explorer plugin
self.master_clients = 0
self.clients = []
# Initialize plugin
self.initialize_plugin()
layout = QVBoxLayout()
self.tabwidget = Tabs(self, self.menu_actions)
if hasattr(self.tabwidget, 'setDocumentMode')\
and not sys.platform == 'darwin':
# Don't set document mode to true on OSX because it generates
# a crash when the console is detached from the main window
# Fixes Issue 561
self.tabwidget.setDocumentMode(True)
self.connect(self.tabwidget, SIGNAL('currentChanged(int)'),
self.refresh_plugin)
self.connect(self.tabwidget, SIGNAL('move_data(int,int)'),
self.move_tab)
self.tabwidget.set_close_function(self.close_client)
if sys.platform == 'darwin':
tab_container = QWidget()
tab_container.setObjectName('tab-container')
tab_layout = QHBoxLayout(tab_container)
tab_layout.setContentsMargins(0, 0, 0, 0)
tab_layout.addWidget(self.tabwidget)
layout.addWidget(tab_container)
else:
layout.addWidget(self.tabwidget)
# Find/replace widget
self.find_widget = FindReplace(self)
self.find_widget.hide()
self.register_widget_shortcuts("Editor", self.find_widget)
layout.addWidget(self.find_widget)
self.setLayout(layout)
# Accepting drops
self.setAcceptDrops(True)
#------ SpyderPluginMixin API ---------------------------------------------
def on_first_registration(self):
"""Action to be performed on first plugin registration"""
self.main.tabify_plugins(self.main.extconsole, self)
def apply_plugin_settings(self, options):
"""Apply configuration file's plugin settings"""
font_n = 'plugin_font'
font_o = self.get_plugin_font()
inspector_n = 'connect_to_oi'
inspector_o = CONF.get('inspector', 'connect/ipython_console')
for client in self.clients:
control = client.get_control()
if font_n in options:
client.set_font(font_o)
if inspector_n in options and control is not None:
control.set_inspector_enabled(inspector_o)
def toggle_view(self, checked):
"""Toggle view"""
if checked:
self.dockwidget.show()
self.dockwidget.raise_()
# Start a client in case there are none shown
if not self.clients:
if self.main.is_setting_up:
self.create_new_client(give_focus=False)
else:
self.create_new_client(give_focus=True)
else:
self.dockwidget.hide()
#------ SpyderPluginWidget API --------------------------------------------
def get_plugin_title(self):
"""Return widget title"""
return _('IPython console')
def get_plugin_icon(self):
"""Return widget icon"""
return get_icon('ipython_console.png')
def get_focus_widget(self):
"""
Return the widget to give focus to when
this plugin's dockwidget is raised on top-level
"""
client = self.tabwidget.currentWidget()
if client is not None:
return client.get_control()
def closing_plugin(self, cancelable=False):
"""Perform actions before parent main window is closed"""
for client in self.clients:
client.close()
return True
def refresh_plugin(self):
"""Refresh tabwidget"""
client = None
if self.tabwidget.count():
# Give focus to the control widget of the selected tab
client = self.tabwidget.currentWidget()
control = client.get_control()
control.setFocus()
widgets = client.get_toolbar_buttons()+[5]
# Change extconsole tab to the client's kernel widget
idx = self.extconsole.get_shell_index_from_id(
client.kernel_widget_id)
if idx is not None:
self.extconsole.tabwidget.setCurrentIndex(idx)
else:
control = None
widgets = []
self.find_widget.set_editor(control)
self.tabwidget.set_corner_widgets({Qt.TopRightCorner: widgets})
self.main.last_console_plugin_focus_was_python = False
self.emit(SIGNAL('update_plugin_title()'))
def get_plugin_actions(self):
"""Return a list of actions related to plugin"""
ctrl = "Cmd" if sys.platform == "darwin" else "Ctrl"
main_create_client_action = create_action(self,
_("Open an &IPython console"),
None, 'ipython_console.png',
triggered=self.create_new_client,
tip=_("Use %s+T when the console is selected "
"to open a new one") % ctrl)
create_client_action = create_action(self,
_("Open a new console"),
QKeySequence("Ctrl+T"), 'ipython_console.png',
triggered=self.create_new_client)
create_client_action.setShortcutContext(Qt.WidgetWithChildrenShortcut)
connect_to_kernel_action = create_action(self,
_("Connect to an existing kernel"), None, None,
_("Open a new IPython console connected to an existing kernel"),
triggered=self.create_client_for_kernel)
# Add the action to the 'Consoles' menu on the main window
main_consoles_menu = self.main.consoles_menu_actions
main_consoles_menu.insert(0, main_create_client_action)
main_consoles_menu += [None, connect_to_kernel_action]
# Plugin actions
self.menu_actions = [create_client_action, connect_to_kernel_action]
return self.menu_actions
def register_plugin(self):
"""Register plugin in Spyder's main window"""
self.main.add_dockwidget(self)
self.extconsole = self.main.extconsole
self.inspector = self.main.inspector
self.historylog = self.main.historylog
self.variableexplorer = self.main.variableexplorer
self.connect(self, SIGNAL('focus_changed()'),
self.main.plugin_focus_changed)
if self.main.editor is not None:
self.connect(self, SIGNAL("edit_goto(QString,int,QString)"),
self.main.editor.load)
self.connect(self.main.editor,
SIGNAL('run_in_current_ipyclient(QString,QString,QString,bool)'),
self.run_script_in_current_client)
#------ Public API (for clients) ------------------------------------------
def get_clients(self):
"""Return clients list"""
return [cl for cl in self.clients if isinstance(cl, IPythonClient)]
# def get_kernels(self):
# """Return IPython kernel widgets list"""
# return [sw for sw in self.shellwidgets
# if isinstance(sw, IPythonKernel)]
#
def get_focus_client(self):
"""Return current client with focus, if any"""
widget = QApplication.focusWidget()
for client in self.get_clients():
if widget is client or widget is client.get_control():
return client
def get_current_client(self):
"""Return the currently selected client"""
client = self.tabwidget.currentWidget()
if client is not None:
return client
def run_script_in_current_client(self, filename, wdir, args, debug):
"""Run script in current client, if any"""
norm = lambda text: remove_backslashes(to_text_string(text))
client = self.get_current_client()
if client is not None:
# Internal kernels, use runfile
if client.kernel_widget_id is not None:
line = "%s('%s'" % ('debugfile' if debug else 'runfile',
norm(filename))
if args:
line += ", args='%s'" % norm(args)
if wdir:
line += ", wdir='%s'" % norm(wdir)
line += ")"
else: # External kernels, use %run
line = "%run "
if debug:
line += "-d "
line += "\"%s\"" % to_text_string(filename)
if args:
line += " %s" % norm(args)
self.execute_python_code(line)
self.visibility_changed(True)
self.raise_()
else:
#XXX: not sure it can really happen
QMessageBox.warning(self, _('Warning'),
_("No IPython console is currently available to run %s."
"
Please open a new one and try again."
) % osp.basename(filename), QMessageBox.Ok)
def execute_python_code(self, lines):
client = self.get_current_client()
if client is not None:
client.shellwidget.execute(to_text_string(lines))
self.activateWindow()
client.get_control().setFocus()
def write_to_stdin(self, line):
client = self.get_current_client()
if client is not None:
client.shellwidget.write_to_stdin(line)
def create_new_client(self, give_focus=True):
"""Create a new client"""
self.master_clients += 1
name = "%d/A" % self.master_clients
client = IPythonClient(self, name=name, history_filename='history.py',
menu_actions=self.menu_actions)
self.add_tab(client, name=client.get_name())
self.main.extconsole.start_ipykernel(client, give_focus=give_focus)
def register_client(self, client, restart=False, give_focus=True):
"""Register new client"""
self.connect_client_to_kernel(client)
client.show_shellwidget(give_focus=give_focus)
# Local vars
shellwidget = client.shellwidget
control = shellwidget._control
page_control = shellwidget._page_control
# Create new clients with Ctrl+T shortcut
self.connect(shellwidget, SIGNAL('new_ipyclient()'),
self.create_new_client)
# Handle kernel interrupts
extconsoles = self.extconsole.shellwidgets
kernel_widget = None
if extconsoles:
if extconsoles[-1].connection_file == client.connection_file:
kernel_widget = extconsoles[-1]
if restart:
shellwidget.custom_interrupt_requested.disconnect()
shellwidget.custom_interrupt_requested.connect(
kernel_widget.keyboard_interrupt)
if kernel_widget is None:
shellwidget.custom_interrupt_requested.connect(
client.interrupt_message)
# Connect to our variable explorer
if kernel_widget is not None and self.variableexplorer is not None:
nsb = self.variableexplorer.currentWidget()
# When the autorefresh button is active, our kernels
# start to consume more and more CPU during time
# Fix Issue 1450
# ----------------
# When autorefresh is off by default we need the next
# line so that kernels don't start to consume CPU
# Fix Issue 1595
nsb.auto_refresh_button.setChecked(True)
nsb.auto_refresh_button.setChecked(False)
nsb.auto_refresh_button.setEnabled(False)
nsb.set_ipyclient(client)
client.set_namespacebrowser(nsb)
# If we are restarting the kernel we need to rename
# the client tab and do no more from here on
if restart:
self.rename_client_tab(client)
return
# For tracebacks
self.connect(control, SIGNAL("go_to_error(QString)"), self.go_to_error)
# Handle kernel restarts asked by the user
if kernel_widget is not None:
shellwidget.custom_restart_requested.connect(
lambda cl=client: self.restart_kernel(client))
else:
shellwidget.custom_restart_requested.connect(client.restart_message)
# Print a message if kernel dies unexpectedly
shellwidget.custom_restart_kernel_died.connect(
lambda t: client.if_kernel_dies(t))
# Connect text widget to our inspector
if kernel_widget is not None and self.inspector is not None:
control.set_inspector(self.inspector)
control.set_inspector_enabled(CONF.get('inspector',
'connect/ipython_console'))
# Connect client to our history log
if self.historylog is not None:
self.historylog.add_history(client.history_filename)
self.connect(client, SIGNAL('append_to_history(QString,QString)'),
self.historylog.append_to_history)
# Set font for client
client.set_font( self.get_plugin_font() )
# Connect focus signal to client's control widget
self.connect(control, SIGNAL('focus_changed()'),
lambda: self.emit(SIGNAL('focus_changed()')))
# Update the find widget if focus changes between control and
# page_control
self.find_widget.set_editor(control)
if page_control:
self.connect(page_control, SIGNAL('focus_changed()'),
lambda: self.emit(SIGNAL('focus_changed()')))
self.connect(control, SIGNAL('visibility_changed(bool)'),
self.refresh_plugin)
self.connect(page_control, SIGNAL('visibility_changed(bool)'),
self.refresh_plugin)
self.connect(page_control, SIGNAL('show_find_widget()'),
self.find_widget.show)
def close_client(self, index=None, client=None, force=False):
"""Close client tab from index or widget (or close current tab)"""
if not self.tabwidget.count():
return
if client is not None:
index = self.tabwidget.indexOf(client)
if index is None and client is None:
index = self.tabwidget.currentIndex()
if index is not None:
client = self.tabwidget.widget(index)
# Check if related clients or kernels are opened
# and eventually ask before closing them
if not force and isinstance(client, IPythonClient):
kernel_index = self.extconsole.get_shell_index_from_id(
client.kernel_widget_id)
close_all = True
if len(self.get_related_clients(client)) > 0 and \
self.get_option('ask_before_closing'):
ans = QMessageBox.question(self, self.get_plugin_title(),
_("Do you want to close all other consoles connected "
"to the same kernel as this one?"),
QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel)
if ans == QMessageBox.Cancel:
return
close_all = ans == QMessageBox.Yes
if close_all:
if kernel_index is not None:
self.extconsole.close_console(index=kernel_index,
from_ipyclient=True)
self.close_related_clients(client)
client.close()
# Note: client index may have changed after closing related widgets
self.tabwidget.removeTab(self.tabwidget.indexOf(client))
self.clients.remove(client)
self.emit(SIGNAL('update_plugin_title()'))
def get_client_index_from_id(self, client_id):
"""Return client index from id"""
for index, client in enumerate(self.clients):
if id(client) == client_id:
return index
def rename_client_tab(self, client):
"""Add the pid of the kernel process to client tab"""
index = self.get_client_index_from_id(id(client))
self.tabwidget.setTabText(index, client.get_name())
def get_related_clients(self, client):
"""
Get all other clients that are connected to the same kernel as `client`
"""
related_clients = []
for cl in self.get_clients():
if cl.connection_file == client.connection_file and \
cl is not client:
related_clients.append(cl)
return related_clients
def close_related_clients(self, client):
"""Close all clients related to *client*, except itself"""
related_clients = self.get_related_clients(client)
for cl in related_clients:
self.close_client(client=cl, force=True)
#------ Public API (for kernels) ------------------------------------------
def ssh_tunnel(self, *args, **kwargs):
if sys.platform == 'win32':
return zmqtunnel.paramiko_tunnel(*args, **kwargs)
else:
return openssh_tunnel(self, *args, **kwargs)
def tunnel_to_kernel(self, ci, hostname, sshkey=None, password=None, timeout=10):
"""tunnel connections to a kernel via ssh. remote ports are specified in
the connection info ci."""
lports = zmqtunnel.select_random_ports(4)
rports = ci['shell_port'], ci['iopub_port'], ci['stdin_port'], ci['hb_port']
remote_ip = ci['ip']
for lp, rp in zip(lports, rports):
self.ssh_tunnel(lp, rp, hostname, remote_ip, sshkey, password, timeout)
return tuple(lports)
def create_kernel_manager_and_client(self, connection_file=None,
hostname=None, sshkey=None,
password=None):
"""Create kernel manager and client"""
cf = find_connection_file(connection_file)
kernel_manager = QtKernelManager(connection_file=cf, config=None)
kernel_client = kernel_manager.client()
kernel_client.load_connection_file()
if hostname is not None:
try:
newports = self.tunnel_to_kernel(dict(ip=kernel_client.ip,
shell_port=kernel_client.shell_port,
iopub_port=kernel_client.iopub_port,
stdin_port=kernel_client.stdin_port,
hb_port=kernel_client.hb_port),
hostname, sshkey, password)
(kernel_client.shell_port, kernel_client.iopub_port,
kernel_client.stdin_port, kernel_client.hb_port) = newports
except Exception as e:
QMessageBox.critical(self, _('Connection error'),
_("Could not open ssh tunnel. The "
"error was:\n\n") + to_text_string(e))
return None, None
kernel_client.start_channels()
# To rely on kernel's heartbeat to know when a kernel has died
kernel_client.hb_channel.unpause()
return kernel_manager, kernel_client
def connect_client_to_kernel(self, client):
"""
Connect a client to its kernel
"""
km, kc = self.create_kernel_manager_and_client(client.connection_file,
client.hostname,
client.sshkey,
client.password)
if km is not None:
widget = client.shellwidget
widget.kernel_manager = km
widget.kernel_client = kc
def create_client_for_kernel(self):
"""Create a client connected to an existing kernel"""
(cf, hostname,
kf, pw, ok) = KernelConnectionDialog.get_connection_parameters(self)
if not ok:
return
else:
self._create_client_for_kernel(cf, hostname, kf, pw)
def _create_client_for_kernel(self, cf, hostname, kf, pw):
# Verifying if the connection file exists
cf = osp.basename(cf)
try:
find_connection_file(cf)
except (IOError, UnboundLocalError):
QMessageBox.critical(self, _('IPython'),
_("Unable to connect to IPython %s") % cf)
return
# Getting the master name that corresponds to the client
# (i.e. the i in i/A)
master_name = None
slave_ord = ord('A') - 1
for cl in self.get_clients():
if cf in cl.connection_file:
cf = cl.connection_file
if master_name is None:
master_name = cl.name.split('/')[0]
new_slave_ord = ord(cl.name.split('/')[1])
if new_slave_ord > slave_ord:
slave_ord = new_slave_ord
# If we couldn't find a client with the same connection file,
# it means this is a new master client
if master_name is None:
self.master_clients += 1
master_name = to_text_string(self.master_clients)
# Set full client name
name = master_name + '/' + chr(slave_ord + 1)
# Getting kernel_widget_id from the currently open kernels.
kernel_widget_id = None
for sw in self.extconsole.shellwidgets:
if sw.connection_file == cf.split('/')[-1]:
kernel_widget_id = id(sw)
# Creating the client
client = IPythonClient(self, name=name, history_filename='history.py',
connection_file=cf,
kernel_widget_id=kernel_widget_id,
menu_actions=self.menu_actions,
hostname=hostname, sshkey=kf, password=pw)
# Adding the tab
self.add_tab(client, name=client.get_name())
# Connecting kernel and client
self.register_client(client)
def restart_kernel(self, client):
"""
Create a new kernel and connect it to `client` if the user asks for it
"""
# Took this bit of code (until if result == ) from the IPython project
# (qt/frontend_widget.py - restart_kernel).
# Licensed under the BSD license
message = _('Are you sure you want to restart the kernel?')
buttons = QMessageBox.Yes | QMessageBox.No
result = QMessageBox.question(self, _('Restart kernel?'),
message, buttons)
if result == QMessageBox.Yes:
client.show_restart_animation()
# Close old kernel tab
idx = self.extconsole.get_shell_index_from_id(client.kernel_widget_id)
self.extconsole.close_console(index=idx, from_ipyclient=True)
# Create a new one and connect it to the client
self.extconsole.start_ipykernel(client)
def get_shellwidget_by_kernelwidget_id(self, kernel_id):
"""Return the IPython widget associated to a kernel widget id"""
for cl in self.clients:
if cl.kernel_widget_id == kernel_id:
return cl.shellwidget
else:
raise ValueError("Unknown kernel widget ID %r" % kernel_id)
#------ Public API (for tabs) ---------------------------------------------
def add_tab(self, widget, name):
"""Add tab"""
self.clients.append(widget)
index = self.tabwidget.addTab(widget, get_icon('ipython_console.png'),
name)
self.tabwidget.setCurrentIndex(index)
if self.dockwidget and not self.ismaximized:
self.dockwidget.setVisible(True)
self.dockwidget.raise_()
self.activateWindow()
widget.get_control().setFocus()
def move_tab(self, index_from, index_to):
"""
Move tab (tabs themselves have already been moved by the tabwidget)
"""
client = self.clients.pop(index_from)
self.clients.insert(index_to, client)
self.emit(SIGNAL('update_plugin_title()'))
#------ Public API (for help) ---------------------------------------------
def go_to_error(self, text):
"""Go to error if relevant"""
match = get_error_match(to_text_string(text))
if match:
fname, lnb = match.groups()
self.emit(SIGNAL("edit_goto(QString,int,QString)"),
osp.abspath(fname), int(lnb), '')
def show_intro(self):
"""Show intro to IPython help"""
from IPython.core.usage import interactive_usage
self.inspector.show_rich_text(interactive_usage)
def show_guiref(self):
"""Show qtconsole help"""
from IPython.core.usage import gui_reference
self.inspector.show_rich_text(gui_reference, collapse=True)
def show_quickref(self):
"""Show IPython Cheat Sheet"""
from IPython.core.usage import quick_reference
self.inspector.show_plain_text(quick_reference)
#----Drag and drop
#TODO: try and reimplement this block
# (this is still the original code block copied from externalconsole.py)
# def dragEnterEvent(self, event):
# """Reimplement Qt method
# Inform Qt about the types of data that the widget accepts"""
# source = event.mimeData()
# if source.hasUrls():
# if mimedata2url(source):
# pathlist = mimedata2url(source)
# shellwidget = self.tabwidget.currentWidget()
# if all([is_python_script(unicode(qstr)) for qstr in pathlist]):
# event.acceptProposedAction()
# elif shellwidget is None or not shellwidget.is_running():
# event.ignore()
# else:
# event.acceptProposedAction()
# else:
# event.ignore()
# elif source.hasText():
# event.acceptProposedAction()
#
# def dropEvent(self, event):
# """Reimplement Qt method
# Unpack dropped data and handle it"""
# source = event.mimeData()
# shellwidget = self.tabwidget.currentWidget()
# if source.hasText():
# qstr = source.text()
# if is_python_script(unicode(qstr)):
# self.start(qstr)
# elif shellwidget:
# shellwidget.shell.insert_text(qstr)
# elif source.hasUrls():
# pathlist = mimedata2url(source)
# if all([is_python_script(unicode(qstr)) for qstr in pathlist]):
# for fname in pathlist:
# self.start(fname)
# elif shellwidget:
# shellwidget.shell.drop_pathlist(pathlist)
# event.acceptProposedAction()