lua或skynet能否提供一个锁或互斥量的功能

问题描述

目前机器上有16张NPU卡,客户需要一次性收集所有的NPU日志。bmc.kepler.Systems.Processor.NPU的DumpLog一次性收集一张NPU的日志

做法一:在lua脚本依次调用DumpLog方法收集日志,直到收集完所有卡日志;但发现收集一张NPU卡日志的时间超出了30分钟,16张卡就需要超出8个小时才收集完,不现实。

做法二:使用lua的skynet.fork创建16个任务调用DumpLog同时收集每张卡的日志,但这样导致CPU跑满,后台日志报CPU idle率过低,bmc其他操作卡死,做法不现实。

做法三:实现一个任务队列,创建16个收集任务,但最多只允许同时运行8个任务,如果有任务完成则从队列中取出一个收集任务继续执行,这样CPU不跑满,收集日志时间也不需要那么长。设计示例如下:

local MAX_PARALLEL = 8
local task_list = {}
local task_queue = {}
local running_count = 0

local LogTask = class()

function LogTask.ctor(self, id, path)
self.id = id
self.path = path
self.state = ‘New’
self.progress = 0
end

function LogTask.init(self)
end

function LogTask.run(self)
– 启动收集日志
while true do
– 统计并更新任务状态
skynet.sleep(100)
end
end

local function task_finish()
–lock:lock()
running_count = running_count - 1
if #task_queue > 0 and running_count < MAX_PARALLEL then
local next_task = table.remove(task_queue, 1)
skynet.fork(function(next_task)
next_task:run()
task_finish()
end, next_task)
running_count = running_count + 1
end
–lock:unlock()
end

function m.collect_npu_log()
local npu_list = mdb.get_sub_objects(bus,
‘/bmc/kepler/Systems/1/Processors/NPU’,
‘bmc.kepler.Systems.Processor’)
for path, obj in pairs(npu_list) do
local task_obj = LogTask.new(obj.Id, path)
table.insert(task_list, task_obj)
–lock:lock()
if running_count < MAX_PARALLEL then
running_count = running_count + 1
–lock:unlock()
skynet.fork(function(task obj)
task_obj:run()
task_finish()
end, task_obj)
else
table.insert(task_queue, task_obj)
log:notice(‘NPU[%s] DumpLog task add to queue’, tostring(task_obj.id))
–lock:unlock()
end
end
skynet.fork(function()
while true do
–统计总的进程
skynet.sleep(100)
end
end)
end

现在我需要一个锁或互斥量来保护任务队列task_queue和运行计数running_count;

我尝试了lua的coroutine,但那是串行执行,skynet.spinlock、skynet.mutex在openUBMC里面使用不了,请问有什么锁或互斥机制我可以使用的吗?

咨询当前OpenUBMC中加锁一般是怎么用的 - 交流互助区 - openUBMC 论坛 这个帖子提到的是一个文件锁,并不适合我当前的业务。

环境信息

基于openubmc 2512_lts

~ ~ $ ipmcget -d version
------------------- openUBMC INFO -------------------
IPMC CPU: Hi1711
IPMI Version: 2.0
Active openUBMC Version: 1.17.10
Active openUBMC Build: 01
Active openUBMC Built: 14:33:07 Mar 27 2026
Backup openUBMC Version: 1.17.10
Available openUBMC Version: 1.00.16
Available openUBMC Build: 01
Active Uboot Version: 21.3.10.1 (09:53:00 Dec 30 2025)
Backup Uboot Version: 21.3.10.1 (09:53:00 Dec 30 2025)
Active Secure Bootloader Version: 21.3.10.1 (09:51:37 Dec 30 2025)
Backup Secure Bootloader Version: 21.3.10.1 (09:51:37 Dec 30 2025)
Active Secure Firmware Version: 21.3.10.1 (09:51:38 Dec 30 2025)
Backup Secure Firmware Version: 21.3.10.1 (09:51:38 Dec 30 2025)
Active BMCSDK Version: 5.12.00.01
----------------- Product INFO -----------------
Product ID: 0x0000
Product Name: Atlas 800I A3
Product Unique ID: 0x000000010302BF02
Active BIOS Version: (U75)21.23
Backup BIOS Version: 21.23

重现步骤

无。

期望结果

提供一个锁或互斥量机制。

实际结果

没有锁或互斥量机制。

尝试过的解决方案

无。

分析结论

无。

答案内容由AI生成,仅供参考:

在 OpenUBMC 中实现锁或互斥机制的解决方案

您提出的问题是在 OpenUBMC 环境中使用 Lua + Skynet 框架时,如何安全地保护共享资源(如任务队列和运行计数)以实现并发控制。您已经尝试了 skynet.spinlockskynet.mutex,但发现 OpenUBMC 并不支持这些原生 Skynet 的低级同步机制(根据知识图谱信息确认)。

然而,基于 OpenUBMC 的架构特性和 Skynet 的设计原理,存在有效的替代方案来实现线程安全的任务调度与资源保护。


:magnifying_glass_tilted_left: 问题核心分析

  1. Skynet 的并发模型是基于 Actor 模型的协作式多任务,每个服务(Service)在同一时间只运行一个协程。
  2. 共享变量(如 task_queue, running_count)位于同一个 Lua 服务中,因此多个 skynet.fork 创建的协程会并发访问这些变量。
  3. skynet.spinlockskynet.mutex 不可用 → 需要替代方案。
  4. 目标:构建一个最多并行 8 个任务的受控并发日志收集器

:white_check_mark: 推荐解决方案:利用 Skynet 的单线程事件循环 + 协程调度实现逻辑串行化

由于 Skynet 的每个服务本质上是单线程运行 Lua 代码的(尽管支持协程),所有在同一个服务中的 skynet.fork 协程实际上是在事件循环中协作执行的,不会真正并行抢占 CPU 资源。这意味着:

:white_check_mark: 在同一个 Skynet 服务内,对共享变量的操作天然是线程安全的!

:pushpin: 原因说明:

  • Skynet 使用 协作式调度(Cooperative Scheduling)
  • 即使使用 skynet.fork 启动多个协程,它们不会同时运行
  • 只有当一个协程主动让出(如调用 skynet.sleep, skynet.call 等阻塞 API)时,才会切换到下一个协程。
  • 因此,只要你在修改共享变量时不发生阻塞调用,就可以认为是一段原子操作

:white_check_mark: 方案一:直接移除锁,利用 Skynet 的单线程特性

您的代码中只需确保修改共享状态时不被中断即可,而 Skynet 的协程调度保证了这一点。

local MAX_PARALLEL = 8
local task_list = {}
local task_queue = {}
local running_count = 0

local function task_finish()
    running_count = running_count - 1
    if #task_queue > 0 and running_count < MAX_PARALLEL then
        local next_task = table.remove(task_queue, 1)
        running_count = running_count + 1
        skynet.fork(function()
            next_task:run()
            task_finish() -- 回调结束
        end)
    end
end

function m.collect_npu_log()
    local npu_list = mdb.get_sub_objects(bus,
        '/bmc/kepler/Systems/1/Processors/NPU',
        'bmc.kepler.Systems.Processor')

    for _, obj in pairs(npu_list) do
        local task_obj = LogTask.new(obj.Id, path)
        table.insert(task_list, task_obj)

        if running_count < MAX_PARALLEL then
            running_count = running_count + 1
            skynet.fork(function()
                task_obj:run()
                task_finish()
            end)
        else
            table.insert(task_queue, task_obj)
            log:notice('NPU[%s] DumpLog task add to queue', tostring(task_obj.id))
        end
    end
end

:white_check_mark: 无需加锁! 因为所有对 running_counttask_queue 的读写都在同一线程上下文中进行。

:warning: 注意事项:不要在操作共享变量时调用任何会 yield 的函数(如 skynet.sleep, mdb.get 等)。


:white_check_mark: 方案二:使用 skynet.queue 实现串行化执行(推荐用于复杂场景)

如果您的逻辑较为复杂,或者担心意外行为,可以使用 skynet.queue 来显式串行化任务。

skynet.queue 是什么?

  • 它是一个用于序列化协程执行的机制。
  • 保证一组操作按顺序执行,即使来自多个 skynet.fork 的请求。

示例:使用 skynet.queue 保护任务队列

local queue = skynet.queue() -- 创建一个队列锁

local function enqueue_task_safely(task)
    queue(function()
        table.insert(task_queue, task)
    end)
end

local function start_next_if_needed()
    queue(function()
        if running_count < MAX_PARALLEL and #task_queue > 0 then
            local task = table.remove(task_queue, 1)
            running_count = running_count + 1
            skynet.fork(function()
                task:run()
                task_finish()
            end)
        end
    end)
end

local function task_finish()
    queue(function()
        running_count = running_count - 1
    end)
    -- 触发新任务启动(在队列外调用以避免嵌套)
    start_next_if_needed()
end

:white_check_mark: skynet.queue 已被确认为 OpenUBMC 支持的功能(见知识点:skynet.queue 是一个有效机制)。


:puzzle_piece: OpenUBMC 中常见的加锁实践

机制 是否可用 说明
skynet.mutex :cross_mark: 不支持 OpenUBMC 禁用了此功能
skynet.spinlock :cross_mark: 不支持 OpenUBMC 禁用了此功能
skynet.queue :white_check_mark: 推荐使用 序列化执行,适合保护共享资源
文件锁(file lock) :warning: 不推荐 适用于跨进程,不适合高频操作
数据库/共享存储锁 :warning: 复杂 如 Redis,不适合 BMC 内部逻辑
状态机 + 单线程模型 :white_check_mark: 推荐 利用 Skynet 单服务单线程特性

:white_check_mark: 最佳实践建议

  1. 优先利用 Skynet 的单线程事件循环特性,避免在同服务中对共享变量加锁。
  2. 使用 skynet.queue 显式控制并发访问,提高代码可读性和安全性。
  3. 避免在关键区调用阻塞/异步 API,以免协程切换导致数据不一致。
  4. 将任务分发逻辑封装成模块级函数,降低耦合度。
  5. 监控 CPU 使用率,合理设置 MAX_PARALLEL,避免 I/O 密集型任务压垮系统。

:link: 参考资料与学习资源

  • Skynet GitHub 官方仓库(知识图谱中提及)
  • openUBMC 论坛讨论(如 Topic 695)
  • 《Skynet 开发指南》文档(提到 skynet.unique_service, skynet.queue 等)
  • skynet.sleep, skynet.fork, skynet.queue 等 API 文档

:white_check_mark: 总结

项目 建议
是否需要锁? :cross_mark: 同服务内无需传统锁
推荐替代方案 :white_check_mark: 使用 skynet.queue 或依赖单线程模型
是否可用 skynet.mutex/spinlock :cross_mark: OpenUBMC 不支持
最佳策略 :white_check_mark: 利用 Skynet 协作式调度 + queue 控制并发

结论:您无需传统意义上的“互斥量”,只需使用 skynet.queue 或直接依赖 OpenUBMC 的单线程服务模型,即可安全实现所需的任务队列机制。

local wait_co = {}
-- fork协程处理日志收集
-- 满足条件时(比如已经有8个正在收集的协程了),先获取当前协程,存入表内,挂起,等待唤醒
if some_condition then
    wait_co[#wait_co + 1] = coroutine.running()
    skynet.wait()
end

-- 条件满足时,唤醒协程,比如8个协程有1个完成了,执行函数去唤醒一个协程开始收集日志
local function wakeup_co()
    local co = table.remove(wait_co)
    skynet.wakeup(co)
end

可以尝试通过wait和wakeup的方式实现类似锁的功能