Changes
Development Team/Almanac/Sugar.presence (view source)
Revision as of 10:56, 10 October 2008
, 10:56, 10 October 2008→Step 3: Coordinate sharing and joining in your activity using stream tubes and the client/server classes defined above.
################### COORDINATE TUBE CREATION AND DOWNLOAD IN JOINING ACTIVITY ##################
#### Method _joined_cb, which is called when this activity joins another
# instance
def _joined_cb(self, also_self):
self._update_chat_text('SYSTEM', '_joined_cb()')
self.watch_for_tubes()
self._want_document = True;
gobject.idle_add(self._get_document)
#### Method watch_for_tubes, which sets up a callback to _new_tube_cb once
# a stream tube is made available.
def watch_for_tubes(self):
self._update_chat_text('SYSTEM', '_watch_for_tubes()')
tubes_chan = self._shared_activity.telepathy_tubes_chan
tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube',
self._new_tube_cb)
tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(
reply_handler=self._list_tubes_reply_cb,
error_handler=self._list_tubes_error_cb)
def _list_tubes_reply_cb(self, tubes):
for tube_info in tubes:
self._new_tube_cb(*tube_info)
def _list_tubes_error_cb(self, e):
self._update_chat_text('System', '_list_tubes_error_cb(): ListTubes() failed: '+str(e))
#### Method _new_tube_cb, which is called once a stream tube is available
# to download from
def _new_tube_cb(self, tube_id, initiator, tube_type, service, params,
state):
self._update_chat_text('SYSTEM', '_new_tube_cb() -- ID:' + str(tube_id) + '; Initiator: ' + str(initiator) + '; tube_type: ' + str(tube_type) + '; Service: ' + str(service))
# If the available tube is the stream tube we set up in sharing activity, then
# let's download from it.
if service == ANNOTATE_STREAM_SERVICE:
self._update_chat_text('SYSTEM', 'I could download from that tube')
# Add the newly found stream tube to the available tubes we can download from
self.unused_download_tubes.add(tube_id)
# if no download is in progress, let's fetch the document
if self._want_document:
gobject.idle_add(self._get_document)
#### Method _get_document, which sets this activity instance up to start downloading
# the document from the sharing activity. It is called once a stream tube has been
# obtained and saved in self.unused_download_tubes.
def _get_document(self):
self._update_chat_text('SYSTEM', '_get_document()')
if not self._want_document:
return False
# Assign a file path to download to -- where downloaded doc will be saved.
path = os.path.join(self.get_activity_root(), 'instance',
'%i' % time.time())
# Pick an available tube we can try to download the document from
try:
tube_id = self.unused_download_tubes.pop()
except (ValueError, KeyError), e:
self._update_chat_text('System', 'No tubes to get the document from right now: '+str(e))
return False
# Avoid trying to download the document multiple times at once
self._want_document = False
gobject.idle_add(self._download_document, tube_id, path)
return False
#### Method _download_result_cb, which is called once downloading is complete.
def _download_result_cb(self, getter, tempfile, suggested_name, tube_id):
self._update_chat_text('SYSTEM', '_download_result_cb() -- tempfile = ' + tempfile)
del self.unused_download_tubes
self._update_chat_text('SYSTEM', "Got document %s (%s) from tube %u",
tempfile, suggested_name, tube_id)
self.save()
#### Method _download_progress_cb, which is called as the file is being downloaded.
def _download_progress_cb(self, getter, bytes_downloaded, tube_id):
# FIXME: signal the expected size somehow, so we can draw a progress
# bar
self._update_chat_text("Downloaded " + str(bytes_downloaded) + " bytes from tube " + str(tube_id))
#### Method _download_error_cb, which is called if there was an error downloading.
def _download_error_cb(self, getter, err, tube_id):
_logger.debug("Error getting document from tube %u: %s",
tube_id, err)
self._want_document = True
gobject.idle_add(self._get_document)
def _download_document(self, tube_id, path):
# FIXME: should ideally have the CM listen on a Unix socket
# instead of IPv4 (might be more compatible with Rainbow)
self._update_chat_text('SYSTEM', '_download_document() -- ' + path)
chan = self._shared_activity.telepathy_tubes_chan
iface = chan[telepathy.CHANNEL_TYPE_TUBES]
addr = iface.AcceptStreamTube(tube_id,
telepathy.SOCKET_ADDRESS_TYPE_IPV4,
telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
utf8_strings=True)
_logger.debug('Accepted stream tube: listening address is %r', addr)
# SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
assert isinstance(addr, dbus.Struct)
assert len(addr) == 2
assert isinstance(addr[0], str)
assert isinstance(addr[1], (int, long))
assert addr[1] > 0 and addr[1] < 65536
port = int(addr[1])
getter = network.GlibURLDownloader("http://%s:%d/document"
% (addr[0], port))
getter.connect("finished", self._download_result_cb, tube_id)
getter.connect("progress", self._download_progress_cb, tube_id)
getter.connect("error", self._download_error_cb, tube_id)
self._update_chat_text('System', "Starting download to "+str(path))
getter.start(path)
return False
</pre>
</pre>