在C#中,Stream是所有流操作的抽象基类,它定义了读写字节序列的基本操作。我经常把它比作水管系统——数据就像水流一样在管道中传输,而Stream就是控制这个传输过程的阀门和泵站。理解Stream的工作机制,对于处理文件、网络通信和内存操作都至关重要。
.NET框架中常见的Stream派生类包括:
重要提示:所有Stream对象都实现了IDisposable接口,使用完毕后必须调用Dispose()方法或使用using语句块来释放资源。这是很多新手容易忽略的关键点。
同步读写是最基础的流操作方法,主要包括Read和Write两个核心方法。让我们看一个文件复制的经典示例:
csharp复制using (FileStream source = new FileStream("source.txt", FileMode.Open))
using (FileStream destination = new FileStream("destination.txt", FileMode.Create))
{
byte[] buffer = new byte[4096]; // 4KB缓冲区
int bytesRead;
while ((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0)
{
destination.Write(buffer, 0, bytesRead);
}
}
这个例子展示了几个关键点:
在现代应用中,异步操作变得越来越重要。.NET提供了ReadAsync和WriteAsync方法来实现非阻塞IO:
csharp复制public async Task CopyFileAsync(string sourcePath, string destPath)
{
using (FileStream source = new FileStream(sourcePath, FileMode.Open))
using (FileStream destination = new FileStream(destPath, FileMode.Create))
{
byte[] buffer = new byte[8192]; // 8KB缓冲区
int bytesRead;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await destination.WriteAsync(buffer, 0, bytesRead);
}
}
}
异步操作的关键注意事项:
Stream的Position属性表示当前流中的位置,这个功能在处理特定格式的文件时特别有用。例如,解析一个文件头:
csharp复制using (FileStream fs = new FileStream("data.bin", FileMode.Open))
{
// 读取文件头(前4字节)
byte[] header = new byte[4];
fs.Read(header, 0, 4);
// 根据头信息跳转到数据区
uint dataOffset = BitConverter.ToUInt32(header, 0);
fs.Position = dataOffset;
// 读取实际数据
// ...
}
Seek方法提供了更灵活的定位方式,支持从不同参考点开始定位:
csharp复制// 从当前位置向后移动100字节
stream.Seek(100, SeekOrigin.Current);
// 移动到文件末尾前50字节处
stream.Seek(-50, SeekOrigin.End);
// 绝对定位到第200字节
stream.Seek(200, SeekOrigin.Begin);
实际经验:在处理大文件时,合理使用Seek可以避免加载不必要的数据到内存,显著提高性能。
Stream的一个强大特性是可以将多个流链接起来,形成处理管道。例如,压缩并加密文件:
csharp复制using (FileStream fileStream = File.Create("output.compressed"))
using (CryptoStream cryptoStream = new CryptoStream(fileStream, encryptor, CryptoStreamMode.Write))
using (GZipStream compressStream = new GZipStream(cryptoStream, CompressionMode.Compress))
using (StreamWriter writer = new StreamWriter(compressStream))
{
writer.Write("这是一段需要压缩加密的敏感数据");
}
这种链式结构的关键点:
MemoryStream在测试和临时数据处理中非常有用:
csharp复制// 创建并写入内存流
var ms = new MemoryStream();
var sw = new StreamWriter(ms);
sw.Write("Hello, MemoryStream!");
sw.Flush(); // 必须刷新缓冲区
// 读取内存流内容
ms.Position = 0; // 重置位置
var sr = new StreamReader(ms);
string content = sr.ReadToEnd();
// 获取原始字节数组
byte[] data = ms.ToArray();
使用MemoryStream的注意事项:
缓冲区大小对流操作的性能有显著影响。根据我的实测经验:
| 缓冲区大小 | 读取1GB文件时间 | 内存占用 |
|---|---|---|
| 1KB | 12.3秒 | 低 |
| 4KB | 4.7秒 | 低 |
| 8KB | 3.8秒 | 低 |
| 64KB | 3.2秒 | 中 |
| 1MB | 3.1秒 | 高 |
建议:
对于高频的流操作,可以使用ArrayPool减少GC压力:
csharp复制// 租用缓冲区
byte[] buffer = ArrayPool<byte>.Shared.Rent(8192);
try
{
int bytesRead;
while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
{
// 处理数据...
}
}
finally
{
// 归还缓冲区
ArrayPool<byte>.Shared.Return(buffer);
}
这种方法特别适合高性能服务器应用,可以显著减少内存分配开销。
"ObjectDisposedException: Cannot access a closed Stream"是常见错误,通常是因为:
解决方案:
csharp复制// 某些流构造函数提供leaveOpen参数
var reader = new StreamReader(stream, Encoding.UTF8, true, 1024, true);
在UI线程中同步等待异步流操作可能导致死锁:
csharp复制// 错误示例 - 可能导致死锁
var data = stream.ReadAsync(buffer, 0, buffer.Length).Result;
// 正确做法
var data = await stream.ReadAsync(buffer, 0, buffer.Length);
经验法则:async/await要一路向上"传染",不要在中间混合同步等待。
处理大文件时需要特别注意内存使用:
csharp复制var options = FileOptions.SequentialScan | FileOptions.Asynchronous;
using (var fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, options))
{
// 分块处理逻辑
}
基于Stream知识,我们可以实现可靠的分块上传:
csharp复制public async Task UploadInChunksAsync(Stream source, string targetUrl, int chunkSize = 8192)
{
byte[] buffer = new byte[chunkSize];
int chunkNumber = 0;
int bytesRead;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
// 创建分块数据包
var chunkData = new MemoryStream(buffer, 0, bytesRead);
// 上传分块(包含校验信息)
await UploadChunkAsync(chunkData, targetUrl, chunkNumber++);
// 报告进度
ReportProgress(source.Position, source.Length);
}
// 通知服务器完成上传
await CompleteUploadAsync(targetUrl);
}
结合CryptoStream和压缩流实现安全存储:
csharp复制public void EncryptAndCompressFile(string inputPath, string outputPath, byte[] key, byte[] iv)
{
using (Aes aes = Aes.Create())
using (FileStream input = File.OpenRead(inputPath))
using (FileStream output = File.Create(outputPath))
{
aes.Key = key;
aes.IV = iv;
using (CryptoStream cryptoStream = new CryptoStream(
output,
aes.CreateEncryptor(),
CryptoStreamMode.Write))
using (GZipStream compressStream = new GZipStream(
cryptoStream,
CompressionMode.Compress))
{
input.CopyTo(compressStream);
}
}
}
当内置流不能满足需求时,可以继承Stream类实现自定义流:
csharp复制public class XORStream : Stream
{
private readonly Stream _baseStream;
private readonly byte _xorKey;
public XORStream(Stream baseStream, byte xorKey)
{
_baseStream = baseStream;
_xorKey = xorKey;
}
public override int Read(byte[] buffer, int offset, int count)
{
int bytesRead = _baseStream.Read(buffer, offset, count);
for (int i = 0; i < bytesRead; i++)
{
buffer[offset + i] ^= _xorKey;
}
return bytesRead;
}
// 实现其他必要成员...
}
实现一个可以报告进度的包装流:
csharp复制public class ProgressStream : Stream
{
private readonly Stream _innerStream;
private readonly Action<long, long> _progressCallback;
private long _totalRead;
public ProgressStream(Stream innerStream, Action<long, long> progressCallback)
{
_innerStream = innerStream;
_progressCallback = progressCallback;
}
public override int Read(byte[] buffer, int offset, int count)
{
int bytesRead = _innerStream.Read(buffer, offset, count);
_totalRead += bytesRead;
_progressCallback?.Invoke(_totalRead, _innerStream.Length);
return bytesRead;
}
// 实现其他必要成员...
}
C# 7.2引入了对Span和Memory的支持,可以更高效地处理流数据:
csharp复制public async Task ProcessStreamAsync(Stream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(4096);
Memory<byte> memory = buffer.AsMemory();
try
{
int bytesRead;
while ((bytesRead = await stream.ReadAsync(memory)) > 0)
{
ProcessData(memory.Span.Slice(0, bytesRead));
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private void ProcessData(Span<byte> data)
{
// 高性能处理逻辑
for (int i = 0; i < data.Length; i++)
{
data[i] = (byte)(data[i] ^ 0x55);
}
}
对于极致性能场景,可以使用System.IO.Pipelines:
csharp复制async Task ProcessLinesAsync(Stream stream)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(stream, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0) break;
writer.Advance(bytesRead);
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted) break;
}
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
// 处理数据...
reader.AdvanceTo(buffer.End);
if (result.IsCompleted) break;
}
await reader.CompleteAsync();
}
使用MemoryStream模拟各种流场景进行单元测试:
csharp复制[Test]
public void TestStreamProcessor()
{
// 准备测试数据
byte[] testData = Enumerable.Range(0, 256).Select(i => (byte)i).ToArray();
using var ms = new MemoryStream(testData);
// 创建被测对象
var processor = new StreamProcessor();
// 执行测试
var result = processor.Process(ms);
// 验证结果
Assert.AreEqual(256, result.TotalBytes);
Assert.AreEqual(127.5, result.AverageValue, 0.1);
}
创建诊断流来跟踪实际读写操作:
csharp复制public class DiagnosticStream : Stream
{
private readonly Stream _innerStream;
public event EventHandler<StreamOperationEventArgs> OperationPerformed;
public DiagnosticStream(Stream innerStream)
{
_innerStream = innerStream;
}
public override int Read(byte[] buffer, int offset, int count)
{
var sw = Stopwatch.StartNew();
int bytesRead = _innerStream.Read(buffer, offset, count);
sw.Stop();
OperationPerformed?.Invoke(this, new StreamOperationEventArgs {
Operation = "Read",
Offset = offset,
Count = count,
ActualBytes = bytesRead,
Elapsed = sw.Elapsed
});
return bytesRead;
}
// 实现其他成员...
}
在跨平台应用中处理文件流时:
csharp复制// 不好的做法 - 硬编码路径分隔符
string path = "folder\\file.txt";
// 好的做法 - 使用Path类
string path = Path.Combine("folder", "file.txt");
处理文本流时注意不同平台的行尾差异:
csharp复制// 自动检测行尾
using var reader = new StreamReader(stream);
while (!reader.EndOfStream)
{
string line = reader.ReadLine();
// 处理行内容
}
// 明确指定行尾风格
string content = "Line1\nLine2\r\nLine3";
using var writer = new StreamWriter(stream, new UTF8Encoding(false))
{
NewLine = "\n" // 统一使用Unix风格
};
writer.Write(content);
处理多个需要清理的资源时:
csharp复制// 传统方式 - 嵌套using
using (var resource1 = new Resource1())
using (var resource2 = new Resource2())
{
// 操作代码
}
// C# 8.0简化方式
using var resource1 = new Resource1();
using var resource2 = new Resource2();
// 操作代码
实现自定义流时的标准Dispose模式:
csharp复制public class CustomStream : Stream
{
private bool _disposed;
private Stream _innerStream;
protected override void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
// 释放托管资源
_innerStream?.Dispose();
}
// 释放非托管资源
_disposed = true;
}
base.Dispose(disposing);
}
// 其他成员实现...
}
对于性能敏感场景,可以考虑内存映射文件:
csharp复制public void ProcessLargeFile(string filePath)
{
using (var mmf = MemoryMappedFile.CreateFromFile(filePath))
using (var accessor = mmf.CreateViewAccessor())
{
// 直接访问文件内存
byte firstByte = accessor.ReadByte(0);
// 批量处理
const int bufferSize = 10000;
byte[] buffer = new byte[bufferSize];
accessor.ReadArray(0, buffer, 0, bufferSize);
}
}
对于可分割的大文件,可以使用并行处理:
csharp复制public void ParallelProcess(string filePath)
{
var fileInfo = new FileInfo(filePath);
long chunkSize = fileInfo.Length / Environment.ProcessorCount;
Parallel.For(0, Environment.ProcessorCount, i =>
{
long start = i * chunkSize;
long end = (i == Environment.ProcessorCount - 1) ? fileInfo.Length : start + chunkSize;
using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
{
stream.Position = start;
byte[] buffer = new byte[8192];
long remaining = end - start;
while (remaining > 0)
{
int toRead = (int)Math.Min(buffer.Length, remaining);
int bytesRead = stream.Read(buffer, 0, toRead);
ProcessChunk(buffer, bytesRead, i);
remaining -= bytesRead;
}
}
});
}
正确使用CryptoStream的要点:
csharp复制public void EncryptFile(string inputFile, string outputFile, string password)
{
using (Aes aes = Aes.Create())
{
// 安全地派生密钥和IV
var salt = new byte[16];
RandomNumberGenerator.Fill(salt);
var pbkdf2 = new Rfc2898DeriveBytes(password, salt, 100000);
aes.Key = pbkdf2.GetBytes(32); // 256位密钥
aes.IV = pbkdf2.GetBytes(16); // 128位IV
using (var input = File.OpenRead(inputFile))
using (var output = File.Create(outputFile))
{
// 写入盐值
output.Write(salt, 0, salt.Length);
using (var crypto = new CryptoStream(
output,
aes.CreateEncryptor(),
CryptoStreamMode.Write))
{
input.CopyTo(crypto);
}
}
}
}
处理NetworkStream时的优化技巧:
csharp复制public async Task SendLargeDataAsync(NetworkStream networkStream, Stream dataStream)
{
// 发送数据长度前缀
long length = dataStream.Length;
byte[] lengthBytes = BitConverter.GetBytes(length);
await networkStream.WriteAsync(lengthBytes, 0, lengthBytes.Length);
// 分块发送数据
byte[] buffer = ArrayPool<byte>.Shared.Rent(8192);
try
{
int bytesRead;
while ((bytesRead = await dataStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await networkStream.WriteAsync(buffer, 0, bytesRead);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
增强网络流的鲁棒性:
csharp复制public async Task<byte[]> ReadWithTimeoutAsync(NetworkStream stream, int count, int timeoutMs)
{
byte[] buffer = new byte[count];
int totalRead = 0;
var cts = new CancellationTokenSource();
cts.CancelAfter(timeoutMs);
try
{
while (totalRead < count)
{
int bytesRead = await stream.ReadAsync(
buffer,
totalRead,
count - totalRead,
cts.Token);
if (bytesRead == 0)
throw new EndOfStreamException();
totalRead += bytesRead;
}
return buffer;
}
catch (OperationCanceledException)
{
throw new TimeoutException($"Read operation timed out after {timeoutMs}ms");
}
}
实现数据处理管道:
csharp复制public Stream CreateProcessingPipeline(Stream source)
{
// 第一段处理:解压
var decompress = new GZipStream(source, CompressionMode.Decompress);
// 第二段处理:解密
var decrypt = new CryptoStream(decompress, CreateDecryptor(), CryptoStreamMode.Read);
// 第三段处理:转码
var decode = new StreamReader(decrypt, Encoding.UTF8);
// 最终转换为内存流
var result = new MemoryStream();
decode.BaseStream.CopyTo(result);
result.Position = 0;
return result;
}
结合System.IO.Pipelines和Reactive Extensions:
csharp复制public IObservable<byte[]> CreateStreamObserver(Stream stream)
{
return Observable.Create<byte[]>(async (observer, cancellationToken) =>
{
byte[] buffer = new byte[4096];
try
{
while (!cancellationToken.IsCancellationRequested)
{
int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken);
if (bytesRead == 0) break;
if (bytesRead < buffer.Length)
{
var data = new byte[bytesRead];
Array.Copy(buffer, data, bytesRead);
observer.OnNext(data);
}
else
{
observer.OnNext(buffer);
}
}
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
}
处理二进制数据结构:
csharp复制public struct DataRecord
{
public int Id;
public float Value;
public DateTime Timestamp;
}
public void WriteRecord(BinaryWriter writer, DataRecord record)
{
writer.Write(record.Id);
writer.Write(record.Value);
writer.Write(record.Testamp.ToBinary());
}
public DataRecord ReadRecord(BinaryReader reader)
{
return new DataRecord
{
Id = reader.ReadInt32(),
Value = reader.ReadSingle(),
Timestamp = DateTime.FromBinary(reader.ReadInt64())
};
}
使用Span
csharp复制public void WriteRecord(Span<byte> buffer, DataRecord record)
{
BinaryPrimitives.WriteInt32LittleEndian(buffer.Slice(0, 4), record.Id);
BinaryPrimitives.WriteSingleLittleEndian(buffer.Slice(4, 4), record.Value);
BinaryPrimitives.WriteInt64LittleEndian(buffer.Slice(8, 8), record.Timestamp.ToBinary());
}
public DataRecord ReadRecord(ReadOnlySpan<byte> buffer)
{
return new DataRecord
{
Id = BinaryPrimitives.ReadInt32LittleEndian(buffer.Slice(0, 4)),
Value = BinaryPrimitives.ReadSingleLittleEndian(buffer.Slice(4, 4)),
Timestamp = DateTime.FromBinary(BinaryPrimitives.ReadInt64LittleEndian(buffer.Slice(8, 8)))
};
}
高效处理大型文本文件:
csharp复制public IEnumerable<string> ReadLines(string filePath)
{
using var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.SequentialScan);
using var reader = new StreamReader(stream);
string line;
while ((line = reader.ReadLine()) != null)
{
yield return line;
}
}
// 使用示例
foreach (var line in ReadLines("hugefile.txt"))
{
// 处理每行内容
}
正确处理文本编码问题:
csharp复制public string DetectEncodingAndRead(string filePath)
{
// 读取前4个字节检测BOM
byte[] bom = new byte[4];
using (var stream = new FileStream(filePath, FileMode.Open))
{
stream.Read(bom, 0, 4);
}
// 根据BOM确定编码
Encoding encoding = bom switch
{
_ when bom[0] == 0xEF && bom[1] == 0xBB && bom[2] == 0xBF => Encoding.UTF8,
_ when bom[0] == 0xFF && bom[1] == 0xFE => Encoding.Unicode,
_ when bom[0] == 0xFE && bom[1] == 0xFF => Encoding.BigEndianUnicode,
_ when bom[0] == 0 && bom[1] == 0 && bom[2] == 0xFE && bom[3] == 0xFF => Encoding.UTF32,
_ => Encoding.Default
};
// 使用检测到的编码读取文件
return File.ReadAllText(filePath, encoding);
}
实现一个支持自动重试的流:
csharp复制public class RetryStream : Stream
{
private readonly Func<Stream> _streamFactory;
private Stream _currentStream;
private int _maxRetries;
public RetryStream(Func<Stream> streamFactory, int maxRetries = 3)
{
_streamFactory = streamFactory;
_maxRetries = maxRetries;
_currentStream = streamFactory();
}
private T RetryOnFailure<T>(Func<T> operation)
{
int attempts = 0;
while (true)
{
try
{
return operation();
}
catch (IOException) when (attempts < _maxRetries)
{
attempts++;
_currentStream.Dispose();
_currentStream = _streamFactory();
}
}
}
public override int Read(byte[] buffer, int offset, int count)
{
return RetryOnFailure(() => _currentStream.Read(buffer, offset, count));
}
// 实现其他成员...
}
实现HTTP分块编码流:
csharp复制public class ChunkedStream : Stream
{
private readonly Stream _innerStream;
private readonly bool _leaveOpen;
private bool _writingChunk;
public ChunkedStream(Stream innerStream, bool leaveOpen = false)
{
_innerStream = innerStream;
_leaveOpen = leaveOpen;
}
public override void Write(byte[] buffer, int offset, int count)
{
if (!_writingChunk)
{
// 写入块大小头
string header = $"{count:X}\r\n";
byte[] headerBytes = Encoding.ASCII.GetBytes(header);
_innerStream.Write(headerBytes, 0, headerBytes.Length);
_writingChunk = true;
}
_innerStream.Write(buffer, offset, count);
}
public override void Flush()
{
if (_writingChunk)
{
_innerStream.Write(Encoding.ASCII.GetBytes("\r\n"), 0, 2);
_writingChunk = false;
}
_innerStream.Flush();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
// 写入结束块
if (!_leaveOpen)
{
_innerStream.Write(Encoding.ASCII.GetBytes("0\r\n\r\n"), 0, 5);
_innerStream.Dispose();
}
}
}
// 实现其他成员...
}
随着.NET平台的不断发展,流处理也在持续演进。我个人在实际项目中发现几个值得关注的趋势:
csharp复制public async IAsyncEnumerable<byte[]> ReadChunksAsync(Stream stream, int chunkSize)
{
byte[] buffer = new byte[chunkSize];
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
if (bytesRead < buffer.Length)
{
var chunk = new byte[bytesRead];
Array.Copy(buffer, chunk, bytesRead);
yield return chunk;
}
else
{
yield return buffer;
}
}
}
零分配API:Span
跨平台增强:.NET Core/.NET 5+对Linux和macOS的深度支持,带来了更多跨平台流处理场景。
云原生流处理:与Azure Blob Storage、AWS S3等云存储服务的深度集成,催生了新的流处理模式。
在实际项目中,我发现合理组合这些技术可以构建出既高性能又易于维护的流处理系统。比如,在处理GB级CSV文件导入时,结合FileStream、Span