Changes

Line 367: Line 367:        +
    ################### COORDINATE TUBE CREATION AND DOWNLOAD IN JOINING ACTIVITY  ##################
    +
    #### Method _joined_cb, which is called when this activity joins another
 +
    # instance
 +
    def _joined_cb(self, also_self):
 +
        self._update_chat_text('SYSTEM', '_joined_cb()')
 +
        self.watch_for_tubes()
 +
        self._want_document = True;
 +
        gobject.idle_add(self._get_document)
 +
 +
    #### Method watch_for_tubes, which sets up a callback to _new_tube_cb once
 +
    # a stream tube is made available.
 +
    def watch_for_tubes(self):
 +
        self._update_chat_text('SYSTEM', '_watch_for_tubes()')
 +
        tubes_chan = self._shared_activity.telepathy_tubes_chan
 +
 +
        tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube',
 +
            self._new_tube_cb)
 +
        tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(
 +
            reply_handler=self._list_tubes_reply_cb,
 +
            error_handler=self._list_tubes_error_cb)
 +
 +
    def _list_tubes_reply_cb(self, tubes):
 +
        for tube_info in tubes:
 +
            self._new_tube_cb(*tube_info)
 +
 +
    def _list_tubes_error_cb(self, e):
 +
        self._update_chat_text('System', '_list_tubes_error_cb(): ListTubes() failed: '+str(e))
 +
 +
    #### Method _new_tube_cb, which is called once a stream tube is available
 +
    # to download from
 +
    def _new_tube_cb(self, tube_id, initiator, tube_type, service, params,
 +
                    state):
 +
        self._update_chat_text('SYSTEM', '_new_tube_cb() -- ID:' + str(tube_id) + '; Initiator: ' + str(initiator) + '; tube_type: ' + str(tube_type) + '; Service: ' + str(service))
 +
 +
        # If the available tube is the stream tube we set up in sharing activity, then
 +
        # let's download from it.
 +
        if service == ANNOTATE_STREAM_SERVICE:
 +
            self._update_chat_text('SYSTEM', 'I could download from that tube')
 +
           
 +
            # Add the newly found stream tube to the available tubes we can download from
 +
            self.unused_download_tubes.add(tube_id)
 +
           
 +
            # if no download is in progress, let's fetch the document
 +
            if self._want_document:
 +
                gobject.idle_add(self._get_document)
 +
 +
 +
    #### Method _get_document, which sets this activity instance up to start downloading
 +
    # the document from the sharing activity. It is called once a stream tube has been
 +
    # obtained and saved in self.unused_download_tubes.
 +
    def _get_document(self):
 +
        self._update_chat_text('SYSTEM', '_get_document()')
 +
        if not self._want_document:
 +
            return False
 +
 +
        # Assign a file path to download to -- where downloaded doc will be saved.
 +
        path = os.path.join(self.get_activity_root(), 'instance',
 +
                                '%i' % time.time())
 +
 +
        # Pick an available tube we can try to download the document from
 +
        try:
 +
            tube_id = self.unused_download_tubes.pop()
 +
        except (ValueError, KeyError), e:
 +
            self._update_chat_text('System', 'No tubes to get the document from right now: '+str(e))
 +
            return False
 +
 +
        # Avoid trying to download the document multiple times at once
 +
        self._want_document = False
 +
        gobject.idle_add(self._download_document, tube_id, path)
 +
        return False
 +
 +
 +
    #### Method _download_result_cb, which is called once downloading is complete.
 +
    def _download_result_cb(self, getter, tempfile, suggested_name, tube_id):
 +
        self._update_chat_text('SYSTEM', '_download_result_cb() -- tempfile = ' + tempfile)
 +
        del self.unused_download_tubes
 +
 +
        self._update_chat_text('SYSTEM', "Got document %s (%s) from tube %u",
 +
                      tempfile, suggested_name, tube_id)
 +
        self.save()
 +
 +
    #### Method _download_progress_cb, which is called as the file is being downloaded.
 +
    def _download_progress_cb(self, getter, bytes_downloaded, tube_id):
 +
        # FIXME: signal the expected size somehow, so we can draw a progress
 +
        # bar
 +
        self._update_chat_text("Downloaded " + str(bytes_downloaded) + " bytes from tube " + str(tube_id))
 +
 +
    #### Method _download_error_cb, which is called if there was an error downloading.
 +
    def _download_error_cb(self, getter, err, tube_id):
 +
        _logger.debug("Error getting document from tube %u: %s",
 +
                      tube_id, err)
 +
        self._want_document = True
 +
        gobject.idle_add(self._get_document)
 +
 +
   
 +
    def _download_document(self, tube_id, path):
 +
        # FIXME: should ideally have the CM listen on a Unix socket
 +
        # instead of IPv4 (might be more compatible with Rainbow)
 +
        self._update_chat_text('SYSTEM', '_download_document() -- ' + path)
 +
        chan = self._shared_activity.telepathy_tubes_chan
 +
        iface = chan[telepathy.CHANNEL_TYPE_TUBES]
 +
        addr = iface.AcceptStreamTube(tube_id,
 +
                telepathy.SOCKET_ADDRESS_TYPE_IPV4,
 +
                telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
 +
                utf8_strings=True)
 +
        _logger.debug('Accepted stream tube: listening address is %r', addr)
 +
        # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
 +
        assert isinstance(addr, dbus.Struct)
 +
        assert len(addr) == 2
 +
        assert isinstance(addr[0], str)
 +
        assert isinstance(addr[1], (int, long))
 +
        assert addr[1] > 0 and addr[1] < 65536
 +
        port = int(addr[1])
 +
 +
        getter = network.GlibURLDownloader("http://%s:%d/document"
 +
                                          % (addr[0], port))
 +
        getter.connect("finished", self._download_result_cb, tube_id)
 +
        getter.connect("progress", self._download_progress_cb, tube_id)
 +
        getter.connect("error", self._download_error_cb, tube_id)
 +
        self._update_chat_text('System', "Starting download to "+str(path))
 +
        getter.start(path)
 +
        return False
    
</pre>
 
</pre>
Anonymous user