Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate ipyparallel-specific code #71

Merged
merged 5 commits into from
Feb 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ipykernel/codeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import warnings
warnings.warn("ipykernel.codeutil is deprecated. It has moved to ipyparallel.serialize", DeprecationWarning)

import sys
import types
try:
Expand Down
5 changes: 5 additions & 0 deletions ipykernel/datapub.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Publishing native (typically pickled) objects.
"""

import warnings
warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

Expand Down Expand Up @@ -54,5 +57,7 @@ def publish_data(data):
data : dict
The data to be published. Think of it as a namespace.
"""
warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)

from ipykernel.zmqshell import ZMQInteractiveShell
ZMQInteractiveShell.instance().data_pub.publish_data(data)
45 changes: 35 additions & 10 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from .comm import CommManager
from .kernelbase import Kernel as KernelBase
from .serialize import serialize_object, unpack_apply_message
from .zmqshell import ZMQInteractiveShell


Expand Down Expand Up @@ -51,8 +50,6 @@ def __init__(self, **kwargs):
self.shell.displayhook.topic = self._topic('execute_result')
self.shell.display_pub.session = self.session
self.shell.display_pub.pub_socket = self.iopub_socket
self.shell.data_pub.session = self.session
self.shell.data_pub.pub_socket = self.iopub_socket

# TMP - hack while developing
self.shell._reply_content = None
Expand Down Expand Up @@ -124,6 +121,33 @@ def set_parent(self, ident, parent):
super(IPythonKernel, self).set_parent(ident, parent)
self.shell.set_parent(parent)

def init_metadata(self, parent):
"""Initialize metadata.

Run at the beginning of each execution request.
"""
md = super(IPythonKernel, self).init_metadata(parent)
# FIXME: remove deprecated ipyparallel-specific code
# This is required for ipyparallel < 5.0
md.update({
'dependencies_met' : True,
'engine' : self.ident,
})
return md

def finish_metadata(self, parent, metadata, reply_content):
"""Finish populating metadata.

Run after completing an execution request.
"""
# FIXME: remove deprecated ipyparallel-specific code
# This is required by ipyparallel < 5.0
metadata['status'] = reply_content['status']
if reply_content['status'] == 'error' and reply_content['ename'] == 'UnmetDependency':
metadata['dependencies_met'] = False

return metadata

def _forward_input(self, allow_stdin=False):
"""Forward raw_input and getpass to the current frontend.

Expand Down Expand Up @@ -198,10 +222,11 @@ def do_execute(self, code, silent, store_history=True,
# runlines. We'll need to clean up this logic later.
if shell._reply_content is not None:
reply_content.update(shell._reply_content)
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
reply_content['engine_info'] = e_info
# reset after use
shell._reply_content = None
# FIXME: deprecate piece for ipyparallel:
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
reply_content['engine_info'] = e_info

if 'traceback' in reply_content:
self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
Expand Down Expand Up @@ -289,6 +314,7 @@ def do_is_complete(self, code):
return r

def do_apply(self, content, bufs, msg_id, reply_metadata):
from .serialize import serialize_object, unpack_apply_message
shell = self.shell
try:
working = shell.user_ns
Expand Down Expand Up @@ -328,18 +354,17 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
reply_content = {}
if shell._reply_content is not None:
reply_content.update(shell._reply_content)
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
reply_content['engine_info'] = e_info
# reset after use
shell._reply_content = None

# FIXME: deprecate piece for ipyparallel:
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
reply_content['engine_info'] = e_info

self.send_response(self.iopub_socket, u'error', reply_content,
ident=self._topic('error'))
self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
result_buf = []

if reply_content['ename'] == 'UnmetDependency':
reply_metadata['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}

Expand Down
119 changes: 65 additions & 54 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,29 @@ def _ident_default(self):
# Track execution count here. For IPython, we override this to use the
# execution count we store in the shell.
execution_count = 0


msg_types = [
'execute_request', 'complete_request',
'inspect_request', 'history_request',
'comm_info_request', 'kernel_info_request',
'connect_request', 'shutdown_request',
'is_complete_request',
# deprecated:
'apply_request',
]
# add deprecated ipyparallel control messages
control_msg_types = msg_types + ['clear_request', 'abort_request']

def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)

# Build dict of handlers for message types
msg_types = [ 'execute_request', 'complete_request',
'inspect_request', 'history_request',
'comm_info_request', 'kernel_info_request',
'connect_request', 'shutdown_request',
'apply_request', 'is_complete_request',
]
self.shell_handlers = {}
for msg_type in msg_types:
for msg_type in self.msg_types:
self.shell_handlers[msg_type] = getattr(self, msg_type)

control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
self.control_handlers = {}
for msg_type in control_msg_types:
for msg_type in self.control_msg_types:
self.control_handlers[msg_type] = getattr(self, msg_type)


Expand Down Expand Up @@ -164,6 +168,25 @@ def dispatch_control(self, msg):
sys.stderr.flush()
self._publish_status(u'idle')

def should_handle(self, stream, msg, idents):
"""Check whether a shell-channel message should be handled

Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only affects messages on the shell channel - it's not checked for the control channel. I think that's probably what we want, but maybe it should be mentioned in the docstring.

"""
msg_id = msg['header']['msg_id']
if msg_id in self.aborted:
msg_type = msg['header']['msg_type']
# is it safe to assume a msg_id will not be resubmitted?
self.aborted.remove(msg_id)
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
md = {'engine' : self.ident}
md.update(status)
self.session.send(stream, reply_type, metadata=md,
content=status, parent=msg, ident=idents)
return False
return True

def dispatch_shell(self, stream, msg):
"""dispatch shell requests"""
# flush control requests first
Expand Down Expand Up @@ -191,15 +214,7 @@ def dispatch_shell(self, stream, msg):
self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
self.log.debug(' Content: %s\n --->\n ', msg['content'])

if msg_id in self.aborted:
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
md = {'engine' : self.ident}
md.update(status)
self.session.send(stream, reply_type, metadata=md,
content=status, parent=msg, ident=idents)
if not self.should_handle(stream, msg, idents):
return

handler = self.shell_handlers.get(msg_type, None)
Expand Down Expand Up @@ -288,17 +303,6 @@ def record_ports(self, ports):
# Kernel request handlers
#---------------------------------------------------------------------------

def _make_metadata(self, other=None):
"""init metadata dict, for execute/apply_reply"""
new_md = {
'dependencies_met' : True,
'engine' : self.ident,
'started': datetime.now(),
}
if other:
new_md.update(other)
return new_md

def _publish_execute_input(self, code, parent, execution_count):
"""Publish the code request on the iopub stream."""

Expand Down Expand Up @@ -340,6 +344,22 @@ def send_response(self, stream, msg_or_type, content=None, ident=None,
"""
return self.session.send(stream, msg_or_type, content, self._parent_header,
ident, buffers, track, header, metadata)

def init_metadata(self, parent):
"""Initialize metadata.

Run at the beginning of execution requests.
"""
return {
'started': datetime.now(),
}

def finish_metadata(self, parent, metadata, reply_content):
"""Finish populating metadata.

Run after completing an execution request.
"""
return metadata

def execute_request(self, stream, ident, parent):
"""handle an execute_request"""
Expand All @@ -358,7 +378,7 @@ def execute_request(self, stream, ident, parent):

stop_on_error = content.get('stop_on_error', True)

md = self._make_metadata(parent['metadata'])
metadata = self.init_metadata(parent)

# Re-broadcast our input for the benefit of listening clients, and
# start computing output
Expand All @@ -380,14 +400,10 @@ def execute_request(self, stream, ident, parent):

# Send the reply.
reply_content = json_clean(reply_content)

md['status'] = reply_content['status']
if reply_content['status'] == 'error' and \
reply_content['ename'] == 'UnmetDependency':
md['dependencies_met'] = False
metadata = self.finish_metadata(parent, metadata, reply_content)

reply_msg = self.session.send(stream, u'execute_reply',
reply_content, parent, metadata=md,
reply_content, parent, metadata=metadata,
ident=ident)

self.log.debug("%s", reply_msg)
Expand Down Expand Up @@ -532,10 +548,11 @@ def do_is_complete(self, code):
}

#---------------------------------------------------------------------------
# Engine methods
# Engine methods (DEPRECATED)
#---------------------------------------------------------------------------

def apply_request(self, stream, ident, parent):
self.log.warn("""apply_request is deprecated in kernel_base, moving to ipyparallel.""")
try:
content = parent[u'content']
bufs = parent[u'buffers']
Expand All @@ -544,31 +561,30 @@ def apply_request(self, stream, ident, parent):
self.log.error("Got bad msg: %s", parent, exc_info=True)
return

md = self._make_metadata(parent['metadata'])
md = self.init_metadata(parent)

reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)

# put 'ok'/'error' status in header, for scheduler introspection:
md['status'] = reply_content['status']


# flush i/o
sys.stdout.flush()
sys.stderr.flush()

md = self.finish_metadata(parent, md, reply_content)

self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, metadata=md)

def do_apply(self, content, bufs, msg_id, reply_metadata):
"""Override in subclasses to support the IPython parallel framework.
"""
"""DEPRECATED"""
raise NotImplementedError

#---------------------------------------------------------------------------
# Control messages
# Control messages (DEPRECATED)
#---------------------------------------------------------------------------

def abort_request(self, stream, ident, parent):
"""abort a specific msg by id"""
self.log.warn("abort_request is deprecated in kernel_base. It os only part of IPython parallel")
msg_ids = parent['content'].get('msg_ids', None)
if isinstance(msg_ids, string_types):
msg_ids = [msg_ids]
Expand All @@ -584,15 +600,13 @@ def abort_request(self, stream, ident, parent):

def clear_request(self, stream, idents, parent):
"""Clear our namespace."""
self.log.warn("clear_request is deprecated in kernel_base. It os only part of IPython parallel")
content = self.do_clear()
self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
content = content)

def do_clear(self):
"""Override in subclasses to clear the namespace

This is only required for IPython.parallel.
"""
"""DEPRECATED"""
raise NotImplementedError

#---------------------------------------------------------------------------
Expand All @@ -601,10 +615,7 @@ def do_clear(self):

def _topic(self, topic):
"""prefixed topic for IOPub messages"""
if self.int_id >= 0:
base = "engine.%i" % self.int_id
else:
base = "kernel.%s" % self.ident
base = "kernel.%s" % self.ident

return py3compat.cast_bytes("%s.%s" % (base, topic))

Expand Down
3 changes: 3 additions & 0 deletions ipykernel/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from zmq.log.handlers import PUBHandler

import warnings
warnings.warn("ipykernel.log is deprecated. It has moved to ipyparallel.engine.log", DeprecationWarning)

class EnginePUBHandler(PUBHandler):
"""A simple PUBHandler subclass that sets root_topic"""
engine=None
Expand Down
3 changes: 3 additions & 0 deletions ipykernel/pickleutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import warnings
warnings.warn("ipykernel.pickleutil is deprecated. It has moved to ipyparallel.", DeprecationWarning)

import copy
import logging
import sys
Expand Down
3 changes: 3 additions & 0 deletions ipykernel/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import warnings
warnings.warn("ipykernel.serialize is deprecated. It has moved to ipyparallel.serialize", DeprecationWarning)

try:
import cPickle
pickle = cPickle
Expand Down
Loading