前言
提到监控,大家想到的大概是 UptimeRobot ,个人免费 50 个站点,而且监控页面也很炫酷
但是有个缺点就是似乎定制域名的功能要付费?
试过把站点扒下来,但是好像有 CORS
的跨域问题((((
最近几天,朋友那里发现了一款替代品,这页面感觉比 UptimeRobot 还炫酷 *
见图 👀
后台也十分好看 ~
详情页面
并且还实现了微信推送的功能
实现步骤
搭建部分
项目地址
>
Replit
推荐部署仓库:github.com/valetzx/uptimekumaonreplit
直接跟教程搭建即可,重点讲一下怎么把推送发到 【正常微信】。
内置一个企业微信通道,但是正常人谁用那玩意啊))))
这里用到的一个项目是方糖的开源版(因为我穷,学生嘛,理解一下))
-
easychen/wecomchan: 通过企业微信向微信推送消息的配置文档、直推函数和可自行搭建的在线服务代码。可以看成Server酱的开源替代方案之一。
其实这个项目里面的 README.md
写的已经很清楚了,甚至连用法都有......
PYTHON版:
def send_to_wecom(text,wecom_cid,wecom_aid,wecom_secret,wecom_touid='@all'):
return response
else:
return False
def send_to_wecom_image(base64_content,wecom_cid,wecom_aid,wecom_secret,wecom_touid='@all'):
return response
else:
return False
def send_to_wecom_markdown(text,wecom_cid,wecom_aid,wecom_secret,wecom_touid='@all'):
return response
else:
return False
使用实例:
ret = send_to_wecom("推送测试\r\n测试换行", "企业ID③", "应用ID①", "应用secret②");
print(ret)
ret = send_to_wecom('<a href="https://www.github.com/">文本中支持超链接</a>', "企业ID③", "应用ID①", "应用secret②");
print(ret)
ret = send_to_wecom_image("此处填写图片Base64", "企业ID③", "应用ID①", "应用secret②");
print(ret)
ret = send_to_wecom_markdown("**Markdown 内容**", "企业ID③", "应用ID①", "应用secret②");
print(ret)
那么感觉也没啥讲的,直接讲怎么搭建实现吧。
首先去按教程
- github.com/easychen/wecomchan#企业微信应用消息配置说明
配置好自己的企业微信,那么我们可以拿到这么几个字段
wecom_cid = "wwXXXXXXXXXXXXXXX"
wecom_aid = "1000XXX"
wecom_secret = "XXXXXXXXXXXX-XXXXXXXXXXX-XXXXXXXXXXXXXXXX"
就这三个字段就够了
然后去写一下 FastAPI 的配置
这个经过我加工了一下....支持了 redis 缓存,需要
Redis
环境loguru
记录日志subprocess
用于启动Redis
FastAPI
主要框架uvicorn
Fast API 启动器)不过,我推荐使用
Replit
直接部署,把以下文件直接复制粘贴进去,点RUN
就行了()(不行的话来评论区
.\main.py
import base64
import json
import requests
import uvicorn
import redis
import logger as lg
from fastapi import FastAPI, Form
import subprocess
start_redis = "redis-server redis.conf"
r = redis.Redis(host='localhost', port=6379, db=0)
# 如果你用的个人版,请将以下填入环境变量中
# 并取消注释以下代码
# import os
# my_secret = os.environ['wecom_cid']
# wecom_aid = os.environ['wecom_aid']
# wecom_secret = os.environ['wecom_secret']
wecom_cid = "wwXXXXXXXXXXXXXXX"
wecom_aid = "1000XXX"
wecom_secret = "XXXXXXXXXXXX-XXXXXXXXXXX-XXXXXXXXXXXXXXXX"
app = FastAPI()
def get_token():
global wecom_cid, wecom_aid, wecom_secret
access_token = r.get('token')
if not access_token:
lg.logger_info('Redis access_token is empty, get from wecom')
get_token_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={wecom_cid}&corpsecret={wecom_secret}"
lg.logger_info('get_token_url: ' + get_token_url)
response = requests.get(get_token_url).content
lg.logger_info('response: ' + str(response))
access_token = json.loads(response).get('access_token')
lg.logger_success('access_token: ' + str(access_token))
r.set('token', access_token, ex=7000)
else:
access_token = access_token.decode('utf-8')
lg.logger_success(f"从Redis 中拿到 access_token")
return access_token
def send_to_wecom(text, wecom_touid='@all'):
access_token = get_token()
lg.logger_success("message: " + str(text))
if access_token and len(access_token) > 0:
send_msg_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
data = {
"touser": wecom_touid,
"agentid": wecom_aid,
"msgtype": "text",
"text": {
"content": text
},
"duplicate_check_interval": 600
}
response = requests.post(send_msg_url, data=json.dumps(data)).content
lg.logger_info("response: " + str(response))
return response
else:
return False
def send_to_wecom_image(base64_content, wecom_touid='@all'):
access_token = get_token()
lg.logger_info('access_token: ' + str(access_token))
if access_token and len(access_token) > 0:
upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type=image'
upload_response = requests.post(upload_url, files={
"picture": base64.b64decode(base64_content)
}).json()
if "media_id" in upload_response:
media_id = upload_response['media_id']
else:
return False
send_msg_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
data = {
"touser": wecom_touid,
"agentid": wecom_aid,
"msgtype": "image",
"image": {
"media_id": media_id
},
"duplicate_check_interval": 600
}
response = requests.post(send_msg_url, data=json.dumps(data)).content
lg.logger_success("response: " + str(response))
return response
else:
return False
def send_to_wecom_markdown(text, wecom_touid='@all'):
access_token = get_token()
if access_token and len(access_token) > 0:
send_msg_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
data = {
"touser": wecom_touid,
"agentid": wecom_aid,
"msgtype": "markdown",
"markdown": {
"content": text
},
"duplicate_check_interval": 600
}
lg.logger_success("message: " + str(text))
response = requests.post(send_msg_url, data=json.dumps(data)).content
lg.logger_success("response: " + str(response))
return response
else:
return False
@app.post("/")
def main(type: str = Form(...), title: str = Form(...), body: str = Form(...), wecom_touid: str = Form(...)):
if type == 'note':
lg.logger_info(f'收到笔记消息:{title}')
data = send_to_wecom(title + '\n' + body, wecom_touid)
elif type == 'image':
lg.logger_info(f'收到图片消息:{title}')
data = send_to_wecom_image(body, wecom_touid)
elif type == 'markdown':
lg.logger_info(f'收到markdown消息:{title}')
data = send_to_wecom_markdown(title + '\n' + body, wecom_touid)
else:
data = send_to_wecom(title + '\n' + body, wecom_touid)
return data
@app.get("/")
def get():
return {"msg": "好耶,部署成功了!但是值得注意的是请不要将此地址告诉别人((防止微信消息被刷爆"}
@app.head("/")
def head():
return {"msg": "好耶,部署成功了!但是值得注意的是请不要将此地址告诉别人((防止微信消息被刷爆"}
if __name__ == "__main__":
print("start redis")
subprocess.Popen(start_redis, shell=True)
uvicorn.run("main:app", host="0.0.0.0", port=8080, log_level="info")
日志功能
.\logger.py
# coding:utf-8
from loguru import logger
logger.add("./log/file_{time}.log", rotation="20 MB")
def logger_error(msg):
logger.error(msg)
def logger_warning(msg):
logger.warning(msg)
def logger_debug(msg):
logger.debug(msg)
def logger_exception(msg):
logger.exception(msg)
def logger_critical(msg):
logger.critical(msg)
def logger_success(msg):
logger.success(msg)
def logger_info(msg):
logger.info(msg)
def logger_trace(msg):
logger.trace(msg)
.\redis.conf
bind 127.0.0.1 -::1
protected-mode yes
port 6379
tcp-backlog 511
timeout 0
tcp-keepalive 300
daemonize no
pidfile /var/run/redis_6379.pid
loglevel notice
logfile ""
databases 16
always-show-logo no
set-proc-title yes
proc-title-template "{title} {listen-addr} {server-mode}"
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
rdb-del-sync-files no
dir ./
replica-serve-stale-data yes
replica-read-only yes
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-diskless-load disabled
repl-disable-tcp-nodelay no
replica-priority 100
acllog-max-len 128
lazyfree-lazy-eviction no
lazyfree-lazy-expire no
lazyfree-lazy-server-del no
replica-lazy-flush no
lazyfree-lazy-user-del no
lazyfree-lazy-user-flush no
oom-score-adj no
oom-score-adj-values 0 200 800
disable-thp yes
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-load-truncated yes
aof-use-rdb-preamble yes
lua-time-limit 5000
slowlog-log-slower-than 10000
slowlog-max-len 128
latency-monitor-threshold 0
notify-keyspace-events ""
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
list-compress-depth 0
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
hll-sparse-max-bytes 3000
stream-node-max-bytes 4096
stream-node-max-entries 100
activerehashing yes
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit replica 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
hz 10
dynamic-hz yes
aof-rewrite-incremental-fsync yes
rdb-save-incremental-fsync yes
jemalloc-bg-thread yes
.\replit.nix
(若 使用的 Replit)
{ pkgs }: {
deps = [
pkgs.python38Full
pkgs.redis
];
env = {
PYTHON_LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath [
# Neded for pandas / numpy
pkgs.stdenv.cc.cc.lib
pkgs.zlib
# Needed for pygame
pkgs.glib
# Needed for matplotlib
pkgs.xorg.libX11
];
PYTHONBIN = "${pkgs.python38Full}/bin/python3.8";
LANG = "en_US.UTF-8";
};
}
.\.replit
# The command that runs the program.
run = ["python3", "main.py"]
# The primary language of the repl. There can be others, though!
language = "python3"
# The main file, which will be shown by default in the editor.
entrypoint = "main.py"
# A list of globs that specify which files and directories should
# be hidden in the workspace.
hidden = ["venv", ".config", "**/__pycache__", "**/.mypy_cache", "**/*.pyc"]
# Specifies which nix channel to use when building the environment.
[nix]
channel = "stable-21_11"
# Per-language configuration: python3
[languages.python3]
# Treats all files that end with `.py` as Python.
pattern = "**/*.py"
# Tells the workspace editor to syntax-highlight these files as
# Python.
syntax = "python"
# The command needed to start the Language Server Protocol. For
# linting and formatting.
[languages.python3.languageServer]
start = ["pyls"]
# The command to start the interpreter.
[interpreter]
[interpreter.command]
args = [
"stderred",
"--",
"prybar-python3",
"-q",
"--ps1",
"\u0001\u001b[33m\u0002\u0001\u001b[00m\u0002 ",
"-i",
]
env = { LD_LIBRARY_PATH = "$PYTHON_LD_LIBRARY_PATH" }
# The environment variables needed to correctly start Python and use the
# package proxy.
[env]
VIRTUAL_ENV = "/home/runner/${REPL_SLUG}/venv"
PATH = "${VIRTUAL_ENV}/bin"
PYTHONPATH="${VIRTUAL_ENV}/lib/python3.8/site-packages"
REPLIT_POETRY_PYPI_REPOSITORY="https://package-proxy.replit.com/pypi/"
MPLBACKEND="TkAgg"
# Enable unit tests. This is only supported for a few languages.
[unitTest]
language = "python3"
# Add a debugger!
[debugger]
support = true
# How to start the debugger.
[debugger.interactive]
transport = "localhost:0"
startCommand = ["dap-python", "main.py"]
# How to communicate with the debugger.
[debugger.interactive.integratedAdapter]
dapTcpAddress = "localhost:0"
# How to tell the debugger to start a debugging session.
[debugger.interactive.initializeMessage]
command = "initialize"
type = "request"
[debugger.interactive.initializeMessage.arguments]
adapterID = "debugpy"
clientID = "replit"
clientName = "replit.com"
columnsStartAt1 = true
linesStartAt1 = true
locale = "en-us"
pathFormat = "path"
supportsInvalidatedEvent = true
supportsProgressReporting = true
supportsRunInTerminalRequest = true
supportsVariablePaging = true
supportsVariableType = true
# How to tell the debugger to start the debuggee application.
[debugger.interactive.launchMessage]
command = "attach"
type = "request"
[debugger.interactive.launchMessage.arguments]
logging = {}
# Configures the packager.
[packager]
# Search packages in PyPI.
language = "python3"
# Never attempt to install `unit_tests`. If there are packages that are being
# guessed wrongly, add them here.
ignoredPackages = ["unit_tests"]
[packager.features]
enabledForHosting = false
# Enable searching packages from the sidebar.
packageSearch = true
# Enable guessing what packages are needed from the code.
guessImports = true
OK,跑起来之后,就实现了自动刷新缓存 token 推送微信的功能
Try Post 发送请求至部署的地址
POST Method
Body X-WWW-form-urlencoded 表单
"type": "Note",
"title": "Test title",
"body": "Test body",
"wecom_touid": "@all"
应该可以收到消息了,那么这么一个推送端我们就搭好了
只需要对接 Uptime
就 OK 了,我选择的是改造 pushbullet.js
这个推送源(里面的推送网址改成你的)
.\server\notification-providers\pushbullet.js
const NotificationProvider = require("./notification-provider");
const axios = require("axios");
var qs = require('qs');
const { DOWN, UP } = require("../../src/util");
class Pushbullet extends NotificationProvider {
name = "pushbullet";
async send(notification, msg, monitorJSON = null, heartbeatJSON = null) {
let okMsg = "Sent Successfully.";
try {
let pushbulletUrl = "https://xxxxx.xxxxxxxxxxxxxx.xxxx.xx";
let config = {
headers: {
"Content-Type": "application/json"
}
};
if (heartbeatJSON == null) {
let testdata = {
"type": "note",
"title": "Uptime Kuma Alert",
"body": "Testing Successful.",
"wecom_touid": notification.pushbulletAccessToken
}
// 提交 构造from表单
var access_token_data = qs.stringify(testdata);
var _config = {
method: 'post',
url: pushbulletUrl,
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
data : access_token_data
};
await axios(_config)
} else if (heartbeatJSON["status"] == DOWN) {
let downdata = {
"type": "note",
"title": "UptimeKuma Alert: " + monitorJSON["name"],
"body": "[🔴 Down] " + heartbeatJSON["msg"] + "\nTime (UTC): " + heartbeatJSON["time"],
"wecom_touid": notification.pushbulletAccessToken
}
// 提交 构造from表单
var access_token_data = qs.stringify(downdata);
var _config = {
method: 'post',
url: pushbulletUrl,
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
data : access_token_data
};
await axios(_config)
} else if (heartbeatJSON["status"] == UP) {
let updata = {
"type": "note",
"title": "UptimeKuma Alert: " + monitorJSON["name"],
"body": "[✅ Up] " + heartbeatJSON["msg"] + "\nTime (UTC): " + heartbeatJSON["time"],
"wecom_touid": notification.pushbulletAccessToken
}
// 提交 构造from表单
var access_token_data = qs.stringify(updata);
var _config = {
method: 'post',
url: pushbulletUrl,
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
data : access_token_data
};
await axios(_config)
}
return okMsg;
} catch (error) {
this.throwGeneralAxiosError(error)
}
}
}
module.exports = Pushbullet;
因为我技术菜,所以这里多引入了一个库 qs
,需要引入一下
package.json
"dependencies": {
+ "qs": "6.10.3",
},
ok,这样部署好了,但是我们还缺少一个参数
wecom_touid
:到底要发给谁呢?- 你可以选这两种方式 :
@all
推送给所有关注服务的人,也可以填用户ID
用户ID在这里看
里面的
这个就是 用户 ID ,成功将监控项目跑起来之后
添加通知项,选择 pushbullet
里面的 Access Token
填
要通知的
用户 ID
或
@all
点击测试,能收到消息即搭建成功
顺便提一嘴
方糖的 PushDeer
也对接成功了
还是那个文件
.\server\notification-providers\pushbullet.js
const NotificationProvider = require("./notification-provider");
const axios = require("axios");
var qs = require('qs');
const { DOWN, UP } = require("../../src/util");
class Pushbullet extends NotificationProvider {
name = "pushbullet";
async send(notification, msg, monitorJSON = null, heartbeatJSON = null) {
let okMsg = "Sent Successfully.";
try {
let pushbulletUrl = "https://sc.ftqq.com/" + notification.pushbulletAccessToken + ".send";
let config = {
headers: {
"Content-Type": "application/json"
}
};
if (heartbeatJSON == null) {
let testdata = {
"title": "Uptime Kuma Alert",
"desp": "Testing Successful."
}
// 提交 构造from表单
var access_token_data = qs.stringify(testdata);
var _config = {
method: 'post',
url: pushbulletUrl,
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
data : access_token_data
};
await axios(_config)
} else if (heartbeatJSON["status"] == DOWN) {
let downdata = {
"title": "UptimeKuma Alert: " + monitorJSON["name"],
"desp": "[🔴 Down] " + heartbeatJSON["msg"] + "\nTime (UTC): " + heartbeatJSON["time"],
}
// 提交 构造from表单
var access_token_data = qs.stringify(downdata);
var _config = {
method: 'post',
url: pushbulletUrl,
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
data : access_token_data
};
await axios(_config)
} else if (heartbeatJSON["status"] == UP) {
let updata = {
"title": "UptimeKuma Alert: " + monitorJSON["name"],
"desp": "[✅ Up] " + heartbeatJSON["msg"] + "\nTime (UTC): " + heartbeatJSON["time"],
}
// 提交 构造from表单
var access_token_data = qs.stringify(updata);
var _config = {
method: 'post',
url: pushbulletUrl,
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
data : access_token_data
};
await axios(_config)
}
return okMsg;
} catch (error) {
this.throwGeneralAxiosError(error)
}
}
}
module.exports = Pushbullet;
这个针对方糖的订阅用户((
还是那个 pushbullet
通道,Access Token
填成你的就行
类似于
SCT888888XXXXXXXXXXXXXXXXXXXXXXX
这种的,填上测试一下,如果收到消息即对接成功(老规矩要先加上那个 qs
的库)
有问题评论区联系(