/* * Copyright (c) 2013-2015, dennis wang * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef RUST_WRAPPER #include "tcp_appender.h" #include #include #include #include "logger_impl.h" #include "util.h" #include "util/string_util.hh" TcpAppender::TcpAppender(LoggerImpl *impl, const std::string &name, const std::string &attribute) : _realName(name), _name("[" + name + "]"), _impl(impl), _port(12288), _running(true), _level(Logger::MIN), _loop(0), _channel(0), _maxLog(512) { // 获取配置 parseAttribute(attribute); // 建立网络循环 _loop = knet_loop_create(); // 建立网络连接 kchannel_ref_t *channel = knet_loop_create_channel(_loop, 128, 1024 * 256); knet_channel_ref_set_auto_reconnect(channel, 1); knet_channel_ref_set_ptr(channel, this); knet_channel_ref_set_cb(channel, connetor_cb); if (error_ok != knet_channel_ref_connect(channel, _ip.c_str(), _port, 5)) { throw LoggerException("Connect to logger server faild"); } // 启动异步日志线程 startThread(); } TcpAppender::~TcpAppender() { // 销毁未发送的日志 LogList::iterator guard = _logList.begin(); for (; guard != _logList.end(); guard++) { delete[] * guard; } if (_loop) { knet_loop_destroy(_loop); } } void TcpAppender::reload(const std::string &attribute) { parseAttribute(attribute); } void TcpAppender::show_level(bool) {} void TcpAppender::show_name(bool) {} void TcpAppender::write(int level, const char *format, ...) { if (level < _level) { return; } va_list va_ptr; va_start(va_ptr, format); try { write(getLevelString(level), format, va_ptr); } catch (std::exception &) { // 这里只会出现std::exception异常,吃掉这个异常,外部无法处理这个异常 return; } va_end(va_ptr); } void TcpAppender::setLevel(int level) { _level = level; } int TcpAppender::getLevel() { return _level; } void TcpAppender::destroy() { stopThread(); // 从管理器内删除 if (_impl) { _impl->remove(_realName); } // 销毁自己 delete this; } void TcpAppender::setChannel(kchannel_ref_t *channel) { _channel = channel; } void TcpAppender::write(const std::string &level, const char *format, va_list va_ptr) { std::string pattern = _pattern.getPrefix(); int len = 0; char buffer[64 * 1024] = {0}; if (!pattern.empty()) { // 日志行前缀 memcpy(buffer + len, pattern.c_str(), (int)pattern.size()); len += (int)pattern.size(); } if (!level.empty()) { // 日志等级 memcpy(buffer + len, level.c_str(), (int)level.size()); len += (int)level.size(); } if (!_name.empty()) { // 日志名称 memcpy(buffer + len, _name.c_str(), (int)_name.size()); len += (int)_name.size(); } int bytes = vsnprintf(buffer + len, sizeof(buffer) - len, format, va_ptr); if (0 >= bytes) { return; } len += bytes; char *log = new char[len + 1]; // 异常外部处理 log[len] = '\n'; log[len + 1] = 0; memcpy(log, buffer, len); ScopeLock guard(&_logLock); if ((int32_t)_logList.size() > _maxLog) { delete[] _logList.back(); _logList.pop_back(); } _logList.push_front(log); // 异常外部处理 } void TcpAppender::parseAttribute(const std::string &attribute) { std::vector tokens; kratos::util::split(attribute, ";", tokens); std::vector::iterator guard = tokens.begin(); // 解析日志属性配置 for (; guard != tokens.end(); guard++) { if (std::string::npos != guard->find("line=")) { _line = getAttribute("line=", *guard); _pattern.parse(_line); } else if (std::string::npos != guard->find("tcp://")) { std::string tcp = getAttribute("tcp://", *guard); std::vector result; kratos::util::split(tcp, ":", result); if (result.empty()) { throw LoggerException("Invalid TCP logger configuration"); } _ip = result[0]; if (result.size() == 2) { _port = atoi(result[1].c_str()); } } else if (std::string::npos != guard->find("level=")) { std::string level = getAttribute("level=", *guard); _level = atoi(level.c_str()); } else if (std::string::npos != guard->find("maxLog=")) { std::string maxLog = getAttribute("maxLog=", *guard); _maxLog = atoi(maxLog.c_str()); } } } std::string TcpAppender::getAttribute(const std::string &name, const std::string &attribute) { size_t pos = attribute.find(name); if (pos == std::string::npos) { throw LoggerException("invalid attribute"); } return attribute.substr(pos + name.size()); } void TcpAppender::doWrite(char *log) { kstream_t *stream = knet_channel_ref_get_stream(_channel); if (error_ok != knet_stream_push_varg(stream, "%s", log)) { pushLog(this, log); } } void TcpAppender::run() { char *log = 0; for (; _running;) { log = 0; // 当前日志行 knet_loop_run_once(_loop); // 运行网络循环 if (_channel) { log = popLog(this); // 取一个日志 if (log) { doWrite(log); } } } } void TcpAppender::connetor_cb(kchannel_ref_t *channel, knet_channel_cb_event_e e) { TcpAppender *appender = (TcpAppender *)knet_channel_ref_get_ptr(channel); if (e & channel_cb_event_connect) { appender->setChannel(channel); } else if (e & channel_cb_event_close) { appender->setChannel(0); } else if (e & channel_cb_event_timeout) { } else if (e & channel_cb_event_connect_timeout) { } else if (e & channel_cb_event_recv) { // 不处理返回数据 kstream_t *stream = knet_channel_ref_get_stream(channel); knet_stream_eat_all(stream); } } char *TcpAppender::popLog(TcpAppender *ap) { char *log = 0; ScopeLock guard(&ap->_logLock); if (!ap->_logList.empty()) { log = ap->_logList.back(); ap->_logList.pop_back(); } return log; } void TcpAppender::pushLog(TcpAppender *ap, char *log) { ScopeLock guard(&ap->_logLock); ap->_logList.push_back(log); } void TcpAppender::startThread() { #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) _tid = (HANDLE)_beginthread(&TcpAppender::write_thread, 0, this); #else pthread_create(&_tid, 0, &TcpAppender::write_thread, this); #endif // _WIN32 } void TcpAppender::stopThread() { _running = false; // 停止 #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) WaitForSingleObject(_tid, INFINITE); #else pthread_join(_tid, 0); #endif // _WIN32 } #if defined(_WIN32) || defined(WIN32) || defined(_WIN64) void TcpAppender::write_thread(void *params) { TcpAppender *ap = (TcpAppender *)params; ap->run(); } #else void *TcpAppender::write_thread(void *params) { TcpAppender *ap = (TcpAppender *)params; ap->run(); return 0; } #endif // _WIN32 #endif // !RUST_WRAPPER