DIV CSS 佈局教程網

 DIV+CSS佈局教程網 >> 網頁腳本 >> JavaScript入門知識 >> 關於JavaScript >> 簡要介紹實現多線程環形緩沖的方法
簡要介紹實現多線程環形緩沖的方法
編輯:關於JavaScript     

網頁制作poluoluo文章簡介:我平時比較喜歡從網上聽歌,有些鏈接下載速度太慢了。如果用HttpURLConnection類的方法打開連接,然後用InputStream類獲得輸入流,再用BufferedInputStream構造出帶緩沖區的輸入流,如果網速太慢的話,無論緩沖區設置多大,聽起來都是斷斷續續的,達不到真正緩沖的目的

 

我平時比較喜歡從網上聽歌,有些鏈接下載速度太慢了。如果用HttpURLConnection類的方法打開連接,然後用InputStream類獲得輸入流,再用BufferedInputStream構造出帶緩沖區的輸入流,如果網速太慢的話,無論緩沖區設置多大,聽起來都是斷斷續續的,達不到真正緩沖的目的。於是嘗試編寫代碼實現用緩沖方式讀取遠程文件,以下貼出的代碼是我寫的MP3解碼器的一部分。我是不怎麼贊同使用多線程下載的,加之有的鏈接下載速度本身就比較快,所以在下載速度足夠的情況下,就讓下載線程退出,直到只剩下一個下載線程。當然,多線程中令人頭痛的死鎖問題、HttpURLConnection的超時阻塞問題都會使代碼看起來異常復雜。

 

簡要介紹一下實現多線程環形緩沖的方法。將緩沖區buf[]分為16塊,每塊32K,下載線程負責向緩沖區寫數據,每次寫一塊;讀線程(BuffRandAcceURL類)每次讀小於32K的任意字節。同步描述:寫/寫互斥等待空閒塊;寫/寫並發填寫buf[];讀/寫並發使用buf[]。

 

經過我很長一段時間使用,我認為比較滿意地實現了我的目標,同其它MP3播放器對比,我的這種方法能夠比較流暢、穩定地下載並播放。我把實現多線程下載緩沖的方法寫出來,不足之處懇請批評指正。

 

一、HttpReader類功能:HTTP協議從指定URL讀取數據

 

/** *//**
* author by http://www.bt285.cn http://www.5a520.cn
*/
package instream;   
  
import java.io.IOException;   
import java.io.InputStream;   
import java.net.HttpURLConnection;   
import java.net.URL;   
  
public final class HttpReader {   
    public static final int MAX_RETRY = 10;   
    private static long content_length;   
    private URL url;   
    private HttpURLConnection httpConnection;   
    private InputStream in_stream;   
    private long cur_pos;           //用於決定seek方法中是否執行文件定位   
    private int connect_timeout;   
    private int read_timeout;   
       
    public HttpReader(URL u) {   
        this(u, 5000, 5000);   
    }   
       
    public HttpReader(URL u, int connect_timeout, int read_timeout) {   
        this.connect_timeout = connect_timeout;   
        this.read_timeout = read_timeout;   
        url = u;   
        if (content_length == 0) {   
            int retry = 0;   
            while (retry < HttpReader.MAX_RETRY)   
                try {   
                    this.seek(0);   
                    content_length = httpConnection.getContentLength();   
                    break;   
                } catch (Exception e) {   
                    retry++;   
                }   
        }   
    }   
       
    public static long getContentLength() {   
        return content_length;   
    }   
       
    public int read(byte[] b, int off, int len) throws IOException {   
        int r = in_stream.read(b, off, len);   
        cur_pos += r;   
        return r;   
    }   
       
    public int getData(byte[] b, int off, int len) throws IOException {   
        int r, rema = len;   
        while (rema > 0) {   
            if ((r = in_stream.read(b, off, rema)) == -1) {   
                return -1;   
            }   
            rema -= r;   
            off += r;   
            cur_pos += r;   
        }   
        return len;   
    }   
       
    public void close() {   
        if (httpConnection != null) {   
            httpConnection.disconnect();   
            httpConnection = null;   
        }   
        if (in_stream != null) {   
            try {   
                in_stream.close();   
            } catch (IOException e) {}   
            in_stream = null;   
        }   
        url = null;   
    }   
       
    /**//*  
     * 拋出異常通知再試  
     * 響應碼503可能是由某種暫時的原因引起的,例如同一IP頻繁的連接請求可能遭服務器拒絕  
     */  
    public void seek(long start_pos) throws IOException {   
        if (start_pos == cur_pos && in_stream != null)   
            return;   
        if (httpConnection != null) {   
            httpConnection.disconnect();   
            httpConnection = null;   
        }   
        if (in_stream != null) {   
            in_stream.close();   
            in_stream = null;   
        }   
        httpConnection = (HttpURLConnection) url.openConnection();   
        httpConnection.setConnectTimeout(connect_timeout);   
        httpConnection.setReadTimeout(read_timeout);   
        String sProperty = "bytes=" + start_pos + "-";   
        httpConnection.setRequestProperty("Range", sProperty);   
        //httpConnection.setRequestProperty("Connection", "Keep-Alive");   
        int responseCode = httpConnection.getResponseCode();   
        if (responseCode < 200 || responseCode >= 300) {   
            try {   
                Thread.sleep(500);   
            } catch (InterruptedException e) {   
                e.printStackTrace();   
            }   
            throw new IOException("HTTP responseCode="+responseCode);   
        }   
  
        in_stream = httpConnection.getInputStream();   
        cur_pos = start_pos;   
    }   
  
}

 

二、IWriterCallBack接口功能:實現讀/寫通信。

 

package instream;   
  
public interface IWriterCallBack {   
    public boolean tryWriting(Writer w) throws InterruptedException;   
    public void updateBuffer(int i, int len);   
    public void updateWriterCount();   
    public void terminateWriters();   
}

 

 

三、Writer類:下載線程,負責向buf[]寫數據。

 

/** *//**
* http://www.bt285.cn http://www.5a520.cn 
*/
package instream;   
import java.io.IOException;   
import java.net.URL;   
  
public final class Writer implements Runnable {   
    private static boolean isalive = true;   
    private byte[] buf;   
    private IWriterCallBack icb;   
    protected int index;            //buf[]內"塊"索引號   
    protected long start_pos;       //index對應的文件位置(相對於文件首的偏移量)   
    protected int await_count;      //用於判斷:下載速度足夠就退出一個"寫"線程   
    private HttpReader hr;   
       
    public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) {   
        hr = new HttpReader(u);   
        if(HttpReader.getContentLength() == 0)  //實例化HttpReader對象都不成功   
            return;   
        icb = call_back;   
        buf = b;   
        Thread t = new Thread(this,"dt_"+i);   
        t.setPriority(Thread.NORM_PRIORITY + 1);   
        t.start();   
    }   
       
    public void run() {   
        int write_bytes=0, write_pos=0, rema = 0, retry = 0;   
        boolean cont = true;   
        while (cont) {   
            try {   
                // 1.等待空閒塊   
                if(retry == 0) {   
                    if (icb.tryWriting(this) == false)   
                        break;   
                    write_bytes = 0;   
                    rema = BuffRandAcceURL.UNIT_LENGTH;   
                    write_pos = index << BuffRandAcceURL.UNIT_LENGTH_BITS;   
                }   
                   
                // 2.定位   
                hr.seek(start_pos);   
  
                // 3.下載"一塊"   
                int w;   
                while (rema > 0 && isalive) {   
                    w = (rema < 2048) ? rema : 2048; //每次讀幾K合適?   
                    if ((w = hr.read(buf, write_pos, w)) == -1) {   
                        cont = false;   
                        break;   
                    }   
                    rema -= w;   
                    write_pos += w;   
                    start_pos += w;   
                    write_bytes += w;   
                }   
                   
                //4.通知"讀"線程   
                retry = 0;   
                icb.updateBuffer(index, write_bytes);   
            } catch (InterruptedException e) {   
                isalive = false;   
                icb.terminateWriters();   
                break;   
            } catch (IOException e) {   
                if(++retry == HttpReader.MAX_RETRY) {   
                    isalive = false;   
                    icb.terminateWriters();   
                    break;   
                }   
            }   
        }   
        icb.updateWriterCount();   
        try {   
            hr.close();   
        } catch (Exception e) {}   
        hr = null;   
        buf = null;   
        icb = null;   
    }   
  
}

 

四、IRandomAccess接口:

 

隨機讀取文件接口,BuffRandAcceURL類和BuffRandAcceFile類實現接口方法。BuffRandAcceFile類實現讀取本地磁盤文件,這兒就不給出其源碼了。

 

package instream;   
  
public interface IRandomAccess {   
    public int read() throws Exception;   
    public int read(byte b[]) throws Exception;   
    public int read(byte b[], int off, int len) throws Exception;   
    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;   
    public void seek(long pos) throws Exception;   
    public long length();   
    public long getFilePointer();   
    public void close();   
}

 

五、BuffRandAcceURL類功能:創建下載線程;read方法從buf[]讀數據。

 

關鍵是如何簡單有效防止死鎖?以下只是我的一次嘗試,請指正。

 

/** *//**
* http://www.5a520.cn  http://www.bt285.cn
*/ 
package instream;   
  
import java.net.URL;   
import java.net.URLDecoder;   
import decode.Header;   
import tag.MP3Tag;   
import tag.TagThread;   
  
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {   
    public static final int UNIT_LENGTH_BITS = 15;                  //32K   
    public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH = UNIT_LENGTH << 4;            //16塊   
    public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);   
    private static final int MAX_WRITER = 8;   
    private static long file_pointer;   
    private static int read_pos;   
    private static int fill_bytes;   
    private static byte[] buf;      //同時也作讀寫同步鎖:buf.wait()/buf.notify()   
    private static int[] buf_bytes;   
    private static int buf_index;   
    private static int alloc_pos;   
    private static URL url = null;   
    private static boolean isalive = true;   
    private static int writer_count;   
    private static int await_count;   
    private long file_length;   
    private long frame_bytes;   
       
    public BuffRandAcceURL(String sURL) throws Exception {   
        this(sURL,MAX_WRITER);   
    }   
       
    public BuffRandAcceURL(String sURL, int download_threads) throws Exception {   
        buf = new byte[BUF_LENGTH];   
        buf_bytes = new int[UNIT_COUNT];   
        url = new URL(sURL);   
           
        //創建線程以異步方式解析ID3   
        new TagThread(url);   
           
        //打印當前文件名   
        try {   
            String s = URLDecoder.decode(sURL, "GBK");   
            System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1));   
            s = null;   
        } catch (Exception e) {   
            System.out.println("start>> " + sURL);   
        }   
           
        //創建"寫"線程   
        for(int i = 0; i < download_threads; i++)   
            new Writer(this, url, buf, i+1);   
        frame_bytes = file_length = HttpReader.getContentLength();   
        if(file_length == 0) {   
            Header.strLastErr = "連接URL出錯,重試 " + HttpReader.MAX_RETRY + " 次後放棄。";   
            throw new Exception("retry " + HttpReader.MAX_RETRY);   
        }   
        writer_count = download_threads;   
           
        //緩沖   
        try_cache();   
           
        //跳過ID3 v2   
        MP3Tag mP3Tag = new MP3Tag();   
        int v2_size = mP3Tag.checkID3V2(buf,0);   
        if (v2_size > 0) {   
            frame_bytes -= v2_size;   
            //seek(v2_size):   
            fill_bytes -= v2_size;   
            file_pointer = v2_size;   
            read_pos = v2_size;   
            read_pos &= BUF_LENGTH_MASK;   
            int units = v2_size >> UNIT_LENGTH_BITS;   
            for(int i = 0; i < units; i++) {   
                buf_bytes[i] = 0;   
                this.notifyWriter();   
            }   
            buf_bytes[units] -= v2_size;   
            this.notifyWriter();   
        }   
        mP3Tag = null;   
    }   
       
    private void try_cache() throws InterruptedException {   
        int cache_size = BUF_LENGTH;   
        if(cache_size > (int)file_length - alloc_pos)   
            cache_size = (int)file_length - alloc_pos;   
        cache_size -= UNIT_LENGTH;   
           
        //等待填寫當前正在讀的那"一塊"緩沖區   
        /**//*if(fill_bytes >= cache_size && writer_count > 0) {  
            synchronized (buf) {  
                buf.wait();  
            }  
            return;  
        }*/  
           
        //等待填滿緩沖區   
        while (fill_bytes < cache_size) {   
            if (writer_count == 0 || isalive == false)   
                return;   
            if(BUF_LENGTH > (int)file_length - alloc_pos)   
                cache_size = (int)file_length - alloc_pos - UNIT_LENGTH;   
            System.out.printf("\r[緩沖%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);   
            synchronized (buf) {   
                buf.wait();   
            }   
        }   
        System.out.printf("\r");   
    }   
       
    private int try_reading(int i, int len) throws Exception {   
        int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);   
        int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   
        while (r < len) {   
            if (writer_count == 0 || isalive == false)   
                return r;   
            try_cache();   
            r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   
        }   
           
        return len;   
    }   
       
    /**//*  
     * 各個"寫"線程互斥等待空閒塊  
     */  
    public synchronized boolean tryWriting(Writer w) throws InterruptedException {   
        await_count++;   
        while (buf_bytes[buf_index] != 0 && isalive) {   
            this.wait();   
        }   
           
        //下載速度足夠就結束一個"寫"線程   
        if(writer_count > 1 && w.await_count >= await_count &&   
                w.await_count >= writer_count)   
            return false;   
           
        if(alloc_pos >= file_length)   
            return false;   
        w.await_count = await_count;   
        await_count--;   
        w.start_pos = alloc_pos;   
        w.index = buf_index;   
        alloc_pos += UNIT_LENGTH;   
        buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;   
        return isalive;   
    }   
       
    public void updateBuffer(int i, int len) {   
        synchronized (buf) {   
            buf_bytes[i] = len;   
            fill_bytes += len;   
            buf.notify();   
        }   
    }   
       
    public void updateWriterCount() {   
        synchronized (buf) {   
            writer_count--;   
            buf.notify();   
        }   
    }   
       
    public synchronized void notifyWriter() {   
        this.notifyAll();   
    }   
       
    public void terminateWriters() {   
        synchronized (buf) {   
            if (isalive) {   
                isalive = false;   
                Header.strLastErr = "讀取文件超時。重試 " + HttpReader.MAX_RETRY   
                        + " 次後放棄,請您稍後再試。";   
            }   
            buf.notify();   
        }   
           
        notifyWriter();        
    }   
       
    public int read() throws Exception {   
        int iret = -1;   
        int i = read_pos >> UNIT_LENGTH_BITS;   
        // 1."等待"有1字節可讀   
        while (buf_bytes[i] < 1) {   
            try_cache();   
            if (writer_count == 0)   
                return -1;   
        }   
        if(isalive == false)   
            return -1;   
  
        // 2.讀取   
        iret = buf[read_pos] & 0xff;   
        fill_bytes--;   
        file_pointer++;   
        read_pos++;   
        read_pos &= BUF_LENGTH_MASK;   
        if (--buf_bytes[i] == 0)   
            notifyWriter();     // 3.通知   
  
        return iret;   
    }   
       
    public int read(byte b[]) throws Exception {   
        return read(b, 0, b.length);   
    }   
  
    public int read(byte[] b, int off, int len) throws Exception {   
        if(len > UNIT_LENGTH)   
            len = UNIT_LENGTH;   
        int i = read_pos >> UNIT_LENGTH_BITS;   
           
        // 1."等待"有足夠內容可讀   
        if(try_reading(i, len) < len || isalive == false)   
            return -1;   
  
        // 2.讀取   
        int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH   
        if (tail_len < len) {   
            System.arraycopy(buf, read_pos, b, off, tail_len);   
            System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);   
        } else  
            System.arraycopy(buf, read_pos, b, off, len);   
  
        fill_bytes -= len;   
        file_pointer += len;   
        read_pos += len;   
        read_pos &= BUF_LENGTH_MASK;   
        buf_bytes[i] -= len;   
        if (buf_bytes[i] < 0) {   
            int ni = read_pos >> UNIT_LENGTH_BITS;   
            buf_bytes[ni] += buf_bytes[i];   
            buf_bytes[i] = 0;   
            notifyWriter();   
        } else if (buf_bytes[i] == 0)   
            notifyWriter();   
           
        return len;   
    }   
       
    /**//*  
     * 從src_off位置復制,不移動文件"指針"  
     */  
    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {   
        int rpos = read_pos + src_off;   
        if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false)   
            return -1;   
        int tail_len = BUF_LENGTH - rpos;   
        if (tail_len < len) {   
            System.arraycopy(buf, rpos, b, dst_off, tail_len);   
            System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);   
        } else  
            System.arraycopy(buf, rpos, b, dst_off, len);   
        // 不發信號   
  
        return len;   
    }   
       
    public long length() {   
        return file_length;   
    }   
       
    public long getFilePointer() {   
        return file_pointer;   
    }   
  
    public void close() {   
        //   
    }   
       
    //   
    public void seek(long pos) throws Exception {   
        //   
    }   
       
}

網頁制作poluoluo文章簡介:我平時比較喜歡從網上聽歌,有些鏈接下載速度太慢了。如果用HttpURLConnection類的方法打開連接,然後用InputStream類獲得輸入流,再用BufferedInputStream構造出帶緩沖區的輸入流,如果網速太慢的話,無論緩沖區設置多大,聽起來都是斷斷續續的,達不到真正緩沖的目的

XML學習教程| jQuery入門知識| AJAX入門| Dreamweaver教程| Fireworks入門知識| SEO技巧| SEO優化集錦|
Copyright © DIV+CSS佈局教程網 All Rights Reserved