在工业自动化领域,上位机系统需要与各类PLC、传感器和控制器进行稳定可靠的通信。传统做法往往针对单一协议开发独立采集模块,导致代码冗余、维护困难。我在某半导体工厂的MES系统升级项目中,就曾遇到需要同时对接Modbus、OPC UA和自定义串口协议的痛点场景。
这个模板框架的诞生,源于产线上一个真实教训:当时由于不同协议采集线程相互阻塞,导致压力传感器数据延迟超过2秒,险些造成批次产品报废。事后我们重构了整套采集架构,提炼出这套经过产线验证的解决方案。其核心价值在于:
框架采用明确的三层分离设计:
code复制采集层(Collectors) → 传输层(Channel) → 应用层(Services/UI)
这种架构带来的最大优势是:当需要新增西门子S7协议时,只需在采集层实现S7Collector,其他层代码零改动。
Tag.cs的设计经历了三次迭代。最终版本包含工业场景必需的元数据:
csharp复制public class Tag
{
public string Name { get; set; } // 点位名称(如"窑炉温度")
public string Protocol { get; set; } // 协议类型过滤标识
public string Address { get; set; } // 协议特定地址格式
public string DataType { get; set; } // 值类型校验依据
public object Value { get; set; } // 装箱的采集值
public DateTime Timestamp { get; set; }// 采集时间戳(重要!)
public double HighLimit { get; set; } // 报警阈值
public double LowLimit { get; set; } // 报警阈值
}
经验提示:Timestamp一定要在采集端生成,而非收到数据时生成。我们在某汽车焊装线就曾因时间戳不同步导致数据分析错误。
每个采集器独立使用System.Threading.Channels创建有界队列:
csharp复制private readonly Channel<Tag> _channel = Channel.CreateBounded<Tag>(
new BoundedChannelOptions(100){
FullMode = BoundedChannelFullMode.DropOldest // 关键配置!
});
这种设计带来两个工业场景优势:
原始代码中的ModbusCollector有几个工业场景需要加强的点:
产线实测表明,单寄存器读取效率极低。应改为批量读取:
csharp复制// 按地址连续分组
var addrGroups = _tags.GroupBy(t => ushort.Parse(t.Address) / 10);
foreach(var group in addrGroups){
ushort startAddr = ushort.Parse(group.First().Address);
ushort numRegs = (ushort)(group.Last().Address - startAddr + 1);
var values = await _master.ReadHoldingRegistersAsync(1, startAddr, numRegs);
foreach(var tag in group){
int offset = ushort.Parse(tag.Address) - startAddr;
tag.Value = ConvertValue(values[offset], tag.DataType);
}
}
频繁断开重连会导致某些设备进入保护状态。建议增加心跳维持:
csharp复制// 在CollectLoopAsync中添加
if((DateTime.Now - _lastActiveTime).TotalMinutes > 1){
await _master.ReadHoldingRegistersAsync(1, 0, 1); // 读保持寄存器0
_lastActiveTime = DateTime.Now;
}
工业环境中必须启用安全策略。修改ConnectAsync方法:
csharp复制private async Task ConnectAsync()
{
var config = new ApplicationConfiguration{
ApplicationUri = $"urn:{Dns.GetHostName()}:HMI",
SecurityConfiguration = new SecurityConfiguration{
ApplicationCertificate = new CertificateIdentifier{
StoreType = "X509Store",
StorePath = "CurrentUser\\My",
SubjectName = "CN=HMI"
},
TrustedPeerCertificates = new CertificateTrustList{
StoreType = "Directory",
StorePath = "C:\\Certs\\Trusted"
}
}
};
var endpoint = new EndpointDescription{
EndpointUrl = _endpointUrl,
SecurityPolicyUri = SecurityPolicies.Basic256Sha256
};
_session = await Session.Create(config, endpoint, false, "HMI", 60000,
new UserIdentity(new AnonymousIdentityToken()), null);
}
原始代码中的重试机制需要增加随机因子,避免多台设备同时重连:
csharp复制private int _retryDelayMs = 1000;
async Task ReconnectAsync(){
var random = new Random();
int actualDelay = _retryDelayMs + random.Next(500); // 添加随机扰动
await Task.Delay(actualDelay);
_retryDelayMs = Math.Min(_retryDelayMs * 2, 30000);
}
不同异常需要差异化处理:
csharp复制catch(ModbusIOException ex) when(ex.InnerException is SocketException){
// 网络异常立即重试
_retryDelayMs = 1000;
}
catch(ModbusIOException ex) when(ex.Message.Contains("IllegalFunction")){
// 功能码错误需要人工干预
LogError($"配置错误:{ex.Message}");
await Task.Delay(30000);
}
高频采集场景应避免内存分配:
csharp复制// 在SerialCollector中
private readonly ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
async Task ReadLoopAsync(){
var buffer = _pool.Rent(1024);
try{
int len = await _port.BaseStream.ReadAsync(buffer, 0, buffer.Length);
// 处理数据...
}finally{
_pool.Return(buffer);
}
}
根据系统负载自动调节:
csharp复制// 在MainForm中
private async Task MonitorSystemLoadAsync(){
while(true){
float cpuLoad = GetCpuUsage();
if(cpuLoad > 80){
foreach(var collector in _collectors){
collector.Interval *= 1.5f;
}
}
await Task.Delay(5000);
}
}
在飞腾D2000上需特别注意:
-p:DefineConstants="ARM64"bash复制sudo usermod -a -G dialout $USER
powershell复制Stop-Service "SysMain" -Force
Set-Service "SysMain" -StartupType Disabled
powershell复制powercfg /setactive 8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c
新增协议的标准流程:
XXXCollector.cs实现IDisposable以MQTT为例:
csharp复制public class MqttCollector : IDisposable
{
private readonly IMqttClient _client;
private readonly Channel<Tag> _channel;
public MqttCollector(string brokerUrl, List<Tag> tags){
_client = new MqttFactory().CreateMqttClient();
_client.ConnectedAsync += e => {
foreach(var tag in tags){
_client.SubscribeAsync(tag.Address);
}
return Task.CompletedTask;
};
_client.ApplicationMessageReceivedAsync += e => {
var tag = tags.First(t => t.Address == e.ApplicationMessage.Topic);
tag.Value = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
_channel.Writer.TryWrite(tag);
return Task.CompletedTask;
};
}
}
推荐采用时序数据库存储:
csharp复制// 在MainForm中添加
private async Task SaveToInfluxDBAsync()
{
await foreach(var tag in _centralChannel.Reader.ReadAllAsync()){
var point = new PointData(tag.Name)
.Field("value", Convert.ToDouble(tag.Value))
.Timestamp(tag.Timestamp, InfluxDB.Client.Api.Domain.WritePrecision.Ns);
await _writeApi.WritePointAsync(point);
}
}
这套框架已在多个工业现场稳定运行,包括:
实际部署时建议根据具体场景调整: