Commit a41c6621 authored by Kirill Terekhov's avatar Kirill Terekhov

Integrate K-means algorithm with Partitioner

not tested yet
parent f7249beb
......@@ -1265,7 +1265,296 @@ namespace INMOST
}
if( package == 3 ) //KMEANS
{
int total_points = 0;
int K = (int)m->GetProcessorsNumber(); //number of clusters
int max_iterations = 100;
#if defined(USE_OMP)
#pragma omp parallel for reduction(+:total_points)
#endif
for(int q = 0; q < m->CellLastLocalID(); ++q) if( m->isValidCell(q) )
{
Cell n = m->CellByLocalID(q);
if( n->GetStatus() != Element::Ghost ) total_points++;
}
std::vector< int > points_node(total_points);
std::vector< double > points_center(total_points*3);
std::vector< int > points_cluster(total_points,-1);
int k = 0;
for(int q = 0; q < m->CellLastLocalID(); ++q) if( m->isValidCell(q) )
{
Cell n = m->CellByLocalID(q);
if( n->GetStatus() != Element::Ghost ) points_node[k++] = n->LocalID();
}
#if defined(USE_OMP)
#pragma omp paralell for
#endif
for(int q = 0; q < total_points; ++q)
{
Cell n = m->CellByLocalID(points_node[q]);
double cnt[3];
n->Centroid(cnt);
points_center[q*3+0] = cnt[0];
points_center[q*3+1] = cnt[1];
points_center[q*3+2] = cnt[2];
}
#if defined(USE_MPI)
std::vector<int> displs(m->GetProcessorsNumber());
std::vector<int> counts(m->GetProcessorsNumber());
std::vector<int> npoints(m->GetProcessorsNumber());
int total_global_points = 0;
int total_local_points = 0;
bool balance = false;
MPI_Allgather(&total_points,1,MPI_INT,&npoints[0],1,MPI_INT,MPI_COMM_WORLD);
double imbalance = 1;
for(int k = 0; k < (int) m->GetProcessorsNumber(); ++k)
{
for(int l = k+1; l < (int) m->GetProcessorsNumber(); ++l)
{
imbalance = std::max(imbalance,npoints[k]/(double)npoints[l]);
imbalance = std::max(imbalance,npoints[l]/(double)npoints[k]);
}
}
//if( m->GetProcessorRank() == 0 )
// std::cout << "Imbalance is " << imbalance << std::endl;
if( imbalance > 1.2 )
balance = true;
if( balance )//redistribute points
{
for(int k = 0; k < (int) m->GetProcessorsNumber(); ++k)
total_global_points += npoints[k];
total_local_points = (int)ceil((double)total_global_points/(double)m->GetProcessorsNumber());
std::vector<double> points_center_global(m->GetProcessorRank() == 0 ? total_global_points*3 : 1);
displs[0] = 0;
counts[0] = npoints[0]*3;
for(int k = 1; k < (int) m->GetProcessorsNumber(); ++k)
{
displs[k] = displs[k-1] + counts[k-1];
counts[k] = npoints[k]*3;
}
MPI_Gatherv(&points_center[0],total_points*3,MPI_DOUBLE,&points_center_global[0],&counts[0],&displs[0],MPI_DOUBLE,0,MPI_COMM_WORLD);
displs[0] = 0;
counts[0] = total_local_points*3;
for(int k = 1; k < (int) m->GetProcessorsNumber(); ++k)
{
displs[k] = displs[k-1] + counts[k-1];
counts[k] = total_local_points*3;
}
counts.back() = total_global_points*3 - displs.back();
total_points = counts[m->GetProcessorRank()]/3;
points_center.resize(total_points*3);
MPI_Scatterv(&points_center_global[0],&counts[0],&displs[0],MPI_DOUBLE,&points_center[0],total_points*3,MPI_DOUBLE,0,MPI_COMM_WORLD);
points_cluster.resize(total_points,-1);
}
#endif
std::vector< double > cluster_center(K*3);
std::vector< int > cluster_npoints(K);
#if defined(USE_MPI)
std::vector< double > cluster_center_tmp(K*3);
std::vector< int > cluster_npoints_tmp(K);
#endif
//if( m->GetProcessorRank() == 0 )
// std::cout << "Init clusters" << std::endl;
// choose K distinct values for the centers of the clusters
{
std::vector<int> prohibited_indexes;
int Kpart = (int)ceil((double)K/(double)m->GetProcessorsNumber());
int Kstart = Kpart * m->GetProcessorRank();
int Kend = Kstart + Kpart;
if( Kend > K ) Kend = K;
for(int i = Kstart; i < Kend; i++)
{
while(true)
{
int index_point = rand() % total_points;
if(find(prohibited_indexes.begin(), prohibited_indexes.end(),
index_point) == prohibited_indexes.end())
{
prohibited_indexes.push_back(index_point);
cluster_center[i*3+0] = points_center[index_point*3+0];
cluster_center[i*3+1] = points_center[index_point*3+1];
cluster_center[i*3+2] = points_center[index_point*3+2];
points_cluster[index_point] = i;
break;
}
}
}
//guter cluster centers over all processes
#if defined(USE_MPI)
{
//std::vector<int> counts(m->GetProcessorsNumber()), displs(m->GetProcessorsNumber());
displs[0] = 0;
counts[0] = Kpart*3;
for(int i = 1; i < (int)m->GetProcessorsNumber(); ++i)
{
displs[i] = displs[i-1] + counts[i-1];
counts[i] = Kpart*3;
}
counts.back() = K*3 - displs.back();
for(int i = Kstart; i < Kend; ++i)
{
cluster_center_tmp[i*3+0] = cluster_center[i*3+0];
cluster_center_tmp[i*3+1] = cluster_center[i*3+1];
cluster_center_tmp[i*3+2] = cluster_center[i*3+2];
}
MPI_Allgatherv(&cluster_center_tmp[Kstart*3],(Kend-Kstart)*3,MPI_DOUBLE,&cluster_center[0],&counts[0],&displs[0],MPI_DOUBLE,MPI_COMM_WORLD);
}
#endif
}
//if( m->GetProcessorRank() == 0 )
// std::cout << "Start clustering" << std::endl;
int iter = 1;
double t = Timer();
while(true)
{
int changed = 0;
// associates each point to the nearest center
#if defined(USE_OMP)
#pragma omp parallel for reduction(+:changed)
#endif
for(int i = 0; i < total_points; i++)
{
int id_old_cluster = points_cluster[i];
int id_nearest_center = -1;
double lmin = 1.0e+100;
for(int j = 0; j < K; ++j)
{
double v[3];
v[0] = (points_center[i*3+0] - cluster_center[j*3+0]);
v[1] = (points_center[i*3+1] - cluster_center[j*3+1]);
v[2] = (points_center[i*3+2] - cluster_center[j*3+2]);
double l = v[0]*v[0] + v[1]*v[1] + v[2]*v[2];
if( l < lmin )
{
lmin = l;
id_nearest_center = j;
}
}
//id_nearest_center = tree.closest(&points_center[i*3]);
if(id_old_cluster != id_nearest_center)
{
points_cluster[i] = id_nearest_center;
changed++;
}
}
#if defined(USE_MPI)
int tmp = changed;
MPI_Allreduce(&tmp,&changed,1,MPI_INT,MPI_SUM,MPI_COMM_WORLD);
#endif
if(changed == 0 || iter >= max_iterations)
{
//if( m->GetProcessorRank() == 0 )
// std::cout << "Break in iteration " << iter << std::endl;
break;
}
for(int i = 0; i < K; i++)
{
cluster_center[i*3+0] = 0;
cluster_center[i*3+1] = 0;
cluster_center[i*3+2] = 0;
cluster_npoints[i] = 0;
}
// recalculating the center of each cluster
#if defined(USE_OMP)
#pragma omp parallel
#endif
{
std::vector< double > local_sum(K*3,0.0);
std::vector< int > local_npoints(K,0);
for(int j = 0; j < total_points; ++j)
{
local_sum[points_cluster[j]*3+0] += points_center[j*3+0];
local_sum[points_cluster[j]*3+1] += points_center[j*3+1];
local_sum[points_cluster[j]*3+2] += points_center[j*3+2];
local_npoints[points_cluster[j]]++;
}
#if defined(USE_OMP)
#pragma omp critical
#endif
{
for(int i = 0; i < K; ++i)
{
cluster_center[i*3+0] += local_sum[i*3+0];
cluster_center[i*3+1] += local_sum[i*3+1];
cluster_center[i*3+2] += local_sum[i*3+2];
cluster_npoints[i] += local_npoints[i];
}
}
}
#if defined(USE_MPI)
MPI_Allreduce(&cluster_center[0],&cluster_center_tmp[0],K*3,MPI_DOUBLE,MPI_SUM,MPI_COMM_WORLD);
MPI_Allreduce(&cluster_npoints[0],&cluster_npoints_tmp[0],K,MPI_INT,MPI_SUM,MPI_COMM_WORLD);
cluster_center.swap(cluster_center_tmp);
cluster_npoints.swap(cluster_npoints_tmp);
#endif
for(int i = 0; i < K; i++)
{
cluster_center[i*3+0] /= (double) cluster_npoints[i];
cluster_center[i*3+1] /= (double) cluster_npoints[i];
cluster_center[i*3+2] /= (double) cluster_npoints[i];
}
//if( m->GetProcessorRank() == 0 )
// std::cout << "Iteration " << iter << " changed " << changed << std::endl;
iter++;
}
//tree.clear();
//std::cout << "Clustering in " << Timer() - t << " secs " << std::endl;
#if defined(USE_MPI)
if( balance )
{
std::vector<int> points_cluster_global(total_global_points);
displs[0] = 0;
counts[0] = total_local_points;
for(int k = 1; k < (int) m->GetProcessorsNumber(); ++k)
{
displs[k] = displs[k-1] + counts[k-1];
counts[k] = total_local_points;
}
counts.back() = total_global_points - displs.back();
MPI_Gatherv(&points_cluster[0],total_points,MPI_INT,&points_cluster_global[0],&counts[0],&displs[0],MPI_INT,0,MPI_COMM_WORLD);
displs[0] = 0;
counts[0] = npoints[0];
for(int k = 1; k < (int) m->GetProcessorsNumber(); ++k)
{
displs[k] = displs[k-1] + counts[k-1];
counts[k] = npoints[k];
}
total_points = counts[m->GetProcessorRank()];
points_cluster.resize(total_points);
MPI_Scatterv(&points_cluster_global[0],&counts[0],&displs[0],MPI_INT,&points_cluster[0],total_points,MPI_INT,0,MPI_COMM_WORLD);
}
#endif
// shows elements of clusters
TagInteger mat = m->RedistributeTag();
#if defined(USE_OMP)
#pragma omp parallel for
#endif
for(int j = 0; j < total_points; ++j)
mat[m->CellByLocalID(points_node[j])] = points_cluster[j]+1;
//m->ExchangeData(mat,CELL,0);
}
EXIT_FUNC();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment