实现线程池

在github上有很多线程库可以参考,这里总结一下主要的实现方法。
/github.com/mtrebi/thread-pool c++11
/github.com/mbrossard/threadpool pthread, c
/github.com/vit-vit/CTPL/blob/master/ctpl.h c++11
/github.com/bilash/threadpool pthread, class封装了condvar和mutex.
/github.com/nbsdx/ThreadPool/ c++11
/github.com/Pithikos/C-Thread-Pool/ pthread, c

C++11的实现方式基本上是相同的,以函数对象std::function<void()>作为任务构建queue, thread或者worker从queue获取任务,然后执行。
在往queue里面添加任务的时候,可以添加任何函数形式。
通过std::bind,将函数和函数参数bind在一起,得到函数对象,再通过lanmda构造void(void)函数。
另外,返回值用到了std::future, 存储未来的值,如果值还没有设置,对它的访问会阻塞。
在notify_one调用之前没有加锁,而在CTPL使用了锁。但在notify的时候确实不需要锁。/blog.csdn.net/fengbingchun/article/details/73695596
对于queue, CTPL使用了boost::lock_free::queue.

  / Submit a function to be executed asynchronously by the pool
  template<typename F, typename...Args>
  auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    / Create a function with bounded parameters ready to execute
    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
    / Encapsulate it into a shared ptr in order to be able to copy construct / assign 
    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

    / Wrap packaged task into void function
    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    / Enqueue generic wrapper function
    m_queue.enqueue(wrapper_func);

    / Wake up one thread if its waiting
    m_conditional_lock.notify_one();

    / Return future from promise
    return task_ptr->get_future();
  }

在nbsdx/ThreadPool,只支持把std::function<void()>作为参数来添加任务。

在Pithikos/C-Thread-Pool定义了job,来打包函数指针和参数,只能支持没有参数或者一个参数的情况,多个参数的话,需要自己打包。
/* Job /
typedef struct job{
struct job
prev; /* pointer to previous job /
void (
function)(void* arg); /* function pointer /
void
arg; /* function's argument */
} job;
在mbrossard/threadpool类似,对函数参数的支持不好。

在bilash/threadpool定义了Task, C风格的,都差不多。

这样看下来,还是第一个的实现最好。

6136640255

一些参考
类似于vtune的profile, 需要自己添加profile对象
/github.com/yse/easy_profiler
用于做benchmark,测单一功能比较有用
/github.com/google/benchmark
benchmark测试的一个例子
/github.com/agustingianni/ThreadProfiler
简单的timer
/github.com/d3rrial/timer/blob/master/timer.hpp
rdtsc timer
/github.com/jffrosen/rdtscp_timer/blob/master/timing.cc
rdtsc介绍
/blog.csdn.net/zdy0_2004/article/details/52971940

三种计时方法 clock, gettimeofday (精度us), time (精度s)
/blog.csdn.net/qq_19764963/article/details/79468683

Google的gperftools套件中包含有CPU Profiler(以下简称:pprof),原生支持C/C++多线程程序的性能剖析。相比GNU Profiler无法原生支持多线程性能剖析,pprof更具实用价值。

简单的timer

#include <chrono>

class timer {
public:
    timer(){ _start_time = std::chrono::high_resolution_clock::now(); }
    double elapsed(){ return std::chrono::duration<double, std::ratio<1, 1>>( std::chrono::high_resolution_clock::now()-_start_time).count(); }
private:
    std::chrono::high_resolution_clock::time_point _start_time;
};

easy_profile里面在函数开始的地方,加上EASY_FUNCTION就可以work,那么它是怎么获取函数名的呢?
通过编译器的宏, func和FUNCTION都可以,这里针对gcc/g++, 其他的编译器可能会不一样。
为了防止有重复的函数名,可以加上文件名和行号作为辅助.
FILE和LINE.
对于类函数,输出的也只是函数名。
更多获取函数名,调用栈,添加打印信息的参考
/blog.csdn.net/CaspianSea/article/details/9957743
/blog.csdn.net/adream307/article/details/6621754
/blog.csdn.net/madbunny/article/details/79801902

benchmark里面怎么统计cpu time的呢?
在src/benchmark_runner.cc里面记录时间,使用的timer是
src/thread_timer.h, 而thread_timer又包含real_time, 从chrono::high_resolution_clock得到,和cpu_time, 从clock_gettime得到。
int clock_gettime(clockid_t clk_id, struct timespec *tp);
CLOCK_THREAD_CPUTIME_ID,本线程到当前代码系统CPU花费的时间
有的地方用rdtsc代替clock_gettime.
实际上在线程里面统计的时间,就是cpu时间。跟使用什么timer没有关系。
如果起并行的接口是parallel(function f)
那么在parallel内部,记录elaps时间,而在function内部,记录的则是cpu时间。

用于简单bench测试,cpp
template<typename time_type=std::chrono::milliseconds, typename clock=std::chrono::steady_clock>
struct measure {
template<typename F, typename ...Args>
static std::pair<typename std::result_of<F(Args...)>::type, typename time_type::rep> execution(F func, Args&&... args) {
auto t0 = clock::now();
auto ret = func(std::forward<Args>(args)...);
auto t1 = clock::now();
auto delta = std::chrono::duration_cast<time_type>(t1 - t0);
return std::make_pair(ret, delta.count());
}
};

BitMap用于稀疏矩阵pattern构建

在构建稀疏矩阵pattern的时候,一般会使用std::vector< std::set > 来存储pattern。vector对应每一列,而每一列中不为0的位置就存到set里面,排序并且去重。当稀疏矩阵比较dense的时候,那么set的开销就比较明显了。
当稀疏比较dense的时候,那么考虑使用bitmap来记录有值的位置,不仅仅比set省内存,而且会比set快,因为set是O(n), bitmap是O(1)的。
对bitmap, 比较方便的是用char存储,因为可以通过shift来计算位置,也可以使用short和long。
下面是一个char的实现。

class CharBitMap
{
public:
    CharBitMap(unsigned int size)
        : mem_(NULL), size_(size)
    {
        memSize_ = (size + 7) >> 3;
        mem_ = new char[memSize_];
        memset(mem_, 0, sizeof(char) * memSize_);
    }
    void Set(unsigned int pos)
    {
        unsigned int memOffset = 0;
        unsigned int bitOffset = 0;
        GetLocation(pos, memOffset, bitOffset);
        mem_[memOffset] |= 1 << bitOffset;
    }
    void Clear(unsigned int pos)
    {
        unsigned int memOffset = 0;
        unsigned int bitOffset = 0;
        GetLocation(pos, memOffset, bitOffset);
        mem_[memOffset] &= ~(1 << bitOffset);
    }
    bool Get(unsigned int pos)
    {
        unsigned int memOffset = 0;
        unsigned int bitOffset = 0;
        GetLocation(pos, memOffset, bitOffset);
        return mem_[memOffset] & (1 << bitOffset);
    }
    void ClearAll()
    {
        memset(mem_, 0, sizeof(char) * memSize_);
    }
    unsigned int GetNNZ()
    {
        unsigned int nnz = 0;
        for(unsigned int i = 0; i < memSize_; ++i)
        {
            nnz += CountBit(mem_[i]);
        }
        return nnz;
    }
    void GetIndexSet(std::vector<unsigned int>& idx)
    {
        unsigned pos = 0;
        idx.reserve(16);
        for(unsigned int i = 0; i < memSize_; ++i)
        {
            char x = mem_[i];
            for(unsigned int j = 0; j < 8; ++j)
            {
                if(x & ( 1 << j))
                    idx.push_back(pos);
                ++pos;
            }
        }
    }
private:
    void GetLocation(unsigned int size, unsigned int& memOffset, unsigned int& bitOffset)
    {
        memOffset = size >> 3;
        bitOffset = size - (memOffset << 3);
    }
    unsigned int CountBit(char x)
    {
        unsigned int cnt = 0;
        while(x &= x - 1) ++cnt;
        return cnt;
    }
public:
    char* mem_;
    unsigned int size_;
    unsigned int memSize_;
};

测试代码,给定维度10000, entry数从10到2000, 测试用char存储和long存储的bitmap,set,以及直接用vector的结果,后续需要对vector内容排序并去重,开销没有包含进去。
测试代码

template<typename BitMap>
void Test()
{
    const unsigned int N = 10000;
    const unsigned int mSize = 10;
    const unsigned int M[] = {10, 20, 30, 50, 100, 200, 300, 500, 1000, 2000};
    std::vector<unsigned int> seeds(M[mSize - 1]);
    for(unsigned int i = 0; i < seeds.size(); ++i)
    {
        unsigned int x = rand() % N;
        seeds[i] = x;
    }
    for(unsigned int j = 0; j < mSize; ++j)
    {
        double t_start = omp_get_wtime();
        for(unsigned int i = 0; i < N; ++i)
        {
            BitMap bits(N);
            for(unsigned int k = 0; k < M[j]; ++k)
            {
                bits.Set(seeds[k]);
            }
        }
        double t_end = omp_get_wtime();
        printf("bit map for size: %d, take time: %.4f\n", M[j], t_end - t_start);
    }
}

bool Equal(std::vector<unsigned int>& a, std::vector<unsigned int>& b)
{
    if(a.size() != b.size()) return false;
    for(unsigned int i = 0; i < a.size(); ++i)
    {
        if(a[i] != b[i]) return false;
    }
    return true;
}
template<typename BitMap>
void TestCorrect()
{
    const unsigned int N = 10000;
    const unsigned int M = 10;
    std::vector<unsigned int> seeds(M);
    BitMap bits(N);
    for(unsigned int i = 0; i < seeds.size(); ++i)
    {
        unsigned int x = rand() % N;
        seeds[i] = x;
        bits.Set(x);
    }
    std::vector<unsigned int> idx;
    bits.GetIndexSet(idx);

    std::sort(seeds.begin(), seeds.end());
    auto last = std::unique(seeds.begin(), seeds.end());
    seeds.erase(last, seeds.end());
    if(Equal(idx, seeds))
        printf("verify success.\n");
    else
    {
        printf("idx  : ");
        PrintVec(idx);
        printf("seeds: ");
        PrintVec(seeds);
        printf("verify failed.\n");
    }
}
int main()
{
    Test<CharBitMap>();
    Test<IntBitMap>();
    Test<IndexSet>();
    Test<IndexVec>();
    TestCorrect<CharBitMap>();
    TestCorrect<IntBitMap>();
    return 0;
}

数据显示charBitMap最快,LongIntBitMap慢一倍,vector在entry数小于30的时候是最快的,set最慢。

OpenMP 并行编程

最近看OpenMP并行编程。

可以参考的资料
OpenMP编译原理及实现技术, 2012.
/max.book118.com/html/2015/0703/20321922.shtm

比较好的turorial介绍
/bisqwit.iki.fi/story/howto/openmp/#ExampleInitializingATableInParallelSingleThreadSimd

下面的文章强调了for循环并行化里面需要注意的依赖问题:数据依赖和循环依赖。如在fibonacci计算, f[i]= f[i-1] + f[i-2]
/www.cnblogs.com/hantan2008/p/5961312.html
如果使用两个线程,计算f(8), 那么第一个线程会计算f[1]~ f[4], 第二个线程计算f[5]~ f[8].
f(5) 依赖于上一次循环的结果,分到不同线程里面计算导致不能拿到正确的f(4)的值。
如向量相加, y[i] = x[i] , 数据依赖,这个并不会存在问题。

自己实现了一个例子,gcc8的结果比较正常,4线程加速比可以接近4, gcc5反倒只有2,可能是环境配置问题。

#include <cstdio>

#include <vector>

#include <omp.h>

/ compile cmd
/ gcc calcPI.cpp --std=c++14 -fopenmp -O3 -o aa

#define NUM_STEPS 100000000
#define NUM_THREADS 4

void calcPI()
{
    double t_start = omp_get_wtime();

    double step = 1.0 / NUM_STEPS;
    double sum = 0;

    for(int i=0; i < NUM_STEPS; ++i)
    {
        double x = (i+0.5) * step;
        sum = sum + 4.0 / (1.0 + x * x);
    }

    double pi = step * sum;

    double t_end = omp_get_wtime();
    printf("pi = %.8f, time: %.4f \n", pi, t_end - t_start);
}

void calcPI_parallel()
{
    double t_start = omp_get_wtime();

    double step = 1.0 / NUM_STEPS;
    / 4 threads 
    /std::vector<double> sum_tmp;
    /sum_tmp.resize(NUM_THREADS, 0.0);
    / non-shared data in the same cache line so each update invalidates the cache line..., really ?  maybe not
    double sum_tmp[NUM_THREADS] = {0};

#pragma omp parallel for num_threads(NUM_THREADS)
    for(int i=0; i< NUM_STEPS; ++i)
    {
        int thread_id = omp_get_thread_num();
        double x = (i+0.5) * step; 
        sum_tmp[thread_id] += 4.0 / (1.0 + x * x);
    }
    double sum = 0;
    for(int i=0; i< NUM_THREADS; ++i)
    {
        sum += sum_tmp[i];
    }
    double pi = step * sum;

    double t_end = omp_get_wtime();
    printf("pi = %.8f, time: %.4f \n", pi, t_end - t_start);
}

void calcPI_atomic()
{
    double t_start = omp_get_wtime();

    double step = 1.0 / NUM_STEPS;
    / 4 threads
    double sum = 0;

#pragma omp parallel for reduction(+:sum) num_threads(NUM_THREADS)
    for(int i=0; i< NUM_STEPS; ++i)
    {
        double x = (i+0.5) * step;
        double t = 4.0 / (1.0 + x * x);
/#pragma omp atomic
        sum += t;
        / but use atomic is very slow
    }
    double pi = step * sum;

    double t_end = omp_get_wtime();
    printf("pi = %.8f, time: %.4f \n", pi, t_end - t_start);
}

int main()
{
    calcPI();
    calcPI_parallel();
    calcPI_atomic();
    return 0;
}


编译:
gcc calcPI.cpp --std=c++14 -fopenmp -O3 -o aa

519-580-2072

WPS for Linux 2018 发布了,去更新了一下,结果出现闪退。
在命令行启动,就可以用。
这个版本还是没有数学公式,查到有其它工具可以用:KLatexFormula,于是就试了一下。
下载/github.com/TobiasWinchen/klatexformula_debian
参考
/blog.csdn.net/ouening/article/details/79008636
启动的时候遇到问题,找不到qt的一些库。添加qt的路径,可以使用locate查找它缺的库,比如libQt5Core.so。
按照参考,将drag和copy都改成png格式,确实可以拖到wps里面。但是显示的字比较小,虽然可以在wps里面更改png的大小,但还是不方便。可以在setting里面更改字体大小,但是也只是更改这个软件里面的显示大小。说不定可以看看code,更改一下。