/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "exception.h" #include "hdfs.h" #include "jni_helper.h" #include #include #include /* Some frequently used Java paths */ #define HADOOP_CONF "org/apache/hadoop/conf/Configuration" #define HADOOP_PATH "org/apache/hadoop/fs/Path" #define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem" #define HADOOP_FS "org/apache/hadoop/fs/FileSystem" #define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus" #define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation" #define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem" #define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream" #define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream" #define HADOOP_STAT "org/apache/hadoop/fs/FileStatus" #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission" #define JAVA_NET_ISA "java/net/InetSocketAddress" #define JAVA_NET_URI "java/net/URI" #define JAVA_STRING "java/lang/String" #define JAVA_VOID "V" /* Macros for constructing method signatures */ #define JPARAM(X) "L" X ";" #define JARRPARAM(X) "[L" X ";" #define JMETHOD1(X, R) "(" X ")" R #define JMETHOD2(X, Y, R) "(" X Y ")" R #define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path" // Bit fields for hdfsFile_internal flags #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo); /** * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream . */ enum hdfsStreamType { UNINITIALIZED = 0, INPUT = 1, OUTPUT = 2, }; /** * The 'file-handle' to a file in hdfs. */ struct hdfsFile_internal { void* file; enum hdfsStreamType type; int flags; }; int hdfsFileIsOpenForRead(hdfsFile file) { return (file->type == INPUT); } int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) { jthrowable jthr; jobject readStats = NULL; jvalue jVal; struct hdfsReadStatistics *s = NULL; int ret; JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } if (file->type != INPUT) { ret = EINVAL; goto done; } jthr = invokeMethod(env, &jVal, INSTANCE, file->file, "org/apache/hadoop/hdfs/client/HdfsDataInputStream", "getReadStatistics", "()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsFileGetReadStatistics: getReadStatistics failed"); goto done; } readStats = jVal.l; s = malloc(sizeof(struct hdfsReadStatistics)); if (!s) { ret = ENOMEM; goto done; } jthr = invokeMethod(env, &jVal, INSTANCE, readStats, "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", "getTotalBytesRead", "()J"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsFileGetReadStatistics: getTotalBytesRead failed"); goto done; } s->totalBytesRead = jVal.j; jthr = invokeMethod(env, &jVal, INSTANCE, readStats, "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", "getTotalLocalBytesRead", "()J"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsFileGetReadStatistics: getTotalLocalBytesRead failed"); goto done; } s->totalLocalBytesRead = jVal.j; jthr = invokeMethod(env, &jVal, INSTANCE, readStats, "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", "getTotalShortCircuitBytesRead", "()J"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed"); goto done; } s->totalShortCircuitBytesRead = jVal.j; *stats = s; s = NULL; ret = 0; done: destroyLocalReference(env, readStats); free(s); if (ret) { errno = ret; return -1; } return 0; } int64_t hdfsReadStatisticsGetRemoteBytesRead( const struct hdfsReadStatistics *stats) { return stats->totalBytesRead - stats->totalLocalBytesRead; } void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) { free(stats); } int hdfsFileIsOpenForWrite(hdfsFile file) { return (file->type == OUTPUT); } int hdfsFileUsesDirectRead(hdfsFile file) { return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); } void hdfsFileDisableDirectRead(hdfsFile file) { file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; } /** * hdfsJniEnv: A wrapper struct to be used as 'value' * while saving thread -> JNIEnv* mappings */ typedef struct { JNIEnv* env; } hdfsJniEnv; /** * Helper function to create a org.apache.hadoop.fs.Path object. * @param env: The JNIEnv pointer. * @param path: The file-path for which to construct org.apache.hadoop.fs.Path * object. * @return Returns a jobject on success and NULL on error. */ static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path, jobject *out) { jthrowable jthr; jstring jPathString; jobject jPath; //Construct a java.lang.String object jthr = newJavaStr(env, path, &jPathString); if (jthr) return jthr; //Construct the org.apache.hadoop.fs.Path object jthr = constructNewObjectOfClass(env, &jPath, "org/apache/hadoop/fs/Path", "(Ljava/lang/String;)V", jPathString); destroyLocalReference(env, jPathString); if (jthr) return jthr; *out = jPath; return NULL; } /** * Set a configuration value. * * @param env The JNI environment * @param jConfiguration The configuration object to modify * @param key The key to modify * @param value The value to set the key to * * @return NULL on success; exception otherwise */ static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration, const char *key, const char *value) { jthrowable jthr; jstring jkey = NULL, jvalue = NULL; jthr = newJavaStr(env, key, &jkey); if (jthr) goto done; jthr = newJavaStr(env, value, &jvalue); if (jthr) goto done; jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration, HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID), jkey, jvalue); if (jthr) goto done; done: destroyLocalReference(env, jkey); destroyLocalReference(env, jvalue); return jthr; } static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration, const char *key, char **val) { jthrowable jthr; jvalue jVal; jstring jkey = NULL, jRet = NULL; jthr = newJavaStr(env, key, &jkey); if (jthr) goto done; jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING)), jkey); if (jthr) goto done; jRet = jVal.l; jthr = newCStr(env, jRet, val); done: destroyLocalReference(env, jkey); destroyLocalReference(env, jRet); return jthr; } int hdfsConfGetStr(const char *key, char **val) { JNIEnv *env; int ret; jthrowable jthr; jobject jConfiguration = NULL; env = getJNIEnv(); if (env == NULL) { ret = EINTERNAL; goto done; } jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsConfGetStr(%s): new Configuration", key); goto done; } jthr = hadoopConfGetStr(env, jConfiguration, key, val); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsConfGetStr(%s): hadoopConfGetStr", key); goto done; } ret = 0; done: destroyLocalReference(env, jConfiguration); if (ret) errno = ret; return ret; } void hdfsConfStrFree(char *val) { free(val); } static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration, const char *key, int32_t *val) { jthrowable jthr = NULL; jvalue jVal; jstring jkey = NULL; jthr = newJavaStr(env, key, &jkey); if (jthr) return jthr; jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"), jkey, (jint)(*val)); destroyLocalReference(env, jkey); if (jthr) return jthr; *val = jVal.i; return NULL; } int hdfsConfGetInt(const char *key, int32_t *val) { JNIEnv *env; int ret; jobject jConfiguration = NULL; jthrowable jthr; env = getJNIEnv(); if (env == NULL) { ret = EINTERNAL; goto done; } jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsConfGetInt(%s): new Configuration", key); goto done; } jthr = hadoopConfGetInt(env, jConfiguration, key, val); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsConfGetInt(%s): hadoopConfGetInt", key); goto done; } ret = 0; done: destroyLocalReference(env, jConfiguration); if (ret) errno = ret; return ret; } struct hdfsBuilderConfOpt { struct hdfsBuilderConfOpt *next; const char *key; const char *val; }; struct hdfsBuilder { int forceNewInstance; const char *nn; tPort port; const char *kerbTicketCachePath; const char *userName; struct hdfsBuilderConfOpt *opts; }; struct hdfsBuilder *hdfsNewBuilder(void) { struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder)); if (!bld) { errno = ENOMEM; return NULL; } return bld; } int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, const char *val) { struct hdfsBuilderConfOpt *opt, *next; opt = calloc(1, sizeof(struct hdfsBuilderConfOpt)); if (!opt) return -ENOMEM; next = bld->opts; bld->opts = opt; opt->next = next; opt->key = key; opt->val = val; return 0; } void hdfsFreeBuilder(struct hdfsBuilder *bld) { struct hdfsBuilderConfOpt *cur, *next; cur = bld->opts; for (cur = bld->opts; cur; ) { next = cur->next; free(cur); cur = next; } free(bld); } void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) { bld->forceNewInstance = 1; } void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) { bld->nn = nn; } void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) { bld->port = port; } void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) { bld->userName = userName; } void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, const char *kerbTicketCachePath) { bld->kerbTicketCachePath = kerbTicketCachePath; } hdfsFS hdfsConnect(const char* host, tPort port) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); return hdfsBuilderConnect(bld); } /** Always return a new FileSystem handle */ hdfsFS hdfsConnectNewInstance(const char* host, tPort port) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); hdfsBuilderSetForceNewInstance(bld); return hdfsBuilderConnect(bld); } hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); hdfsBuilderSetUserName(bld, user); return hdfsBuilderConnect(bld); } /** Always return a new FileSystem handle */ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); hdfsBuilderSetForceNewInstance(bld); hdfsBuilderSetUserName(bld, user); return hdfsBuilderConnect(bld); } /** * Calculate the effective URI to use, given a builder configuration. * * If there is not already a URI scheme, we prepend 'hdfs://'. * * If there is not already a port specified, and a port was given to the * builder, we suffix that port. If there is a port specified but also one in * the URI, that is an error. * * @param bld The hdfs builder object * @param uri (out param) dynamically allocated string representing the * effective URI * * @return 0 on success; error code otherwise */ static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri) { const char *scheme; char suffix[64]; const char *lastColon; char *u; size_t uriLen; if (!bld->nn) return EINVAL; scheme = (strstr(bld->nn, "://")) ? "" : "hdfs://"; if (bld->port == 0) { suffix[0] = '\0'; } else { lastColon = rindex(bld->nn, ':'); if (lastColon && (strspn(lastColon + 1, "0123456789") == strlen(lastColon + 1))) { fprintf(stderr, "port %d was given, but URI '%s' already " "contains a port!\n", bld->port, bld->nn); return EINVAL; } snprintf(suffix, sizeof(suffix), ":%d", bld->port); } uriLen = strlen(scheme) + strlen(bld->nn) + strlen(suffix); u = malloc((uriLen + 1) * (sizeof(char))); if (!u) { fprintf(stderr, "calcEffectiveURI: out of memory"); return ENOMEM; } snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn, suffix); *uri = u; return 0; } static const char *maybeNull(const char *str) { return str ? str : "(NULL)"; } static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld, char *buf, size_t bufLen) { snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, " "kerbTicketCachePath=%s, userName=%s", bld->forceNewInstance, maybeNull(bld->nn), bld->port, maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName)); return buf; } hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { JNIEnv *env = 0; jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL; jstring jURIString = NULL, jUserString = NULL; jvalue jVal; jthrowable jthr = NULL; char *cURI = 0, buf[512]; int ret; jobject jRet = NULL; struct hdfsBuilderConfOpt *opt; //Get the JNIEnv* corresponding to current thread env = getJNIEnv(); if (env == NULL) { ret = EINTERNAL; goto done; } // jConfiguration = new Configuration(); jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } // set configuration values for (opt = bld->opts; opt; opt = opt->next) { jthr = hadoopConfSetStr(env, jConfiguration, opt->key, opt->val); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s): error setting conf '%s' to '%s'", hdfsBuilderToStr(bld, buf, sizeof(buf)), opt->key, opt->val); goto done; } } //Check what type of FileSystem the caller wants... if (bld->nn == NULL) { // Get a local filesystem. if (bld->forceNewInstance) { // fs = FileSytem#newInstanceLocal(conf); jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_LOCALFS)), jConfiguration); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jFS = jVal.l; } else { // fs = FileSytem#getLocal(conf); jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal", JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_LOCALFS)), jConfiguration); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jFS = jVal.l; } } else { if (!strcmp(bld->nn, "default")) { // jURI = FileSystem.getDefaultUri(conf) jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getDefaultUri", "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;", jConfiguration); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jURI = jVal.l; } else { // fs = FileSystem#get(URI, conf, ugi); ret = calcEffectiveURI(bld, &cURI); if (ret) goto done; jthr = newJavaStr(env, cURI, &jURIString); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jthr = invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI, "create", "(Ljava/lang/String;)Ljava/net/URI;", jURIString); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jURI = jVal.l; } if (bld->kerbTicketCachePath) { jthr = hadoopConfSetStr(env, jConfiguration, KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } } jthr = newJavaStr(env, bld->userName, &jUserString); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } if (bld->forceNewInstance) { jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "newInstance", JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI, jConfiguration, jUserString); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jFS = jVal.l; } else { jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get", JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI, jConfiguration, jUserString); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } jFS = jVal.l; } } jRet = (*env)->NewGlobalRef(env, jFS); if (!jRet) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf))); goto done; } ret = 0; done: // Release unnecessary local references destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jFS); destroyLocalReference(env, jURI); destroyLocalReference(env, jCachePath); destroyLocalReference(env, jURIString); destroyLocalReference(env, jUserString); free(cURI); hdfsFreeBuilder(bld); if (ret) { errno = ret; return NULL; } return (hdfsFS)jRet; } int hdfsDisconnect(hdfsFS fs) { // JAVA EQUIVALENT: // fs.close() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); int ret; if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jFS = (jobject)fs; //Sanity check if (fs == NULL) { errno = EBADF; return -1; } jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, "close", "()V"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsDisconnect: FileSystem#close"); } else { ret = 0; } (*env)->DeleteGlobalRef(env, jFS); if (ret) { errno = ret; return -1; } return 0; } /** * Get the default block size of a FileSystem object. * * @param env The Java env * @param jFS The FileSystem object * @param jPath The path to find the default blocksize at * @param out (out param) the default block size * * @return NULL on success; or the exception */ static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS, jobject jPath, jlong *out) { jthrowable jthr; jvalue jVal; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getDefaultBlockSize", JMETHOD1(JPARAM(HADOOP_PATH), "J"), jPath); if (jthr) return jthr; *out = jVal.j; return NULL; } hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, short replication, tSize blockSize) { /* JAVA EQUIVALENT: File f = new File(path); FSData{Input|Output}Stream f{is|os} = fs.create(f); return f{is|os}; */ /* Get the JNIEnv* corresponding to current thread */ JNIEnv* env = getJNIEnv(); int accmode = flags & O_ACCMODE; if (env == NULL) { errno = EINTERNAL; return NULL; } jstring jStrBufferSize = NULL, jStrReplication = NULL; jobject jConfiguration = NULL, jPath = NULL, jFile = NULL; jobject jFS = (jobject)fs; jthrowable jthr; jvalue jVal; hdfsFile file = NULL; int ret; if (accmode == O_RDONLY || accmode == O_WRONLY) { /* yay */ } else if (accmode == O_RDWR) { fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n"); errno = ENOTSUP; return NULL; } else { fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode); errno = EINVAL; return NULL; } if ((flags & O_CREAT) && (flags & O_EXCL)) { fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); } /* The hadoop java api/signature */ const char* method = NULL; const char* signature = NULL; if (accmode == O_RDONLY) { method = "open"; signature = JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_ISTRM)); } else if (flags & O_APPEND) { method = "append"; signature = JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_OSTRM)); } else { method = "create"; signature = JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_OSTRM)); } /* Create an object of org.apache.hadoop.fs.Path */ jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsOpenFile(%s): constructNewObjectOfPath", path); goto done; } /* Get the Configuration object from the FileSystem object */ jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getConf", JMETHOD1("", JPARAM(HADOOP_CONF))); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsOpenFile(%s): FileSystem#getConf", path); goto done; } jConfiguration = jVal.l; jint jBufferSize = bufferSize; jshort jReplication = replication; jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size"); if (!jStrBufferSize) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM"); goto done; } jStrReplication = (*env)->NewStringUTF(env, "dfs.replication"); if (!jStrReplication) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM"); goto done; } if (!bufferSize) { jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", jStrBufferSize, 4096); if (jthr) { ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND | NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_UNRESOLVED_LINK, "hdfsOpenFile(%s): Configuration#getInt(io.file.buffer.size)", path); goto done; } jBufferSize = jVal.i; } if ((accmode == O_WRONLY) && (flags & O_APPEND) == 0) { if (!replication) { jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration, HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", jStrReplication, 1); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsOpenFile(%s): Configuration#getInt(dfs.replication)", path); goto done; } jReplication = jVal.i; } } /* Create and return either the FSDataInputStream or FSDataOutputStream references jobject jStream */ // READ? if (accmode == O_RDONLY) { jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, method, signature, jPath, jBufferSize); } else if ((accmode == O_WRONLY) && (flags & O_APPEND)) { // WRITE/APPEND? jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, method, signature, jPath); } else { // WRITE/CREATE jboolean jOverWrite = 1; jlong jBlockSize = blockSize; if (jBlockSize == 0) { jthr = getDefaultBlockSize(env, jFS, jPath, &jBlockSize); if (jthr) { ret = EIO; goto done; } } jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, method, signature, jPath, jOverWrite, jBufferSize, jReplication, jBlockSize); } if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsOpenFile(%s): FileSystem#%s(%s)", path, method, signature); goto done; } jFile = jVal.l; file = calloc(1, sizeof(struct hdfsFile_internal)); if (!file) { fprintf(stderr, "hdfsOpenFile(%s): OOM create hdfsFile\n", path); ret = ENOMEM; goto done; } file->file = (*env)->NewGlobalRef(env, jFile); if (!file->file) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsOpenFile(%s): NewGlobalRef", path); goto done; } file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT); file->flags = 0; if ((flags & O_WRONLY) == 0) { // Try a test read to see if we can do direct reads char buf; if (readDirect(fs, file, &buf, 0) == 0) { // Success - 0-byte read should return 0 file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; } else if (errno != ENOTSUP) { // Unexpected error. Clear it, don't set the direct flag. fprintf(stderr, "hdfsOpenFile(%s): WARN: Unexpected error %d when testing " "for direct read compatibility\n", path, errno); } } ret = 0; done: destroyLocalReference(env, jStrBufferSize); destroyLocalReference(env, jStrReplication); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jPath); destroyLocalReference(env, jFile); if (ret) { if (file) { if (file->file) { (*env)->DeleteGlobalRef(env, file->file); } free(file); } errno = ret; return NULL; } return file; } int hdfsCloseFile(hdfsFS fs, hdfsFile file) { int ret; // JAVA EQUIVALENT: // file.close //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Caught exception jthrowable jthr; //Sanity check if (!file || file->type == UNINITIALIZED) { errno = EBADF; return -1; } //The interface whose 'close' method to be called const char* interface = (file->type == INPUT) ? HADOOP_ISTRM : HADOOP_OSTRM; jthr = invokeMethod(env, NULL, INSTANCE, file->file, interface, "close", "()V"); if (jthr) { const char *interfaceShortName = (file->type == INPUT) ? "FSDataInputStream" : "FSDataOutputStream"; ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "%s#close", interfaceShortName); } else { ret = 0; } //De-allocate memory (*env)->DeleteGlobalRef(env, file->file); free(file); if (ret) { errno = ret; return -1; } return 0; } int hdfsExists(hdfsFS fs, const char *path) { JNIEnv *env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jPath; jvalue jVal; jobject jFS = (jobject)fs; jthrowable jthr; if (path == NULL) { errno = EINVAL; return -1; } jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsExists: constructNewObjectOfPath"); return -1; } jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath); destroyLocalReference(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsExists: invokeMethod(%s)", JMETHOD1(JPARAM(HADOOP_PATH), "Z")); return -1; } if (jVal.z) { return 0; } else { errno = ENOENT; return -1; } } // Checks input file for readiness for reading. static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f, jobject* jInputStream) { *jInputStream = (jobject)(f ? f->file : NULL); //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } return 0; } tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { if (length == 0) { return 0; } else if (length < 0) { errno = EINVAL; return -1; } if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { return readDirect(fs, f, buffer, length); } // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(bR); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream; if (readPrepare(env, fs, f, &jInputStream) == -1) { return -1; } jbyteArray jbRarray; jint noReadBytes = length; jvalue jVal; jthrowable jthr; //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); if (!jbRarray) { errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsRead: NewByteArray"); return -1; } jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, HADOOP_ISTRM, "read", "([B)I", jbRarray); if (jthr) { destroyLocalReference(env, jbRarray); errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsRead: FSDataInputStream#read"); return -1; } if (jVal.i < 0) { // EOF destroyLocalReference(env, jbRarray); return 0; } else if (jVal.i == 0) { destroyLocalReference(env, jbRarray); errno = EINTR; return -1; } (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); destroyLocalReference(env, jbRarray); if ((*env)->ExceptionCheck(env)) { errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsRead: GetByteArrayRegion"); return -1; } return jVal.i; } // Reads using the read(ByteBuffer) API, which does fewer copies tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { // JAVA EQUIVALENT: // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer // fis.read(bbuffer); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jInputStream; if (readPrepare(env, fs, f, &jInputStream) == -1) { return -1; } jvalue jVal; jthrowable jthr; //Read the requisite bytes jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length); if (bb == NULL) { errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "readDirect: NewDirectByteBuffer"); return -1; } jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb); destroyLocalReference(env, bb); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "readDirect: FSDataInputStream#read"); return -1; } return (jVal.i < 0) ? 0 : jVal.i; } tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length) { JNIEnv* env; jbyteArray jbRarray; jvalue jVal; jthrowable jthr; if (length == 0) { return 0; } else if (length < 0) { errno = EINVAL; return -1; } if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(pos, bR, 0, length); jbRarray = (*env)->NewByteArray(env, length); if (!jbRarray) { errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsPread: NewByteArray"); return -1; } jthr = invokeMethod(env, &jVal, INSTANCE, f->file, HADOOP_ISTRM, "read", "(J[BII)I", position, jbRarray, 0, length); if (jthr) { destroyLocalReference(env, jbRarray); errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsPread: FSDataInputStream#read"); return -1; } if (jVal.i < 0) { // EOF destroyLocalReference(env, jbRarray); return 0; } else if (jVal.i == 0) { destroyLocalReference(env, jbRarray); errno = EINTR; return -1; } (*env)->GetByteArrayRegion(env, jbRarray, 0, jVal.i, buffer); destroyLocalReference(env, jbRarray); if ((*env)->ExceptionCheck(env)) { errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsPread: GetByteArrayRegion"); return -1; } return jVal.i; } tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length) { // JAVA EQUIVALENT // byte b[] = str.getBytes(); // fso.write(b); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } jobject jOutputStream = f->file; jbyteArray jbWarray; jthrowable jthr; if (length < 0) { errno = EINVAL; return -1; } //Error checking... make sure that this file is 'writable' if (f->type != OUTPUT) { fprintf(stderr, "Cannot write into a non-OutputStream object!\n"); errno = EINVAL; return -1; } if (length < 0) { errno = EINVAL; return -1; } if (length == 0) { return 0; } //Write the requisite bytes into the file jbWarray = (*env)->NewByteArray(env, length); if (!jbWarray) { errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsWrite: NewByteArray"); return -1; } (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer); if ((*env)->ExceptionCheck(env)) { destroyLocalReference(env, jbWarray); errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsWrite(length = %d): SetByteArrayRegion", length); return -1; } jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream, HADOOP_OSTRM, "write", "([B)V", jbWarray); destroyLocalReference(env, jbWarray); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsWrite: FSDataOutputStream#write"); return -1; } // Unlike most Java streams, FSDataOutputStream never does partial writes. // If we succeeded, all the data was written. return length; } int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos) { // JAVA EQUIVALENT // fis.seek(pos); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; } jobject jInputStream = f->file; jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jInputStream, HADOOP_ISTRM, "seek", "(J)V", desiredPos); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsSeek(desiredPos=%" PRId64 ")" ": FSDataInputStream#seek", desiredPos); return -1; } return 0; } tOffset hdfsTell(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // pos = f.getPos(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Parameters jobject jStream = f->file; const char* interface = (f->type == INPUT) ? HADOOP_ISTRM : HADOOP_OSTRM; jvalue jVal; jthrowable jthr = invokeMethod(env, &jVal, INSTANCE, jStream, interface, "getPos", "()J"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsTell: %s#getPos", ((f->type == INPUT) ? "FSDataInputStream" : "FSDataOutputStream")); return -1; } return jVal.j; } int hdfsFlush(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // fos.flush(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Sanity check if (!f || f->type != OUTPUT) { errno = EBADF; return -1; } jthrowable jthr = invokeMethod(env, NULL, INSTANCE, f->file, HADOOP_OSTRM, "flush", "()V"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsFlush: FSDataInputStream#flush"); return -1; } return 0; } int hdfsHFlush(hdfsFS fs, hdfsFile f) { //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Sanity check if (!f || f->type != OUTPUT) { errno = EBADF; return -1; } jobject jOutputStream = f->file; jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream, HADOOP_OSTRM, "hflush", "()V"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsHFlush: FSDataOutputStream#hflush"); return -1; } return 0; } int hdfsHSync(hdfsFS fs, hdfsFile f) { //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Sanity check if (!f || f->type != OUTPUT) { errno = EBADF; return -1; } jobject jOutputStream = f->file; jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream, HADOOP_OSTRM, "hsync", "()V"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsHSync: FSDataOutputStream#hsync"); return -1; } return 0; } int hdfsAvailable(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // fis.available(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; } //Parameters jobject jInputStream = f->file; jvalue jVal; jthrowable jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, HADOOP_ISTRM, "available", "()I"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsAvailable: FSDataInputStream#available"); return -1; } return jVal.i; } static int hdfsCopyImpl(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst, jboolean deleteSource) { //JAVA EQUIVALENT // FileUtil#copy(srcFS, srcPath, dstFS, dstPath, // deleteSource = false, conf) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jSrcFS = (jobject)srcFS; jobject jDstFS = (jobject)dstFS; jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL; jthrowable jthr; jvalue jVal; int ret; jthr = constructNewObjectOfPath(env, src, &jSrcPath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsCopyImpl(src=%s): constructNewObjectOfPath", src); goto done; } jthr = constructNewObjectOfPath(env, dst, &jDstPath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst); goto done; } //Create the org.apache.hadoop.conf.Configuration object jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsCopyImpl: Configuration constructor"); goto done; } //FileUtil#copy jthr = invokeMethod(env, &jVal, STATIC, NULL, "org/apache/hadoop/fs/FileUtil", "copy", "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;" "Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;" "ZLorg/apache/hadoop/conf/Configuration;)Z", jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource, jConfiguration); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): " "FileUtil#copy", src, dst, deleteSource); goto done; } if (!jVal.z) { ret = EIO; goto done; } ret = 0; done: destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); if (ret) { errno = ret; return -1; } return 0; } int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { return hdfsCopyImpl(srcFS, src, dstFS, dst, 0); } int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { return hdfsCopyImpl(srcFS, src, dstFS, dst, 1); } int hdfsDelete(hdfsFS fs, const char* path, int recursive) { // JAVA EQUIVALENT: // Path p = new Path(path); // bool retval = fs.delete(p, recursive); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; jthrowable jthr; jobject jPath; jvalue jVal; jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsDelete(path=%s): constructNewObjectOfPath", path); return -1; } jboolean jRecursive = recursive ? JNI_TRUE : JNI_FALSE; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "delete", "(Lorg/apache/hadoop/fs/Path;Z)Z", jPath, jRecursive); destroyLocalReference(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsDelete(path=%s, recursive=%d): " "FileSystem#delete", path, recursive); return -1; } if (!jVal.z) { errno = EIO; return -1; } return 0; } int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { // JAVA EQUIVALENT: // Path old = new Path(oldPath); // Path new = new Path(newPath); // fs.rename(old, new); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; jthrowable jthr; jobject jOldPath = NULL, jNewPath = NULL; int ret = -1; jvalue jVal; jthr = constructNewObjectOfPath(env, oldPath, &jOldPath ); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsRename: constructNewObjectOfPath(%s)", oldPath); goto done; } jthr = constructNewObjectOfPath(env, newPath, &jNewPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsRename: constructNewObjectOfPath(%s)", newPath); goto done; } // Rename the file // TODO: use rename2 here? (See HDFS-3592) jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "rename", JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_PATH), "Z"), jOldPath, jNewPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsRename(oldPath=%s, newPath=%s): FileSystem#rename", oldPath, newPath); goto done; } if (!jVal.z) { errno = EIO; goto done; } ret = 0; done: destroyLocalReference(env, jOldPath); destroyLocalReference(env, jNewPath); return ret; } char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { // JAVA EQUIVALENT: // Path p = fs.getWorkingDirectory(); // return p.toString() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jPath = NULL; jstring jPathString = NULL; jobject jFS = (jobject)fs; jvalue jVal; jthrowable jthr; int ret; const char *jPathChars = NULL; //FileSystem#getWorkingDirectory() jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getWorkingDirectory", "()Lorg/apache/hadoop/fs/Path;"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetWorkingDirectory: FileSystem#getWorkingDirectory"); goto done; } jPath = jVal.l; if (!jPath) { fprintf(stderr, "hdfsGetWorkingDirectory: " "FileSystem#getWorkingDirectory returned NULL"); ret = -EIO; goto done; } //Path#toString() jthr = invokeMethod(env, &jVal, INSTANCE, jPath, "org/apache/hadoop/fs/Path", "toString", "()Ljava/lang/String;"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetWorkingDirectory: Path#toString"); goto done; } jPathString = jVal.l; jPathChars = (*env)->GetStringUTFChars(env, jPathString, NULL); if (!jPathChars) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsGetWorkingDirectory: GetStringUTFChars"); goto done; } //Copy to user-provided buffer ret = snprintf(buffer, bufferSize, "%s", jPathChars); if (ret >= bufferSize) { ret = ENAMETOOLONG; goto done; } ret = 0; done: if (jPathChars) { (*env)->ReleaseStringUTFChars(env, jPathString, jPathChars); } destroyLocalReference(env, jPath); destroyLocalReference(env, jPathString); if (ret) { errno = ret; return NULL; } return buffer; } int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // fs.setWorkingDirectory(Path(path)); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; jthrowable jthr; jobject jPath; //Create an object of org.apache.hadoop.fs.Path jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsSetWorkingDirectory(%s): constructNewObjectOfPath", path); return -1; } //FileSystem#setWorkingDirectory() jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, "setWorkingDirectory", "(Lorg/apache/hadoop/fs/Path;)V", jPath); destroyLocalReference(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ILLEGAL_ARGUMENT, "hdfsSetWorkingDirectory(%s): FileSystem#setWorkingDirectory", path); return -1; } return 0; } int hdfsCreateDirectory(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // fs.mkdirs(new Path(path)); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; jobject jPath; jthrowable jthr; //Create an object of org.apache.hadoop.fs.Path jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsCreateDirectory(%s): constructNewObjectOfPath", path); return -1; } //Create the directory jvalue jVal; jVal.z = 0; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z", jPath); destroyLocalReference(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | NOPRINT_EXC_UNRESOLVED_LINK | NOPRINT_EXC_PARENT_NOT_DIRECTORY, "hdfsCreateDirectory(%s): FileSystem#mkdirs", path); return -1; } if (!jVal.z) { // It's unclear under exactly which conditions FileSystem#mkdirs // is supposed to return false (as opposed to throwing an exception.) // It seems like the current code never actually returns false. // So we're going to translate this to EIO, since there seems to be // nothing more specific we can do with it. errno = EIO; return -1; } return 0; } int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { // JAVA EQUIVALENT: // fs.setReplication(new Path(path), replication); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; jthrowable jthr; //Create an object of org.apache.hadoop.fs.Path jobject jPath; jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsSetReplication(path=%s): constructNewObjectOfPath", path); return -1; } //Create the directory jvalue jVal; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z", jPath, replication); destroyLocalReference(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsSetReplication(path=%s, replication=%d): " "FileSystem#setReplication", path, replication); return -1; } if (!jVal.z) { // setReplication returns false "if file does not exist or is a // directory." So the nearest translation to that is ENOENT. errno = ENOENT; return -1; } return 0; } int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) { // JAVA EQUIVALENT: // fs.setOwner(path, owner, group) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } if (owner == NULL && group == NULL) { return 0; } jobject jFS = (jobject)fs; jobject jPath = NULL; jstring jOwner = NULL, jGroup = NULL; jthrowable jthr; int ret; jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsChown(path=%s): constructNewObjectOfPath", path); goto done; } jthr = newJavaStr(env, owner, &jOwner); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsChown(path=%s): newJavaStr(%s)", path, owner); goto done; } jthr = newJavaStr(env, group, &jGroup); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsChown(path=%s): newJavaStr(%s)", path, group); goto done; } //Create the directory jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, "setOwner", JMETHOD3(JPARAM(HADOOP_PATH), JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID), jPath, jOwner, jGroup); if (jthr) { ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | NOPRINT_EXC_UNRESOLVED_LINK, "hdfsChown(path=%s, owner=%s, group=%s): " "FileSystem#setOwner", path, owner, group); goto done; } ret = 0; done: destroyLocalReference(env, jPath); destroyLocalReference(env, jOwner); destroyLocalReference(env, jGroup); if (ret) { errno = ret; return -1; } return 0; } int hdfsChmod(hdfsFS fs, const char* path, short mode) { int ret; // JAVA EQUIVALENT: // fs.setPermission(path, FsPermission) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jthrowable jthr; jobject jPath = NULL, jPermObj = NULL; jobject jFS = (jobject)fs; // construct jPerm = FsPermission.createImmutable(short mode); jshort jmode = mode; jthr = constructNewObjectOfClass(env, &jPermObj, HADOOP_FSPERM,"(S)V",jmode); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "constructNewObjectOfClass(%s)", HADOOP_FSPERM); return -1; } //Create an object of org.apache.hadoop.fs.Path jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsChmod(%s): constructNewObjectOfPath", path); goto done; } //Create the directory jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, "setPermission", JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSPERM), JAVA_VOID), jPath, jPermObj); if (jthr) { ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | NOPRINT_EXC_UNRESOLVED_LINK, "hdfsChmod(%s): FileSystem#setPermission", path); goto done; } ret = 0; done: destroyLocalReference(env, jPath); destroyLocalReference(env, jPermObj); if (ret) { errno = ret; return -1; } return 0; } int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { // JAVA EQUIVALENT: // fs.setTimes(src, mtime, atime) jthrowable jthr; //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath; jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsUtime(path=%s): constructNewObjectOfPath", path); return -1; } const tTime NO_CHANGE = -1; jlong jmtime = (mtime == NO_CHANGE) ? -1 : (mtime * (jlong)1000); jlong jatime = (atime == NO_CHANGE) ? -1 : (atime * (jlong)1000); jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS, "setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J", JAVA_VOID), jPath, jmtime, jatime); destroyLocalReference(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | NOPRINT_EXC_UNRESOLVED_LINK, "hdfsUtime(path=%s): FileSystem#setTimes", path); return -1; } return 0; } char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { // JAVA EQUIVALENT: // fs.getFileBlockLoctions(new Path(path), start, length); jthrowable jthr; jobject jPath = NULL; jobject jFileStatus = NULL; jvalue jFSVal, jVal; jobjectArray jBlockLocations = NULL, jFileBlockHosts = NULL; jstring jHost = NULL; char*** blockHosts = NULL; int i, j, ret; jsize jNumFileBlocks = 0; //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetHosts(path=%s): constructNewObjectOfPath", path); goto done; } jthr = invokeMethod(env, &jFSVal, INSTANCE, jFS, HADOOP_FS, "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)" "Lorg/apache/hadoop/fs/FileStatus;", jPath); if (jthr) { ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND, "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" "FileSystem#getFileStatus", path, start, length); destroyLocalReference(env, jPath); goto done; } jFileStatus = jFSVal.l; //org.apache.hadoop.fs.FileSystem#getFileBlockLocations jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getFileBlockLocations", "(Lorg/apache/hadoop/fs/FileStatus;JJ)" "[Lorg/apache/hadoop/fs/BlockLocation;", jFileStatus, start, length); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" "FileSystem#getFileBlockLocations", path, start, length); goto done; } jBlockLocations = jVal.l; //Figure out no of entries in jBlockLocations //Allocate memory and add NULL at the end jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations); blockHosts = calloc(jNumFileBlocks + 1, sizeof(char**)); if (blockHosts == NULL) { ret = ENOMEM; goto done; } if (jNumFileBlocks == 0) { ret = 0; goto done; } //Now parse each block to get hostnames for (i = 0; i < jNumFileBlocks; ++i) { jobject jFileBlock = (*env)->GetObjectArrayElement(env, jBlockLocations, i); if (!jFileBlock) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" "GetObjectArrayElement(%d)", path, start, length, i); goto done; } jthr = invokeMethod(env, &jVal, INSTANCE, jFileBlock, HADOOP_BLK_LOC, "getHosts", "()[Ljava/lang/String;"); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" "BlockLocation#getHosts", path, start, length); goto done; } jFileBlockHosts = jVal.l; if (!jFileBlockHosts) { fprintf(stderr, "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):" "BlockLocation#getHosts returned NULL", path, start, length); ret = EINTERNAL; goto done; } //Figure out no of hosts in jFileBlockHosts, and allocate the memory jsize jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts); blockHosts[i] = calloc(jNumBlockHosts + 1, sizeof(char*)); if (!blockHosts[i]) { ret = ENOMEM; goto done; } //Now parse each hostname const char *hostName; for (j = 0; j < jNumBlockHosts; ++j) { jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j); if (!jHost) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"): " "NewByteArray", path, start, length); goto done; } hostName = (const char*)((*env)->GetStringUTFChars(env, jHost, NULL)); if (!hostName) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64", " "j=%d out of %d): GetStringUTFChars", path, start, length, j, jNumBlockHosts); goto done; } blockHosts[i][j] = strdup(hostName); (*env)->ReleaseStringUTFChars(env, jHost, hostName); if (!blockHosts[i][j]) { ret = ENOMEM; goto done; } destroyLocalReference(env, jHost); jHost = NULL; } destroyLocalReference(env, jFileBlockHosts); jFileBlockHosts = NULL; } ret = 0; done: destroyLocalReference(env, jPath); destroyLocalReference(env, jFileStatus); destroyLocalReference(env, jBlockLocations); destroyLocalReference(env, jFileBlockHosts); destroyLocalReference(env, jHost); if (ret) { if (blockHosts) { hdfsFreeHosts(blockHosts); } return NULL; } return blockHosts; } void hdfsFreeHosts(char ***blockHosts) { int i, j; for (i=0; blockHosts[i]; i++) { for (j=0; blockHosts[i][j]; j++) { free(blockHosts[i][j]); } free(blockHosts[i]); } free(blockHosts); } tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { // JAVA EQUIVALENT: // fs.getDefaultBlockSize(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //FileSystem#getDefaultBlockSize() jvalue jVal; jthrowable jthr; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getDefaultBlockSize", "()J"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize"); return -1; } return jVal.j; } tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) { // JAVA EQUIVALENT: // fs.getDefaultBlockSize(path); jthrowable jthr; jobject jFS = (jobject)fs; jobject jPath; tOffset blockSize; JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath", path); return -1; } jthr = getDefaultBlockSize(env, jFS, jPath, &blockSize); (*env)->DeleteLocalRef(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetDefaultBlockSize(path=%s): " "FileSystem#getDefaultBlockSize", path); return -1; } return blockSize; } tOffset hdfsGetCapacity(hdfsFS fs) { // JAVA EQUIVALENT: // FsStatus fss = fs.getStatus(); // return Fss.getCapacity(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //FileSystem#getStatus jvalue jVal; jthrowable jthr; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetCapacity: FileSystem#getStatus"); return -1; } jobject fss = (jobject)jVal.l; jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS, "getCapacity", "()J"); destroyLocalReference(env, fss); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetCapacity: FsStatus#getCapacity"); return -1; } return jVal.j; } tOffset hdfsGetUsed(hdfsFS fs) { // JAVA EQUIVALENT: // FsStatus fss = fs.getStatus(); // return Fss.getUsed(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //FileSystem#getStatus jvalue jVal; jthrowable jthr; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;"); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetUsed: FileSystem#getStatus"); return -1; } jobject fss = (jobject)jVal.l; jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS, "getUsed", "()J"); destroyLocalReference(env, fss); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetUsed: FsStatus#getUsed"); return -1; } return jVal.j; } static jthrowable getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo) { jvalue jVal; jthrowable jthr; jobject jPath = NULL; jstring jPathName = NULL; jstring jUserName = NULL; jstring jGroupName = NULL; jobject jPermission = NULL; jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "isDir", "()Z"); if (jthr) goto done; fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile; jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getReplication", "()S"); if (jthr) goto done; fileInfo->mReplication = jVal.s; jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getBlockSize", "()J"); if (jthr) goto done; fileInfo->mBlockSize = jVal.j; jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getModificationTime", "()J"); if (jthr) goto done; fileInfo->mLastMod = jVal.j / 1000; jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getAccessTime", "()J"); if (jthr) goto done; fileInfo->mLastAccess = (tTime) (jVal.j / 1000); if (fileInfo->mKind == kObjectKindFile) { jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getLen", "()J"); if (jthr) goto done; fileInfo->mSize = jVal.j; } jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getPath", "()Lorg/apache/hadoop/fs/Path;"); if (jthr) goto done; jPath = jVal.l; if (jPath == NULL) { jthr = newRuntimeError(env, "org.apache.hadoop.fs.FileStatus#" "getPath returned NULL!"); goto done; } jthr = invokeMethod(env, &jVal, INSTANCE, jPath, HADOOP_PATH, "toString", "()Ljava/lang/String;"); if (jthr) goto done; jPathName = jVal.l; const char *cPathName = (const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL)); if (!cPathName) { jthr = getPendingExceptionAndClear(env); goto done; } fileInfo->mName = strdup(cPathName); (*env)->ReleaseStringUTFChars(env, jPathName, cPathName); jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getOwner", "()Ljava/lang/String;"); if (jthr) goto done; jUserName = jVal.l; const char* cUserName = (const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL)); if (!cUserName) { jthr = getPendingExceptionAndClear(env); goto done; } fileInfo->mOwner = strdup(cUserName); (*env)->ReleaseStringUTFChars(env, jUserName, cUserName); const char* cGroupName; jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getGroup", "()Ljava/lang/String;"); if (jthr) goto done; jGroupName = jVal.l; cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL)); if (!cGroupName) { jthr = getPendingExceptionAndClear(env); goto done; } fileInfo->mGroup = strdup(cGroupName); (*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName); jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT, "getPermission", "()Lorg/apache/hadoop/fs/permission/FsPermission;"); if (jthr) goto done; if (jVal.l == NULL) { jthr = newRuntimeError(env, "%s#getPermission returned NULL!", HADOOP_STAT); goto done; } jPermission = jVal.l; jthr = invokeMethod(env, &jVal, INSTANCE, jPermission, HADOOP_FSPERM, "toShort", "()S"); if (jthr) goto done; fileInfo->mPermissions = jVal.s; jthr = NULL; done: if (jthr) hdfsFreeFileInfoEntry(fileInfo); destroyLocalReference(env, jPath); destroyLocalReference(env, jPathName); destroyLocalReference(env, jUserName); destroyLocalReference(env, jGroupName); destroyLocalReference(env, jPermission); destroyLocalReference(env, jPath); return jthr; } static jthrowable getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo **fileInfo) { // JAVA EQUIVALENT: // fs.isDirectory(f) // fs.getModificationTime() // fs.getAccessTime() // fs.getLength(f) // f.getPath() // f.getOwner() // f.getGroup() // f.getPermission().toShort() jobject jStat; jvalue jVal; jthrowable jthr; jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath); if (jthr) return jthr; if (jVal.z == 0) { *fileInfo = NULL; return NULL; } jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "getFileStatus", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_STAT)), jPath); if (jthr) return jthr; jStat = jVal.l; *fileInfo = calloc(1, sizeof(hdfsFileInfo)); if (!*fileInfo) { destroyLocalReference(env, jStat); return newRuntimeError(env, "getFileInfo: OOM allocating hdfsFileInfo"); } jthr = getFileInfoFromStat(env, jStat, *fileInfo); destroyLocalReference(env, jStat); return jthr; } hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { // JAVA EQUIVALENT: // Path p(path); // Path []pathList = fs.listPaths(p) // foreach path in pathList // getFileInfo(path) jthrowable jthr; jobject jPath = NULL; hdfsFileInfo *pathList = NULL; jobjectArray jPathList = NULL; jvalue jVal; jsize jPathListSize = 0; int ret; //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsListDirectory(%s): constructNewObjectOfPath", path); goto done; } jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_DFS, "listStatus", JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_STAT)), jPath); if (jthr) { ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | NOPRINT_EXC_UNRESOLVED_LINK, "hdfsListDirectory(%s): FileSystem#listStatus", path); goto done; } jPathList = jVal.l; //Figure out the number of entries in that directory jPathListSize = (*env)->GetArrayLength(env, jPathList); if (jPathListSize == 0) { ret = 0; goto done; } //Allocate memory pathList = calloc(jPathListSize, sizeof(hdfsFileInfo)); if (pathList == NULL) { ret = ENOMEM; goto done; } //Save path information in pathList jsize i; jobject tmpStat; for (i=0; i < jPathListSize; ++i) { tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i); if (!tmpStat) { ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "hdfsListDirectory(%s): GetObjectArrayElement(%d out of %d)", path, i, jPathListSize); goto done; } jthr = getFileInfoFromStat(env, tmpStat, &pathList[i]); destroyLocalReference(env, tmpStat); if (jthr) { ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsListDirectory(%s): getFileInfoFromStat(%d out of %d)", path, i, jPathListSize); goto done; } } ret = 0; done: destroyLocalReference(env, jPath); destroyLocalReference(env, jPathList); if (ret) { hdfsFreeFileInfo(pathList, jPathListSize); errno = ret; return NULL; } *numEntries = jPathListSize; return pathList; } hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // File f(path); // fs.isDirectory(f) // fs.lastModified() ?? // fs.getLength(f) // f.getPath() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath; jthrowable jthr = constructNewObjectOfPath(env, path, &jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "hdfsGetPathInfo(%s): constructNewObjectOfPath", path); return NULL; } hdfsFileInfo *fileInfo; jthr = getFileInfo(env, jFS, jPath, &fileInfo); destroyLocalReference(env, jPath); if (jthr) { errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND | NOPRINT_EXC_UNRESOLVED_LINK, "hdfsGetPathInfo(%s): getFileInfo", path); return NULL; } if (!fileInfo) { errno = ENOENT; return NULL; } return fileInfo; } static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo) { free(hdfsFileInfo->mName); free(hdfsFileInfo->mOwner); free(hdfsFileInfo->mGroup); memset(hdfsFileInfo, 0, sizeof(hdfsFileInfo)); } void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { //Free the mName, mOwner, and mGroup int i; for (i=0; i < numEntries; ++i) { hdfsFreeFileInfoEntry(hdfsFileInfo + i); } //Free entire block free(hdfsFileInfo); } /** * vim: ts=4: sw=4: et: */