Logo Search packages:      
Sourcecode: lfc version File versions  Download package

dpm_main.c

/*
 * Copyright (C) 2004-2008 by CERN/IT/GD/CT
 * All rights reserved
 */

#ifndef lint
static char sccsid[] = "@(#)$RCSfile: dpm_main.c,v $ $Revision: 1.45 $ $Date: 2009/01/07 09:49:55 $ CERN IT-GD/CT Jean-Philippe Baud";
#endif /* not lint */

#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#if defined(_WIN32)
#include <winsock2.h>
#else
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif
#include "Cgrp.h"
#include "Cinit.h"
#include "Cnetdb.h"
#include "Cpool_api.h"
#include "Cpwd.h"
#ifdef CSEC
#include "Csec_api.h"
#endif
#include "Cthread_api.h"
#include "dpm.h"
#include "dpm_server.h"
#include "dpm_util.h"
#include "dpns_api.h"
#include "marshall.h"
#include "net.h"
#include "patchlevel.h"
#include "serrno.h"

/*
 * Maximum number of sockets on which the server can listen
 */
#define MAX_LISTEN_SOCKS        16

int being_shutdown;
char db_name[33];
char db_pwd[33];
char db_srvr[33];
char db_user[33];
gid_t dpm_gid;
struct dpm_srv_thread_info dpm_srv_thread_info[DPM_NBFTHREADS];
uid_t dpm_uid;
char dpnshost[CA_MAXHOSTNAMELEN+1];
char func[16];
int jid;
char localdomain[CA_MAXHOSTNAMELEN+1];
char localhost[CA_MAXHOSTNAMELEN+1];
char logfile[CA_MAXPATHLEN+1];
int maxfds;
int reqctr;
void *reqctr_lock;
int nb_supported_protocols;
char **supported_protocols;
int listen_ipv4only=0, listen_ipv6only=0;
extern int Cdomainname (char *, int);
extern int dpm_rm_onereplica (struct Cns_fileid *, char *, u_signed64, int *, struct dpm_dbfd *dbfd, char *, int, u_signed64);
extern int isTrustedHost (int, char *, char *, char *, char *);
static int na_key = -1;

dpm_main(main_args)
struct main_args *main_args;
{
      int c;
      FILE *cf;
      char cfbuf[80];
      struct dpm_dbfd dbfd;
      void *doit(void *);
      char dpmconfigfile[CA_MAXPATHLEN+1];
      struct sockaddr_storage from;
      int fromlen;
      char *getconfent();
      struct group *gr;
      int i;
      int ipool;
      char logbuf[LOGBUFSZ];
      void *msthread(void *);
      int ms_tid;
      int on = 1; /* for REUSEADDR and IPV6_V6ONLY */
      char *p;
      char *p_n, *p_p, *p_s, *p_u;
      void *pooladdr;
      struct passwd *pw;
      fd_set readfd, readmask;
      void *rexthread(void *);
      int rex_tid;
      int rqfd;
      int s;
      struct addrinfo hints, *ai, *aitop;
      char strport[NI_MAXSERV];
      int gaierrno,nfds,num_listen_socks;
      int listen_socks[MAX_LISTEN_SOCKS];
      int thread_index;
      struct timeval timeval;

      jid = getpid();
      strcpy (func, "dpm_serv");
      dpmconfigfile[0] = '\0';
      strcpy (logfile, LOGFILE);

      /* process command line options if any */

      while ((c = getopt (main_args->argc, main_args->argv, "46c:l:")) != EOF) {
            switch (c) {
            case '4':
                  listen_ipv4only++;
                  break;
            case '6':
                  listen_ipv6only++;
                  break;
            case 'c':
                  strncpy (dpmconfigfile, optarg, sizeof(dpmconfigfile));
                  dpmconfigfile[sizeof(dpmconfigfile) - 1] = '\0';
                  break;
            case 'l':
                  strncpy (logfile, optarg, sizeof(logfile));
                  logfile[sizeof(logfile) - 1] = '\0';
                  break;
            }
      }

      if (listen_ipv4only && listen_ipv6only) {
            dpmlogit (func, "Can not choose to listen for only IPv4 and "
                  "also only for IPv6\n");
            return (USERR);
      }

      dpmlogit (func, "started (DPM %s-%d)\n", BASEVERSION, PATCHLEVEL);
      gethostname (localhost, CA_MAXHOSTNAMELEN+1);
      if (Cdomainname (localdomain, sizeof(localdomain)) < 0) {
            dpmlogit (func, "Unable to get local domain name\n");
            return (SYERR);
      }
      if (strchr (localhost, '.') == NULL) {
            strcat (localhost, ".");
            strcat (localhost, localdomain);
      }

      if ((p = getenv ("DPNS_HOST")))
            strcpy (dpnshost, p);
      dpmlogit (func, "DPNS_HOST = %s\n", dpnshost);

      if ((pw = Cgetpwnam (STAGERSUPERUSER)) == NULL) {
            dpmlogit (func, "%s account is not defined in passwd file\n",
                STAGERSUPERUSER);
            return (CONFERR);
      }
      dpm_uid = pw->pw_uid;
      if ((gr = Cgetgrnam (STAGERSUPERGROUP)) == NULL) {
            dpmlogit (func, "%s account is not defined in group file\n",
                STAGERSUPERGROUP);
            return (CONFERR);
      }
      dpm_gid = gr->gr_gid;

      /* Get list of supported protocols */

      if ((nb_supported_protocols = get_supported_protocols (&supported_protocols)) < 0) {
            dpmlogit (func, "malloc error\n");
            return (SYERR);
      }
      strcpy (logbuf, "Supported protocols are:");
      for (i = 0; i < nb_supported_protocols; i++)
            sprintf (logbuf + strlen (logbuf), " %s", supported_protocols[i]);
      dpmlogit (func, "%s\n", logbuf);

      /* get DB login info from the Disk Pool Manager server config file */

      if (! *dpmconfigfile) {
            if (strncmp (DPMCONFIG, "%SystemRoot%\\", 13) == 0 &&
                (p = getenv ("SystemRoot")))
                  sprintf (dpmconfigfile, "%s%s", p, strchr (DPMCONFIG, '\\'));
            else
                  strcpy (dpmconfigfile, DPMCONFIG);
      }
      if ((cf = fopen (dpmconfigfile, "r")) == NULL) {
            dpmlogit (func, DP023, dpmconfigfile);
            return (CONFERR);
      }
      if (fgets (cfbuf, sizeof(cfbuf), cf) &&
          strlen (cfbuf) >= 5 && (p_u = strtok (cfbuf, "/\n")) &&
          (p_p = strtok (NULL, "@\n")) && (p_s = strtok (NULL, "/\n"))) {
            if ((p_n = strtok (NULL, "\n")))
                  strcpy (db_name, p_n);
            else
                  strcpy (db_name, "dpm_db");
            strcpy (db_user, p_u);
            strcpy (db_pwd, p_p);
            strcpy (db_srvr, p_s);
      } else {
            dpmlogit (func, DP009, dpmconfigfile, "incorrect");
            return (CONFERR);
      }
      (void) fclose (cf);

      /* Initialize the request counter mutex and condition variable */

      if (Cthread_mutex_lock (&reqctr) < 0) {
            dpmlogit (func, DP002, "Cthread_mutex_lock", sstrerror (serrno));
            return (SYERR);
      }
      if ((reqctr_lock = Cthread_mutex_lock_addr (&reqctr)) == NULL) {
            dpmlogit (func, DP002, "Cthread_mutex_lock_addr", sstrerror (serrno));
            return (SYERR);
      }
      if (Cthread_mutex_unlock_ext (reqctr_lock) < 0) {
            dpmlogit (func, DP002, "Cthread_mutex_unlock_ext", sstrerror (serrno));
            return (SYERR);
      }

      (void) dpm_init_dbpkg ();
      memset (&dbfd, 0, sizeof(dbfd));
      dbfd.idx = DPM_NBFTHREADS + DPM_NBSTHREADS + 1;
      if (dpm_opendb (db_srvr, db_user, db_pwd, db_name, &dbfd) < 0)
            return (SYERR);

      /* Get current disk pool configuration */

      if (dpm_getpoolconf (&dbfd)) {
            (void) dpm_closedb (&dbfd);
            return (SYERR);
      }

      /* Recover the queue of uncompleted requests */

      c = dpm_recover_queue (&dbfd);

      /* Reallocate space for put requests in active/running state */

      c += dpm_reallocate_space (&dbfd);

      (void) dpm_closedb (&dbfd);
      if (c)
            return (SYERR);

      /* Create the thread to remove expired puts */

      if ((rex_tid = Cthread_create (&rexthread, NULL)) < 0) {
            dpmlogit (func, DP002, "Cthread_create", sstrerror (serrno));
            return (SYERR);
      }

      /* Create main thread for slow operations */

      if ((ms_tid = Cthread_create (&msthread, NULL)) < 0) {
            dpmlogit (func, DP002, "Cthread_create", sstrerror (serrno));
            return (SYERR);
      }

      /* Create a pool of threads for fast operations */

      if ((ipool = Cpool_create_ext (DPM_NBFTHREADS, NULL, &pooladdr)) < 0) {
            dpmlogit (func, DP002, "Cpool_create", sstrerror (serrno));
            return (SYERR);
      }
      for (i = 0; i < DPM_NBFTHREADS; i++) {
            dpm_srv_thread_info[i].s = -1;
            dpm_srv_thread_info[i].dbfd.idx = i;
      }

      FD_ZERO (&readmask);
      FD_ZERO (&readfd);
#if ! defined(_WIN32)
      signal (SIGPIPE, SIG_IGN);
      signal (SIGXFSZ, SIG_IGN);
#endif

      /* open request sockets */

      serrno = 0;
      memset (&hints, 0, sizeof(struct addrinfo));
      if (listen_ipv4only)
            hints.ai_family = PF_INET;  
      else if (listen_ipv6only)
            hints.ai_family = PF_INET6;
      else
            hints.ai_family = PF_UNSPEC;
      hints.ai_socktype = SOCK_STREAM;
      hints.ai_flags = AI_PASSIVE;
      if ((p = getenv ("DPM_PORT")) || (p = getconfent ("DPM", "PORT", 0))) {
            strncpy (strport, p, sizeof(strport));
            strport[sizeof(strport)-1] = '\0';
      } else {
            snprintf (strport, sizeof(strport), "%u", DPM_PORT);
      }

      if (gaierrno=Cgetaddrinfo (NULL, strport, &hints, &aitop)) {
            dpmlogit (func, DP002, "Cgetaddrinfo",
                  (gaierrno != EAI_SYSTEM) ? Cgai_strerror(gaierrno) : neterror());
            return (CONFERR);
      }

      num_listen_socks = 0;
        for (ai = aitop; ai; ai = ai->ai_next) {
                int fo = 0;
                if (ai->ai_family != PF_INET && ai->ai_family != PF_INET6)
                        continue;
                if (num_listen_socks >= MAX_LISTEN_SOCKS) {
                        dpmlogit (func, "Too many listen sockets\n");  
                        freeaddrinfo (aitop);
                        return (CONFERR);
                }
                if ((s = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol))<0)
                        continue;
                if (setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
                        dpmlogit (func, DP002, "setsockopt (SO_REUSEADDR)", neterror()); 
                        close (s);
                        continue;
                }
                if (ai->ai_family == PF_INET6) {
#ifdef IPV6_V6ONLY
                        if (setsockopt (s, IPPROTO_IPV6, IPV6_V6ONLY,
                                        (char *)&on, sizeof(on))) {   
                                fo = 1;
                                dpmlogit (func, DP002, "setsockopt (IPV6_V6ONLY)", neterror());
                        }
#else
                        fo = 1;
#endif
                }
                if (bind (s, ai->ai_addr, ai->ai_addrlen) < 0) {
                        dpmlogit (func, DP002, "bind", neterror());
                        close (s);
                        continue; 
                }
                if (fo) {
#ifdef IPV6_V6ONLY
                        dpmlogit (func, "Was not able to set the IPV6_V6ONLY "
                                        "socket option on the IPv6 listen socket\n");
#else
                        dpmlogit (func, "Was compiled on a system that does not "
                                        "support the IPV6_V6ONLY socket option\n");
#endif
                        if (listen_ipv6only) {
                                dpmlogit (func, "Not proceeding as the IPv6 only flag was specified\n");
                                exit (CONFERR);
                        }
                        dpmlogit (func, "Incoming IPv4 will be accepted and handled as "
                                        "IPv4-mapped IPv6 addresses\n");
                }
                listen_socks[num_listen_socks] = s;
                ++num_listen_socks;
                listen (s, 5);
        }
        freeaddrinfo (aitop);
                                        
        if (num_listen_socks == 0) {
                dpmlogit (func, "Could not listen on any sockets\n");
                return (CONFERR);
        }
                        
        nfds = -1;
        for (i=0; i<num_listen_socks; ++i) {   
                FD_SET (listen_socks[i], &readmask);
                if (listen_socks[i]>nfds)
                        nfds = listen_socks[i];
        }
        ++nfds;

      /* main loop */

      while (1) {
            if (being_shutdown) {
                  int nb_active_threads = 0;
                  for (i = 0; i < DPM_NBFTHREADS; i++) {
                        if (dpm_srv_thread_info[i].s >= 0) {
                              nb_active_threads++;
                              continue;
                        }
                        if (dpm_srv_thread_info[i].db_open_done)
                              (void) dpm_closedb (&dpm_srv_thread_info[i].dbfd);
#ifdef CSEC
                        (void) Csec_clearContext (&dpm_srv_thread_info[i].sec_ctx);
#endif
                  }
                  if (nb_active_threads == 0) {
                        (void) Cthread_join (ms_tid, NULL);
                        (void) Cthread_join (rex_tid, NULL);
                        (void) dpm_wait4allgcs ();
                        return (0);
                  }
            }
            for (i=0; i < num_listen_socks; ++i) {
                  s = listen_socks[i];
                  if (FD_ISSET (s, &readfd)) {
                        FD_CLR (s, &readfd);
                        fromlen = sizeof(from);
                        rqfd = accept (s, (struct sockaddr *) &from, &fromlen);
                        if ((thread_index =
                            Cpool_next_index_timeout_ext (ipool, pooladdr, -1)) < 0) {
                              dpmlogit (func, DP002, "Cpool_next_index",
                                    sstrerror (serrno));
                              if (serrno == SEWOULDBLOCK) {
                                    sendrep (rqfd, DPM_RC, serrno);
                                    continue;
                              } else
                                    return (SYERR);
                        }
                        dpm_srv_thread_info[thread_index].s = rqfd;
                        if (Cpool_assign_ext (ipool, pooladdr, &doit,
                            &dpm_srv_thread_info[thread_index], 1) < 0) {
                              dpm_srv_thread_info[thread_index].s = -1;
                              dpmlogit (func, DP002, "Cpool_assign", sstrerror (serrno));
                              return (SYERR);
                        }
                  }
            }
            memcpy (&readfd, &readmask, sizeof(readmask));
            timeval.tv_sec = CHECKI;
            timeval.tv_usec = 0;
            if (select (nfds, &readfd, (fd_set *)0, (fd_set *)0, &timeval) < 0) {
                  FD_ZERO (&readfd);
            }
      }
}

main(argc, argv)
int argc;
char **argv;
{
#if ! defined(_WIN32)
      struct main_args main_args;

      if ((maxfds = Cinitdaemon ("dpmdaemon", NULL)) < 0)
            exit (SYERR);
      main_args.argc = argc;
      main_args.argv = argv;
      exit (dpm_main (&main_args));
#else
      if (Cinitservice ("dpm", &dpm_main))
            exit (SYERR);
#endif
}

void *
doit(arg)
      void *arg;
{
      int alloced_gids = 0;
      int c;
      const char *clienthost = NULL;
      const char *clientip;
      const char *clientname;
      char **fqan = NULL;
      int magic;
      int nbfqans = 0;
      char *req_data;
      int req_type = 0;
      char reqbuf[REQBUFSZ-3*LONGSIZE];
      int set_authorization_id = 1;
      struct dpm_srv_thread_info *thip = (struct dpm_srv_thread_info *) arg;
      char *voname = NULL;

      (void) Cgetnetaddress (thip->s, NULL, 0, &na_key, &clientip, &clientname, 0, 0);
      if (clientip == NULL)
            clientip = "unknown";
      if (clientname == NULL)
            clientname = "unknown";

#ifdef CSEC
      Csec_server_reinitContext (&thip->sec_ctx, CSEC_SERVICE_TYPE_HOST, NULL);
      if (Csec_server_establishContext (&thip->sec_ctx, thip->s) < 0) {
            dpmlogit (func, "[%s] (%s): Could not establish an authenticated connection: %s !\n",
                  clientip, clientname, Csec_getErrorMessageSummary (LOGBUFSZ-140));
            sendrep (thip->s, DPM_RC, ESEC_NO_CONTEXT);
            thip->s = -1;
            return NULL;
      }
      thip->Csec_gids = NULL;
      Csec_server_getClientId (&thip->sec_ctx, &thip->Csec_mech, &thip->Csec_auth_id);
      if (strcmp (thip->Csec_mech, "ID") == 0 ||
          Csec_isIdAService (thip->Csec_mech, thip->Csec_auth_id) >= 0) {
            if (isTrustedHost (thip->s, localhost, localdomain, "DPM", "TRUST")) {
                  if (Csec_server_getAuthorizationId (&thip->sec_ctx,
                      &thip->Csec_mech, &thip->Csec_auth_id) < 0) {
                        thip->Csec_uid = 0;
                        thip->Csec_gid = 0;
                        thip->Csec_nbgids = 1;
                        thip->Csec_gids = &thip->Csec_gid;
                        set_authorization_id = 0;
#ifndef VIRTUAL_ID
                  } else if (Csec_mapToLocalUser (thip->Csec_mech, thip->Csec_auth_id,
                      NULL, 0, &thip->Csec_uid, &thip->Csec_gid) < 0) {
                        dpmlogit (func, "[%s] (%s): Could not map (%s,\"%s\") to local user: %s !\n",
                            clientip, clientname, thip->Csec_mech, thip->Csec_auth_id, sstrerror (serrno));
                        sendrep (thip->s, DPM_RC, serrno);
                        thip->s = -1;
                        return NULL;
#else
                  } else {    /* mapping will be done later */
                        thip->Csec_uid = (uid_t) -1;
                        thip->Csec_gid = (gid_t) -1;
                        alloced_gids = 1;
#endif
                  }
            } else {
                  dpmlogit (func, "[%s] (%s): Host is not trusted, identity provided was (%s,\"%s\")\n",
                      clientip, clientname, thip->Csec_mech, thip->Csec_auth_id);
                  sendrep (thip->s, DPM_RC, EACCES);
                  thip->s = -1;
                  return NULL;
            }
#ifndef VIRTUAL_ID
      } else if (Csec_mapToLocalUser (thip->Csec_mech, thip->Csec_auth_id,
          NULL, 0, &thip->Csec_uid, &thip->Csec_gid) < 0) {
            dpmlogit (func, "[%s] (%s): Could not map (%s,\"%s\") to local user: %s !\n",
                clientip, clientname, thip->Csec_mech, thip->Csec_auth_id, sstrerror (serrno));
            sendrep (thip->s, DPM_RC, serrno);
            thip->s = -1;
            return NULL;
#else
      } else {    /* mapping will be done later */
            thip->Csec_uid = (uid_t) -1;
            thip->Csec_gid = (gid_t) -1;
            alloced_gids = 1;
#endif
      }
#ifdef VIRTUAL_ID
      if (thip->Csec_uid == -1) {
#ifdef USE_VOMS
            voname = Csec_server_get_client_vo (&thip->sec_ctx);
            fqan = Csec_server_get_client_fqans (&thip->sec_ctx, &nbfqans);
#endif
            /* must reset VOMS pointers in Cns client context */
            Cns_client_setAuthorizationId (thip->Csec_uid, thip->Csec_gid,
                thip->Csec_mech, thip->Csec_auth_id);
            if (voname && fqan)
                  Cns_client_setVOMS_data (voname, fqan, nbfqans);

            thip->Csec_nbgids = nbfqans ? nbfqans : 1;
            if ((thip->Csec_gids =
                malloc (thip->Csec_nbgids * sizeof(gid_t))) == NULL) {
                  dpmlogit (func,
                      "[%s] (%s): Could not allocate memory for gids, identity was (%s,\"%s\")\n",
                      clientip, clientname, thip->Csec_mech, thip->Csec_auth_id);
                  sendrep (thip->s, DPM_RC, ENOMEM);
                  thip->s = -1;
                  return NULL;
            }
            if (Cns_getidmap (thip->Csec_auth_id, nbfqans,
                (const char **)fqan, &thip->Csec_uid, thip->Csec_gids)) {
                  dpmlogit (func, "[%s] (%s): Could not get virtual id for (%s,\"%s\"): %s !\n",
                      clientip, clientname, thip->Csec_mech, thip->Csec_auth_id, sstrerror (serrno));
                  sendrep (thip->s, DPM_RC, serrno);
                  thip->s = -1;
                  return NULL;
            }
      }
#endif
      if (set_authorization_id) {
            Cns_client_setAuthorizationId (thip->Csec_uid, thip->Csec_gid,
                thip->Csec_mech, thip->Csec_auth_id);
            if (voname && fqan)
                  Cns_client_setVOMS_data (voname, fqan, nbfqans);
      } else
            Cns_client_resetAuthorizationId ();
#endif
      req_data = reqbuf;
      if ((c = getreq (thip->s, &magic, &req_type, &req_data, &clienthost)) == 0) {
            procreq (magic, req_type, req_data, clienthost, thip);
            if (req_data != reqbuf)
                  free (req_data);
      } else {
            dpmlogit (func, "[%s] (%s): Failure getting the request: %s\n",
                clientip, clientname, sstrerror(c));
            if (c > 0)
                  sendrep (thip->s, DPM_RC, c);
            else
                  netclose (thip->s);
      }
#ifdef CSEC
      if (alloced_gids)
            free (thip->Csec_gids);
#endif
      thip->s = -1;
      return (NULL);
}

dec_reqctr()
{
      if (Cthread_mutex_lock_ext (reqctr_lock) < 0) {
            dpmlogit (func, DP002, "Cthread_mutex_lock", sstrerror (serrno));
            return (SEINTERNAL);
      }
      if (reqctr > 0) {
            dpmlogit (func, "decrementing reqctr\n");
            reqctr--;
      }
      if (Cthread_mutex_unlock_ext (reqctr_lock) < 0) {
            dpmlogit (func, DP002, "Cthread_mutex_unlock", sstrerror (serrno));
            return (SEINTERNAL);
      }
      return (0);
}

void *
gcthread(void *arg)
{
      int bol;
      time_t curtime;
      struct dpm_dbfd dbfd;
      DBLISTPTR dblistptr;
      struct Cns_fileid file_uniqueid;
      int flags;
      char func[16];
      struct gc_entry *gc_entry = (struct gc_entry *) arg;
      struct dpm_get_filereq gfr_entry;
      time_t last_db_use;
      Cns_list list;
      struct Cns_filereplicax *lp;
      int nbremoved;
      struct dpm_put_filereq pfr_entry;
      dpm_dbrec_addr rec_addr;
      int status;

      strcpy (func, "gcthread");
      memset (&dbfd, 0, sizeof(dbfd));
      dbfd.idx = DPM_NBFTHREADS + DPM_NBSTHREADS + 2 + gc_entry->gc_idx;
      if (dpm_opendb (db_srvr, db_user, db_pwd, db_name, &dbfd) < 0)
            return;
      last_db_use = time (0);
      memset (&file_uniqueid, 0, sizeof(file_uniqueid));
      while (! being_shutdown) {

            /* Wait while free space > GC_START_THRESH */

            while (dpm_enoughfreespace (gc_entry->poolname, 1)) {
                  if (being_shutdown || gc_entry->status != 1) break;
                  sleep (5);
            }
            if (being_shutdown || gc_entry->status != 1) break;

            if ((curtime = time (0)) > last_db_use + DPM_DBPINGI)
                  (void) dpm_pingdb (&dbfd);
            last_db_use = curtime;

            flags = CNS_LIST_BEGIN;
            nbremoved = 0;
            while ((lp = Cns_listrep4gc (gc_entry->poolname, flags, &list)) != NULL) {
                  if (being_shutdown || gc_entry->status != 1) break;
                  flags = CNS_LIST_CONTINUE;
                  dpmlogit (func, "removing file: %s\n", lp->sfn);
                  file_uniqueid.fileid = lp->fileid;
                  if (dpm_rm_onereplica (&file_uniqueid, lp->sfn, 0, &status,
                      &dbfd, lp->setname, 1, -1) == 0) {
                        nbremoved++;
                        (void) dpm_start_tr (0, &dbfd);
                        bol = 1;
                        while (dpm_get_gfr_by_pfn (&dbfd, bol, lp->sfn,
                            &gfr_entry, 1, &rec_addr, 0, &dblistptr) == 0) {
                              bol = 0;
                              gfr_entry.status = DPM_EXPIRED;
                              dpm_update_gfr_entry (&dbfd, &rec_addr, &gfr_entry);
                        }
                        (void) dpm_get_gfr_by_pfn (&dbfd, bol, lp->sfn,
                            &gfr_entry, 1, &rec_addr, 1, &dblistptr);
                        if (dpm_get_pfr_by_pfn (&dbfd, lp->sfn,
                            &pfr_entry, 1, &rec_addr) == 0) {
                              pfr_entry.status = DPM_EXPIRED;
                              dpm_update_pfr_entry (&dbfd, &rec_addr, &pfr_entry);
                        }
                        (void) dpm_end_tr (&dbfd);
                  }

                  /* Is free space > GC_STOP_THRESH ? */

                  if (dpm_enoughfreespace (gc_entry->poolname, 0)) break;
            }
            (void) Cns_listrep4gc (gc_entry->poolname, CNS_LIST_END, &list);
            if (being_shutdown || gc_entry->status != 1) break;
            if (nbremoved == 0)
                  sleep (5);
      }
      (void) dpm_closedb (&dbfd);
      (void) dpm_reset_gc_entry (gc_entry->gc_idx);
}

getreq(s, magic, req_type, req_data, clienthost)
int s;
int *magic;
int *req_type;
char **req_data;
const char **clienthost;
{
      struct hostent *hp;
      int l;
      unsigned int msglen;
      int n;
      char *rbp;
      char req_hdr[3*LONGSIZE];

      serrno = 0;
      l = netread_timeout (s, req_hdr, sizeof(req_hdr), DPM_TIMEOUT);
      if (l == sizeof(req_hdr)) {
            rbp = req_hdr;
            unmarshall_LONG (rbp, n);
            *magic = n;
            unmarshall_LONG (rbp, n);
            *req_type = n;
            unmarshall_LONG (rbp, msglen);
            if (msglen > 10*ONE_MB) {
                  dpmlogit (func, DP046, 10*ONE_MB);
                  return (E2BIG);
            }  
            l = msglen - sizeof(req_hdr);
            if (msglen > REQBUFSZ && (*req_data = malloc (l)) == NULL) {
                  return (ENOMEM);
            }
            n = netread_timeout (s, *req_data, l, DPM_TIMEOUT);
            if (being_shutdown) {
                  return (EDPMNACT);
            }
            if (n > 0 && n == l) {
                  if (*clienthost == NULL) {
                        if ((*clienthost =
                            Cgetnetaddress (s, NULL, 0, &na_key, NULL, NULL, 0, 0)) == NULL) {
                              dpmlogit (func, "Could not find the address of the client\n");
                              return (SEINTERNAL);
                        }
                  }
                  return (0);
            }
            l = n;
      }
      if (l > 0)
            dpmlogit (func, DP004, l);
      else if (l < 0) {
            dpmlogit (func, DP002, "netread", neterror());
            if (serrno == SETIMEDOUT)
                  return (SETIMEDOUT);
      }
      return (SEINTERNAL);
}

inc_reqctr()
{
      if (Cthread_mutex_lock_ext (reqctr_lock) < 0) {
            dpmlogit (func, DP002, "Cthread_mutex_lock", sstrerror (serrno));
            return (SEINTERNAL);
      }
      dpmlogit (func, "incrementing reqctr\n");
      reqctr++;
      if (Cthread_cond_signal_ext (reqctr_lock) < 0) {
            dpmlogit (func, DP002, "Cthread_cond_signal", sstrerror (serrno));
            return (SEINTERNAL);
      }
      dpmlogit (func, "msthread signalled\n");
      if (Cthread_mutex_unlock_ext (reqctr_lock) < 0) {
            dpmlogit (func, DP002, "Cthread_mutex_unlock", sstrerror (serrno));
            return (SEINTERNAL);
      }
      return (0);
}

procreq(magic, req_type, req_data, clienthost, thip)
int magic;
int req_type;
char *req_data;
const char *clienthost;
struct dpm_srv_thread_info *thip;
{
      int c;
      time_t curtime;

      /* connect to the database if not done yet */

      if (! thip->db_open_done) {
            Cns_seterrbuf (thip->errbuf, sizeof(thip->errbuf));
            if (req_type != DPM_GETPOOLS && req_type != DPM_GETPOOLFS &&
                req_type != DPM_GETPROTO && req_type != DPM_SHUTDOWN &&
                req_type != DPM_INCREQCTR && req_type != DPM_PING) {
                  if (dpm_opendb (db_srvr, db_user, db_pwd, db_name,
                      &thip->dbfd) < 0) {
                        c = serrno;
                        sendrep (thip->s, MSG_ERR, "db open error: %d\n", c);
                        sendrep (thip->s, DPM_RC, c);
                        return;
                  }
                  thip->db_open_done = 1;
                  thip->last_db_use = time (0);
            }
      } else if (req_type != DPM_GETPOOLS && req_type != DPM_GETPOOLFS &&
          req_type != DPM_GETPROTO && req_type != DPM_SHUTDOWN &&
          req_type != DPM_INCREQCTR && req_type != DPM_PING) { 
            if ((curtime = time (0)) > thip->last_db_use + DPM_DBPINGI)
                  (void) dpm_pingdb (&thip->dbfd);
            thip->last_db_use = curtime;
      }
      switch (req_type) {
      case DPM_ABORTFILES:
            c = dpm_srv_abortfiles (magic, req_data, clienthost, thip);
            break;
      case DPM_ABORTREQ:
            c = dpm_srv_abortreq (magic, req_data, clienthost, thip);
            break;
      case DPM_ADDFS:
            c = dpm_srv_addfs (magic, req_data, clienthost, thip);
            break;
      case DPM_ADDPOOL:
            c = dpm_srv_addpool (magic, req_data, clienthost, thip);
            break;
      case DPM_COPY:
            c = dpm_srv_copy (magic, req_data, clienthost, thip);
            break;
      case DPM_DELREPLICA:
            c = dpm_srv_delreplica (magic, req_data, clienthost, thip);
            break;
      case DPM_EXTENDLIFE:
            c = dpm_srv_extendfilelife (magic, req_data, clienthost, thip);
            break;
      case DPM_GET:
            c = dpm_srv_get (magic, req_data, clienthost, thip);
            break;
      case DPM_GETPOOLFS:
            c = dpm_srv_getpoolfs (magic, req_data, clienthost, thip);
            break;
      case DPM_GETPOOLS:
            c = dpm_srv_getpools (magic, req_data, clienthost, thip);
            break;
      case DPM_GETPROTO:
            c = dpm_srv_getprotocols (magic, req_data, clienthost, thip);
            break;
      case DPM_GETREQID:
            c = dpm_srv_getreqid (magic, req_data, clienthost, thip);
            break;
      case DPM_GETREQSUM:
            c = dpm_srv_getreqsummary (magic, req_data, clienthost, thip);
            break;
      case DPM_GETSPACEMD:
            c = dpm_srv_getspacemd (magic, req_data, clienthost, thip);
            break;
      case DPM_GETSPACETKN:
            c = dpm_srv_getspacetoken (magic, req_data, clienthost, thip);
            break;
      case DPM_GETSTSCOPY:
            c = dpm_srv_getstatus_copyreq (magic, req_data, clienthost, thip);
            break;
      case DPM_GETSTSGET:
            c = dpm_srv_getstatus_getreq (magic, req_data, clienthost, thip);
            break;
      case DPM_GETSTSPUT:
            c = dpm_srv_getstatus_putreq (magic, req_data, clienthost, thip);
            break;
      case DPM_INCREQCTR:
            c = dpm_srv_inc_reqctr (magic, req_data, clienthost, thip);
            break;
      case DPM_MODFS:
            c = dpm_srv_modifyfs (magic, req_data, clienthost, thip);
            break;
      case DPM_MODPOOL:
            c = dpm_srv_modifypool (magic, req_data, clienthost, thip);
            break;
      case DPM_PING:
            c = dpm_srv_ping (magic, req_data, clienthost, thip);
            break;
      case DPM_PUT:
            c = dpm_srv_put (magic, req_data, clienthost, thip);
            break;
      case DPM_PUTDONE:
            c = dpm_srv_putdone (magic, req_data, clienthost, thip);
            break;
      case DPM_RLSSPACE:
            c = dpm_srv_releasespace (magic, req_data, clienthost, thip);
            break;
      case DPM_RELFILES:
            c = dpm_srv_relfiles (magic, req_data, clienthost, thip);
            break;
      case DPM_RSVSPACE:
            c = dpm_srv_reservespace (magic, req_data, clienthost, thip);
            break;
      case DPM_RM:
            c = dpm_srv_rm (magic, req_data, clienthost, thip);
            break;
      case DPM_RMFS:
            c = dpm_srv_rmfs (magic, req_data, clienthost, thip);
            break;
      case DPM_RMPOOL:
            c = dpm_srv_rmpool (magic, req_data, clienthost, thip);
            break;
      case DPM_SHUTDOWN:
            c = dpm_srv_shutdown (magic, req_data, clienthost, thip);
            break;
      case DPM_UPDSPACE:
            c = dpm_srv_updatespace (magic, req_data, clienthost, thip);
            break;
      case DPM_UPDFILSTS:
            c = dpm_srv_updatefilestatus (magic, req_data, clienthost, thip);
            break;
      default:
            sendrep (thip->s, MSG_ERR, DP003, req_type);
            c = SEOPNOTSUP;
      }
      sendrep (thip->s, DPM_RC, c);
}

void *
rexthread(arg)
void *arg;
{
      int bol;
      time_t curtime;
      struct dpm_dbfd dbfd;
      DBLISTPTR dblistptr;
      struct dpm_space_reserv dpm_spcmd;
      char func[16];
      time_t last_db_use;
      int nextcheck;
      struct dpm_put_filereq pfr_entry;
      dpm_dbrec_addr rec_addr;
      int status;
      struct timeval timeval;

      strcpy (func, "rexthread");
      memset (&dbfd, 0, sizeof(dbfd));
      dbfd.idx = DPM_NBFTHREADS + DPM_NBSTHREADS;
      if (dpm_opendb (db_srvr, db_user, db_pwd, db_name, &dbfd) < 0)
            return;
      last_db_use = time (0);
      while (! being_shutdown) {
            if ((curtime = time (0)) > last_db_use + DPM_DBPINGI)
                  (void) dpm_pingdb (&dbfd);
            last_db_use = curtime;
            nextcheck = CHECKEXP;
            bol = 1;
            while (dpm_list_expired_spaces (&dbfd, bol, &dpm_spcmd, 0, &dblistptr) == 0) {
                  bol = 0;
                  if (being_shutdown) break;
                  dpmlogit (func, "removing expired space: %s\n", dpm_spcmd.s_token);
                  (void) dpm_start_tr (0, &dbfd);
                  if (dpm_get_spcmd_by_token (&dbfd, dpm_spcmd.s_token,
                      &dpm_spcmd, 1, &rec_addr) == 0) {
                        if (dpm_delete_spcmd_entry (&dbfd, &rec_addr))
                              dpmlogit (func, "could not remove space\n");
                        else
                              dpm_updpoolfreespace (dpm_spcmd.poolname, dpm_spcmd.u_space);
                  }
                  (void) dpm_end_tr (&dbfd);
            }
            (void) dpm_list_expired_spaces (&dbfd, bol, &dpm_spcmd, 1, &dblistptr);
            if (being_shutdown) break;
            bol = 1;
            while (dpm_list_expired_puts (&dbfd, bol, &pfr_entry, 0, &dblistptr) == 0) {
                  bol = 0;
                  if (being_shutdown) break;
                  dpmlogit (func, "removing expired put file: %s\n", pfr_entry.pfn);
                  if (dpm_rm_onereplica (NULL, pfr_entry.pfn,
                      pfr_entry.requested_size, &status, &dbfd, pfr_entry.s_token,
                      pfr_entry.actual_size ? 1 : 0, -1) == 0) {
                        (void) dpm_start_tr (0, &dbfd);
                        if (dpm_get_pfr_by_fullid (&dbfd, pfr_entry.r_token,
                            pfr_entry.f_ordinal, &pfr_entry, 1, &rec_addr) == 0) {
                              pfr_entry.status = DPM_EXPIRED;
                              dpm_update_pfr_entry (&dbfd, &rec_addr, &pfr_entry);
                        }
                        (void) dpm_end_tr (&dbfd);
                  }
            }
            (void) dpm_list_expired_puts (&dbfd, bol, &pfr_entry, 1, &dblistptr);
            if (being_shutdown) break;
            while (nextcheck > 0) {
                  timeval.tv_sec = CHECKI;
                  timeval.tv_usec = 0;
                  select (0, NULL, NULL, NULL, &timeval);
                  if (being_shutdown) break;
                  nextcheck -= CHECKI;
            }
      }
      (void) dpm_closedb (&dbfd);
}

Generated by  Doxygen 1.6.0   Back to index