/* * Copyright (l) 2013-2015, dennis wang * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef RUST_WRAPPER #include #include #include #include "http_appender.h" #include "logger_impl.h" #include "util.h" HttpAppender::HttpAppender(LoggerImpl *impl, const std::string &name, const std::string &attribute) : _realName(name), _name("[" + name + "]"), _method("POST"), // 默认为POST _impl(impl), _uri(""), _poolSize(32), // 默认32个连接管道 _port(80), _running(true), _level(Logger::MIN), _chanCount(0), _loop(0) { // 获取配置 parseAttribute(attribute); // 建立网络循环 _loop = knet_loop_create(); // 获取主机地址 resolveHost(); // 启动异步日志线程 startThread(); } HttpAppender::~HttpAppender() { if (_loop) { knet_loop_destroy(_loop); } // 销毁未发送的日志 LogList::iterator guard = _logList.begin(); for (; guard != _logList.end(); guard++) { delete[] * guard; } } void HttpAppender::write(int level, const char *format, ...) { if (level < _level) { return; } va_list va_ptr; va_start(va_ptr, format); try { write(getLevelString(level), format, va_ptr); } catch (std::exception &) { // 这里只会出现std::exception异常,吃掉这个异常,外部无法处理这个异常 return; } va_end(va_ptr); } void HttpAppender::reload(const std::string &attribute) { parseAttribute(attribute); } void HttpAppender::show_level(bool) {} void HttpAppender::show_name(bool) {} void HttpAppender::setLevel(int level) { _level = level; } void HttpAppender::destroy() { stopThread(); // 从管理器内删除 if (_impl) { _impl->remove(_realName); } // 销毁自己 delete this; } void HttpAppender::write(const std::string &level, const char *format, va_list va_ptr) { std::string pattern = _pattern.getPrefix(); int len = 0; char buffer[64 * 1024] = {0}; if (!pattern.empty()) { // 日志行前缀 memcpy(buffer + len, pattern.c_str(), (int)pattern.size()); len += (int)pattern.size(); } if (!level.empty()) { // 日志等级 memcpy(buffer + len, level.c_str(), (int)level.size()); len += (int)level.size(); } if (!_name.empty()) { // 日志名称 memcpy(buffer + len, _name.c_str(), (int)_name.size()); len += (int)_name.size(); } int bytes = vsnprintf(buffer + len, sizeof(buffer) - len, format, va_ptr); if (0 >= bytes) { return; } len += bytes; char *log = new char[len + 1]; // 异常外部处理 memcpy(log, buffer, len); log[len] = 0; ScopeLock guard(&_logLock); _logList.push_front(log); // 异常外部处理 } void HttpAppender::addChan() { _chanCount += 1; } void HttpAppender::subChan() { _chanCount -= 1; } void HttpAppender::parseAttribute(const std::string &attribute) { std::vector tokens; kratos::util::split(attribute, ";", tokens); std::vector::iterator guard = tokens.begin(); // 解析日志属性配置 for (; guard != tokens.end(); guard++) { if (std::string::npos != guard->find("line=")) { _line = getAttribute("line=", *guard); _pattern.parse(_line); } else if (std::string::npos != guard->find("http://")) { _host = getAttribute("http://", *guard); } else if (std::string::npos != guard->find("port=")) { std::string port = getAttribute("port=", *guard); _port = atoi(port.c_str()); } else if (std::string::npos != guard->find("pool=")) { std::string pool = getAttribute("pool=", *guard); _poolSize = atoi(pool.c_str()); } else if (std::string::npos != guard->find("uri=")) { _uri = getAttribute("uri=", *guard); } else if (std::string::npos != guard->find("method=")) { _method = getAttribute("method=", *guard); } else if (std::string::npos != guard->find("level=")) { std::string level = getAttribute("level=", *guard); _level = atoi(level.c_str()); } } } std::string HttpAppender::getAttribute(const std::string &name, const std::string &attribute) { size_t pos = attribute.find(name); if (pos == std::string::npos) { throw LoggerException("invalid attribute"); } return attribute.substr(pos + name.size()); } int HttpAppender::postLog(kstream_t *stream, HttpAppender *ap, const char *log) { // 发送HTTP请求 return knet_stream_push_varg(stream, "%s /%s HTTP/1.1\r\n" "Host: %s\r\n" "Content-type: text/plain\r\n" "Content-length: %d\r\n" "%s\r\n\r\n", ap->_method.c_str(), ap->_uri.c_str(), ap->_host.c_str(), strlen(log), log); } void HttpAppender::doHttpWrite(char *log) { if (_chanCount < _poolSize) { // 建立一个管道 kchannel_ref_t *channel = knet_loop_create_channel(_loop, 128, 512); knet_channel_ref_set_cb(channel, &HttpAppender::connetor_cb); // 连接HTTP服务器 if (error_ok == knet_channel_ref_connect(channel, _ip.c_str(), _port, 5)) { Log_ *l = new Log_(this, log); knet_channel_ref_set_ptr(channel, l); knet_channel_ref_set_timeout(channel, 5); // 5秒后断开 addChan(); } } else { // 管道数量到达上限 // 重新放回日志队列 pushLog(this, log); } } char *HttpAppender::popLog(HttpAppender *ap) { char *log = 0; ScopeLock guard(&ap->_logLock); if (!ap->_logList.empty()) { log = ap->_logList.back(); ap->_logList.pop_back(); } return log; } void HttpAppender::pushLog(HttpAppender *ap, char *log) { ScopeLock guard(&ap->_logLock); ap->_logList.push_back(log); } void HttpAppender::run() { char *log = 0; for (; _running;) { log = 0; // 当前日志行 knet_loop_run_once(_loop); // 运行网络循环 log = popLog(this); // 取一个日志 if (log) { doHttpWrite(log); } } } #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) void HttpAppender::write_thread(void *params) { HttpAppender *ap = (HttpAppender *)params; ap->run(); } #else void *HttpAppender::write_thread(void *params) { HttpAppender *ap = (HttpAppender *)params; ap->run(); return 0; } #endif // _WIN32 void HttpAppender::doChanClose(kchannel_ref_t *channel) { Log_ *l = (Log_ *)knet_channel_ref_get_ptr(channel); if (l) { if (l->getLog()) { if (l->isPosted()) { // 发送成功销毁 l->delLog(); } else { // 未发送成功的log重新放回 l->repush(); } } l->subChan(); delete l; } } void HttpAppender::doChanConnect(kchannel_ref_t *channel) { Log_ *l = (Log_ *)knet_channel_ref_get_ptr(channel); if (l) { kstream_t *stream = knet_channel_ref_get_stream(channel); // post Log l->setPosted(false); if (error_ok == postLog(stream, l->getAppender(), l->getLog())) { l->setPosted(true); } else { // 失败时,在channel_cb_event_close处理内处理 } } } void HttpAppender::connetor_cb(kchannel_ref_t *channel, knet_channel_cb_event_e e) { if (e & channel_cb_event_connect) { doChanConnect(channel); } else if (e & channel_cb_event_close) { doChanClose(channel); } else if (e & channel_cb_event_timeout) { knet_channel_ref_close(channel); } else if (e & channel_cb_event_connect_timeout) { knet_channel_ref_close(channel); } else if (e & channel_cb_event_recv) { // 不处理HTTP服务的返回数据 kstream_t *stream = knet_channel_ref_get_stream(channel); knet_stream_eat_all(stream); } } void HttpAppender::resolveHost() { char ip[32] = {0}; if (error_ok != get_host_ip_string(_host.c_str(), ip, sizeof(ip))) { throw LoggerException("resolve host failed"); } _ip = ip; } void HttpAppender::startThread() { #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) _tid = (HANDLE)_beginthread(&HttpAppender::write_thread, 0, this); #else pthread_create(&_tid, 0, &HttpAppender::write_thread, this); #endif // _WIN32 } void HttpAppender::stopThread() { _running = false; // 停止 #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) WaitForSingleObject(_tid, INFINITE); #else pthread_join(_tid, 0); #endif // _WIN32 } int HttpAppender::getLevel() { return _level; } #endif // !RUST_WRAPPER