自己做免费手机网站吗,网络营销难不难学,现在做跨境电商平台有哪些,中企高呈网站建设本地短视频服务器 背景#xff1a;我的NAS中存放了很多短视频#xff0c;多到很多没看过#xff0c;于是写了这个程序来随机查看并删除短视频 运行#xff1a; 安装依赖后运行main.py 直接使用docker#xff1a; docker pull realwang/short_video docker run -d -p 3000:…本地短视频服务器 背景我的NAS中存放了很多短视频多到很多没看过于是写了这个程序来随机查看并删除短视频 运行 安装依赖后运行main.py 直接使用docker docker pull realwang/short_video docker run -d -p 3000:3000 -v /path/to/your/video:/app/video realwang/short_video 功能 扫描本地视频和图片并在网页上显示 在网页上 点赞和删除文件 上下滑动来切换文件 媒体文件放在video目录下 本地数据库
代码由4个文件组成
1.数据库操作
# database.pyimport sqlite3def init_db():conn sqlite3.connect(media.db)c conn.cursor()c.execute(CREATE TABLE IF NOT EXISTS media(id INTEGER PRIMARY KEY, name TEXT, path TEXT, views INTEGER DEFAULT 0, likes INTEGER DEFAULT 0))conn.commit()conn.close()def add_media(name, path):try:conn sqlite3.connect(media.db)c conn.cursor()c.execute(INSERT INTO media (name, path) VALUES (?, ?), (name, path))conn.commit()conn.close()return Trueexcept sqlite3.IntegrityError:# 处理重复键错误等数据库约束错误return Falseexcept Exception as e:print(fError inserting {name} into database: {str(e)})return Falsedef update_views(media_id):conn sqlite3.connect(media.db)c conn.cursor()c.execute(UPDATE media SET views views 1 WHERE id ?, (media_id,))conn.commit()conn.close()def update_likes(media_id):conn sqlite3.connect(media.db)c conn.cursor()c.execute(UPDATE media SET likes likes 1 WHERE id ?, (media_id,))conn.commit()conn.close()def delete_media(media_id):conn sqlite3.connect(media.db)c conn.cursor()c.execute(DELETE FROM media WHERE id ?, (media_id,))conn.commit()conn.close()def get_random_media():conn sqlite3.connect(media.db)c conn.cursor()c.execute(SELECT id, name, path FROM media ORDER BY RANDOM() LIMIT 1)media c.fetchone()conn.close()return media
2.文件扫描
#scanner.py
import os
import time
import sqlite3
from database import init_db, add_mediadef scan_directory(directoryvideo):init_db()print(f扫描目录: {directory})for root, dirs, files in os.walk(directory):for file in files:if file.endswith((.mp4, .jpg, .png, .gif)):path os.path.join(root, file)print(f发现文件: {file} 路径: {path})try:result add_media(file, path)#print(fadd_media 返回: {result}) # Debug: Print the return valueif result:print(f插入 {file} 到数据库)else:print(f插入 {file} 到数据库失败)except Exception as e:print(f由于异常无法插入 {file} 到数据库: {str(e)})def incremental_scan(directoryvideo):scanned_files set()conn sqlite3.connect(media.db)c conn.cursor()c.execute(SELECT path FROM media)for row in c.fetchall():scanned_files.add(row[0])conn.close()print(开始增量扫描...)for root, dirs, files in os.walk(directory):for file in files:if file.endswith((.mp4, .jpg, .png, .gif)):path os.path.join(root, file)if path not in scanned_files:print(f发现新文件: {file} 路径: {path})try:result add_media(file, path)print(fadd_media 返回: {result}) # Debug: Print the return valueif result:print(f插入 {file} 到数据库)else:print(f插入 {file} 到数据库失败)except Exception as e:print(f由于异常无法插入 {file} 到数据库: {str(e)})else:print(f跳过已存在文件: {file})if __name__ __main__:init_db()while True:incremental_scan()time.sleep(3600) # 每小时扫描一次
3.web服务
#app.pyfrom flask import Flask, render_template, request, send_file, jsonify, send_from_directory
import os
import sqlite3
import random # 导入 random 模块
from database import init_db, update_views, update_likes, delete_media, get_random_mediaapp Flask(__name__)def is_video_file(path):return path.lower().endswith(.mp4)def is_image_file(path):return path.lower().endswith(.jpg) or path.lower().endswith(.jpeg) or path.lower().endswith(.png)def get_random_media():conn sqlite3.connect(media.db)c conn.cursor()rand_num random.random() # Generate a random number between 0 and 1if rand_num 0.9:c.execute(SELECT id, name, path FROM media WHERE path LIKE %.mp4 ORDER BY RANDOM() LIMIT 1;)else:c.execute(SELECT id, name, path FROM media WHERE path LIKE %.jpg OR path LIKE %.jpeg OR path LIKE %.png ORDER BY RANDOM() LIMIT 1;)media c.fetchone()conn.close()return mediaapp.route(/)
def index():media get_random_media()if media:media_id, name, path mediaupdate_views(media_id)conn sqlite3.connect(media.db)c conn.cursor()c.execute(SELECT likes FROM media WHERE id ?, (media_id,))likes c.fetchone()[0]conn.close()return render_template(index.html, media_idmedia_id, namename, pathpath, likeslikes,is_videois_video_file(path), is_imageis_image_file(path))return No media foundapp.route(/media/path:filename)
def media(filename):return send_from_directory(directoryos.path.dirname(filename), pathos.path.basename(filename))app.route(/like/int:media_id, methods[POST])
def like(media_id):update_likes(media_id)conn sqlite3.connect(media.db)c conn.cursor()c.execute(SELECT likes FROM media WHERE id ?, (media_id,))likes c.fetchone()[0]conn.close()return jsonify(successTrue, likeslikes)app.route(/delete/int:media_id, methods[POST])
def delete(media_id):conn sqlite3.connect(media.db)c conn.cursor()c.execute(SELECT path FROM media WHERE id ?, (media_id,))path c.fetchone()[0]conn.close()if os.path.exists(path):os.remove(path)delete_media(media_id)return jsonify(successTrue)app.route(/download/int:media_id)
def download(media_id):conn sqlite3.connect(media.db)c conn.cursor()c.execute(SELECT path FROM media WHERE id ?, (media_id,))path c.fetchone()[0]conn.close()return send_file(path, as_attachmentTrue)if __name__ __main__:init_db() # 确保数据库和表被初始化app.run(host0.0.0.0, port3000, debugTrue)
4.启动器
import os
import threading
import time# 定义运行scanner.py的函数
def run_scanner():# 清空并创建scanner.log文件with open(log/scanner.log, wb) as f:passos.system(python scanner.py log/scanner.log 21)# 定义运行app.py的函数
def run_app():# 清空并创建app.log文件with open(log/app.log, wb) as f:passos.system(python app.py log/app.log 21)if __name__ __main__:# 创建log子目录os.makedirs(log, exist_okTrue)# 创建并启动线程运行scanner.pyscanner_thread threading.Thread(targetrun_scanner)scanner_thread.start()# 等待3秒钟time.sleep(3)# 创建并启动线程运行app.pyapp_thread threading.Thread(targetrun_app)app_thread.start()