在之前的项目中介绍了
这次通过 Netty 传递文件
此项目地址: https://github.com/haoxiaoyong1014/netty-file
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.21.Final</version>
</dependency>
和之前的两个例子中的依赖是一样的
-
客户端
- FileUploadClientHandler
public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
private int byteRead;
private volatile int start = 0;
private volatile int lastLength = 0;
public RandomAccessFile randomAccessFile;
private FileUploadFile fileUploadFile;
private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadClientHandler.class);
public FileUploadClientHandler(FileUploadFile ef) {
if (ef.getFile().exists()) {
if (!ef.getFile().isFile()) {
System.out.println("Not a file :" + ef.getFile());
return;
}
}
this.fileUploadFile = ef;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
LOGGER.info("客户端结束传递文件channelInactive()");
}
public void channelActive(ChannelHandlerContext ctx) {
LOGGER.info("正在执行channelActive()方法.....");
try {
randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),
"r");
randomAccessFile.seek(fileUploadFile.getStarPos());
// lastLength = (int) randomAccessFile.length() / 10;
lastLength = 1024 * 10;
byte[] bytes = new byte[lastLength];
if ((byteRead = randomAccessFile.read(bytes)) != -1) {
fileUploadFile.setEndPos(byteRead);
fileUploadFile.setBytes(bytes);
ctx.writeAndFlush(fileUploadFile); //发送消息到服务端
} else {
}
LOGGER.info("channelActive()文件已经读完 " + byteRead);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException i) {
i.printStackTrace();
}
LOGGER.info("channelActive()方法执行结束");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof Integer) {
start = (Integer) msg;
if (start != -1) {
randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
randomAccessFile.seek(start); //将文件定位到start
LOGGER.info("长度:" + (randomAccessFile.length() - start));
int a = (int) (randomAccessFile.length() - start);
int b = (int) (randomAccessFile.length() / 1024 * 2);
if (a < lastLength) {
lastLength = a;
}
LOGGER.info("文件长度:" + (randomAccessFile.length()) + ",start:" + start + ",a:" + a + ",b:" + b + ",lastLength:" + lastLength);
byte[] bytes = new byte[lastLength];
LOGGER.info("bytes的长度是="+bytes.length);
if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length() - start) > 0) {
LOGGER.info("byteRead = " + byteRead);
fileUploadFile.setEndPos(byteRead);
fileUploadFile.setBytes(bytes);
try {
ctx.writeAndFlush(fileUploadFile);
} catch (Exception e) {
e.printStackTrace();
}
} else {
randomAccessFile.close();
ctx.close();
LOGGER.info("文件已经读完channelRead()--------" + byteRead);
}
}
}
}
}
这里使用了RandomAccessFile
对RandomAccessFile 做一个简单的介绍更有助于理解代码
RandomAccessFile特点
RandomAccessFile是java Io体系中功能最丰富的文件内容访问类。即可以读取文件内容,也可以向文件中写入内容。但是和其他输入/输入流不同的是,程序可以直接跳到文件的任意位置来读写数据。 因为RandomAccessFile可以自由访问文件的任意位置,所以如果我们希望只访问文件的部分内容,那就可以使用RandomAccessFile类。 与OutputStearm,Writer等输出流不同的是,RandomAccessFile类允许自由定位文件记录指针,所以RandomAccessFile可以不从文件开始的地方进行输出,所以RandomAccessFile可以向已存在的文件后追加内容。则应该使用RandomAccessFile。
RandomAccessFile类包含了一个记录指针,用以标识当前读写处的位置,当程序新创建一个RandomAccessFile对象时,该对象的文件记录指针位于文件头(也就是0处), 当读/写了n个字节后,文件记录指针将会向后移动n个字节。除此之外,RandomAccessFile可以自由的移动记录指针,即可以向前移动,也可以向后移动。 RandomAccessFile包含了以下两个方法来操作文件的记录指针.
long getFilePointer(); 返回文件记录指针的当前位置
void seek(long pos); 将文件记录指针定位到pos位置
RandomAccessFile即可以读文件,也可以写,所以它即包含了完全类似于InputStream的3个read()方法,其用法和InputStream的3个read()方法完全一样; 也包含了完全类似于OutputStream的3个write()方法,其用法和OutputStream的3个Writer()方法完全一样。 除此之外,RandomAccessFile还包含了一系类的readXXX()和writeXXX()方法来完成输入和输出。
-
RandomAccessFile有两个构造器,其实这两个构造器基本相同,只是指定文件的形式不同而已,一个使用String参数来指定文件名,一个使用File参数来指定文件本身。 除此之外,创建RandomAccessFile对象还需要指定一个mode参数。该参数指定RandomAccessFile的访问模式,有以下4个值:
- “r” 以只读方式来打开指定文件夹。如果试图对该RandomAccessFile执行写入方法,都将抛出IOException异常。
- “rw” 以读,写方式打开指定文件。如果该文件尚不存在,则试图创建该文件。
- “rws” 以读,写方式打开指定文件。相对于”rw” 模式,还要求对文件内容或元数据的每个更新都同步写入到底层设备。
- “rwd” 以读,写方式打开指定文件。相对于”rw” 模式,还要求对文件内容每个更新都同步写入到底层设备
下面对上面的FileUploadClientHandler类中的代码进行说明:
channelActive()
方法中有这么一段代码:
ctx.writeAndFlush(fileUploadFile);
这段代码的意思是向服务端发送消息,消息内容就是FileUploadFile
对象, FileUploadFile对象中包含 :文件,文件名,开始位置,文件字节数组,结尾位置
服务端代码:
- FileUploadServerHandler
public class FileUploadServerHandler extends ChannelInboundHandlerAdapter {
private int byteRead;
private volatile int start = 0;
private String file_dir = "/tmp";
private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelActive(ctx);
LOGGER.info("服务端:channelActive()");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
LOGGER.info("服务端:channelInactive()");
ctx.flush();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LOGGER.info("收到客户端发来的文件,正在处理....");
if (msg instanceof FileUploadFile) {
FileUploadFile ef = (FileUploadFile) msg;
byte[] bytes = ef.getBytes();
byteRead = ef.getEndPos();
String md5 = ef.getFile_md5();//文件名
String path = file_dir + File.separator + md5;
File file = new File(path);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");//r: 只读模式 rw:读写模式
randomAccessFile.seek(start);//移动文件记录指针的位置,
randomAccessFile.write(bytes);//调用了seek(start)方法,是指把文件的记录指针定位到start字节的位置。也就是说程序将从start字节开始写数据
start = start + byteRead;
if (byteRead > 0) {
ctx.writeAndFlush(start);//向客户端发送消息
randomAccessFile.close();
if(byteRead!=1024 * 10){
Thread.sleep(1000);
channelInactive(ctx);
}
} else {
ctx.close();
}
LOGGER.info("处理完毕,文件路径:"+path+","+byteRead);
}
}
}
channelRead()
方法即是接收客户端的代码,客户端也有这个方法,客户端channelRead()
方法主要是负责读文件,服务端主要是写文件.
注意:
在服务端FileUploadServerHandler
这个类中我们要将file_dir
的路径改为自己电脑上的路径,mac: /tmp; windows: F:
我们可以先运行服务端的ServerFileTest
测试类,然后运行客户端的 ClientFileTest
测试类进行
具体代码就不贴上去了,可以在这里进行下载这个案例 netty-file 如果对你有帮助还请给个Star哦
新增多个文件同时异步上传功能
引入线程,在开发中如果有此需求尽量使用线程池;
更新的代码有:
public class FileUploadClient implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadClient.class);
private int port;
private String host;
private FileUploadFile fileUploadFile;
public FileUploadClient(int port, String host, FileUploadFile fileUploadFile) {
this.port = port;
this.host = host;
this.fileUploadFile = fileUploadFile;
}
@Override
public void run() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
//是禁用nagle算法
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(
new ObjectDecoder(
ClassResolvers
.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(
new FileUploadClientHandler(
fileUploadFile));
}
});
ChannelFuture f = null;
try {
f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("FileUploadClient connect()结束");
} finally {
group.shutdownGracefully();
}
}
}
FileUploadClient类实现Runnable并重写run方法;
测试类:
public static void main(String[] args) {
final int FILE_PORT = 9991;
try {
List<String> fileNameList = new ArrayList();
fileNameList.add("/test-1.zip");
fileNameList.add("/test-2.zip");
for (String fileName : fileNameList) {
FileUploadFile uploadFile = new FileUploadFile();
File file = new File(fileName);
String fileMd5 = file.getName();// 文件名
uploadFile.setFile(file);
uploadFile.setFile_md5(fileMd5);
uploadFile.setStarPos(0);// 文件开始位置
Thread thread = new Thread(new FileUploadClient(FILE_PORT, "127.0.0.1", uploadFile));
thread.start();
System.out.println(fileName + "开始传输。。。。。。");
}
} catch (Exception e) {
e.printStackTrace();
}
}