一道多線程題目的思考 – JAVA編程語言程序開發技術文章

題目如下:
  啟動4個線程,向4個文件A,B,C,D裡寫入數據,每個線程隻能寫一個值。
  線程1:隻寫1
  線程2:隻寫2
  線程3:隻寫3
  線程4:隻寫4
  4個文件A,B,C,D。
  程序運行起來,4個文件的寫入結果如下:
  A:12341234…
  B:23412341…
  C:34123412…
  D:41234123…

  這是前些天看的一個blog上的題目 https://www.aiwalls.com/kf/201203/123393.html  他的實現我沒有時間看,以後再補上,大概是理解的,簡單的說就是如果線程2來瞭,就想辦法找結尾是1的文件提供給2寫入.我自己亦因為一些問題的困擾想瞭好久
  最初比較容易發現的一個規律:
    線程:1234 1234 1234 1234….
    資源:ABCD DABC CDAB BCDA….
  線程對應資源的調度是呈以上的規律重復出現的。打個很爛的比喻就是:1234手上分別拿著ABCD4種不同的水果 而他們除瞭想吃自己的還想吃別人的而且生怕別人多吃,於是約定瞭一個協議:每個人隻能對自己拿到的水果咬一口,咬完之後就要按一個方向(順時針)給下一個(4給1),而自己咬完瞭上一個人沒咬完就隻能等。所以很直觀的想到按固定順序調度線程組操作有固定流向的資源。而控制線程按著一個固定順序調度是可以做到的,如按1,2,3,4的循環;而固定流向的資源ABCDDABCCDABBCDA亦有規律的:以4位為一組分為[ABCD][DABC][CDAB][BCDA]而每組的最後一位放在最前面就是下一組的結果,而後面的都是重復的,所以考慮循環。而如何協調1->A,2->B,3->C,4->D 可以將資源放到阻塞隊列中 然後按固定順序調度控制對應的線程去取,這個固定順序需要同步機制實現
  而之後有個問題我想瞭好久:有這樣的一種情況就是: 2吃完瞭自己的水果,3把自己的和2的都吃完瞭,就是2,3都等著要吃1的 當1吃完瞭 如何區分是給2還是3呢?後面想到的方法是給他們定義帶順序的號卡,2,3就按著號排隊。1通過號卡區分(就是給1,2,3,4定義各自的對象監視器,而且是有序的,他們通過自身的序號可以取到下一線程等待的監視器,而喚醒下一線程)
  實現如下:<如有不對歡迎指出>
package com.java5.learning.concurrent;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 測試類
 * @author zengjf
 *
 */
public class WritingTest {
   
    private static int order = 0;
   
    public static void main(String[] args) {
        final String[] Contents = new String[]{"1","2","3","4"};
        int srcNums = 4, threadNums = 4;
        FileFlow flow = new FileFlow(srcNums);
        final FileScheduler fileScheduler = new FileScheduler(flow,threadNums);
       
        ExecutorService es = Executors.newFixedThreadPool(threadNums);
        for (int i = 0; i < threadNums; i++) {
            es.execute(new Runnable() {
                public void run() {
                    int o = order++;
                    try {
                        while (true) {
                            fileScheduler.write(o, fileScheduler.schedule(o),
                                    Contents[o]);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        es.shutdown();
    }   
}
/**
 * 文件對象的獲取及寫入
 * @author zengjf
 *
 */
class FileScheduler{
   
    FileFlow flow;
    Lock lock;
    Condition[] condition;
    boolean[] turn;//順序
    ConcurrentMap isVisiting;//文件是否被占據
    ConcurrentMap lockObjMap;//號卡集合
    ConcurrentMap outputMap;

    public FileScheduler(FileFlow flow,int threadNums){
        this.flow = flow;
        this.lock = new ReentrantLock();
        this.turn = new boolean[threadNums];
        this.condition = new Condition[threadNums];
        this.isVisiting = new ConcurrentHashMap();
        this.lockObjMap = new ConcurrentHashMap();
        this.outputMap = new ConcurrentHashMap<String,StringBuffer>();

        for (int i = 0; i < threadNums; i++) {
            turn[i] = i == 0 ? true: false;
            condition[i] = this.lock.newCondition();
            lockObjMap.put(i,new Integer(i));
        }       
    }
    /**
     * 按流向獲取文件
     * @param order
     * @return
     * @throws Exception
     */
    public String schedule(int order) throws Exception{
        String file;
        lock.lock();
        try {
            //如果初次進入的非1線程 則等待 又1線程開始獲取 並喚醒下一線程
            while (!turn[order]) {
                condition[order].await();
            }
           
            file = flow.getFileInFlow();
            Thread.sleep(100);
           
            turn[order] = false;//自身等待
            order = order == 3 ? 0 : order + 1;
            turn[order] = true;//喚醒下一位
           
            condition[order].signal();
        } finally {
            lock.unlock();
        }
        return file;   
    }
    /**
     * 寫入文件
     * @param order
     * @param file
     * @param Content
     * @throws Exception
     */
    public void write(int order,String file,String Content)throws Exception{
       
        synchronized (FileScheduler.class) {
            if (!isVisiting.containsKey(file)) {
                isVisiting.put(file, false);
                outputMap.put(file,new StringBuffer(file+":"));
            }
        }
       
        synchronized (FileScheduler.class) {
            while ((Boolean) isVisiting.get(file)) {
                Integer lockObj = (Integer)lockObjMap.get(order);//號卡
                synchronized (lockObj) {
                    System.out.println("線程" + (order+1) + "等待" + file);
                    lockObj.wait();
                }
            }
        }
        //並發寫入
        isVisiting.put(file, true);
        System.out.println("線程" + (order+1) + "開始寫文件" + file + "..");
        Thread.sleep(new Random().nextInt(4)*1000);
        System.out.println("線程" + (order+1) + "寫文件" + file + "結束..");
        StringBuffer sb = (StringBuffer)outputMap.get(file);
        System.out.println(sb.append(Content).toString());
        isVisiting.put(file, false);
       
        //如果存在等待線程 則通過號卡喚醒
        Integer nextLockObj = (Integer)lockObjMap.get(order == 3 ? 0 : order + 1);
        synchronized (nextLockObj) {
            nextLockObj.notify();
        }
    }
}
/**
 * 文件類 按照某個流向供應文件對象
 * @author zengjf
 *
 */
class FileFlow{
    private ArrayList<String> list = new ArrayList<String>();
    private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(4);
    private String[] filenames = {"A","B","C","D"};
    public FileFlow(int nums){   
        for (int i = 0; i < nums && i< filenames.length; i++) {
            list.add(filenames[i]);
        }
       
        ExecutorService es = Executors.newSingleThreadExecutor();   
        es.execute(new Runnable(){
            public void run() {
                while(true){
                    int i ;
                    for ( i = 0; i < list.size(); i++) {
                         try {
                            queue.put(list.get(i));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }       
                    }
                   
                    list.add(0,list.remove(i-1));
                }
            }
        });
    }
   
    public String getFileInFlow(){
        try {
            return queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
   
    public int length(){
        return list.size();
    }
}

 

摘自  Goodspeed85
 

發佈留言