Skip to content

Commit

Permalink
Merge pull request #71 from minrk/deprecate-parallel
Browse files Browse the repository at this point in the history
Deprecate ipyparallel-specific code
  • Loading branch information
takluyver committed Feb 8, 2016
2 parents 6e67bfd + 07da04e commit 3cf2ab5
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 67 deletions.
3 changes: 3 additions & 0 deletions ipykernel/codeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import warnings
warnings.warn("ipykernel.codeutil is deprecated. It has moved to ipyparallel.serialize", DeprecationWarning)

import sys
import types
try:
Expand Down
5 changes: 5 additions & 0 deletions ipykernel/datapub.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Publishing native (typically pickled) objects.
"""

import warnings
warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

Expand Down Expand Up @@ -53,5 +56,7 @@ def publish_data(data):
data : dict
The data to be published. Think of it as a namespace.
"""
warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)

from ipykernel.zmqshell import ZMQInteractiveShell
ZMQInteractiveShell.instance().data_pub.publish_data(data)
45 changes: 35 additions & 10 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from .comm import CommManager
from .kernelbase import Kernel as KernelBase
from .serialize import serialize_object, unpack_apply_message
from .zmqshell import ZMQInteractiveShell


Expand Down Expand Up @@ -51,8 +50,6 @@ def __init__(self, **kwargs):
self.shell.displayhook.topic = self._topic('execute_result')
self.shell.display_pub.session = self.session
self.shell.display_pub.pub_socket = self.iopub_socket
self.shell.data_pub.session = self.session
self.shell.data_pub.pub_socket = self.iopub_socket

# TMP - hack while developing
self.shell._reply_content = None
Expand Down Expand Up @@ -124,6 +121,33 @@ def set_parent(self, ident, parent):
super(IPythonKernel, self).set_parent(ident, parent)
self.shell.set_parent(parent)

def init_metadata(self, parent):
"""Initialize metadata.
Run at the beginning of each execution request.
"""
md = super(IPythonKernel, self).init_metadata(parent)
# FIXME: remove deprecated ipyparallel-specific code
# This is required for ipyparallel < 5.0
md.update({
'dependencies_met' : True,
'engine' : self.ident,
})
return md

def finish_metadata(self, parent, metadata, reply_content):
"""Finish populating metadata.
Run after completing an execution request.
"""
# FIXME: remove deprecated ipyparallel-specific code
# This is required by ipyparallel < 5.0
metadata['status'] = reply_content['status']
if reply_content['status'] == 'error' and reply_content['ename'] == 'UnmetDependency':
metadata['dependencies_met'] = False

return metadata

def _forward_input(self, allow_stdin=False):
"""Forward raw_input and getpass to the current frontend.
Expand Down Expand Up @@ -198,10 +222,11 @@ def do_execute(self, code, silent, store_history=True,
# runlines. We'll need to clean up this logic later.
if shell._reply_content is not None:
reply_content.update(shell._reply_content)
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
reply_content['engine_info'] = e_info
# reset after use
shell._reply_content = None
# FIXME: deprecate piece for ipyparallel:
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
reply_content['engine_info'] = e_info

if 'traceback' in reply_content:
self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
Expand Down Expand Up @@ -289,6 +314,7 @@ def do_is_complete(self, code):
return r

def do_apply(self, content, bufs, msg_id, reply_metadata):
from .serialize import serialize_object, unpack_apply_message
shell = self.shell
try:
working = shell.user_ns
Expand Down Expand Up @@ -328,18 +354,17 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
reply_content = {}
if shell._reply_content is not None:
reply_content.update(shell._reply_content)
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
reply_content['engine_info'] = e_info
# reset after use
shell._reply_content = None

# FIXME: deprecate piece for ipyparallel:
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
reply_content['engine_info'] = e_info

self.send_response(self.iopub_socket, u'error', reply_content,
ident=self._topic('error'))
self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
result_buf = []

if reply_content['ename'] == 'UnmetDependency':
reply_metadata['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}

Expand Down
119 changes: 65 additions & 54 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,29 @@ def _ident_default(self):
# Track execution count here. For IPython, we override this to use the
# execution count we store in the shell.
execution_count = 0


msg_types = [
'execute_request', 'complete_request',
'inspect_request', 'history_request',
'comm_info_request', 'kernel_info_request',
'connect_request', 'shutdown_request',
'is_complete_request',
# deprecated:
'apply_request',
]
# add deprecated ipyparallel control messages
control_msg_types = msg_types + ['clear_request', 'abort_request']

def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)

# Build dict of handlers for message types
msg_types = [ 'execute_request', 'complete_request',
'inspect_request', 'history_request',
'comm_info_request', 'kernel_info_request',
'connect_request', 'shutdown_request',
'apply_request', 'is_complete_request',
]
self.shell_handlers = {}
for msg_type in msg_types:
for msg_type in self.msg_types:
self.shell_handlers[msg_type] = getattr(self, msg_type)

control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
self.control_handlers = {}
for msg_type in control_msg_types:
for msg_type in self.control_msg_types:
self.control_handlers[msg_type] = getattr(self, msg_type)


Expand Down Expand Up @@ -165,6 +169,25 @@ def dispatch_control(self, msg):
sys.stderr.flush()
self._publish_status(u'idle')

def should_handle(self, stream, msg, idents):
"""Check whether a shell-channel message should be handled
Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
"""
msg_id = msg['header']['msg_id']
if msg_id in self.aborted:
msg_type = msg['header']['msg_type']
# is it safe to assume a msg_id will not be resubmitted?
self.aborted.remove(msg_id)
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
md = {'engine' : self.ident}
md.update(status)
self.session.send(stream, reply_type, metadata=md,
content=status, parent=msg, ident=idents)
return False
return True

def dispatch_shell(self, stream, msg):
"""dispatch shell requests"""
# flush control requests first
Expand Down Expand Up @@ -192,15 +215,7 @@ def dispatch_shell(self, stream, msg):
self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
self.log.debug(' Content: %s\n --->\n ', msg['content'])

if msg_id in self.aborted:
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
md = {'engine' : self.ident}
md.update(status)
self.session.send(stream, reply_type, metadata=md,
content=status, parent=msg, ident=idents)
if not self.should_handle(stream, msg, idents):
return

handler = self.shell_handlers.get(msg_type, None)
Expand Down Expand Up @@ -289,17 +304,6 @@ def record_ports(self, ports):
# Kernel request handlers
#---------------------------------------------------------------------------

def _make_metadata(self, other=None):
"""init metadata dict, for execute/apply_reply"""
new_md = {
'dependencies_met' : True,
'engine' : self.ident,
'started': datetime.now(),
}
if other:
new_md.update(other)
return new_md

def _publish_execute_input(self, code, parent, execution_count):
"""Publish the code request on the iopub stream."""

Expand Down Expand Up @@ -341,6 +345,22 @@ def send_response(self, stream, msg_or_type, content=None, ident=None,
"""
return self.session.send(stream, msg_or_type, content, self._parent_header,
ident, buffers, track, header, metadata)

def init_metadata(self, parent):
"""Initialize metadata.
Run at the beginning of execution requests.
"""
return {
'started': datetime.now(),
}

def finish_metadata(self, parent, metadata, reply_content):
"""Finish populating metadata.
Run after completing an execution request.
"""
return metadata

def execute_request(self, stream, ident, parent):
"""handle an execute_request"""
Expand All @@ -359,7 +379,7 @@ def execute_request(self, stream, ident, parent):

stop_on_error = content.get('stop_on_error', True)

md = self._make_metadata(parent['metadata'])
metadata = self.init_metadata(parent)

# Re-broadcast our input for the benefit of listening clients, and
# start computing output
Expand All @@ -381,14 +401,10 @@ def execute_request(self, stream, ident, parent):

# Send the reply.
reply_content = json_clean(reply_content)

md['status'] = reply_content['status']
if reply_content['status'] == 'error' and \
reply_content['ename'] == 'UnmetDependency':
md['dependencies_met'] = False
metadata = self.finish_metadata(parent, metadata, reply_content)

reply_msg = self.session.send(stream, u'execute_reply',
reply_content, parent, metadata=md,
reply_content, parent, metadata=metadata,
ident=ident)

self.log.debug("%s", reply_msg)
Expand Down Expand Up @@ -533,10 +549,11 @@ def do_is_complete(self, code):
}

#---------------------------------------------------------------------------
# Engine methods
# Engine methods (DEPRECATED)
#---------------------------------------------------------------------------

def apply_request(self, stream, ident, parent):
self.log.warn("""apply_request is deprecated in kernel_base, moving to ipyparallel.""")
try:
content = parent[u'content']
bufs = parent[u'buffers']
Expand All @@ -545,31 +562,30 @@ def apply_request(self, stream, ident, parent):
self.log.error("Got bad msg: %s", parent, exc_info=True)
return

md = self._make_metadata(parent['metadata'])
md = self.init_metadata(parent)

reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)

# put 'ok'/'error' status in header, for scheduler introspection:
md['status'] = reply_content['status']


# flush i/o
sys.stdout.flush()
sys.stderr.flush()

md = self.finish_metadata(parent, md, reply_content)

self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, metadata=md)

def do_apply(self, content, bufs, msg_id, reply_metadata):
"""Override in subclasses to support the IPython parallel framework.
"""
"""DEPRECATED"""
raise NotImplementedError

#---------------------------------------------------------------------------
# Control messages
# Control messages (DEPRECATED)
#---------------------------------------------------------------------------

def abort_request(self, stream, ident, parent):
"""abort a specific msg by id"""
self.log.warn("abort_request is deprecated in kernel_base. It os only part of IPython parallel")
msg_ids = parent['content'].get('msg_ids', None)
if isinstance(msg_ids, string_types):
msg_ids = [msg_ids]
Expand All @@ -585,15 +601,13 @@ def abort_request(self, stream, ident, parent):

def clear_request(self, stream, idents, parent):
"""Clear our namespace."""
self.log.warn("clear_request is deprecated in kernel_base. It os only part of IPython parallel")
content = self.do_clear()
self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
content = content)

def do_clear(self):
"""Override in subclasses to clear the namespace
This is only required for IPython.parallel.
"""
"""DEPRECATED"""
raise NotImplementedError

#---------------------------------------------------------------------------
Expand All @@ -602,10 +616,7 @@ def do_clear(self):

def _topic(self, topic):
"""prefixed topic for IOPub messages"""
if self.int_id >= 0:
base = "engine.%i" % self.int_id
else:
base = "kernel.%s" % self.ident
base = "kernel.%s" % self.ident

return py3compat.cast_bytes("%s.%s" % (base, topic))

Expand Down
3 changes: 3 additions & 0 deletions ipykernel/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from zmq.log.handlers import PUBHandler

import warnings
warnings.warn("ipykernel.log is deprecated. It has moved to ipyparallel.engine.log", DeprecationWarning)

class EnginePUBHandler(PUBHandler):
"""A simple PUBHandler subclass that sets root_topic"""
engine=None
Expand Down
3 changes: 3 additions & 0 deletions ipykernel/pickleutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import warnings
warnings.warn("ipykernel.pickleutil is deprecated. It has moved to ipyparallel.", DeprecationWarning)

import copy
import logging
import sys
Expand Down
3 changes: 3 additions & 0 deletions ipykernel/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import warnings
warnings.warn("ipykernel.serialize is deprecated. It has moved to ipyparallel.serialize", DeprecationWarning)

try:
import cPickle
pickle = cPickle
Expand Down
Loading

0 comments on commit 3cf2ab5

Please sign in to comment.