-
Notifications
You must be signed in to change notification settings - Fork 185
[RFC] src/aiori-CEPHFS: New libcephfs backend #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,361 @@ | ||
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- | ||
* vim:expandtab:shiftwidth=8:tabstop=8: | ||
*/ | ||
/******************************************************************************\ | ||
* * | ||
* (C) 2015 The University of Chicago * | ||
* (C) 2020 Red Hat, Inc. * | ||
* * | ||
* See COPYRIGHT in top-level directory. * | ||
* * | ||
******************************************************************************** | ||
* | ||
* Implement abstract I/O interface for CEPHFS. | ||
* | ||
\******************************************************************************/ | ||
|
||
#ifdef HAVE_CONFIG_H | ||
# include "config.h" | ||
#endif | ||
10000 |
|
|
#include <stdio.h> | ||
#include <stdlib.h> | ||
#include <sys/stat.h> | ||
#include <cephfs/libcephfs.h> | ||
|
||
#include "ior.h" | ||
#include "iordef.h" | ||
#include "aiori.h" | ||
#include "utilities.h" | ||
|
||
#define CEPH_O_RDONLY 00000000 | ||
#define CEPH_O_WRONLY 00000001 | ||
#define CEPH_O_RDWR 00000002 | ||
#define CEPH_O_CREAT 00000100 | ||
#define CEPH_O_EXCL 00000200 | ||
#define CEPH_O_TRUNC 00001000 | ||
#define CEPH_O_LAZY 00020000 | ||
#define CEPH_O_DIRECTORY 00200000 | ||
#define CEPH_O_NOFOLLOW 00400000 | ||
|
||
/************************** O P T I O N S *****************************/ | ||
struct cephfs_options{ | ||
char * user; | ||
char * conf; | ||
char * prefix; | ||
}; | ||
|
||
static struct cephfs_options o = { | ||
.user = NULL, | ||
.conf = NULL, | ||
.prefix = NULL, | ||
}; | ||
|
||
static option_help options [] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. great that you use the new options ^-^ |
||
{0, "cephfs.user", "Username for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.user}, | ||
{0, "cephfs.conf", "Config file for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.conf}, | ||
{0, "cephfs.prefix", "mount prefix", OPTION_OPTIONAL_ARGUMENT, 's', & o.prefix}, | ||
LAST_OPTION | ||
}; | ||
|
||
static struct ceph_mount_info *cmount; | ||
|
||
/**************************** P R O T O T Y P E S *****************************/ | ||
static void CEPHFS_Init(); | ||
static void CEPHFS_Final(); | ||
static void *CEPHFS_Create(char *, IOR_param_t *); | ||
static void *CEPHFS_Open(char *, IOR_param_t *); | ||
static IOR_offset_t CEPHFS_Xfer(int, void *, IOR_size_t *, | ||
IOR_offset_t, IOR_param_t *); | ||
static void CEPHFS_Close(void *, IOR_param_t *); | ||
static void CEPHFS_Delete(char *, IOR_param_t *); | ||
static void CEPHFS_Fsync(void *, IOR_param_t *); | ||
static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t *, MPI_Comm, char *); | ||
static int CEPHFS_StatFS(const char *, ior_aiori_statfs_t *, IOR_param_t *); | ||
static int CEPHFS_MkDir(const char *, mode_t, IOR_param_t *); | ||
static int CEPHFS_RmDir(const char *, IOR_param_t *); | ||
static int CEPHFS_Access(const char *, int, IOR_param_t *); | ||
static int CEPHFS_Stat(const char *, struct stat *, IOR_param_t *); | ||
static option_help * CEPHFS_options(); | ||
|
||
/************************** D E C L A R A T I O N S ***************************/ | ||
ior_aiori_t cephfs_aiori = { | ||
.name = "CEPHFS", | ||
.name_legacy = NULL, | ||
.initialize = CEPHFS_Init, | ||
.finalize = CEPHFS_Final, | ||
.create = CEPHFS_Create, | ||
.open = CEPHFS_Open, | ||
.xfer = CEPHFS_Xfer, | ||
.close = CEPHFS_Close, | ||
.delete = CEPHFS_Delete, | ||
.get_version = aiori_get_version, | ||
.fsync = CEPHFS_Fsync, | ||
.get_file_size = CEPHFS_GetFileSize, | ||
.statfs = CEPHFS_StatFS, | ||
.mkdir = CEPHFS_MkDir, | ||
.rmdir = CEPHFS_RmDir, | ||
.access = CEPHFS_Access, | ||
.stat = CEPHFS_Stat, | ||
.get_options = CEPHFS_options, | ||
}; | ||
|
||
#define CEPHFS_ERR(__err_str, __ret) do { \ | ||
errno = -__ret; \ | ||
ERR(__err_str); \ | ||
} while(0) | ||
|
||
/***************************** F U N C T I O N S ******************************/ | ||
static const char* pfix(const char* path) { | ||
const char* npath = path; | ||
const char* prefix = o.prefix; | ||
while (*prefix) { | ||
if(*prefix++ != *npath++) { | ||
return path; | ||
} | ||
} | ||
return npath; | ||
} | ||
|
||
static option_help * CEPHFS_options(){ | ||
return options; | ||
} | ||
|
||
static void CEPHFS_Init() | ||
{ | ||
/* Short circuit if the mount handle already exists */ | ||
if (cmount) { | ||
return; | ||
} | ||
|
||
int ret; | ||
/* create CEPHFS mount handle */ | ||
ret = ceph_create(&cmount, o.user); | ||
if (ret) { | ||
CEPHFS_ERR("unable to create CEPHFS mount handle", ret); | ||
} | ||
|
||
/* set the handle using the Ceph config */ | ||
ret = ceph_conf_read_file(cmount, o.conf); | ||
if (ret) { | ||
CEPHFS_ERR("unable to read ceph config file", ret); | ||
} | ||
|
||
/* mount the handle */ | ||
ret = ceph_mount(cmount, "/"); | ||
if (ret) { | ||
CEPHFS_ERR("unable to mount cephfs", ret); | ||
ceph_shutdown(cmount); | ||
|
||
} | ||
|
||
Inode *root; | ||
|
||
/* try retrieving the root cephfs inode */ | ||
ret = ceph_ll_lookup_root(cmount, &root); | ||
if (ret) { | ||
CEPHFS_ERR("uanble to retrieve root cephfs inode", ret); | ||
ceph_shutdown(cmount); | ||
|
||
} | ||
|
||
return; | ||
} | ||
|
||
static void CEPHFS_Final() | ||
{ | ||
/* shutdown */ | ||
ceph_shutdown(cmount); | ||
} | ||
|
||
static void *CEPHFS_Create(char *testFileName, IOR_param_t * param) | ||
{ | ||
return CEPHFS_Open(testFileName, param); | ||
} | ||
|
||
static void *CEPHFS_Open(char *testFileName, IOR_param_t * param) | ||
{ | ||
const char *file = pfix(testFileName); | ||
int* fd; | ||
fd = (int *)malloc(sizeof(int)); | ||
|
||
mode_t mode = 0664; | ||
int flags = (int) 0; | ||
|
||
/* set IOR file flags to CephFS flags */ | ||
/* -- file open flags -- */ | ||
if (param->openFlags & IOR_RDONLY) { | ||
flags |= CEPH_O_RDONLY; | ||
} | ||
if (param->openFlags & IOR_WRONLY) { | ||
flags |= CEPH_O_WRONLY; | ||
} | ||
if (param->openFlags & IOR_RDWR) { | ||
flags |= CEPH_O_RDWR; | ||
} | ||
if (param->openFlags & IOR_APPEND) { | ||
fprintf(stdout, "File append not implemented in CephFS\n"); | ||
} | ||
if (param->openFlags & IOR_CREAT) { | ||
flags |= CEPH_O_CREAT; | ||
} | ||
if (param->openFlags & IOR_EXCL) { | ||
flags |= CEPH_O_EXCL; | ||
} | ||
if (param->openFlags & IOR_TRUNC) { | ||
flags |= CEPH_O_TRUNC; | ||
} | ||
if (param->openFlags & IOR_DIRECT) { | ||
fprintf(stdout, "O_DIRECT not implemented in CephFS\n"); | ||
} | ||
*fd = ceph_open(cmount, file, flags, mode); | ||
if (*fd < 0) { | ||
CEPHFS_ERR("ceph_open failed", *fd); | ||
} | ||
return (void *) fd; | ||
} | ||
|
||
static IOR_offset_t CEPHFS_Xfer(int access, void *file, IOR_size_t * buffer, | ||
IOR_offset_t length, IOR_param_t * param) | ||
{ | ||
uint64_t size = (uint64_t) length; | ||
char *buf = (char *) buffer; | ||
int fd = *(int *) file; | ||
int ret; | ||
|
||
if (access == WRITE) | ||
{ | ||
ret = ceph_write(cmount, fd, buf, size, param->offset); | ||
if (ret < 0) { | ||
CEPHFS_ERR("unable to write file to CephFS", ret); | ||
} else if (ret < size) { | ||
CEPHFS_ERR("short write to CephFS", ret); | ||
} | ||
if (param->fsyncPerWrite == TRUE) { | ||
CEPHFS_Fsync(&fd, param); | ||
} | ||
} | ||
else /* READ */ | ||
{ | ||
ret = ceph_read(cmount, fd, buf, size, param->offset); | ||
if (ret < 0) { | ||
CEPHFS_ERR("unable to read file from CephFS", ret); | ||
} else if (ret < size) { | ||
CEPHFS_ERR("short read from CephFS", ret); | ||
} | ||
|
||
} | ||
return length; | ||
} | ||
|
||
static void CEPHFS_Fsync(void *file, IOR_param_t * param) | ||
{ | ||
int fd = *(int *) file; | ||
int ret = ceph_fsync(cmount, fd, 0); | ||
if (ret < 0) { | ||
CEPHFS_ERR("ceph_fsync failed", ret); | ||
} | ||
} | ||
|
||
static void CEPHFS_Close(void *file, IOR_param_t * param) | ||
{ | ||
int fd = *(int *) file; | ||
int ret = ceph_close(cmount, fd); | ||
if (ret < 0) { | ||
CEPHFS_ERR("ceph_close failed", ret); | ||
} | ||
free(file); | ||
|
||
return; | ||
} | ||
|
||
static void CEPHFS_Delete(char *testFileName, IOR_param_t * param) | ||
{ | ||
int ret = ceph_unlink(cmount, pfix(testFileName)); | ||
if (ret < 0) { | ||
CEPHFS_ERR("ceph_unlink failed", ret); | ||
} | ||
return; | ||
} | ||
|
||
static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t * param, MPI_Comm testComm, | ||
char *testFileName) | ||
{ | ||
struct stat stat_buf; | ||
IOR_offset_t aggFileSizeFromStat, tmpMin, tmpMax, tmpSum; | ||
|
||
int ret = ceph_stat(cmount, pfix(testFileName), &stat_buf); | ||
if (ret < 0) { | ||
CEPHFS_ERR("ceph_stat failed", ret); | ||
} | ||
aggFileSizeFromStat = stat_buf.st_size; | ||
|
||
if (param->filePerProc == TRUE) { | ||
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1, | ||
MPI_LONG_LONG_INT, MPI_SUM, testComm), | ||
"cannot total data moved"); | ||
aggFileSizeFromStat = tmpSum; | ||
} else { | ||
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1, | ||
MPI_LONG_LONG_INT, MPI_MIN, testComm), | ||
"cannot total data moved"); | ||
MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1, | ||
MPI_LONG_LONG_INT, MPI_MAX, testComm), | ||
"cannot total data moved"); | ||
if (tmpMin != tmpMax) { | ||
if (rank == 0) { | ||
WARN("inconsistent file size by different tasks"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice check, albeit it costs a little performance. |
||
} | ||
/* incorrect, but now consistent across tasks */ | ||
aggFileSizeFromStat = tmpMin; | ||
} | ||
} | ||
|
||
return (aggFileSizeFromStat); | ||
|
||
} | ||
|
||
static int CEPHFS_StatFS(const char *path, ior_aiori_statfs_t *stat_buf, | ||
IOR_param_t *param) | ||
{ | ||
#if defined(HAVE_STATVFS) | ||
struct statvfs statfs_buf; | ||
int ret = ceph_statfs(cmount, pfix(path), &statfs_buf); | ||
if (ret < 0) { | ||
CEPHFS_ERR("ceph_statfs failed", ret); | ||
return -1; | ||
} | ||
|
||
stat_buf->f_bsize = statfs_buf.f_bsize; | ||
stat_buf->f_blocks = statfs_buf.f_blocks; | ||
stat_buf->f_bfree = statfs_buf.f_bfree; | ||
stat_buf->f_files = statfs_buf.f_files; | ||
stat_buf->f_ffree = statfs_buf.f_ffree; | ||
|
||
return 0; | ||
#else | ||
WARN("ceph_statfs requires statvfs!"); | ||
return -1; | ||
#endif | ||
} | ||
|
||
static int CEPHFS_MkDir(const char *path, mode_t mode, IOR_param_t *param) | ||
{ | ||
return ceph_mkdir(cmount, pfix(path), mode); | ||
} | ||
|
||
static int CEPHFS_RmDir(const char *path, IOR_param_t *param) | ||
{ | ||
return ceph_rmdir(cmount, pfix(path)); | ||
} | ||
|
||
static int CEPHFS_Access(const char *testFileName, int mode, IOR_param_t *param) | ||
{ | ||
struct stat buf; | ||
return ceph_stat(cmount, pfix(testFileName), &buf); | ||
} | ||
|
||
static int CEPHFS_Stat(const char *testFileName, struct stat *buf, IOR_param_t *param) | ||
{ | ||
return ceph_stat(cmount, pfix(testFileName), buf); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, the code might benefit from automatic detection of include/library files. It is reasonable to keep it function for now though.