# -*- coding: utf-8 -*- # # Copyright © 2009-2010 Pierre Raybaut # Licensed under the terms of the MIT License # (see spyderlib/__init__.py for details) """Internal shell widget : PythonShellWidget + Interpreter""" # pylint: disable=C0103 # pylint: disable=R0903 # pylint: disable=R0911 # pylint: disable=R0201 #FIXME: Internal shell MT: for i in range(100000): print i -> bug #----Builtins* from spyderlib.py3compat import builtins from spyderlib.widgets.objecteditor import oedit builtins.oedit = oedit import os import threading from time import time from subprocess import Popen from spyderlib.qt.QtGui import QMessageBox from spyderlib.qt.QtCore import SIGNAL, QObject, QEventLoop # Local import from spyderlib import get_versions from spyderlib.utils.qthelpers import create_action, get_std_icon from spyderlib.interpreter import Interpreter from spyderlib.utils.dochelpers import getargtxt, getsource, getdoc, getobjdir from spyderlib.utils.misc import get_error_match #TODO: remove the CONF object and make it work anyway # In fact, this 'CONF' object has nothing to do in package spyderlib.widgets # which should not contain anything directly related to Spyder's main app from spyderlib.baseconfig import get_conf_path, _, DEBUG from spyderlib.config import CONF from spyderlib.widgets.shell import PythonShellWidget from spyderlib.py3compat import to_text_string, getcwd, to_binary_string, u def create_banner(message): """Create internal shell banner""" if message is None: versions = get_versions() return 'Python %s %dbits [%s]'\ % (versions['python'], versions['bitness'], versions['system']) else: return message class SysOutput(QObject): """Handle standard I/O queue""" def __init__(self): QObject.__init__(self) self.queue = [] self.lock = threading.Lock() def write(self, val): self.lock.acquire() self.queue.append(val) self.lock.release() self.emit(SIGNAL("void data_avail()")) def empty_queue(self): self.lock.acquire() s = "".join(self.queue) self.queue = [] self.lock.release() return s # We need to add this method to fix Issue 1789 def flush(self): pass # This is needed to fix Issue 2984 @property def closed(self): return False class WidgetProxyData(object): pass class WidgetProxy(QObject): """Handle Shell widget refresh signal""" def __init__(self, input_condition): QObject.__init__(self) self.input_data = None self.input_condition = input_condition def new_prompt(self, prompt): self.emit(SIGNAL("new_prompt(QString)"), prompt) def set_readonly(self, state): self.emit(SIGNAL("set_readonly(bool)"), state) def edit(self, filename, external_editor=False): self.emit(SIGNAL("edit(QString,bool)"), filename, external_editor) def data_available(self): """Return True if input data is available""" return self.input_data is not WidgetProxyData def wait_input(self, prompt=''): self.input_data = WidgetProxyData self.emit(SIGNAL("wait_input(QString)"), prompt) def end_input(self, cmd): self.input_condition.acquire() self.input_data = cmd self.input_condition.notify() self.input_condition.release() class InternalShell(PythonShellWidget): """Shell base widget: link between PythonShellWidget and Interpreter""" def __init__(self, parent=None, namespace=None, commands=[], message=None, max_line_count=300, font=None, exitfunc=None, profile=False, multithreaded=True, light_background=True): PythonShellWidget.__init__(self, parent, get_conf_path('history_internal.py'), profile) self.set_light_background(light_background) self.multithreaded = multithreaded self.setMaximumBlockCount(max_line_count) # For compatibility with ExtPythonShellWidget self.is_ipykernel = False if font is not None: self.set_font(font) # Allow raw_input support: self.input_loop = None self.input_mode = False # KeyboardInterrupt support self.interrupted = False # used only for not-multithreaded mode self.connect(self, SIGNAL("keyboard_interrupt()"), self.keyboard_interrupt) # Code completion / calltips getcfg = lambda option: CONF.get('internal_console', option) case_sensitive = getcfg('codecompletion/case_sensitive') self.set_codecompletion_case(case_sensitive) # keyboard events management self.eventqueue = [] # Init interpreter self.exitfunc = exitfunc self.commands = commands self.message = message self.interpreter = None self.start_interpreter(namespace) # Clear status bar self.emit(SIGNAL("status(QString)"), '') # Embedded shell -- requires the monitor (which installs the # 'open_in_spyder' function in builtins) if hasattr(builtins, 'open_in_spyder'): self.connect(self, SIGNAL("go_to_error(QString)"), self.open_with_external_spyder) #------ Interpreter def start_interpreter(self, namespace): """Start Python interpreter""" self.clear() if self.interpreter is not None: self.interpreter.closing() self.interpreter = Interpreter(namespace, self.exitfunc, SysOutput, WidgetProxy, DEBUG) self.connect(self.interpreter.stdout_write, SIGNAL("void data_avail()"), self.stdout_avail) self.connect(self.interpreter.stderr_write, SIGNAL("void data_avail()"), self.stderr_avail) self.connect(self.interpreter.widget_proxy, SIGNAL("set_readonly(bool)"), self.setReadOnly) self.connect(self.interpreter.widget_proxy, SIGNAL("new_prompt(QString)"), self.new_prompt) self.connect(self.interpreter.widget_proxy, SIGNAL("edit(QString,bool)"), self.edit_script) self.connect(self.interpreter.widget_proxy, SIGNAL("wait_input(QString)"), self.wait_input) if self.multithreaded: self.interpreter.start() # Interpreter banner banner = create_banner(self.message) self.write(banner, prompt=True) # Initial commands for cmd in self.commands: self.run_command(cmd, history=False, new_prompt=False) # First prompt self.new_prompt(self.interpreter.p1) self.emit(SIGNAL("refresh()")) return self.interpreter def exit_interpreter(self): """Exit interpreter""" self.interpreter.exit_flag = True if self.multithreaded: self.interpreter.stdin_write.write(to_binary_string('\n')) self.interpreter.restore_stds() def edit_script(self, filename, external_editor): filename = to_text_string(filename) if external_editor: self.external_editor(filename) else: self.parent().edit_script(filename) def stdout_avail(self): """Data is available in stdout, let's empty the queue and write it!""" data = self.interpreter.stdout_write.empty_queue() if data: self.write(data) def stderr_avail(self): """Data is available in stderr, let's empty the queue and write it!""" data = self.interpreter.stderr_write.empty_queue() if data: self.write(data, error=True) self.flush(error=True) #------Raw input support def wait_input(self, prompt=''): """Wait for input (raw_input support)""" self.new_prompt(prompt) self.setFocus() self.input_mode = True self.input_loop = QEventLoop() self.input_loop.exec_() self.input_loop = None def end_input(self, cmd): """End of wait_input mode""" self.input_mode = False self.input_loop.exit() self.interpreter.widget_proxy.end_input(cmd) #----- Menus, actions, ... def setup_context_menu(self): """Reimplement PythonShellWidget method""" PythonShellWidget.setup_context_menu(self) self.help_action = create_action(self, _("Help..."), icon=get_std_icon('DialogHelpButton'), triggered=self.help) self.menu.addAction(self.help_action) def help(self): """Help on Spyder console""" QMessageBox.about(self, _("Help"), """%s

%s
edit foobar.py

%s
xedit foobar.py

%s
run foobar.py

%s
clear x, y

%s
!ls

%s
object?

%s
result = oedit(object) """ % (_('Shell special commands:'), _('Internal editor:'), _('External editor:'), _('Run script:'), _('Remove references:'), _('System commands:'), _('Python help:'), _('GUI-based editor:'))) #------ External editing def open_with_external_spyder(self, text): """Load file in external Spyder's editor, if available This method is used only for embedded consoles (could also be useful if we ever implement the magic %edit command)""" match = get_error_match(to_text_string(text)) if match: fname, lnb = match.groups() builtins.open_in_spyder(fname, int(lnb)) def external_editor(self, filename, goto=-1): """Edit in an external editor Recommended: SciTE (e.g. to go to line where an error did occur)""" editor_path = CONF.get('internal_console', 'external_editor/path') goto_option = CONF.get('internal_console', 'external_editor/gotoline') try: if goto > 0 and goto_option: Popen(r'%s "%s" %s%d' % (editor_path, filename, goto_option, goto)) else: Popen(r'%s "%s"' % (editor_path, filename)) except OSError: self.write_error("External editor was not found:" " %s\n" % editor_path) #------ I/O def flush(self, error=False, prompt=False): """Reimplement ShellBaseWidget method""" PythonShellWidget.flush(self, error=error, prompt=prompt) if self.interrupted: self.interrupted = False raise KeyboardInterrupt #------ Clear terminal def clear_terminal(self): """Reimplement ShellBaseWidget method""" self.clear() self.new_prompt(self.interpreter.p2 if self.interpreter.more else self.interpreter.p1) #------ Keyboard events def on_enter(self, command): """on_enter""" if self.profile: # Simple profiling test t0 = time() for _ in range(10): self.execute_command(command) self.insert_text(u("\n<Δt>=%dms\n") % (1e2*(time()-t0))) self.new_prompt(self.interpreter.p1) else: self.execute_command(command) self.__flush_eventqueue() def keyPressEvent(self, event): """ Reimplement Qt Method Enhanced keypress event handler """ if self.preprocess_keyevent(event): # Event was accepted in self.preprocess_keyevent return self.postprocess_keyevent(event) def __flush_eventqueue(self): """Flush keyboard event queue""" while self.eventqueue: past_event = self.eventqueue.pop(0) self.postprocess_keyevent(past_event) #------ Command execution def keyboard_interrupt(self): """Simulate keyboard interrupt""" if self.multithreaded: self.interpreter.raise_keyboard_interrupt() else: if self.interpreter.more: self.write_error("\nKeyboardInterrupt\n") self.interpreter.more = False self.new_prompt(self.interpreter.p1) self.interpreter.resetbuffer() else: self.interrupted = True def execute_lines(self, lines): """ Execute a set of lines as multiple command lines: multiple lines of text to be executed as single commands """ for line in lines.splitlines(): stripped_line = line.strip() if stripped_line.startswith('#'): continue self.write(line+os.linesep, flush=True) self.execute_command(line+"\n") self.flush() def execute_command(self, cmd): """ Execute a command cmd: one-line command only, with '\n' at the end """ if self.input_mode: self.end_input(cmd) return if cmd.endswith('\n'): cmd = cmd[:-1] # cls command if cmd == 'cls': self.clear_terminal() return self.run_command(cmd) def run_command(self, cmd, history=True, new_prompt=True): """Run command in interpreter""" if not cmd: cmd = '' else: if history: self.add_to_history(cmd) self.interpreter.stdin_write.write(to_binary_string(cmd + '\n')) if not self.multithreaded: self.interpreter.run_line() self.emit(SIGNAL("refresh()")) #------ Code completion / Calltips def _eval(self, text): """Is text a valid object?""" return self.interpreter.eval(text) def get_dir(self, objtxt): """Return dir(object)""" obj, valid = self._eval(objtxt) if valid: return getobjdir(obj) def get_globals_keys(self): """Return shell globals() keys""" return list(self.interpreter.namespace.keys()) def get_cdlistdir(self): """Return shell current directory list dir""" return os.listdir(getcwd()) def iscallable(self, objtxt): """Is object callable?""" obj, valid = self._eval(objtxt) if valid: return callable(obj) def get_arglist(self, objtxt): """Get func/method argument list""" obj, valid = self._eval(objtxt) if valid: return getargtxt(obj) def get__doc__(self, objtxt): """Get object __doc__""" obj, valid = self._eval(objtxt) if valid: return obj.__doc__ def get_doc(self, objtxt): """Get object documentation dictionary""" obj, valid = self._eval(objtxt) if valid: return getdoc(obj) def get_source(self, objtxt): """Get object source""" obj, valid = self._eval(objtxt) if valid: return getsource(obj) def is_defined(self, objtxt, force_import=False): """Return True if object is defined""" return self.interpreter.is_defined(objtxt, force_import)