/* test multithread write/flush */

#include <tap.h>
#include <my_sys.h>
#include <m_string.h>

static const uint number_of_threads= TEST_THREAD;
static const uint sync_period= 8;
#define  COUNTER_LIMIT sync_period*64;
static uint counter_limit;


static uint counter= 0;
static pthread_mutex_t counter_lock;
static pthread_cond_t counter_cond;
static uint in_progress= 0;
static pthread_mutex_t progress_lock;
static pthread_cond_t progress_cond;
static char buffer[1048576];
static File file;


void main_writer()
{
  uint pos;

  for(;;)
  {
    pthread_mutex_lock(&counter_lock);
    if (counter % sync_period == 0)
    {
      pthread_mutex_lock(&progress_lock);
      while (in_progress > 0)
      {
        pthread_cond_wait(&progress_cond, &progress_lock);
      }
      pthread_mutex_unlock(&progress_lock);

      /*if (my_sync(file, MYF(MY_WME)))
        exit(1);*/

      if (counter >= counter_limit)
      {
        my_close(file, MYF(MY_WME));
        exit(0);
      }

      pthread_cond_broadcast(&counter_cond);
    }
    pos= counter++;
    pthread_mutex_unlock(&counter_lock);
    if (my_pwrite(file, buffer, sizeof(buffer), sizeof(buffer) * pos,
                  MYF(MY_WME | MY_NABP)))
      exit(1);
  }
}

void helper_writer()
{
  uint pos;
  for(;;)
  {
    pthread_mutex_lock(&counter_lock);
    while (counter % sync_period == 0)
    {
       pthread_cond_wait(&counter_cond, &counter_lock);
    }
    pos= counter++;
    pthread_mutex_lock(&progress_lock);
    in_progress++;
    pthread_mutex_unlock(&progress_lock);
    pthread_mutex_unlock(&counter_lock);
    if (my_pwrite(file, buffer, sizeof(buffer), sizeof(buffer) * pos,
                  MYF(MY_WME | MY_NABP)))
      exit(1);
    pthread_mutex_lock(&progress_lock);
    in_progress--;
    if (!in_progress)
       pthread_cond_broadcast(&progress_cond);
    pthread_mutex_unlock(&progress_lock);
  }
}


static void *test_thread_writer(void *arg)
{
  int param=*((int*) arg);
  my_thread_init();
  if (number_of_threads == (uint)param)
    main_writer();
  else
    helper_writer();
  free((uchar*) arg);
  my_thread_end();
  return 0;
}

int main(int argc __attribute__((unused)),
         char **argv __attribute__((unused)))
{
  uint i;
  int *param;
  pthread_t tid;
  pthread_attr_t thr_attr;

  counter_limit= COUNTER_LIMIT;

  MY_INIT(argv[0]);
  pthread_mutex_init(&counter_lock, MY_MUTEX_INIT_FAST);
  pthread_mutex_init(&progress_lock, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&counter_cond, NULL);
  pthread_cond_init(&progress_cond, NULL);
  if ((file= my_open("FILE",
                     O_CREAT | O_TRUNC | O_RDWR, MYF(0))) == -1)
    exit(1);

  memset(buffer, '1', sizeof(buffer));
  pthread_attr_init(&thr_attr);
  pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
  for(i= 1; i <= number_of_threads; i++)
  {
    param=(int*) malloc(sizeof(int));
    *param= (int) i;
    if (pthread_create(&tid, &thr_attr, test_thread_writer,
                       (void*) param))
        exit(1);
  }
  for (;;)
  {
    sleep(10);
  }

  my_end(0);
}

