拙作のgonlineに並列での学習もサポートするようにした。 分散環境での学習は手間がかかりそうだったので並列での学習のみとしている。 並列での学習にはIterative Parameter Mixture (pdf)を提供している。

シングルコアで学習するよりは速いんだけど、モデルの平均を取る時のボトルネックが大きくて、学習データの量がそれほど多くない場合はあまり効果がなさそう (以下の実験では人工的に学習データを増やしている)。CPU数を増やすと、平均を計算するコストが大きくなるので単純に学習が速くなるわけではない 。平均を取るときも、二分木にして並列化をしているが O(N)がO(log N)になるくらいなので、CPUの数が少なければ平均の計算がとても速くなるわけでもない。 CPUは、1.7 GHz Intel Core i5を利用して、4コア利用時の学習速度とシングルコア利用時の学習速度をと比較してみる。

$wc -l news20.scale
   15935 news20.scale
$touch news20.scale.big
$for i in 1 2 3 4 5; do cat news20.scale >> news20.scale.big; done
$wc -l news20.scale.big
   79675 news20.scale.big
$time ./gonline train -a arow -m model -i 10 -t ./news20.t.scale -withoutshuffle -p 4 -s ipm ./news20.scale.big
./gonline train -a arow -m model -i 10 -t ./news20.t.scale -withoutshuffle -p  272.55s user 8.83s system 181% cpu 2:34.95 total
$time ./gonline train -a arow -m model -i 10 -t ./news20.t.scale -withoutshuffle -p 1 -s ipm ./news20.scale.big
./gonline train -a arow -m model -i 10 -t ./news20.t.scale -withoutshuffle -p  169.83s user 5.84s system 97% cpu 3:00.66 total

Iterative Parameter Mixtureに関するコードは以下。

ipm.go

func FitLearners(learners *[]LearnerInterface, x *[]map[string]float64, y *[]string) {
	var wg sync.WaitGroup
	num_learner := len(*learners)
	num_data := len(*x)
	buffer := make(chan int, num_learner)
	sizechunk := num_data/num_learner + 1
	for i := 0; i < num_learner; i++ {
		wg.Add(1)
		go func(ch chan int) {
			defer wg.Done()
			for j := range ch {
				start := j * sizechunk
				end := (j + 1) * sizechunk
				if end >= num_data {
					end = num_data - 1
				}
				x_j := (*x)[start:end]
				y_j := (*y)[start:end]
				(*learners)[j].Fit(&x_j, &y_j)
			}
		}(buffer)
	}
	for i := 0; i < num_learner; i++ {
		buffer <- i
	}
	close(buffer)
	wg.Wait()
}

func average_two(learner1, learner2 *LearnerInterface) *LearnerInterface {
	params := (*learner1).GetParams()
	num_params := len(*params)
	avg_params := make([][][]float64, num_params, num_params)
	for i := 0; i < num_params; i++ {
		avg_params[i] = make([][]float64, 10)
	}

	avg_ftdic := NewDict()
	avg_labeldic := NewDict()
	learners := []LearnerInterface{*learner1, *learner2}
	for _, learner := range learners {
		params := learner.GetParams()
		num_params := len(*params)
		ftdict, labeldict := learner.GetDics()

		for p := 0; p < num_params; p++ {
			param := (*params)[p]
			avg_param := &avg_params[p]
			for yid := 0; yid < len(labeldict.Id2elem); yid++ {
				y := labeldict.Id2elem[yid]
				if !avg_labeldic.HasElem(y) {
					avg_labeldic.AddElem(y)
				}
				yid_avg := avg_labeldic.Elem2id[y]
				for i := len(*avg_param); i <= yid_avg; i++ {
					*avg_param = append(*avg_param, make([]float64, 0, 1000))
				}
				avg_param_y := &avg_params[p][yid_avg]
				param_y := param[yid]

				for ftid := 0; ftid < len(param[yid]); ftid++ {
					ft := ftdict.Id2elem[ftid]
					if !avg_ftdic.HasElem(ft) {
						avg_ftdic.AddElem(ft)
					}
					ftid_avg := avg_ftdic.Elem2id[ft]
					for i := len(*avg_param_y); i <= ftid_avg; i++ {
						*avg_param_y = append(*avg_param_y, 0.)
					}
					(*avg_param_y)[ftid_avg] += param_y[ftid] / float64(len(learners))

				}
			}
		}
	}
	(*learner1).SetParams(&avg_params)
	(*learner1).SetDics(&avg_ftdic, &avg_labeldic)
	return learner1
}

func AverageModels(learners []LearnerInterface) *LearnerInterface {
	if len(learners)%2 != 0 { /* add learner to make length of learners is even number */
		learners = append(learners, learners[len(learners)/2])
	}
	num_learner := len(learners)
	buffer := make(chan int, num_learner)
	results := make(chan *LearnerInterface, num_learner)

	var wg sync.WaitGroup
	for i := 0; i < num_learner; i++ {
		wg.Add(1)
		go func(ch chan int) {
			defer wg.Done()
			for j := range ch {
				l1 := learners[j]
				l2 := learners[j+num_learner/2]
				l_avg := average_two(&l1, &l2)
				results <- l_avg
			}
		}(buffer)
	}
	for i := 0; i < num_learner/2; i++ {
		buffer <- i
	}
	close(buffer)
	wg.Wait()
	close(results)
	learners_avg := make([]LearnerInterface, 0, num_learner/2)
	for l_avg := range results {
		learners_avg = append(learners_avg, *l_avg)
	}

	if len(learners_avg) == 1 {
		return &learners_avg[0]
	}
	return AverageModels(learners_avg)
}

func BroadCastModel(avg_learner *LearnerInterface, learners *[]LearnerInterface) {
	params := (*avg_learner).GetParams()
	avg_ftdic, avg_labeldic := (*avg_learner).GetDics()
	num_learner := len(*learners)
	var wg sync.WaitGroup
	buffer := make(chan int, num_learner)
	for i := 0; i < num_learner; i++ {
		wg.Add(1)
		go func(ch chan int) {
			defer wg.Done()
			for j := range ch {
				(*learners)[j].SetParams(params)
				(*learners)[j].SetDics(avg_ftdic, avg_labeldic)
			}
		}(buffer)
	}
	for i := 0; i < num_learner; i++ {
		buffer <- i
	}
	close(buffer)
	wg.Wait()
}

main.go

gonline.FitLearners(&learners, x_train, y_train)
learner_avg = gonline.AverageModels(learners)
gonline.BroadCastModel(learner_avg, &learners)


関連記事






最近の記事