How do I setup a D-Bus Tube?

Let's first look at what conceptually happens to make activity sharing work. The diagram below shows two instances of the same activity: the "Sharing Activity" and the "Joining Activity". The sharing activity is the activity that is initially run and shared with other buddies. The "Joining Activity" refers to other instances of the activity that are created once buddies decide to join an activity that has been shared. You can allow an activity to be shared (making it a sharing activity) by including the standard Sugar Activity Toolbar.

 

Once the user decides to share his activity with others, that activity becomes the sharing activity. The activity will receive a "shared" signal from the connection manager indicating the activity has been shared. The sharing activity must then save information about the channel on which the tube will exist and call the OfferDBusTube() method to offer a tube that can be accessed by other XOs that join the activity. The process is similar for XOs that join an activity except they will need to call the ListTubes() method instead of OfferDBusTubes() to find a tube that has been offered by the sharing activity.

Once a tube has been set up by the underlying connection manager, both the sharing and joining XOs get a "NewTube" signal. Upon receiving this signal, each tube can instantiate a new Tube class that has been designed and implemented for the needs of each specific activity. The code example below shows how all of the steps above are actually accomplished (only the code directly relevant to tube setup has been included).

...
import telepathy
from dbus.service import method, signal
from dbus.gobject_service import ExportedGObject
from sugar.presence import presenceservice
from sugar.presence.tubeconn import TubeConnection
from sugar import profile
...

SERVICE = "org.laptop.Sample"
IFACE = SERVICE
PATH = "/org/laptop/Sample"

class SampleActivity(activity.Activity):
    
    _ps = presenceservice.get_instance()
    
    ############################ METHODS INVOLVED IN TUBE SETUP ##########################################

    #### Method: __init__, initialize this SampleActivity instance
    def __init__(self, handle):
        activity.Activity.__init__(self, handle)
        ...
        #When you initialize your activity, wait for receipt of "joined" or "shared" signals
        #and also keep a variable called self.initiating to indicating whether this instance
        #is the sharing or joining activity. 
        self.initiating = None #indicates whether this instance initiated sharing. 
        self.connect('shared', self._shared_cb)
        self.connect('joined', self._joined_cb)


    #### Method self._shared_cb, which is called whenever this activity has been successfully 
    # shared with other XOs on the mesh
    def _shared_cb(self, activity):
        
        #Ensure that this activity is indeed being shared
        if self._shared_activity is None:
            _logger.error("Failed to share or join activity ... _shared_activity is null in _shared_cb()")
            return

        #This activity initiated sharing, so set self.initiating to be true
        self.initiating = True
 
        #Save information about the telepathy connection, telepathy tubes channel and the text channel
        #associated with this shared activity
        self.conn = self._shared_activity.telepathy_conn
        self.tubes_chan = self._shared_activity.telepathy_tubes_chan
        self.text_chan = self._shared_activity.telepathy_text_chan

        #Set up a callback for when we receive the "NewTube" signal later on
        self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal(
             'NewTube', self._new_tube_cb)

        #Call the OfferDBusTubes() method 
        _id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube(SERVICE, {})

    #### Method self._joined_cb, which is called whenever this XO has successfully joined another
    # activity. 
    def _joined_cb(self, activity):

        #Ensure that this activity is indeed being shared
        if self._shared_activity is None:
            _logger.error("Failed to share or join activity ... _shared_activity is null in _shared_cb()")
            return

        #This activity joined an existing shared activity, so set self.initiating to False
        self.initiating = False

        #Save information about the telepathy connection, telepathy tubes channel and the text channel
        #associated with this shared activity
        self.conn = self._shared_activity.telepathy_conn
        self.tubes_chan = self._shared_activity.telepathy_tubes_chan
        self.text_chan = self._shared_activity.telepathy_text_chan

        #Set up a callback for when we receive the "NewTube" signal later on
        self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal(
             'NewTube', self._new_tube_cb)

        #For joining activities, call ListTubes and connect to a callback once a set 
        #shared tubes have been found (list_tubes_reply_cb). 
        self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(reply_handler=self._list_tubes_reply_cb, error_handler=self._list_tubes_error_cb)


    #### Method _list_tubes_reply_cb, which is called once ListTubes successfully finds
    # tubes available for a joining activity. 
    def _list_tubes_reply_cb(self, tubes):
        for tube_info in tubes:
            self._new_tube_cb(*tube_info)
            
    #### Method _list_tubes_error_cb, which is needed in case there was some error in ListTubes
    # for the joining activity. 
    def _list_tubes_error_cb(self, e):
        print "Error" + str(e)

    #### Method _new_tube_cb, which is called for both joining and sharing activities once a tube
    # is available in the underlying presence system for communication. Several parameters will be 
    # passed to this callback with information about the tube that has been set up. 
    def _new_tube_cb(self, id, initiator, type, service, params, state):
        
        if (type == telepathy.TUBE_TYPE_DBUS and service == SERVICE):
            if state == telepathy.TUBE_STATE_LOCAL_PENDING:
                #Accept the new tube that has been created 
                self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id)

            # The tube connection object gives a handle to the new tube. 
            tube_conn = TubeConnection(self.conn, self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES], id, group_iface=self.text_chan[telepathy.CHANNEL_INTERFACE_GROUP])

            # Using the handle saved above, create an instance of your own customized Tube wrapper that 
            # will handle sending and receiving text from the underlying tube conneciton (ChatTube is defined
            # at the end below this activity class). 
            self.chattube = ChatTube(tube_conn, self.initiating, self.text_received_cb)

    ...
    ... 
    ############################# METHODS USED ONCE TUBE IS SET UP ###################################
    
    #### Method text_received_cb, which is called once a ChatTube instance is set up and that class
    # receives any text over the tube. You will pass this method as an argument to ChatTube.__init__()
    # (see _new_tube_cb() method above). 
    def text_received_cb(self, text):
        self._chat += "\nSomeone Else" + ":: " + text + "\n"
        self._chat_buffer.set_text(self._chat)

    #### Method: _speak_cb, which is called whenever user decides to send whatever chat text
    # he has written in self._chat_input to the public room. 
    def _speak_cb(self, widget, entry):
        nick = profile.get_nick_name()
        nick = nick.upper()
        self._chat += "\n" + nick + ":: " + entry.get_text() + "\n"
        self._chat_buffer.set_text(self._chat)
        if self.chattube is not None:
            self.chattube.SendText(entry.get_text())
        entry.set_text("")
        

######################## CHATTUBE CLASS - WRAPS LOGIC FOR TUBE COMMUNICATION #####################
class ChatTube(ExportedGObject):
    #### Method __init__, which sets up a new ChatTube instance given the underlying tube connection 
    # object (tube). 
    def __init__(self, tube, is_initiator, text_received_cb):
        super(ChatTube, self).__init__(tube, PATH)
        self.tube = tube
        self.is_initiator = is_initiator #is this the sharing or joining activity?
        self.text_received_cb = text_received_cb #callback in main activity when text is received
        self.text = ''
 
        #The sendtext_cb method is called once someone else has sent text to this end of the tube. 
        self.tube.add_signal_receiver(self.sendtext_cb, 'SendText', IFACE, path=PATH, sender_keyword='sender')

    #### Method sendtext_cb, which is called once this tube receives text that is
    # sent by someone else
    def sendtext_cb(self, text, sender=None):
        if sender == self.tube.get_unique_name():
            return
        self.text = text
        self.text_received_cb(text)

    #### Method SendText, which uses a DBus signal to actually send text to others over the tube. 
    @signal(dbus_interface=IFACE, signature='s')
    def SendText(self, text):
        self.text = text

How is data shared between activities through a D-Bus tube?

Communication in Sugar is achieved through several layers of technologies, including Sugar's own Presence Service, the D-Bus service for process communication, and various implementations of the telepathy system used for real time conversations. In order to get activity communication working between activities, your goal is to ultimately create a D-Bus tube that will handle text messaging between activities (there are also stream tubes which are discussed elsewhere).

The figure below shows a simplified conceptualization of what happens after you set up a D-Bus tube correctly.

 

The activity code for each XO should handle sending text to the tube and also receiving text through a callback method that is supplied to the tube. The Activity developer will also need to define the Tube Object itself, which should be designed to send and receive text appropriately based on the type of sharing that is desired.

Let's consider a simple, but concrete example of how a D-Bus tube is used (assuming it has already been setup correctly). The code below is from a chat application where users type text and send it to others who are sharing the activity. Only the minimal code needed to send and receive text is included for clarity. If you want to know how the tube is set up from start to finish, then read the section on setting up a D-Bus tube. The comments should make clear what code does what in this example.


import telepathy
from dbus.service import method, signal
from dbus.gobject_service import ExportedGObject
from sugar.presence import presenceservice
from sugar.presence.tubeconn import TubeConnection
...
SERVICE = "org.laptop.Sample"
IFACE = SERVICE
PATH = "/org/laptop/Sample"

############## MAIN ACTIVITY CLASS ####################
class SampleActivity(activity.Activity):
    ...
    # Callback method for text received from tube -- update activity state
    def text_received_cb(self, text):
        self._chat += "\nSomeone Else Said" + ":: " + text + "\n"
        self._chat_buffer.set_text(self._chat)

    #### Method: _speak_cb, which is called whenever user decides to send whatever chat text
    # he has written in self._chat_input to the public. 
    def _speak_cb(self, widget, entry):
        nick = profile.get_nick_name()
        nick = nick.upper()
        self._chat += "\n" + nick + ":: " + entry.get_text() + "\n"
        self._chat_buffer.set_text(self._chat)
        if self.chattube is not None:
            self.chattube.SendText(entry.get_text())
        entry.set_text("")

     ...


###################### TUBE CLASS ######################
class ChatTube(ExportedGObject):
    
    def __init__(self, tube, is_initiator, text_received_cb):
        super(ChatTube, self).__init__(tube, PATH)
        ...
        self.text_received_cb = text_received_cb
        self.text = ''
        self.tube.add_signal_receiver(self.sendtext_cb, 'SendText', IFACE, path=PATH, sender_keyword='sender')
        ...        

    # This method is called when the XO receives some text through the D-bus tube
    def sendtext_cb(self, text, sender=None):
        # Ignore any text that this XO sent to itself. 
        if sender == self.tube.get_unique_name():
            return

        self.text = text
        self.text_received_cb(text)

    # This method is used to actually send text to all other XO's who are sharing. 
    @signal(dbus_interface=IFACE, signature='s')
    def SendText(self, text):
        self.text = text
 

How do I set up a simple stream tube that can send data one-way between two instances of an activity?

As with D-Bus tubes, there are a series of coordinated steps that must happen between a sharing and joining activity before data can be shared over a tube. The example we discuss here specifically concerns sharing data one way - from a "sharing" activity to a "joining" activity. Stream tubes are especially useful if you want to share non-text data or if you want to take advantage of communication protocols like TCP/IP and UDP to transfer data between instances of an activity running on separate XOs.

Step 1: Understand how to architect your stream tubes to achieve the goals of your activity.

Before trying to create any working code, you should understand exactly what you want your stream tube to do - when and how will sharing of data occur over the tube during the life of your activity? The example we discuss below is predicated on a specific model of communication.

In particular, the code we use in this section is adapted from the Read activity, but it has been incorporated in to an example activity called "Annotate". In activities like Read or Annotate, the goal is to set up a one way communication between the "sharing" activity and the "joining" activity so that the joining activity can download and take part in whatever document is being edited by the sharer. The document to be sent over the stream tube is shared immediately upon startup and requires no user action to initiate it.

Given such a paradigm for using our stream tube, we can draw a rough picture of what our communication architecture will look like in this case:

 

Notice that the stream tube exists as a complement to the socket architecture used by Unix systems to facilitate internet communication.

Step 2: Define or identify the classes you will use to serve data and receive data through your stream tube.

In our case, we want the sharing activity to behave like a one-time server while the joining activity is a client that downloads any document it needs on startup. To accomplish serving, we create the following two classes:

...
import telepathy
from dbus.service import method, signal
from dbus.gobject_service import ExportedGObject
from sugar.presence import presenceservice
from sugar.presence.tubeconn import TubeConnection
from sugar import network
...

SERVICE = "org.laptop.Annotate"
IFACE = SERVICE
PATH = "/org/laptop/Annotate"


##########################################################################
REQUESTHANDLER AND HTTPSERVER WORK TOGETHER TO SERVE DATA OVER STREAM TUBE 
##########################################################################
class AnnotateHTTPRequestHandler(network.ChunkedGlibHTTPRequestHandler):
    #### Method translate_path, which, for our simple activity, just returns the same
    # filepath for a document that will be shared. 
    def translate_path(self, path):
        return self.server._filepath

class AnnotateHTTPServer(network.GlibTCPServer):
    def __init__(self, server_address, filepath):
        self._filepath = filepath
        network.GlibTCPServer.__init__(self, server_address, AnnotateHTTPRequestHandler)

In the example code above, most of the work is done by the superclasses defined in sugar.network, namely ChunkedGlibHTTPRequestHandler and GLibTCPServer. However, if you want some custom behavior in how data is served, then it is a good idea to subclass those or other Server and RequestHandler classes.

On the client side, there must be a class that handles receiving data from a server. In Annotate, this is accomplished with the standard sugar.network.GlibURLDownloader class.


Step 3: Coordinate sharing and joining in your activity using stream tubes and the client/server classes defined above.

ANNOTATE_STREAM_SERVICE = 'annotate-activity-http'

class AnnotateActivity(activity.Activity):

    #### Method: __init__, initialize this AnnotateActivity instance
    def __init__(self, handle):

        ...


        #Joining activity needs to know which stream tubes are available for downloading 
        #and if it still needs to download document. 
        self.unused_download_tubes = set() 
        self._want_document = True

        #Set the port number on which sharing activity will serve data
        h = hash(self._activity_id)
        self.port = 1024 + (h % 64511)

        self.connect('shared', self._shared_cb)
        if self._shared_activity:
            # We're joining
            if self.get_shared():
                # Already joined for some reason, just get the document
                self._joined_cb(self)
            else:
                # Wait for a successful join before trying to get the document
                self.connect("joined", self._joined_cb)

        ...

    ######################## SETUP SERVER ON SHARING ACTIVITY ####################################

    #### Method _shared_cb, which is called when this activity is successfully 
    # shared on a mesh channel. 
    def _shared_cb(self, activity):
        # We initiated this activity and have now shared it, so by
        # definition we have the file.
        _logger.debug('SYSTEM:', '_shared_cb(): Activity being shared')
        self._want_document = False;
        self._share_document()

    #### Method: _share_document, which sets up an http server that will serve a specific file to
    # any joining activities. 
    def _share_document(self):
        _logger.debug('SYSTEM', '_share_document() -- sharing file ' + str(object_path))
        object_path = os.path.join(self.get_activity_root(), 'data', 'sample-shared-file.doc')

        _logger.debug('SYSTEM', '_share_document() -- Starting HTTP server on port '+str(self.port))
        self._fileserver = AnnotateHTTPServer(("", self.port), object_path)

        # Make a tube for the server
        chan = self._shared_activity.telepathy_tubes_chan
        iface = chan[telepathy.CHANNEL_TYPE_TUBES]
        self._fileserver_tube_id = iface.OfferStreamTube(ANNOTATE_STREAM_SERVICE,
                {},
                telepathy.SOCKET_ADDRESS_TYPE_IPV4,
                ('127.0.0.1', dbus.UInt16(self.port)),
                telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0)


    ################### COORDINATE TUBE CREATION AND DOWNLOAD IN JOINING ACTIVITY  ##################

    #### Method _joined_cb, which is called when this activity joins another
    # instance
    def _joined_cb(self, also_self):
        self._update_chat_text('SYSTEM', '_joined_cb()')
        self.watch_for_tubes()
        self._want_document = True;
        gobject.idle_add(self._get_document)

    #### Method watch_for_tubes, which sets up a callback to _new_tube_cb once
    # a stream tube is made available. 
    def watch_for_tubes(self):
        self._update_chat_text('SYSTEM', '_watch_for_tubes()')
        tubes_chan = self._shared_activity.telepathy_tubes_chan

        tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube',
            self._new_tube_cb)
        tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(
            reply_handler=self._list_tubes_reply_cb,
            error_handler=self._list_tubes_error_cb)

    def _list_tubes_reply_cb(self, tubes):
        for tube_info in tubes:
            self._new_tube_cb(*tube_info)

    def _list_tubes_error_cb(self, e):
        self._update_chat_text('System', '_list_tubes_error_cb(): ListTubes() failed: '+str(e))

    #### Method _new_tube_cb, which is called once a stream tube is available 
    # to download from
    def _new_tube_cb(self, tube_id, initiator, tube_type, service, params,
                     state):
        self._update_chat_text('SYSTEM', '_new_tube_cb() -- ID:' + str(tube_id) + '; Initiator: ' + str(initiator) + '; tube_type: ' + str(tube_type) + '; Service: ' + str(service))

        # If the available tube is the stream tube we set up in sharing activity, then 
        # let's download from it. 
        if service == ANNOTATE_STREAM_SERVICE:
            self._update_chat_text('SYSTEM', 'I could download from that tube')
            
            # Add the newly found stream tube to the available tubes we can download from
            self.unused_download_tubes.add(tube_id)
            
            # if no download is in progress, let's fetch the document
            if self._want_document:
                gobject.idle_add(self._get_document)


    #### Method _get_document, which sets this activity instance up to start downloading
    # the document from the sharing activity. It is called once a stream tube has been 
    # obtained and saved in self.unused_download_tubes. 
    def _get_document(self):
        self._update_chat_text('SYSTEM', '_get_document()')
        if not self._want_document:
            return False

        # Assign a file path to download to -- where downloaded doc will be saved. 
        path = os.path.join(self.get_activity_root(), 'instance',
                                '%i' % time.time())

        # Pick an available tube we can try to download the document from
        try:
            tube_id = self.unused_download_tubes.pop()
        except (ValueError, KeyError), e:
            self._update_chat_text('System', 'No tubes to get the document from right now: '+str(e))
            return False

        # Avoid trying to download the document multiple times at once
        self._want_document = False
        gobject.idle_add(self._download_document, tube_id, path)
        return False


    #### Method _download_result_cb, which is called once downloading is complete. 
    def _download_result_cb(self, getter, tempfile, suggested_name, tube_id):
        self._update_chat_text('SYSTEM', '_download_result_cb() -- tempfile = ' + tempfile)
        del self.unused_download_tubes

        self._update_chat_text('SYSTEM', "Got document %s (%s) from tube %u",
                      tempfile, suggested_name, tube_id)
        self.save()

    #### Method _download_progress_cb, which is called as the file is being downloaded. 
    def _download_progress_cb(self, getter, bytes_downloaded, tube_id):
        # FIXME: signal the expected size somehow, so we can draw a progress
        # bar
        self._update_chat_text("Downloaded " + str(bytes_downloaded) + " bytes from tube " + str(tube_id))

    #### Method _download_error_cb, which is called if there was an error downloading. 
    def _download_error_cb(self, getter, err, tube_id):
        _logger.debug("Error getting document from tube %u: %s",
                      tube_id, err)
        self._want_document = True
        gobject.idle_add(self._get_document)

    
    def _download_document(self, tube_id, path):
        # FIXME: should ideally have the CM listen on a Unix socket
        # instead of IPv4 (might be more compatible with Rainbow)
        self._update_chat_text('SYSTEM', '_download_document() -- ' + path)
        chan = self._shared_activity.telepathy_tubes_chan
        iface = chan[telepathy.CHANNEL_TYPE_TUBES]
        addr = iface.AcceptStreamTube(tube_id,
                telepathy.SOCKET_ADDRESS_TYPE_IPV4,
                telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
                utf8_strings=True)
        _logger.debug('Accepted stream tube: listening address is %r', addr)
        # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
        assert isinstance(addr, dbus.Struct)
        assert len(addr) == 2
        assert isinstance(addr[0], str)
        assert isinstance(addr[1], (int, long))
        assert addr[1] > 0 and addr[1] < 65536
        port = int(addr[1])

        getter = network.GlibURLDownloader("http://%s:%d/document"
                                           % (addr[0], port))
        getter.connect("finished", self._download_result_cb, tube_id)
        getter.connect("progress", self._download_progress_cb, tube_id)
        getter.connect("error", self._download_error_cb, tube_id)
        self._update_chat_text('System', "Starting download to "+str(path))
        getter.start(path)
        return False

When downloading data from a stream tube, how can I show the download progress?

How do I control the file and path where data is saved when sharing through a stream tube?

Where do I get more information regarding sugar activity sharing and the technologies that support it?