Development Team/Almanac/Sugar.presence: Difference between revisions

Line 367: Line 367:




    ################### COORDINATE TUBE CREATION AND DOWNLOAD IN JOINING ACTIVITY  ##################


    #### Method _joined_cb, which is called when this activity joins another
    # instance
    def _joined_cb(self, also_self):
        self._update_chat_text('SYSTEM', '_joined_cb()')
        self.watch_for_tubes()
        self._want_document = True;
        gobject.idle_add(self._get_document)
    #### Method watch_for_tubes, which sets up a callback to _new_tube_cb once
    # a stream tube is made available.
    def watch_for_tubes(self):
        self._update_chat_text('SYSTEM', '_watch_for_tubes()')
        tubes_chan = self._shared_activity.telepathy_tubes_chan
        tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube',
            self._new_tube_cb)
        tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(
            reply_handler=self._list_tubes_reply_cb,
            error_handler=self._list_tubes_error_cb)
    def _list_tubes_reply_cb(self, tubes):
        for tube_info in tubes:
            self._new_tube_cb(*tube_info)
    def _list_tubes_error_cb(self, e):
        self._update_chat_text('System', '_list_tubes_error_cb(): ListTubes() failed: '+str(e))
    #### Method _new_tube_cb, which is called once a stream tube is available
    # to download from
    def _new_tube_cb(self, tube_id, initiator, tube_type, service, params,
                    state):
        self._update_chat_text('SYSTEM', '_new_tube_cb() -- ID:' + str(tube_id) + '; Initiator: ' + str(initiator) + '; tube_type: ' + str(tube_type) + '; Service: ' + str(service))
        # If the available tube is the stream tube we set up in sharing activity, then
        # let's download from it.
        if service == ANNOTATE_STREAM_SERVICE:
            self._update_chat_text('SYSTEM', 'I could download from that tube')
           
            # Add the newly found stream tube to the available tubes we can download from
            self.unused_download_tubes.add(tube_id)
           
            # if no download is in progress, let's fetch the document
            if self._want_document:
                gobject.idle_add(self._get_document)
    #### Method _get_document, which sets this activity instance up to start downloading
    # the document from the sharing activity. It is called once a stream tube has been
    # obtained and saved in self.unused_download_tubes.
    def _get_document(self):
        self._update_chat_text('SYSTEM', '_get_document()')
        if not self._want_document:
            return False
        # Assign a file path to download to -- where downloaded doc will be saved.
        path = os.path.join(self.get_activity_root(), 'instance',
                                '%i' % time.time())
        # Pick an available tube we can try to download the document from
        try:
            tube_id = self.unused_download_tubes.pop()
        except (ValueError, KeyError), e:
            self._update_chat_text('System', 'No tubes to get the document from right now: '+str(e))
            return False
        # Avoid trying to download the document multiple times at once
        self._want_document = False
        gobject.idle_add(self._download_document, tube_id, path)
        return False
    #### Method _download_result_cb, which is called once downloading is complete.
    def _download_result_cb(self, getter, tempfile, suggested_name, tube_id):
        self._update_chat_text('SYSTEM', '_download_result_cb() -- tempfile = ' + tempfile)
        del self.unused_download_tubes
        self._update_chat_text('SYSTEM', "Got document %s (%s) from tube %u",
                      tempfile, suggested_name, tube_id)
        self.save()
    #### Method _download_progress_cb, which is called as the file is being downloaded.
    def _download_progress_cb(self, getter, bytes_downloaded, tube_id):
        # FIXME: signal the expected size somehow, so we can draw a progress
        # bar
        self._update_chat_text("Downloaded " + str(bytes_downloaded) + " bytes from tube " + str(tube_id))
    #### Method _download_error_cb, which is called if there was an error downloading.
    def _download_error_cb(self, getter, err, tube_id):
        _logger.debug("Error getting document from tube %u: %s",
                      tube_id, err)
        self._want_document = True
        gobject.idle_add(self._get_document)
   
    def _download_document(self, tube_id, path):
        # FIXME: should ideally have the CM listen on a Unix socket
        # instead of IPv4 (might be more compatible with Rainbow)
        self._update_chat_text('SYSTEM', '_download_document() -- ' + path)
        chan = self._shared_activity.telepathy_tubes_chan
        iface = chan[telepathy.CHANNEL_TYPE_TUBES]
        addr = iface.AcceptStreamTube(tube_id,
                telepathy.SOCKET_ADDRESS_TYPE_IPV4,
                telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
                utf8_strings=True)
        _logger.debug('Accepted stream tube: listening address is %r', addr)
        # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
        assert isinstance(addr, dbus.Struct)
        assert len(addr) == 2
        assert isinstance(addr[0], str)
        assert isinstance(addr[1], (int, long))
        assert addr[1] > 0 and addr[1] < 65536
        port = int(addr[1])
        getter = network.GlibURLDownloader("http://%s:%d/document"
                                          % (addr[0], port))
        getter.connect("finished", self._download_result_cb, tube_id)
        getter.connect("progress", self._download_progress_cb, tube_id)
        getter.connect("error", self._download_error_cb, tube_id)
        self._update_chat_text('System', "Starting download to "+str(path))
        getter.start(path)
        return False


</pre>
</pre>