Skip to content

Commit

Permalink
Merge pull request #61 from minrk/parallel-to-ipyparallel
Browse files Browse the repository at this point in the history
move ipyparallel-specific code to this package
  • Loading branch information
minrk committed Nov 3, 2015
2 parents 81d0319 + e98b6b3 commit 6e4c1f7
Show file tree
Hide file tree
Showing 21 changed files with 1,020 additions and 95 deletions.
42 changes: 36 additions & 6 deletions docs/source/development/messages.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Clients use the same socket as engines to start their connections. Connection re
from clients need no information:

Message type: ``connection_request``::

content = {}

The reply to a Client registration request contains the connection information for the
Expand Down Expand Up @@ -136,7 +136,7 @@ cannot handle number keys. The three keys of each dict are::

'completed' : messages submitted via any queue that ran on the engine
'queue' : jobs submitted via MUX queue, whose results have not been received
'tasks' : tasks that are known to have been submitted to the engine, but
'tasks' : tasks that are known to have been submitted to the engine, but
have not completed. Note that with the pure zmq scheduler, this will
always be 0/[].

Expand Down Expand Up @@ -166,7 +166,7 @@ Message type: ``result_request``::

The :func:`result_request` reply contains the content objects of the actual execution
reply messages. If `statusonly=True`, then there will be only the 'pending' and
'completed' lists.
'completed' lists.


Message type: ``result_reply``::
Expand All @@ -181,7 +181,7 @@ Message type: ``result_reply``::
'completed' : ['msg_id','...'], # list of completed msg_ids
}
buffers = ['bufs','...'] # the buffers that contained the results of the objects.
# this will be empty if no messages are complete, or if
# this will be empty if no messages are complete, or if
# statusonly is True.

For memory management purposes, Clients can also instruct the hub to forget the
Expand Down Expand Up @@ -223,7 +223,7 @@ The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``RO
sockets on either side. This allows the queue to relay individual messages to particular
targets via ``zmq.IDENTITY`` routing. The Task scheduler may be a MonitoredQueue ØMQ
device, in which case the client-facing socket is ``ROUTER``, and the engine-facing socket
is ``DEALER``. The result of this is that client-submitted messages are load-balanced via
is ``DEALER``. The result of this is that client-submitted messages are load-balanced via
the ``DEALER`` socket, but the engine's replies to each message go to the requesting client.

Raw ``DEALER`` scheduling is quite primitive, and doesn't allow message introspection, so
Expand Down Expand Up @@ -252,7 +252,7 @@ The `Namespace <http://gist.github.com/483294>`_ model suggests that execution b
use the model::

ns.apply(f, *args, **kwargs)

which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)``
on a remote engine, returning the result (or, for non-blocking, information facilitating
later retrieval of the result). This model, unlike the execute message which just uses a
Expand Down Expand Up @@ -291,6 +291,36 @@ Message type: ``apply_reply``::

All engine execution and data movement is performed via apply messages.

Raw Data Publication
********************

``display_data`` lets you publish *representations* of data, such as images and html.
This ``data_pub`` message lets you publish *actual raw data*, sent via message buffers.

data_pub messages are constructed via the :func:`ipyparallel.datapub.publish_data` function:

.. sourcecode:: python

from ipyparallel.datapub import publish_data
ns = dict(x=my_array)
publish_data(ns)


Message type: ``data_pub``::

content = {
# the keys of the data dict, after it has been unserialized
'keys' : ['a', 'b']
}
# the namespace dict will be serialized in the message buffers,
# which will have a length of at least one
buffers = [b'pdict', ...]


The interpretation of a sequence of data_pub messages for a given parent request should be
to update a single namespace with subsequent results.


Control Messages
----------------

Expand Down
2 changes: 1 addition & 1 deletion examples/Using Dill.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
"This is equivalent to\n",
"\n",
"```python\n",
"from ipykernel.pickleutil import use_dill\n",
"from ipyparallel.serialize import use_dill\n",
"use_dill()\n",
"rc[:].apply(use_dill)\n",
"```"
Expand Down
6 changes: 1 addition & 5 deletions ipyparallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@

from traitlets.config.configurable import MultipleInstanceError

from ipykernel.pickleutil import Reference, can_map


from ._version import version_info, __version__
from .serialize import *
from .client.asyncresult import *
from .client.client import Client
from .client.remotefunction import *
from .client.view import *
from .controller.dependency import *
from .error import *
from .util import interactive
from . import pickleutil

pickleutil.enable()

#-----------------------------------------------------------------------------
# Functions
Expand Down
20 changes: 3 additions & 17 deletions ipyparallel/apps/ipengineapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,10 @@
# encoding: utf-8
"""
The IPython engine application
Authors:
* Brian Granger
* MinRK
"""

#-----------------------------------------------------------------------------
# Copyright (C) 2008-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import json
import os
Expand All @@ -36,7 +22,6 @@
base_flags,
catch_config_error,
)
from ipykernel.log import EnginePUBHandler
from ipykernel.ipkernel import IPythonKernel as Kernel
from ipykernel.kernelapp import IPKernelApp
from jupyter_client.session import (
Expand All @@ -46,6 +31,7 @@

from traitlets.config.configurable import Configurable

from ipyparallel.engine.log import EnginePUBHandler
from ipyparallel.engine.engine import EngineFactory
from ipyparallel.util import disambiguate_ip_address

Expand Down
2 changes: 1 addition & 1 deletion ipyparallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from ipyparallel import util

from jupyter_client.session import Session
from ipykernel import serialize
from ipyparallel import serialize

from .asyncresult import AsyncResult, AsyncHubResult
from .view import DirectView, LoadBalancedView
Expand Down
16 changes: 8 additions & 8 deletions ipyparallel/client/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
from __future__ import print_function

import imp
import sys
import warnings
from contextlib import contextmanager
from types import ModuleType

import zmq

from ipykernel import pickleutil
from .. import serialize
from traitlets import (
HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
)
Expand Down Expand Up @@ -500,16 +498,18 @@ def use_dill(self):
adds support for closures, etc.
This calls ipykernel.pickleutil.use_dill() here and on each engine.
This calls ipyparallel.serialize.use_dill() here and on each engine.
"""
pickleutil.use_dill()
return self.apply(pickleutil.use_dill)
serialize.use_dill()
return self.apply(serialize.use_dill)

def use_cloudpickle(self):
"""Expand serialization support with cloudpickle.
This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.
"""
pickleutil.use_cloudpickle()
return self.apply(pickleutil.use_cloudpickle)
serialize.use_cloudpickle()
return self.apply(serialize.use_cloudpickle)


@sync_results
Expand Down
4 changes: 2 additions & 2 deletions ipyparallel/controller/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ipyparallel.util import interactive
from ipython_genutils import py3compat
from ipython_genutils.py3compat import string_types
from ipykernel.pickleutil import can, uncan
from ipyparallel.serialize import can, uncan

class depend(object):
"""Dependency decorator, for use with tasks.
Expand Down Expand Up @@ -81,7 +81,7 @@ def __name__(self):
def _require(*modules, **mapping):
"""Helper for @require decorator."""
from ipyparallel.error import UnmetDependency
from ipykernel.pickleutil import uncan
from ipyparallel.serialize import uncan
user_ns = globals()
for name in modules:
try:
Expand Down
1 change: 1 addition & 0 deletions ipyparallel/datapub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .engine.datapub import publish_data
55 changes: 55 additions & 0 deletions ipyparallel/engine/datapub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Publishing native (typically pickled) objects."""

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

from traitlets.config import Configurable
from traitlets import Instance, Dict, CBytes, Any
from ipykernel.jsonutil import json_clean
from ipyparallel.serialize import serialize_object
from jupyter_client.session import Session, extract_header

class ZMQDataPublisher(Configurable):

topic = topic = CBytes(b'datapub')
session = Instance(Session, allow_none=True)
pub_socket = Any(allow_none=True)
parent_header = Dict({})

def set_parent(self, parent):
"""Set the parent for outbound messages."""
self.parent_header = extract_header(parent)

def publish_data(self, data):
"""publish a data_message on the IOPub channel
Parameters
----------
data : dict
The data to be published. Think of it as a namespace.
"""
session = self.session
buffers = serialize_object(data,
buffer_threshold=session.buffer_threshold,
item_threshold=session.item_threshold,
)
content = json_clean(dict(keys=list(data.keys())))
session.send(self.pub_socket, 'data_message', content=content,
parent=self.parent_header,
buffers=buffers,
ident=self.topic,
)


def publish_data(data):
"""publish a data_message on the IOPub channel
Parameters
----------
data : dict
The data to be published. Think of it as a namespace.
"""
from ipykernel.zmqshell import ZMQInteractiveShell
ZMQInteractiveShell.instance().data_pub.publish_data(data)
4 changes: 2 additions & 2 deletions ipyparallel/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ipyparallel.factory import RegistrationFactory
from ipyparallel.util import disambiguate_url

from ipykernel.ipkernel import IPythonKernel as Kernel
from .kernel import IPythonParallelKernel as Kernel
from ipykernel.kernelapp import IPKernelApp

class EngineFactory(RegistrationFactory):
Expand Down Expand Up @@ -225,7 +225,7 @@ def url(key):
sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
sys.displayhook.topic = cast_bytes('engine.%i.execute_result' % self.id)

self.kernel = Kernel(parent=self, int_id=self.id, ident=self.ident, session=self.session,
self.kernel = Kernel(parent=self, engine_id=self.id, ident=self.ident, session=self.session,
control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
loop=loop, user_ns=self.user_ns, log=self.log)

Expand Down
Loading

0 comments on commit 6e4c1f7

Please sign in to comment.