Logo Search packages:      
Sourcecode: lfc version File versions  Download package

dpm-drain.c

/*
 * Copyright (C) 2005-2008 by CERN/IT/GD/SC
 * All rights reserved
 */

#ifndef lint
static char sccsid[] = "@(#)$RCSfile: dpm-drain.c,v $ $Revision: 1.21 $ $Date: 2009/02/26 16:45:44 $ CERN IT-GD/SC Jean-Philippe Baud";
#endif /* not lint */

/*      dpm-drain - drain a component of the Light Weight Disk Pool Manager */

#include <grp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include "Cgetopt.h"
#include "dpm_api.h"
#include "dpns_api.h"
#include "serrno.h"
#include "u64subr.h"
int help_flag = 0;
volatile sig_atomic_t sig_int_received = 0;

void sig_handler(signum)
int signum;
{
      const char notice[34] = {"Interrupt received, finishing up.\n"};
      const char warn[50] = {"Interrupt received. One more to exit immediately.\n"};

      switch (++sig_int_received) {
            case 1:
                  write (STDERR_FILENO, notice, sizeof (notice));
                  break;
            case 2:
                  write (STDERR_FILENO, warn, sizeof (warn));
                  break;
            case 3:
                  _exit (USERR);
                  break;
      }
}

int main(argc, argv)
int argc;
char **argv;
{
      char buf[256];
      int c;
      u_signed64 current_size = 0;
      time_t current_time;
      char *dp;
      struct dpm_fs *dpm_fs;
      struct dpm_pool *dpm_pools;
      int errflg = 0;
      time_t f_lifetime;
      int flags;
      int found_fs = 0;
      char *fs = NULL;
      struct fs_list_s {
            char server[CA_MAXHOSTNAMELEN+1];
            char fs[80];
      } *fs_list = NULL;
      int fs_status;
      gid_t *gid_list = NULL; 
#ifndef VIRTUAL_ID
      struct group *gr;
#endif
      char groupname[256];
      int i = 0;
      int j = 0;
      Cns_list list;
      static struct Coptions longopts[] = {
            {"fs", REQUIRED_ARGUMENT, 0, OPT_FS},
            {"gid",REQUIRED_ARGUMENT,0,OPT_POOL_GID},
            {"group",REQUIRED_ARGUMENT,0,OPT_POOL_GROUP},
            {"help", NO_ARGUMENT, &help_flag, 1},
            {"size", REQUIRED_ARGUMENT, 0, OPT_DRAIN_SZ},
            {"poolname", REQUIRED_ARGUMENT, 0, OPT_POOL_NAME},
            {"server", REQUIRED_ARGUMENT, 0, OPT_FS_SERVER},
            {0, 0, 0, 0}
      };
      struct Cns_filereplica *lp;
      u_signed64 min_size = 0;
      int nbentries;
      int nbfs;   
      int nbgids = 0;
      int nbpools;      
      char *p = NULL;
      int pass_good;
      int pass_rc;
      char path[CA_MAXPATHLEN+1];
      char pfn[CA_MAXSFNLEN+1];
      char *poolname = NULL;
      int rc = 0;
      int replicate;
      struct dpns_filereplicax *rep_entries = NULL;
      int save_serrno;
      struct sigaction sigact;
      char *server = NULL;
      struct dpm_space_metadata *spacemd = NULL;
      int target_fs_status;
      struct Cns_filestatg statbuf;
      char u64buf[21];

      Copterr = 1;
      Coptind = 1;
      while ((c = Cgetopt_long (argc, argv, "", longopts, NULL)) != EOF) {
            switch (c) {
            case OPT_FS:
                  if (strlen (Coptarg) > 79) {
                        fprintf (stderr,
                              "filesystem name too long: %s\n", Coptarg);
                        errflg++;
                  } else
                        fs = Coptarg;
                  break;
            case OPT_FS_SERVER:
                  if (strlen (Coptarg) > CA_MAXHOSTNAMELEN) {
                        fprintf (stderr,
                              "server name too long: %s\n", Coptarg);
                        errflg++;
                  } else
                        server = Coptarg;
                  break;
            case OPT_DRAIN_SZ:
                  p = Coptarg;
                  while (*p >= '0' && *p <= '9') p++;
                  if (! (*p == '\0' || ((*p == 'k' || *p == 'M' ||
                      *p == 'G' || *p == 'T' || *p == 'P') && *(p+1) == '\0'))) {
                        fprintf (stderr,
                            "invalid minimum amount to drain %s\n", Coptarg);
                        errflg++;
                  } else
                        min_size = strutou64 (Coptarg);
                  break;
            case OPT_POOL_NAME:
                  if (strlen (Coptarg) > CA_MAXPOOLNAMELEN) {
                        fprintf (stderr,
                              "pool name too long: %s\n", Coptarg);
                        errflg++;
                  } else
                        poolname = Coptarg;
                  break;
            case OPT_POOL_GID:
                  p = Coptarg;
                  //check that the user didn't provide already a list of gid
                  if (nbgids == 0) i = 0;
                  while (*p) {
                        if (*p == ',') nbgids++;
                        p++;
                  }
                  nbgids++;
                  if ((gid_list = (gid_t *) realloc (gid_list, nbgids * sizeof(gid_t))) == NULL) {
                        fprintf (stderr, "Could not allocate memory for gids\n");
                        exit (USERR);
                  }
                  p = strtok (Coptarg, ",");
                  while (p) {
                        if ((gid_list[i] = strtol (p, &dp, 10)) < 0 || *dp != '\0') {
                              fprintf (stderr, "Invalid gid %s \n",p);
                              errflg++;
                        } else {
                                    
#ifdef VIRTUAL_ID
                              if (gid_list[i] > 0 && Cns_getgrpbygid (gid_list[i], groupname) < 0) {
#else
                              if (gid_list[i] > 0 && ! getgrgid (gid_list[i])) {
#endif
                                    fprintf (stderr, "Invalid gid %s \n", p);
                                    errflg++;
                              }
                        }
                        i++;
                        if ((p = strtok (NULL, ",")))
                              *(p - 1) = ',';
                  }
                  break;
            case OPT_POOL_GROUP:
                  /* if it contains ALL, it is like a normal dpm-drain without filtering */
                  if (strstr (Coptarg, "ALL") != NULL)
                        break;
                  /* check that the user didn't provide already a list of gid */
                  if (nbgids == 0) i = 0;
                  p = Coptarg;
                  while (*p) {
                        if (*p == ',') nbgids++;
                        p++;
                  }
                  nbgids++;
                  if ((gid_list = (gid_t *) realloc (gid_list, nbgids * sizeof(gid_t))) == NULL) {
                        fprintf (stderr, "Could not allocate memory for gids\n");
                        exit (USERR);
                  }
                  p = strtok (Coptarg, ",");
                  while (p) {
#ifdef VIRTUAL_ID
                        if (strcmp (p, "root") == 0)
                              gid_list[i] = 0;
                        else if (Cns_getgrpbynam (p, &gid_list[i]) < 0) {
#else
                        if ((gr = getgrnam (p)))
                              gid_list[i] = gr->gr_gid;
                        else {
#endif
                              fprintf (stderr, "Invalid group : %s\n", p);
                              errflg++;
                        }
                        i++;
                        if ((p = strtok (NULL, ",")))
                              *(p - 1) = ',';
                  }
                  break;
            case '?':
                  errflg++;
                  break;
            default:
                  break;
            }
      }
      if (Coptind < argc || (poolname == NULL && server == NULL && fs == NULL))
            errflg++;
      if (fs && server == NULL)
            errflg++;
      if (fs && poolname)
            errflg++;
      if (errflg || help_flag) {
            fprintf (stderr, "usage:\n%s\n%s\n%s\n%s",
                "dpm-drain --poolname pool_name [--server fs_server] [--gid gid(s)]\n"
                "\t[--group group_name(s)] [--size amount_to_drain]\n",
                "dpm-drain --server fs_server [--gid gid(s)] [--group group_name(s)]\n"
                "\t[--size amount_to_drain]\n",
                "dpm-drain --server fs_server --fs fs_name [--gid gid(s)]\n"
                "\t[--group group_name(s)] [--size amount_to_drain]\n",
                "dpm-drain --help\n");
            exit (help_flag ? 0 : USERR);
      }

      /* set status to FS_RDONLY unless a specific server & fs is specified along with a limit */
      target_fs_status = ((nbgids == 0 && min_size == 0) || !(server && fs)) ? FS_RDONLY : -1;

      if (dpm_getpools (&nbpools, &dpm_pools) < 0) {
            fprintf (stderr, "dpm_getpools: %s\n", sstrerror (serrno));
            exit (USERR);
      }
      for (i = 0; i < nbpools; i++) {
            if (poolname && strcmp ((dpm_pools + i)->poolname, poolname))
                  continue;
            if (dpm_getpoolfs ((dpm_pools + i)->poolname, &nbfs, &dpm_fs) < 0) {
                  fprintf (stderr, 
                        "dpm_getpoolfs %s: %s\n", (dpm_pools + i)->poolname, 
                        sstrerror (serrno));
                  exit (USERR);
            }
            for (j = 0; j < nbfs; j++) {
                  if (server && strcmp ((dpm_fs + j)->server, server))
                        continue;
                  if (fs && strcmp (dpm_fs[j].fs, fs))
                        continue;
                  fs_status = target_fs_status;
                  if (dpm_fs[j].status & FS_DISABLED)
                        fs_status = -1;
                  if (dpm_modifyfs ((dpm_fs + j)->server, (dpm_fs + j)->fs, fs_status) < 0) {
                        fprintf (stderr, 
                              "dpm_modifyfs %s %s: %s\n", (dpm_fs + j)->server, 
                              (dpm_fs + j)->fs, sstrerror (serrno));
                        exit (USERR);
                  }
                  p = realloc (fs_list, sizeof(struct fs_list_s)*(found_fs+1));
                  fs_list = (struct fs_list_s *)p;
                  strcpy (fs_list[found_fs].server,(dpm_fs + j)->server);
                  strcpy (fs_list[found_fs].fs,(dpm_fs + j)->fs);
                  found_fs++;
            }
      }
      if (!found_fs) {
            fprintf (stderr, "No file systems matching specification\n");
            exit (USERR);
      }

      /* install a signal handler for SIGINT */

      memset(&sigact, '\0', sizeof(sigact));
      sigemptyset(&sigact.sa_mask);
      sigact.sa_handler = &sig_handler;
#ifdef SA_RESTART
      sigact.sa_flags |= SA_RESTART;
#endif
      sigaction(SIGINT, &sigact, (struct sigaction *)NULL);

      while (1) {
            pass_rc = 0;
            pass_good = 0;
            flags = CNS_LIST_BEGIN;
            while (!sig_int_received && (serrno=0, lp = Cns_listreplicax (poolname, server, fs, flags, &list)) != NULL) {
                  if (flags != CNS_LIST_CONTINUE)
                        flags = CNS_LIST_CONTINUE;

                  for (i=0; i < found_fs; i++) {
                        if (!strcmp (fs_list[i].server, lp->host) &&
                            !strcmp (fs_list[i].fs, lp->fs))
                              break;
                  }
                  if (i >= found_fs)
                        continue;

                  printf ("\n");

                  if (nbgids != 0 || min_size != 0) {
                        if (Cns_statr (lp->sfn, &statbuf) < 0) {
                              fprintf (stderr, "Cns_statr %s: %s\n", lp->sfn, sstrerror (serrno));
                              pass_rc = 1;
                              continue;
                        }
                  }
                  if (nbgids != 0) {
                        for (i=0; i < nbgids; i++) {
                              if (statbuf.gid == gid_list[i]) 
                                    break;
                        }
                        if (i >= nbgids)
                              continue;
                  }
                  if (lp->status != '-') {      /* file is being populated/deleted */
                        if (lp->status == 'P') {
                              printf ("The file %s is in the process of being uploaded, ignoring\n", lp->sfn);
                        } else {
                              printf ("The file %s is recorded as being in the process of being deleted, ignoring it during drain\n", lp->sfn);
                        }
                        fflush (stdout);
                        if (min_size == 0)
                              pass_rc = 1;
                        continue;
                  }
                  current_time = time (0);
                  if (lp->ptime > current_time) {     /* file is pinned */
                        printf ("%s pinned until %s", lp->sfn, ctime (&lp->ptime));
                        fflush (stdout);
                        if (min_size == 0)
                              pass_rc = 1;
                        continue;
                  }

                  if (dpns_getpath (NULL, lp->fileid, path) < 0) {
                        fprintf (stderr, 
                              "dpns_getpath: %s (%s): %s\n", lp->sfn,
                              u64tostr (lp->fileid, u64buf, 0), sstrerror (serrno));
                        pass_rc = 1;
                        continue;
                  }

                  free (rep_entries);
                  rep_entries = NULL;

                  if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) {
                        fprintf (stderr, "dpns_getreplicax: %s: %s\n",
                              path, sstrerror (serrno));
                        pass_rc = 1;
                        continue;
                  }
                  for (i=0; i < nbentries; i++) {
                        if (!strcmp (lp->sfn, rep_entries[i].sfn))
                              break;
                  }
                  if (i >= nbentries) {
                        fprintf (stderr, "could not find replica of %s with pfn %s\n", path, lp->sfn);
                        pass_rc = 1;
                        continue;
                  }
                  printf ("File:\t\t%s\n", path);
                  printf ("pfn:\t\t%s (of %d)\n", lp->sfn, nbentries);
                  printf ("replica type:\t%s\n",
                      (rep_entries[i].r_type == 'P') ? "primary" : "secondary");
                  switch(lp->f_type) {
                        case 'V':
                              strcpy (buf,"volatile");
                              break;
                        case 'D':
                              strcpy (buf,"durable");
                              break;
                        case 'P':
                              strcpy (buf,"permanent");
                              break;
                        default:
                              sprintf (buf,"'%c'",lp->f_type);
                              break;
                  }
                  printf ("file type:\t%s", buf);
                  if (lp->f_type == 'P')
                        printf (" (does not expire)\n");
                  else {
                        buf[0] = '\0';
                        if (p=ctime (&rep_entries[i].ltime))
                              strcpy (buf, p);
                        if (p=strchr (buf, '\n'))
                              *p = '\0';
                        printf (" (%s on %s)\n",
                            (rep_entries[i].ltime <= current_time) ? "expired" : "expires", buf);
                  }
                  if (min_size != 0)
                        printf ("size:\t\t%s\n", u64tostru (statbuf.filesize, u64buf, 0));
                  if (rep_entries[i].setname == NULL || *rep_entries[i].setname == '\0')
                        printf ("space:\t\tnot in any space\n");
                  else {
                        p = rep_entries[i].setname;
                        printf ("space:\t\t%s", p);
                        j = 0;
                        if (dpm_getspacemd (1, &p, &j, &spacemd) < 0) {
                              if (serrno == EINVAL) {
                                    printf (" (invalid space)\n");
                                    *p = '\0';
                              } else
                                    printf ("\n");
                        } else if (j == 1 && spacemd)
                              printf (" (%s)\n", spacemd[0].u_token);
                        else
                              printf ("\n");
                        free (spacemd);
                        spacemd = NULL;
                  }
                  replicate = 1;
                  if (lp->f_type == 'V' && rep_entries[i].ltime <= current_time)
                        replicate = 0;
                  if (replicate) {
                        printf ("replicating... ");
                        fflush (stdout);
                        f_lifetime = rep_entries[i].ltime;
                        if (dpm_replicatex (lp->sfn, lp->f_type, rep_entries[i].setname, f_lifetime, pfn) < 0) {
                              printf ("failed\n");
                              fprintf (stderr, "dpm_replicatex %s: %s\n", lp->sfn, sstrerror (serrno));
                              pass_rc = 1;
                              continue;
                        }
                        if (lp->f_type != 'P') {
                              free (rep_entries);
                              rep_entries = NULL;
                              if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) {
                                    printf ("failed\n");
                                    fprintf (stderr, "dpns_getreplicax: %s: %s\n",
                                        path, sstrerror (serrno));
                                    if (dpm_delreplica (pfn) < 0)
                                          rc = 1;
                                    pass_rc = 1;
                                    continue;
                              }
                              for (i=0; i < nbentries; i++) {
                                    if (!strcmp (pfn, rep_entries[i].sfn))
                                          break;
                              }
                              if (i>=nbentries) {
                                    printf ("failed\n");
                                    fprintf (stderr, "could not find new replica\n");
                                    if (dpm_delreplica (pfn) < 0)
                                          rc = 1;
                                    pass_rc = 1;
                                    continue;
                              }
                              if (rep_entries[i].ltime < f_lifetime) {
                                    printf ("failed\n");
                                    fprintf (stderr, "could not replicate to a new file with sufficient lifetime\n");
                                    if (dpm_delreplica (pfn) < 0)
                                          rc = 1;
                                    pass_rc = 1;
                                    continue;
                              }
                        }
                  }
                  printf ("deleting\n");
                  fflush (stdout);
                  if (dpm_delreplica (lp->sfn) < 0) {
                        fprintf (stderr, "dpm_delreplica %s: %s\n", lp->sfn, sstrerror (serrno));
                        pass_rc = 1;
                        rc = 1;
                        continue;
                  }
                  if (min_size != 0) {
                        current_size += statbuf.filesize;
                        if (current_size >= min_size)
                              break;
                  }
                  pass_good++;
            }
            save_serrno = serrno;
            (void) Cns_listreplicax (poolname, server, fs, CNS_LIST_END, &list);
            free (rep_entries);
            rep_entries = NULL;
            if (sig_int_received || lp != NULL || rc || save_serrno != SETIMEDOUT || pass_good == 0) {
                  if (pass_rc)
                        rc = 1;
                  if (!sig_int_received && lp == NULL && save_serrno)
                        fprintf (stderr, "Cns_listreplicax: %s\n", sstrerror (save_serrno));
                  break;
            }
            printf ("\n");
      }
      if (sig_int_received) {
            fprintf (stderr, "\nFinishing after interrupt\n");
            rc = 1;
      } else if (rc)
            fprintf (stderr, "\nThere were some errors which prevented dpm-drain from completing fully\n");
      if (min_size != 0) {
            printf ("\nnumber of bytes drained %s\n", u64tostru (current_size, u64buf, 0));
            fflush (stdout);
            if (current_size < min_size)
                  rc = 1;
      }
      exit (rc);
}

Generated by  Doxygen 1.6.0   Back to index