Development Team/Almanac/Sugar.presence: Difference between revisions
| Line 367: | Line 367: | ||
################### COORDINATE TUBE CREATION AND DOWNLOAD IN JOINING ACTIVITY ################## | |||
#### Method _joined_cb, which is called when this activity joins another | |||
# instance | |||
def _joined_cb(self, also_self): | |||
self._update_chat_text('SYSTEM', '_joined_cb()') | |||
self.watch_for_tubes() | |||
self._want_document = True; | |||
gobject.idle_add(self._get_document) | |||
#### Method watch_for_tubes, which sets up a callback to _new_tube_cb once | |||
# a stream tube is made available. | |||
def watch_for_tubes(self): | |||
self._update_chat_text('SYSTEM', '_watch_for_tubes()') | |||
tubes_chan = self._shared_activity.telepathy_tubes_chan | |||
tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', | |||
self._new_tube_cb) | |||
tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes( | |||
reply_handler=self._list_tubes_reply_cb, | |||
error_handler=self._list_tubes_error_cb) | |||
def _list_tubes_reply_cb(self, tubes): | |||
for tube_info in tubes: | |||
self._new_tube_cb(*tube_info) | |||
def _list_tubes_error_cb(self, e): | |||
self._update_chat_text('System', '_list_tubes_error_cb(): ListTubes() failed: '+str(e)) | |||
#### Method _new_tube_cb, which is called once a stream tube is available | |||
# to download from | |||
def _new_tube_cb(self, tube_id, initiator, tube_type, service, params, | |||
state): | |||
self._update_chat_text('SYSTEM', '_new_tube_cb() -- ID:' + str(tube_id) + '; Initiator: ' + str(initiator) + '; tube_type: ' + str(tube_type) + '; Service: ' + str(service)) | |||
# If the available tube is the stream tube we set up in sharing activity, then | |||
# let's download from it. | |||
if service == ANNOTATE_STREAM_SERVICE: | |||
self._update_chat_text('SYSTEM', 'I could download from that tube') | |||
# Add the newly found stream tube to the available tubes we can download from | |||
self.unused_download_tubes.add(tube_id) | |||
# if no download is in progress, let's fetch the document | |||
if self._want_document: | |||
gobject.idle_add(self._get_document) | |||
#### Method _get_document, which sets this activity instance up to start downloading | |||
# the document from the sharing activity. It is called once a stream tube has been | |||
# obtained and saved in self.unused_download_tubes. | |||
def _get_document(self): | |||
self._update_chat_text('SYSTEM', '_get_document()') | |||
if not self._want_document: | |||
return False | |||
# Assign a file path to download to -- where downloaded doc will be saved. | |||
path = os.path.join(self.get_activity_root(), 'instance', | |||
'%i' % time.time()) | |||
# Pick an available tube we can try to download the document from | |||
try: | |||
tube_id = self.unused_download_tubes.pop() | |||
except (ValueError, KeyError), e: | |||
self._update_chat_text('System', 'No tubes to get the document from right now: '+str(e)) | |||
return False | |||
# Avoid trying to download the document multiple times at once | |||
self._want_document = False | |||
gobject.idle_add(self._download_document, tube_id, path) | |||
return False | |||
#### Method _download_result_cb, which is called once downloading is complete. | |||
def _download_result_cb(self, getter, tempfile, suggested_name, tube_id): | |||
self._update_chat_text('SYSTEM', '_download_result_cb() -- tempfile = ' + tempfile) | |||
del self.unused_download_tubes | |||
self._update_chat_text('SYSTEM', "Got document %s (%s) from tube %u", | |||
tempfile, suggested_name, tube_id) | |||
self.save() | |||
#### Method _download_progress_cb, which is called as the file is being downloaded. | |||
def _download_progress_cb(self, getter, bytes_downloaded, tube_id): | |||
# FIXME: signal the expected size somehow, so we can draw a progress | |||
# bar | |||
self._update_chat_text("Downloaded " + str(bytes_downloaded) + " bytes from tube " + str(tube_id)) | |||
#### Method _download_error_cb, which is called if there was an error downloading. | |||
def _download_error_cb(self, getter, err, tube_id): | |||
_logger.debug("Error getting document from tube %u: %s", | |||
tube_id, err) | |||
self._want_document = True | |||
gobject.idle_add(self._get_document) | |||
def _download_document(self, tube_id, path): | |||
# FIXME: should ideally have the CM listen on a Unix socket | |||
# instead of IPv4 (might be more compatible with Rainbow) | |||
self._update_chat_text('SYSTEM', '_download_document() -- ' + path) | |||
chan = self._shared_activity.telepathy_tubes_chan | |||
iface = chan[telepathy.CHANNEL_TYPE_TUBES] | |||
addr = iface.AcceptStreamTube(tube_id, | |||
telepathy.SOCKET_ADDRESS_TYPE_IPV4, | |||
telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0, | |||
utf8_strings=True) | |||
_logger.debug('Accepted stream tube: listening address is %r', addr) | |||
# SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)' | |||
assert isinstance(addr, dbus.Struct) | |||
assert len(addr) == 2 | |||
assert isinstance(addr[0], str) | |||
assert isinstance(addr[1], (int, long)) | |||
assert addr[1] > 0 and addr[1] < 65536 | |||
port = int(addr[1]) | |||
getter = network.GlibURLDownloader("http://%s:%d/document" | |||
% (addr[0], port)) | |||
getter.connect("finished", self._download_result_cb, tube_id) | |||
getter.connect("progress", self._download_progress_cb, tube_id) | |||
getter.connect("error", self._download_error_cb, tube_id) | |||
self._update_chat_text('System', "Starting download to "+str(path)) | |||
getter.start(path) | |||
return False | |||
</pre> | </pre> | ||