Development Team/Almanac/Sugar.presence
How do I setup a D-Bus Tube?
Let's first look at what conceptually happens to make activity sharing work. The diagram below shows two instances of the same activity: the "Sharing Activity" and the "Joining Activity". The sharing activity is the activity that is initially run and shared with other buddies. The "Joining Activity" refers to other instances of the activity that are created once buddies decide to join an activity that has been shared. You can allow an activity to be shared (making it a sharing activity) by including the standard Sugar Activity Toolbar.
Once the user decides to share his activity with others, that activity becomes the sharing activity. The activity will receive a "shared" signal from the connection manager indicating the activity has been shared. The sharing activity must then save information about the channel on which the tube will exist and call the OfferDBusTube() method to offer a tube that can be accessed by other XOs that join the activity. The process is similar for XOs that join an activity except they will need to call the ListTubes() method instead of OfferDBusTubes() to find a tube that has been offered by the sharing activity.
Once a tube has been set up by the underlying connection manager, both the sharing and joining XOs get a "NewTube" signal. Upon receiving this signal, each tube can instantiate a new Tube class that has been designed and implemented for the needs of each specific activity. The code example below shows how all of the steps above are actually accomplished (only the code directly relevant to tube setup has been included).
... import telepathy from dbus.service import method, signal from dbus.gobject_service import ExportedGObject from sugar.presence import presenceservice from sugar.presence.tubeconn import TubeConnection from sugar import profile ... SERVICE = "org.laptop.Sample" IFACE = SERVICE PATH = "/org/laptop/Sample" class SampleActivity(activity.Activity): _ps = presenceservice.get_instance() ############################ METHODS INVOLVED IN TUBE SETUP ########################################## #### Method: __init__, initialize this SampleActivity instance def __init__(self, handle): activity.Activity.__init__(self, handle) ... #When you initialize your activity, wait for receipt of "joined" or "shared" signals #and also keep a variable called self.initiating to indicating whether this instance #is the sharing or joining activity. self.initiating = None #indicates whether this instance initiated sharing. self.connect('shared', self._shared_cb) self.connect('joined', self._joined_cb) #### Method self._shared_cb, which is called whenever this activity has been successfully # shared with other XOs on the mesh def _shared_cb(self, activity): #Ensure that this activity is indeed being shared if self._shared_activity is None: _logger.error("Failed to share or join activity ... _shared_activity is null in _shared_cb()") return #This activity initiated sharing, so set self.initiating to be true self.initiating = True #Save information about the telepathy connection, telepathy tubes channel and the text channel #associated with this shared activity self.conn = self._shared_activity.telepathy_conn self.tubes_chan = self._shared_activity.telepathy_tubes_chan self.text_chan = self._shared_activity.telepathy_text_chan #Set up a callback for when we receive the "NewTube" signal later on self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal( 'NewTube', self._new_tube_cb) #Call the OfferDBusTubes() method _id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube(SERVICE, {}) #### Method self._joined_cb, which is called whenever this XO has successfully joined another # activity. def _joined_cb(self, activity): #Ensure that this activity is indeed being shared if self._shared_activity is None: _logger.error("Failed to share or join activity ... _shared_activity is null in _shared_cb()") return #This activity joined an existing shared activity, so set self.initiating to False self.initiating = False #Save information about the telepathy connection, telepathy tubes channel and the text channel #associated with this shared activity self.conn = self._shared_activity.telepathy_conn self.tubes_chan = self._shared_activity.telepathy_tubes_chan self.text_chan = self._shared_activity.telepathy_text_chan #Set up a callback for when we receive the "NewTube" signal later on self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal( 'NewTube', self._new_tube_cb) #For joining activities, call ListTubes and connect to a callback once a set #shared tubes have been found (list_tubes_reply_cb). self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(reply_handler=self._list_tubes_reply_cb, error_handler=self._list_tubes_error_cb) #### Method _list_tubes_reply_cb, which is called once ListTubes successfully finds # tubes available for a joining activity. def _list_tubes_reply_cb(self, tubes): for tube_info in tubes: self._new_tube_cb(*tube_info) #### Method _list_tubes_error_cb, which is needed in case there was some error in ListTubes # for the joining activity. def _list_tubes_error_cb(self, e): print "Error" + str(e) #### Method _new_tube_cb, which is called for both joining and sharing activities once a tube # is available in the underlying presence system for communication. Several parameters will be # passed to this callback with information about the tube that has been set up. def _new_tube_cb(self, id, initiator, type, service, params, state): if (type == telepathy.TUBE_TYPE_DBUS and service == SERVICE): if state == telepathy.TUBE_STATE_LOCAL_PENDING: #Accept the new tube that has been created self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id) # The tube connection object gives a handle to the new tube. tube_conn = TubeConnection(self.conn, self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES], id, group_iface=self.text_chan[telepathy.CHANNEL_INTERFACE_GROUP]) # Using the handle saved above, create an instance of your own customized Tube wrapper that # will handle sending and receiving text from the underlying tube conneciton (ChatTube is defined # at the end below this activity class). self.chattube = ChatTube(tube_conn, self.initiating, self.text_received_cb) ... ... ############################# METHODS USED ONCE TUBE IS SET UP ################################### #### Method text_received_cb, which is called once a ChatTube instance is set up and that class # receives any text over the tube. You will pass this method as an argument to ChatTube.__init__() # (see _new_tube_cb() method above). def text_received_cb(self, text): self._chat += "\nSomeone Else" + ":: " + text + "\n" self._chat_buffer.set_text(self._chat) #### Method: _speak_cb, which is called whenever user decides to send whatever chat text # he has written in self._chat_input to the public room. def _speak_cb(self, widget, entry): nick = profile.get_nick_name() nick = nick.upper() self._chat += "\n" + nick + ":: " + entry.get_text() + "\n" self._chat_buffer.set_text(self._chat) if self.chattube is not None: self.chattube.SendText(entry.get_text()) entry.set_text("") ######################## CHATTUBE CLASS - WRAPS LOGIC FOR TUBE COMMUNICATION ##################### class ChatTube(ExportedGObject): #### Method __init__, which sets up a new ChatTube instance given the underlying tube connection # object (tube). def __init__(self, tube, is_initiator, text_received_cb): super(ChatTube, self).__init__(tube, PATH) self.tube = tube self.is_initiator = is_initiator #is this the sharing or joining activity? self.text_received_cb = text_received_cb #callback in main activity when text is received self.text = '' #The sendtext_cb method is called once someone else has sent text to this end of the tube. self.tube.add_signal_receiver(self.sendtext_cb, 'SendText', IFACE, path=PATH, sender_keyword='sender') #### Method sendtext_cb, which is called once this tube receives text that is # sent by someone else def sendtext_cb(self, text, sender=None): if sender == self.tube.get_unique_name(): return self.text = text self.text_received_cb(text) #### Method SendText, which uses a DBus signal to actually send text to others over the tube. @signal(dbus_interface=IFACE, signature='s') def SendText(self, text): self.text = text
Communication in Sugar is achieved through several layers of technologies, including Sugar's own Presence Service, the D-Bus service for process communication, and various implementations of the telepathy system used for real time conversations. In order to get activity communication working between activities, your goal is to ultimately create a D-Bus tube that will handle text messaging between activities (there are also stream tubes which are discussed elsewhere).
The figure below shows a simplified conceptualization of what happens after you set up a D-Bus tube correctly.
The activity code for each XO should handle sending text to the tube and also receiving text through a callback method that is supplied to the tube. The Activity developer will also need to define the Tube Object itself, which should be designed to send and receive text appropriately based on the type of sharing that is desired.
Let's consider a simple, but concrete example of how a D-Bus tube is used (assuming it has already been setup correctly). The code below is from a chat application where users type text and send it to others who are sharing the activity. Only the minimal code needed to send and receive text is included for clarity. If you want to know how the tube is set up from start to finish, then read the section on setting up a D-Bus tube. The comments should make clear what code does what in this example.
import telepathy from dbus.service import method, signal from dbus.gobject_service import ExportedGObject from sugar.presence import presenceservice from sugar.presence.tubeconn import TubeConnection ... SERVICE = "org.laptop.Sample" IFACE = SERVICE PATH = "/org/laptop/Sample" ############## MAIN ACTIVITY CLASS #################### class SampleActivity(activity.Activity): ... # Callback method for text received from tube -- update activity state def text_received_cb(self, text): self._chat += "\nSomeone Else Said" + ":: " + text + "\n" self._chat_buffer.set_text(self._chat) #### Method: _speak_cb, which is called whenever user decides to send whatever chat text # he has written in self._chat_input to the public. def _speak_cb(self, widget, entry): nick = profile.get_nick_name() nick = nick.upper() self._chat += "\n" + nick + ":: " + entry.get_text() + "\n" self._chat_buffer.set_text(self._chat) if self.chattube is not None: self.chattube.SendText(entry.get_text()) entry.set_text("") ... ###################### TUBE CLASS ###################### class ChatTube(ExportedGObject): def __init__(self, tube, is_initiator, text_received_cb): super(ChatTube, self).__init__(tube, PATH) ... self.text_received_cb = text_received_cb self.text = '' self.tube.add_signal_receiver(self.sendtext_cb, 'SendText', IFACE, path=PATH, sender_keyword='sender') ... # This method is called when the XO receives some text through the D-bus tube def sendtext_cb(self, text, sender=None): # Ignore any text that this XO sent to itself. if sender == self.tube.get_unique_name(): return self.text = text self.text_received_cb(text) # This method is used to actually send text to all other XO's who are sharing. @signal(dbus_interface=IFACE, signature='s') def SendText(self, text): self.text = text
How do I set up a simple stream tube that can send data one-way between two instances of an activity?
As with D-Bus tubes, there are a series of coordinated steps that must happen between a sharing and joining activity before data can be shared over a tube. The example we discuss here specifically concerns sharing data one way - from a "sharing" activity to a "joining" activity. Stream tubes are especially useful if you want to share non-text data or if you want to take advantage of communication protocols like TCP/IP and UDP to transfer data between instances of an activity running on separate XOs.
Step 1: Understand how to architect your stream tubes to achieve the goals of your activity.
Before trying to create any working code, you should understand exactly what you want your stream tube to do - when and how will sharing of data occur over the tube during the life of your activity? The example we discuss below is predicated on a specific model of communication.
In particular, the code we use in this section is adapted from the Read activity, but it has been incorporated in to an example activity called "Annotate". In activities like Read or Annotate, the goal is to set up a one way communication between the "sharing" activity and the "joining" activity so that the joining activity can download and take part in whatever document is being edited by the sharer. The document to be sent over the stream tube is shared immediately upon startup and requires no user action to initiate it.
Given such a paradigm for using our stream tube, we can draw a rough picture of what our communication architecture will look like in this case:
Notice that the stream tube exists as a complement to the socket architecture used by Unix systems to facilitate internet communication.
Step 2: Define or identify the classes you will use to serve data and receive data through your stream tube.
In our case, we want the sharing activity to behave like a one-time server while the joining activity is a client that downloads any document it needs on startup. To accomplish serving, we create the following two classes:
... import telepathy from dbus.service import method, signal from dbus.gobject_service import ExportedGObject from sugar.presence import presenceservice from sugar.presence.tubeconn import TubeConnection from sugar import network ... SERVICE = "org.laptop.Annotate" IFACE = SERVICE PATH = "/org/laptop/Annotate" ########################################################################## REQUESTHANDLER AND HTTPSERVER WORK TOGETHER TO SERVE DATA OVER STREAM TUBE ########################################################################## class AnnotateHTTPRequestHandler(network.ChunkedGlibHTTPRequestHandler): #### Method translate_path, which, for our simple activity, just returns the same # filepath for a document that will be shared. def translate_path(self, path): return self.server._filepath class AnnotateHTTPServer(network.GlibTCPServer): def __init__(self, server_address, filepath): self._filepath = filepath network.GlibTCPServer.__init__(self, server_address, AnnotateHTTPRequestHandler)
In the example code above, most of the work is done by the superclasses defined in sugar.network, namely ChunkedGlibHTTPRequestHandler and GLibTCPServer. However, if you want some custom behavior in how data is served, then it is a good idea to subclass those or other Server and RequestHandler classes.
On the client side, there must be a class that handles receiving data from a server. In Annotate, this is accomplished with the standard sugar.network.GlibURLDownloader class.
Step 3: Coordinate sharing and joining in your activity using stream tubes and the client/server classes defined above.
ANNOTATE_STREAM_SERVICE = 'annotate-activity-http' class AnnotateActivity(activity.Activity): #### Method: __init__, initialize this AnnotateActivity instance def __init__(self, handle): ... #Joining activity needs to know which stream tubes are available for downloading #and if it still needs to download document. self.unused_download_tubes = set() self._want_document = True #Set the port number on which sharing activity will serve data h = hash(self._activity_id) self.port = 1024 + (h % 64511) self.connect('shared', self._shared_cb) if self._shared_activity: # We're joining if self.get_shared(): # Already joined for some reason, just get the document self._joined_cb(self) else: # Wait for a successful join before trying to get the document self.connect("joined", self._joined_cb) ... ######################## SETUP SERVER ON SHARING ACTIVITY #################################### #### Method _shared_cb, which is called when this activity is successfully # shared on a mesh channel. def _shared_cb(self, activity): # We initiated this activity and have now shared it, so by # definition we have the file. _logger.debug('SYSTEM:', '_shared_cb(): Activity being shared') self._want_document = False; self._share_document() #### Method: _share_document, which sets up an http server that will serve a specific file to # any joining activities. def _share_document(self): _logger.debug('SYSTEM', '_share_document() -- sharing file ' + str(object_path)) object_path = os.path.join(self.get_activity_root(), 'data', 'sample-shared-file.doc') _logger.debug('SYSTEM', '_share_document() -- Starting HTTP server on port '+str(self.port)) self._fileserver = AnnotateHTTPServer(("", self.port), object_path) # Make a tube for the server chan = self._shared_activity.telepathy_tubes_chan iface = chan[telepathy.CHANNEL_TYPE_TUBES] self._fileserver_tube_id = iface.OfferStreamTube(ANNOTATE_STREAM_SERVICE, {}, telepathy.SOCKET_ADDRESS_TYPE_IPV4, ('127.0.0.1', dbus.UInt16(self.port)), telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0)
When downloading data from a stream tube, how can I show the download progress?
How do I control the file and path where data is saved when sharing through a stream tube?
Where do I get more information regarding sugar activity sharing and the technologies that support it?
- A brief tutorial on activity sharing for the OLPC laptop.