Development Team/Almanac/Sugar.presence
How do I setup a D-Bus Tube?
Let's first look at what conceptually happens to make activity sharing work. The diagram below shows two instances of the same activity: the "Sharing Activity" and the "Joining Activity". The sharing activity is the activity that is initially run and shared with other buddies. The "Joining Activity" refers to other instances of the activity that are created once buddies decide to join an activity that has been shared. You can allow an activity to be shared (making it a sharing activity) by including the standard Sugar Activity Toolbar.
Once the user decides to share his activity with others, that activity becomes the sharing activity. The activity will receive a "shared" signal from the connection manager indicating the activity has been shared. The sharing activity must then save information about the channel on which the tube will exist and call the OfferDBusTube() method to offer a tube that can be accessed by other XOs that join the activity. The process is similar for XOs that join an activity except they will need to call the ListTubes() method instead of OfferDBusTubes() to find a tube that has been offered by the sharing activity.
Once a tube has been set up by the underlying connection manager, both the sharing and joining XOs get a "NewTube" signal. Upon receiving this signal, each tube can instantiate a new Tube class that has been designed and implemented for the needs of each specific activity. The code example below shows how all of the steps above are actually accomplished (only the code directly relevant to tube setup has been included).
... import telepathy from dbus.service import method, signal from dbus.gobject_service import ExportedGObject from sugar.presence import presenceservice from sugar.presence.tubeconn import TubeConnection from sugar import profile ... SERVICE = "org.laptop.Sample" IFACE = SERVICE PATH = "/org/laptop/Sample" class SampleActivity(activity.Activity): _ps = presenceservice.get_instance() ############################ METHODS INVOLVED IN TUBE SETUP ########################################## #### Method: __init__, initialize this SampleActivity instance def __init__(self, handle): activity.Activity.__init__(self, handle) ... #When you initialize your activity, wait for receipt of "joined" or "shared" signals #and also keep a variable called self.initiating to indicating whether this instance #is the sharing or joining activity. self.initiating = None #indicates whether this instance initiated sharing. self.connect('shared', self._shared_cb) self.connect('joined', self._joined_cb) #### Method self._shared_cb, which is called whenever this activity has been successfully # shared with other XOs on the mesh def _shared_cb(self, activity): #Ensure that this activity is indeed being shared if self._shared_activity is None: _logger.error("Failed to share or join activity ... _shared_activity is null in _shared_cb()") return #This activity initiated sharing, so set self.initiating to be true self.initiating = True #Save information about the telepathy connection, telepathy tubes channel and the text channel #associated with this shared activity self.conn = self._shared_activity.telepathy_conn self.tubes_chan = self._shared_activity.telepathy_tubes_chan self.text_chan = self._shared_activity.telepathy_text_chan #Set up a callback for when we receive the "NewTube" signal later on self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal( 'NewTube', self._new_tube_cb) #Call the OfferDBusTubes() method _id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube(SERVICE, {}) #### Method self._joined_cb, which is called whenever this XO has successfully joined another # activity. def _joined_cb(self, activity): #Ensure that this activity is indeed being shared if self._shared_activity is None: _logger.error("Failed to share or join activity ... _shared_activity is null in _shared_cb()") return #This activity joined an existing shared activity, so set self.initiating to False self.initiating = False #Save information about the telepathy connection, telepathy tubes channel and the text channel #associated with this shared activity self.conn = self._shared_activity.telepathy_conn self.tubes_chan = self._shared_activity.telepathy_tubes_chan self.text_chan = self._shared_activity.telepathy_text_chan #Set up a callback for when we receive the "NewTube" signal later on self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal( 'NewTube', self._new_tube_cb) #For joining activities, call ListTubes and connect to a callback once a set #shared tubes have been found (list_tubes_reply_cb). self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(reply_handler=self._list_tubes_reply_cb, error_handler=self._list_tubes_error_cb) #### Method _list_tubes_reply_cb, which is called once ListTubes successfully finds # tubes available for a joining activity. def _list_tubes_reply_cb(self, tubes): for tube_info in tubes: self._new_tube_cb(*tube_info) #### Method _list_tubes_error_cb, which is needed in case there was some error in ListTubes # for the joining activity. def _list_tubes_error_cb(self, e): print "Error" + str(e) #### Method _new_tube_cb, which is called for both joining and sharing activities once a tube # is available in the underlying presence system for communication. Several parameters will be # passed to this callback with information about the tube that has been set up. def _new_tube_cb(self, id, initiator, type, service, params, state): if type == telepathy.TUBE_TYPE_DBUS and service == SERVICE: if state == telepathy.TUBE_STATE_LOCAL_PENDING: #Accept the new tube that has been created self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id) # The tube connection object gives a handle to the new tube. tube_conn = TubeConnection(self.conn, self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES], id, group_iface=self.text_chan[telepathy.CHANNEL_INTERFACE_GROUP]) # Using the handle saved above, create an instance of your own customized Tube wrapper that # will handle sending and receiving text from the underlying tube conneciton (ChatTube is defined # at the end below this activity class). self.chattube = ChatTube(tube_conn, self.initiating, self.text_received_cb) ... ... ############################# METHODS USED ONCE TUBE IS SET UP ################################### #### Method text_received_cb, which is called once a ChatTube instance is set up and that class # receives any text over the tube. You will pass this method as an argument to ChatTube.__init__() # (see _new_tube_cb() method above). def text_received_cb(self, text): self._chat += "\nSomeone Else" + ":: " + text + "\n" self._chat_buffer.set_text(self._chat) #### Method: _speak_cb, which is called whenever user decides to send whatever chat text # he has written in self._chat_input to the public room. def _speak_cb(self, widget, entry): nick = profile.get_nick_name() nick = nick.upper() self._chat += "\n" + nick + ":: " + entry.get_text() + "\n" self._chat_buffer.set_text(self._chat) if self.chattube is not None: self.chattube.SendText(entry.get_text()) entry.set_text("") ######################## CHATTUBE CLASS - WRAPS LOGIC FOR TUBE COMMUNICATION ##################### class ChatTube(ExportedGObject): #### Method __init__, which sets up a new ChatTube instance given the underlying tube connection # object (tube). def __init__(self, tube, is_initiator, text_received_cb): super(ChatTube, self).__init__(tube, PATH) self.tube = tube self.is_initiator = is_initiator #is this the sharing or joining activity? self.text_received_cb = text_received_cb #callback in main activity when text is received self.text = '' #The sendtext_cb method is called once someone else has sent text to this end of the tube. self.tube.add_signal_receiver(self.sendtext_cb, 'SendText', IFACE, path=PATH, sender_keyword='sender') #### Method sendtext_cb, which is called once this tube receives text that is # sent by someone else def sendtext_cb(self, text, sender=None): if sender == self.tube.get_unique_name(): return self.text = text self.text_received_cb(text) #### Method SendText, which uses a DBus signal to actually send text to others over the tube. @signal(dbus_interface=IFACE, signature='s') def SendText(self, text): self.text = text
Communication in Sugar is achieved through several layers of technologies, including Sugar's own Presence Service, the D-Bus service for process communication, and various implementations of the telepathy system used for real time conversations. In order to get activity communication working between activities, your goal is to ultimately create a D-Bus tube that will handle text messaging between activities (there are also stream tubes which are discussed elsewhere).
The figure below shows a simplified conceptualization of what happens after you set up a D-Bus tube correctly.
The activity code for each XO should handle sending text to the tube and also receiving text through a callback method that is supplied to the tube. The Activity developer will also need to define the Tube Object itself, which should be designed to send and receive text appropriately based on the type of sharing that is desired.
Let's consider a simple, but concrete example of how a D-Bus tube is used (assuming it has already been setup correctly). The code below is from a chat application where users type text and send it to others who are sharing the activity. Only the minimal code needed to send and receive text is included for clarity. If you want to know how the tube is set up from start to finish, then read the section on setting up a D-Bus tube. The comments should make clear what code does what in this example.
import telepathy from dbus.service import method, signal from dbus.gobject_service import ExportedGObject from sugar.presence import presenceservice from sugar.presence.tubeconn import TubeConnection ... SERVICE = "org.laptop.Sample" IFACE = SERVICE PATH = "/org/laptop/Sample" ############## MAIN ACTIVITY CLASS #################### class SampleActivity(activity.Activity): ... # Callback method for text received from tube -- update activity state def text_received_cb(self, text): self._chat += "\nSomeone Else Said" + ":: " + text + "\n" self._chat_buffer.set_text(self._chat) #### Method: _speak_cb, which is called whenever user decides to send whatever chat text # he has written in self._chat_input to the public. def _speak_cb(self, widget, entry): nick = profile.get_nick_name() nick = nick.upper() self._chat += "\n" + nick + ":: " + entry.get_text() + "\n" self._chat_buffer.set_text(self._chat) if self.chattube is not None: self.chattube.SendText(entry.get_text()) entry.set_text("") ... ###################### TUBE CLASS ###################### class ChatTube(ExportedGObject): def __init__(self, tube, is_initiator, text_received_cb): super(ChatTube, self).__init__(tube, PATH) ... self.text_received_cb = text_received_cb self.text = '' self.tube.add_signal_receiver(self.sendtext_cb, 'SendText', IFACE, path=PATH, sender_keyword='sender') ... # This method is called when the XO receives some text through the D-bus tube def sendtext_cb(self, text, sender=None): # Ignore any text that this XO sent to itself. if sender == self.tube.get_unique_name(): return self.text = text self.text_received_cb(text) # This method is used to actually send text to all other XO's who are sharing. @signal(dbus_interface=IFACE, signature='s') def SendText(self, text): self.text = text
How do I set up a simple stream tube that can send data one-way between two instances of an activity?
As with D-Bus tubes, there are a series of coordinated steps that must happen between a sharing and joining activity before data can be shared over a tube. The example we discuss here specifically concerns sharing data one way - from a "sharing" activity to a "joining" activity. Stream tubes are especially useful if you want to share non-text data or if you want to take advantage of communication protocols like TCP/IP and UDP to transfer data between instances of an activity running on separate XOs.
Step 1: Understand how to architect your stream tubes to achieve the goals of your activity.
Before trying to create any working code, you should understand exactly what you want your stream tube to do - when and how will sharing of data occur over the tube during the life of your activity? The example we discuss below is predicated on a specific model of communication.
In particular, the code we use in this section is adapted from the Read activity, but it has been incorporated in to an example activity called "Annotate". In activities like Read or Annotate, the goal is to set up a one way communication between the "sharing" activity and the "joining" activity so that the joining activity can download and take part in whatever document is being edited by the sharer. The document to be sent over the stream tube is shared immediately upon startup and requires no user action to initiate it.
Given such a paradigm for using our stream tube, we can draw a rough picture of what our communication architecture will look like in this case:
Notice that the stream tube exists on top of the socket architecture used by Unix systems to facilitate internet communication.
Step 2: Define or identify the classes you will use to serve data and receive data through your stream tube.
In our case, we want the sharing activity to behave like a one-time server while the joining activity is a client that downloads any document it needs on startup. To accomplish serving, we create the following two classes:
... import telepathy from dbus.service import method, signal from dbus.gobject_service import ExportedGObject from sugar.presence import presenceservice from sugar.presence.tubeconn import TubeConnection from sugar import network ... SERVICE = "org.laptop.Annotate" IFACE = SERVICE PATH = "/org/laptop/Annotate" ########################################################################## REQUESTHANDLER AND HTTPSERVER WORK TOGETHER TO SERVE DATA OVER STREAM TUBE ########################################################################## class AnnotateHTTPRequestHandler(network.ChunkedGlibHTTPRequestHandler): #### Method translate_path, which, for our simple activity, just returns the same # filepath for a document that will be shared. def translate_path(self, path): return self.server._filepath class AnnotateHTTPServer(network.GlibTCPServer): def __init__(self, server_address, filepath): self._filepath = filepath network.GlibTCPServer.__init__(self, server_address, AnnotateHTTPRequestHandler)
In the example code above, most of the work is done by the superclasses defined in sugar.network, namely ChunkedGlibHTTPRequestHandler and GLibTCPServer. However, if you want some custom behavior in how data is served, then it is a good idea to subclass those or other Server and RequestHandler classes.
On the client side, there must be a class that handles receiving data from a server. In Annotate, this is accomplished with the standard sugar.network.GlibURLDownloader class.
Step 3: Coordinate sharing and joining in your activity using stream tubes and the client/server classes defined above.
On the sharing instance, the following must happen:
- Once the activity has been shared by the user, the sharing instance should detect the "shared" signal and setup for sharing.
- To setup for sharing, the sharing instance needs to instantiate a new server (in Annotate, this is an AnnotateHTTPServer) that will deliver content to any client activities.
- The newly instantiated server should listen on a given port and that port should be connected to a new stream tube. The stream tube can be created by calling the iface.OfferStreamTube() method.
For the joining activity, it must coordinate several steps once the "joined" signal has been detected:
- The joining activity needs to wait for a valid stream tube that is provided under the service ANNOTATE_STREAM_SERVICE.
- When such a tube is available, the joining activity should specify the path where the downloaded data will be saved.
- Create a listening client on the address and port associated with the stream tube that has been found. In our case, we create a sugar.network.GlibURLDownloader instance that will manage downloading from the stream tube.
ANNOTATE_STREAM_SERVICE = 'annotate-activity-http' class AnnotateActivity(activity.Activity): #### Method: __init__, initialize this AnnotateActivity instance def __init__(self, handle): ... #Joining activity needs to know which stream tubes are available for downloading #and if it still needs to download document. self.unused_download_tubes = set() self._want_document = True #Set the port number on which sharing activity will serve data h = hash(self._activity_id) self.port = 1024 + (h % 64511) self.connect('shared', self._shared_cb) if self._shared_activity: # We're joining if self.get_shared(): # Already joined for some reason, just get the document self._joined_cb(self) else: # Wait for a successful join before trying to get the document self.connect("joined", self._joined_cb) ... ######################## SETUP SERVER ON SHARING ACTIVITY #################################### #### Method _shared_cb, which is called when this activity is successfully # shared on a mesh channel. def _shared_cb(self, activity): # We initiated this activity and have now shared it, so by # definition we have the file. _logger.debug('SYSTEM:', '_shared_cb(): Activity being shared') self._want_document = False; self._share_document() #### Method: _share_document, which sets up an http server that will serve a specific file to # any joining activities. def _share_document(self): _logger.debug('SYSTEM', '_share_document() -- sharing file ' + str(object_path)) object_path = os.path.join(self.get_activity_root(), 'data', 'sample-shared-file.doc') _logger.debug('SYSTEM', '_share_document() -- Starting HTTP server on port '+str(self.port)) self._fileserver = AnnotateHTTPServer(("", self.port), object_path) # Make a tube for the server chan = self._shared_activity.telepathy_tubes_chan iface = chan[telepathy.CHANNEL_TYPE_TUBES] self._fileserver_tube_id = iface.OfferStreamTube(ANNOTATE_STREAM_SERVICE, {}, telepathy.SOCKET_ADDRESS_TYPE_IPV4, ('127.0.0.1', dbus.UInt16(self.port)), telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0) ################### COORDINATE TUBE CREATION AND DOWNLOAD IN JOINING ACTIVITY ################## #### Method _joined_cb, which is called when this activity joins another # instance def _joined_cb(self, also_self): self.watch_for_tubes() self._want_document = True; gobject.idle_add(self._get_document) #### Method watch_for_tubes, which sets up a callback to _new_tube_cb once # a stream tube is made available. def watch_for_tubes(self): tubes_chan = self._shared_activity.telepathy_tubes_chan tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', self._new_tube_cb) tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes( reply_handler=self._list_tubes_reply_cb, error_handler=self._list_tubes_error_cb) def _list_tubes_reply_cb(self, tubes): for tube_info in tubes: self._new_tube_cb(*tube_info) def _list_tubes_error_cb(self, e): #### Method _new_tube_cb, which is called once a stream tube is available # to download from def _new_tube_cb(self, tube_id, initiator, tube_type, service, params, state): # If the available tube is the stream tube we set up in sharing activity, then # let's download from it. if service == ANNOTATE_STREAM_SERVICE: # Add the newly found stream tube to the available tubes we can download from self.unused_download_tubes.add(tube_id) # if no download is in progress, let's fetch the document if self._want_document: gobject.idle_add(self._get_document) #### Method _get_document, which sets this activity instance up to start downloading # the document from the sharing activity. It is called once a stream tube has been # obtained and saved in self.unused_download_tubes. def _get_document(self): if not self._want_document: return False # Assign a file path to download to -- where downloaded doc will be saved. path = os.path.join(self.get_activity_root(), 'instance', '%i' % time.time()) # Pick an available tube we can try to download the document from try: tube_id = self.unused_download_tubes.pop() except (ValueError, KeyError), e: return False # Avoid trying to download the document multiple times at once self._want_document = False gobject.idle_add(self._download_document, tube_id, path) return False #### Method _download_result_cb, which is called once downloading is complete. def _download_result_cb(self, getter, tempfile, suggested_name, tube_id): del self.unused_download_tubes self.save() #### Method _download_progress_cb, which is called as the file is being downloaded. def _download_progress_cb(self, getter, bytes_downloaded, tube_id): # FIXME: signal the expected size somehow, so we can draw a progress # bar return True; #### Method _download_error_cb, which is called if there was an error downloading. def _download_error_cb(self, getter, err, tube_id): _logger.debug("Error getting document from tube %u: %s", tube_id, err) self._want_document = True gobject.idle_add(self._get_document) def _download_document(self, tube_id, path): # FIXME: should ideally have the CM listen on a Unix socket # instead of IPv4 (might be more compatible with Rainbow) chan = self._shared_activity.telepathy_tubes_chan iface = chan[telepathy.CHANNEL_TYPE_TUBES] addr = iface.AcceptStreamTube(tube_id, telepathy.SOCKET_ADDRESS_TYPE_IPV4, telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0, utf8_strings=True) _logger.debug('Accepted stream tube: listening address is %r', addr) # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)' assert isinstance(addr, dbus.Struct) assert len(addr) == 2 assert isinstance(addr[0], str) assert isinstance(addr[1], (int, long)) assert 0 < addr[1] < 65536 port = int(addr[1]) getter = network.GlibURLDownloader("http://%s:%d/document" % (addr[0], port)) getter.connect("finished", self._download_result_cb, tube_id) getter.connect("progress", self._download_progress_cb, tube_id) getter.connect("error", self._download_error_cb, tube_id) getter.start(path) return False
When downloading data from a stream tube, how can I show the download progress?
How do I control the file and path where data is saved when sharing through a stream tube?
Where do I get more information regarding sugar activity sharing and the technologies that support it?
- A brief tutorial on activity sharing for the OLPC laptop.
I can't get tubes to work. Are there unresolved issues?
Which threading patterns are most appropriate for use on the sugar platform?
There are issues regarding the best way to create activities that support media sharing. In particular, the structure of DBus tubes makes it easy to customize the communication process between activity instances running on multiple XOs. However, stream tubes seem to be tied closely to a client server architecture - the servers and handlers provided in sugar.network package are some form of client/server arrangement.
Setting up more advanced stream tubes functionality that would allow sharing of non-text data (eg. binary files, images, etc.) in a continuous manner in the way dbus tubes enable text communication is complicated. To allow activities to communicate constantly over a stream tube, one needs to set up both a client and server side on each activity and coordinate communication between them. Furthermore, most activities would require threading as well, which means resolving best practices for threading. Ultimately, it seems that the existing servers/handlers to support stream tubes may not be adequate to support easy and relatively painless sharing of data between activities on a continuous basis. The code that is on the Almanac right now guides a user through the basic process of setting up a stream tube and allowing communication one time between the server and client. This is useful for sharing/joining activities that require sending binary files (which is why Record and Read use this). However, it does not extend easily to peer to peer communication.
One solution to sending non-text data between activities is to just use the DBus tube and take advantage of its flexibility. It's not clear how well these tubes will support binary data, however and whether they work well with larger media files. It would be helpful to get insight from sugar developers regarding how such communication should work and what facilities sugar needs to add to support this (perhaps more functionality for stream tubes in sugar.network). Given that peer to peer networking through mesh is probably one of the standout features of XO laptops, I think having better functionality (and better documentation) for developers to build rich communicative applications is a high priority.